Trigger and Function
Last updated
Last updated
The deposit_to_l2
job is triggered when an deposit-related transaction occurs or an event is produced. Specifically:
bridgeETHTo
: When the transaction content involves depositing WETH to a specific L2 chain through a contract
depositERC20
: When the transaction content involves depositing ERC20 to a specific L2 chain through a contract
def parse_deposit_transaction_function(
transactions: List[Transaction],
contract_set: set,
chain_mapping: dict,
sig_function_mapping: dict,
sig_parse_mapping: dict,
) -> List[TokenDepositTransaction]:
deposit_tokens = []
for transaction in transactions:
if transaction.to_address in contract_set:
input_sig = transaction.input[0:10]
if input_sig in sig_function_mapping:
deposit_transaction = sig_parse_mapping[input_sig](
transaction=transaction,
chain_mapping=chain_mapping,
function=sig_function_mapping[input_sig],
)
deposit_tokens.append(deposit_transaction)
return deposit_tokens
and the function to extract the deposit info from the transaction input data:
def eth_deposit_parse(transaction: Transaction, chain_mapping: dict, function: ABIFunction) -> TokenDepositTransaction:
return TokenDepositTransaction(
transaction_hash=transaction.hash,
wallet_address=transaction.from_address,
chain_id=chain_mapping[transaction.to_address],
contract_address=transaction.to_address,
token_address=ETH_ADDRESS,
value=transaction.value,
block_number=transaction.block_number,
block_timestamp=transaction.block_timestamp,
)
def usdc_deposit_parse(transaction: Transaction, chain_mapping: dict, function: ABIFunction) -> TokenDepositTransaction:
decoded_input = decode_transaction_data(function, HexStr(transaction.input))
return TokenDepositTransaction(
transaction_hash=transaction.hash,
wallet_address=transaction.from_address,
chain_id=chain_mapping[transaction.to_address],
contract_address=transaction.to_address,
token_address=USDC_ADDRESS,
value=decoded_input["_amount"],
block_number=transaction.block_number,
block_timestamp=transaction.block_timestamp,
)
def token_deposit_parse(
transaction: Transaction, chain_mapping: dict, function: ABIFunction
) -> TokenDepositTransaction:
decoded_input = decode_transaction_data(function, HexStr(transaction.input))
return TokenDepositTransaction(
transaction_hash=transaction.hash,
wallet_address=transaction.from_address,
chain_id=chain_mapping[transaction.to_address],
contract_address=transaction.to_address,
token_address=decoded_input["_l1Token"],
value=decoded_input["_amount"],
block_number=transaction.block_number,
block_timestamp=transaction.block_timestamp,
)
Meanwhile, after deposit_to_l2_job is triggered, the AddressTokenDeposit value is continuously aggregated by TokenDepositTransaction:
for deposit in pre_aggregate_deposit_in_same_block(deposit_tokens):
cache_key = (deposit.wallet_address, deposit.chain_id, deposit.contract_address, deposit.token_address)
cache_value = self.cache.get(cache_key)
if cache_value and cache_value.block_number < deposit.block_number:
# add and save 2 cache
token_deposit = AddressTokenDeposit(
wallet_address=deposit.wallet_address,
chain_id=deposit.chain_id,
contract_address=deposit.contract_address,
token_address=deposit.token_address,
value=deposit.value + cache_value.value,
block_number=deposit.block_number,
block_timestamp=deposit.block_timestamp,
)
self.cache.set(cache_key, token_deposit)
self._collect_item(AddressTokenDeposit.type(), token_deposit)
elif cache_value is None:
# check from db and save 2 cache
history_deposit = self.check_history_deposit_from_db(
deposit.wallet_address, deposit.chain_id, deposit.token_address
)
if history_deposit is None or history_deposit.block_number < deposit.block_number:
token_deposit = AddressTokenDeposit(
wallet_address=deposit.wallet_address,
chain_id=deposit.chain_id,
contract_address=deposit.contract_address,
token_address=deposit.token_address,
value=deposit.value + history_deposit.value if history_deposit else deposit.value,
block_number=deposit.block_number,
block_timestamp=deposit.block_timestamp,
)
self.cache.set(cache_key, token_deposit)
self._collect_item(AddressTokenDeposit.type(), token_deposit)
self._data_buff[AddressTokenDeposit.type()] = distinct_collections_by_group(
collections=self._data_buff[AddressTokenDeposit.type()],
group_by=["wallet_address", "chain_id", "contract_address", "token_address"],
max_key="block_number",
)