This job filters for logs from the Uniswap V3 NFT contract address.
Key Processing Steps
Collecting Token Data:
Filter token ownership transfers and liquidity adjustments for each tokenId.
for log in logs:if log.address == self._nft_address:if topic0 in [constants.TRANSFER_TOPIC0, constants.UNISWAP_V3_ADD_LIQUIDITY_TOPIC0, constants.UNISWAP_V3_REMOVE_LIQUIDITY_TOPIC0]:# Process and collect token data
for data in token_infos:if token_id notin self._exist_token_ids:# Create new token self._collect_item(UniswapV3Token.type(), new_token) detail =UniswapV3TokenDetail(...) self._collect_item(UniswapV3TokenDetail.type(), detail)# Update current status token_id_current_status[token_id]=create_token_status(detail)
Collect Liquidity Updates and Fee Collection Events::
if topic0 in [constants.UNISWAP_V3_REMOVE_LIQUIDITY_TOPIC0, constants.UNISWAP_V3_ADD_LIQUIDITY_TOPIC0]: self._collect_item(UniswapV3TokenUpdateLiquidity.type(), liquidity_event)elif topic0 == constants.UNISWAP_V3_TOKEN_COLLECT_FEE_TOPIC0: self._collect_item(UniswapV3TokenCollectFee.type(), fee_event)