Trigger and Function

The deposit_to_l2 job is triggered when an deposit-related transaction occurs or an event is produced. Specifically:

def parse_deposit_transaction_function(
    transactions: List[Transaction],
    contract_set: set,
    chain_mapping: dict,
    sig_function_mapping: dict,
    sig_parse_mapping: dict,
) -> List[TokenDepositTransaction]:
    deposit_tokens = []
    for transaction in transactions:
        if transaction.to_address in contract_set:
            input_sig = transaction.input[0:10]
            if input_sig in sig_function_mapping:
                deposit_transaction = sig_parse_mapping[input_sig](
                    transaction=transaction,
                    chain_mapping=chain_mapping,
                    function=sig_function_mapping[input_sig],
                )
                deposit_tokens.append(deposit_transaction)
    return deposit_tokens

and the function to extract the deposit info from the transaction input data:

def eth_deposit_parse(transaction: Transaction, chain_mapping: dict, function: ABIFunction) -> TokenDepositTransaction:
    return TokenDepositTransaction(
        transaction_hash=transaction.hash,
        wallet_address=transaction.from_address,
        chain_id=chain_mapping[transaction.to_address],
        contract_address=transaction.to_address,
        token_address=ETH_ADDRESS,
        value=transaction.value,
        block_number=transaction.block_number,
        block_timestamp=transaction.block_timestamp,
    )


def usdc_deposit_parse(transaction: Transaction, chain_mapping: dict, function: ABIFunction) -> TokenDepositTransaction:
    decoded_input = decode_transaction_data(function, HexStr(transaction.input))
    return TokenDepositTransaction(
        transaction_hash=transaction.hash,
        wallet_address=transaction.from_address,
        chain_id=chain_mapping[transaction.to_address],
        contract_address=transaction.to_address,
        token_address=USDC_ADDRESS,
        value=decoded_input["_amount"],
        block_number=transaction.block_number,
        block_timestamp=transaction.block_timestamp,
    )


def token_deposit_parse(
    transaction: Transaction, chain_mapping: dict, function: ABIFunction
) -> TokenDepositTransaction:
    decoded_input = decode_transaction_data(function, HexStr(transaction.input))
    return TokenDepositTransaction(
        transaction_hash=transaction.hash,
        wallet_address=transaction.from_address,
        chain_id=chain_mapping[transaction.to_address],
        contract_address=transaction.to_address,
        token_address=decoded_input["_l1Token"],
        value=decoded_input["_amount"],
        block_number=transaction.block_number,
        block_timestamp=transaction.block_timestamp,
    )

Meanwhile, after deposit_to_l2_job is triggered, the AddressTokenDeposit value is continuously aggregated by TokenDepositTransaction:

    for deposit in pre_aggregate_deposit_in_same_block(deposit_tokens):
        cache_key = (deposit.wallet_address, deposit.chain_id, deposit.contract_address, deposit.token_address)
        cache_value = self.cache.get(cache_key)
        if cache_value and cache_value.block_number < deposit.block_number:
            # add and save 2 cache
            token_deposit = AddressTokenDeposit(
                wallet_address=deposit.wallet_address,
                chain_id=deposit.chain_id,
                contract_address=deposit.contract_address,
                token_address=deposit.token_address,
                value=deposit.value + cache_value.value,
                block_number=deposit.block_number,
                block_timestamp=deposit.block_timestamp,
            )
    
            self.cache.set(cache_key, token_deposit)
            self._collect_item(AddressTokenDeposit.type(), token_deposit)
    
        elif cache_value is None:
            # check from db and save 2 cache
            history_deposit = self.check_history_deposit_from_db(
                deposit.wallet_address, deposit.chain_id, deposit.token_address
            )
            if history_deposit is None or history_deposit.block_number < deposit.block_number:
                token_deposit = AddressTokenDeposit(
                    wallet_address=deposit.wallet_address,
                    chain_id=deposit.chain_id,
                    contract_address=deposit.contract_address,
                    token_address=deposit.token_address,
                    value=deposit.value + history_deposit.value if history_deposit else deposit.value,
                    block_number=deposit.block_number,
                    block_timestamp=deposit.block_timestamp,
                )
                self.cache.set(cache_key, token_deposit)
                self._collect_item(AddressTokenDeposit.type(), token_deposit)
    
    self._data_buff[AddressTokenDeposit.type()] = distinct_collections_by_group(
        collections=self._data_buff[AddressTokenDeposit.type()],
        group_by=["wallet_address", "chain_id", "contract_address", "token_address"],
        max_key="block_number",
    )

Last updated