This job filters for logs from the Uniswap V3 NFT contract address.
Key Processing Steps
Collecting Token Data:
Filter token ownership transfers and liquidity adjustments for each tokenId.
for log in logs:
if log.address == self._nft_address:
if topic0 in [constants.TRANSFER_TOPIC0, constants.UNISWAP_V3_ADD_LIQUIDITY_TOPIC0,
constants.UNISWAP_V3_REMOVE_LIQUIDITY_TOPIC0]:
# Process and collect token data
for data in token_infos:
if token_id not in self._exist_token_ids:
# Create new token
self._collect_item(UniswapV3Token.type(), new_token)
detail = UniswapV3TokenDetail(...)
self._collect_item(UniswapV3TokenDetail.type(), detail)
# Update current status
token_id_current_status[token_id] = create_token_status(detail)
Collect Liquidity Updates and Fee Collection Events::
if topic0 in [constants.UNISWAP_V3_REMOVE_LIQUIDITY_TOPIC0,
constants.UNISWAP_V3_ADD_LIQUIDITY_TOPIC0]:
self._collect_item(UniswapV3TokenUpdateLiquidity.type(), liquidity_event)
elif topic0 == constants.UNISWAP_V3_TOKEN_COLLECT_FEE_TOPIC0:
self._collect_item(UniswapV3TokenCollectFee.type(), fee_event)