Trigger and Function

UniSwapV3PoolJob

Filter Logic

def get_filter(self):
    return TransactionFilterByLogs(
        [
            TopicSpecification(addresses=[self._factory_address], topics=[self._create_pool_topic0]),
            TopicSpecification(topics=self._pool_price_topic0_list),
        ]
    )

This job filters for logs from the factory address with the pool creation topic, and logs with topics indicating pool price changes.

Key Processing Steps

  1. Collecting Pool Data:

def _collect_pool_batch(self, logs):
    for log in logs:
        if self._factory_address == log.address and self._create_pool_topic0 == log.topic0:
            entity = decode_pool_created(self._nft_address, self._factory_address, log)
            self._collect_item(UniswapV3Pool.type(), entity)
  1. Collecting Swap Events:

if log.topic0 == constants.UNISWAP_V3_POOL_SWAP_TOPIC0:
    # Process and collect swap event
    self._collect_item(UniswapV3SwapEvent.type(), swap_event)
  1. Updating Pool Prices:

  2. call slot0 function.

pool_prices = slot0_rpc_requests(
            self._web3,
            self._batch_web3_provider.make_request,
            requests,
            self._is_batch,
            self._abi_list,
            self._batch_size,
            self._max_worker,
        )
for data in pool_prices:
    detail = UniswapV3PoolPrice(
        factory_address=self._factory_address,
        pool_address=data["pool_address"],
        sqrt_price_x96=data["sqrtPriceX96"],
        tick=data["tick"],
        block_number=data["block_number"],
        block_timestamp=data["block_timestamp"],
    )
    self._collect_item(UniswapV3PoolPrice.type(), detail)

UniswapV3TokenJob

Filter Logic

def get_filter(self):
    return TransactionFilterByLogs(
        [
            TopicSpecification(addresses=[self._nft_address]),
        ]
    )

This job filters for logs from the Uniswap V3 NFT contract address.

Key Processing Steps

  1. Collecting Token Data:

Filter token ownership transfers and liquidity adjustments for each tokenId.

for log in logs:
    if log.address == self._nft_address:
        if topic0 in [constants.TRANSFER_TOPIC0, constants.UNISWAP_V3_ADD_LIQUIDITY_TOPIC0, 
                      constants.UNISWAP_V3_REMOVE_LIQUIDITY_TOPIC0]:
            # Process and collect token data
  1. Fetching Token Ownership and Position Data:

call owner and positions

owner_info = owner_rpc_requests(...)
token_infos = positions_rpc_requests(...)
  1. Collect Token Information:

for data in token_infos:
    if token_id not in self._exist_token_ids:
        # Create new token
        self._collect_item(UniswapV3Token.type(), new_token)
    
    detail = UniswapV3TokenDetail(...)
    self._collect_item(UniswapV3TokenDetail.type(), detail)
    
    # Update current status
    token_id_current_status[token_id] = create_token_status(detail)
  1. Collect Liquidity Updates and Fee Collection Events::

if topic0 in [constants.UNISWAP_V3_REMOVE_LIQUIDITY_TOPIC0, 
              constants.UNISWAP_V3_ADD_LIQUIDITY_TOPIC0]:
    self._collect_item(UniswapV3TokenUpdateLiquidity.type(), liquidity_event)
elif topic0 == constants.UNISWAP_V3_TOKEN_COLLECT_FEE_TOPIC0:
    self._collect_item(UniswapV3TokenCollectFee.type(), fee_event)

Last updated