Trigger and Function
UniSwapV3PoolJob
Filter Logic
def get_filter(self):
return TransactionFilterByLogs(
[
TopicSpecification(addresses=[self._factory_address], topics=[self._create_pool_topic0]),
TopicSpecification(topics=self._pool_price_topic0_list),
]
)
This job filters for logs from the factory address with the pool creation topic, and logs with topics indicating pool price changes.
Key Processing Steps
Collecting Pool Data:
def _collect_pool_batch(self, logs):
for log in logs:
if self._factory_address == log.address and self._create_pool_topic0 == log.topic0:
entity = decode_pool_created(self._nft_address, self._factory_address, log)
self._collect_item(UniswapV3Pool.type(), entity)
Collecting Swap Events:
if log.topic0 == constants.UNISWAP_V3_POOL_SWAP_TOPIC0:
# Process and collect swap event
self._collect_item(UniswapV3SwapEvent.type(), swap_event)
Updating Pool Prices:
call
slot0
function.
pool_prices = slot0_rpc_requests(
self._web3,
self._batch_web3_provider.make_request,
requests,
self._is_batch,
self._abi_list,
self._batch_size,
self._max_worker,
)
for data in pool_prices:
detail = UniswapV3PoolPrice(
factory_address=self._factory_address,
pool_address=data["pool_address"],
sqrt_price_x96=data["sqrtPriceX96"],
tick=data["tick"],
block_number=data["block_number"],
block_timestamp=data["block_timestamp"],
)
self._collect_item(UniswapV3PoolPrice.type(), detail)
UniswapV3TokenJob
Filter Logic
def get_filter(self):
return TransactionFilterByLogs(
[
TopicSpecification(addresses=[self._nft_address]),
]
)
This job filters for logs from the Uniswap V3 NFT contract address.
Key Processing Steps
Collecting Token Data:
Filter token ownership transfers and liquidity adjustments for each tokenId.
for log in logs:
if log.address == self._nft_address:
if topic0 in [constants.TRANSFER_TOPIC0, constants.UNISWAP_V3_ADD_LIQUIDITY_TOPIC0,
constants.UNISWAP_V3_REMOVE_LIQUIDITY_TOPIC0]:
# Process and collect token data
Fetching Token Ownership and Position Data:
call owner
and positions
owner_info = owner_rpc_requests(...)
token_infos = positions_rpc_requests(...)
Collect Token Information:
for data in token_infos:
if token_id not in self._exist_token_ids:
# Create new token
self._collect_item(UniswapV3Token.type(), new_token)
detail = UniswapV3TokenDetail(...)
self._collect_item(UniswapV3TokenDetail.type(), detail)
# Update current status
token_id_current_status[token_id] = create_token_status(detail)
Collect Liquidity Updates and Fee Collection Events::
if topic0 in [constants.UNISWAP_V3_REMOVE_LIQUIDITY_TOPIC0,
constants.UNISWAP_V3_ADD_LIQUIDITY_TOPIC0]:
self._collect_item(UniswapV3TokenUpdateLiquidity.type(), liquidity_event)
elif topic0 == constants.UNISWAP_V3_TOKEN_COLLECT_FEE_TOPIC0:
self._collect_item(UniswapV3TokenCollectFee.type(), fee_event)
Last updated