Building User Defined Functions(UDF)

This section walks the user through the steps to define, implement, and integrating a User-Defined Function (UDF) to process blockchain data, extract specific information, and store it in a PostgreSQL database.

We'll be following 4 primary steps:

  • Setting up input and output data classes

  • Defining database models

  • Implementing job logic

  • Integrating the job into Hemera Indexer

Prerequisites

Before beginning, set up the development environment by following Hemera’s setup guide for Docker or from source​.


Step 1: Define the Input and Output Data Classes

Input Data Class

The input data class represents the structure of the data being processed by your job. For blockchain-based UDFs, this might be a Transaction or Log object representing blockchain transactions or events.

Output Data Class

The output data class represents the processed data that will be stored in the database. This class should capture relevant fields from the input data and convert them into a format suitable for saving to the database.

# your_udf_data_classes.py
from dataclasses import dataclass

@dataclass
class YourCustomDataClass:
    field1: str
    field2: int
    field3: str
    # Add other fields as necessary

This structure ensures that you are working with typed data and can easily process it within your UDF logic.


Step 2: Define the Database Model

The model maps the processed data to the database table schema. You will use SQLAlchemy to define your model, which ensures that your data class output is correctly persisted in the PostgreSQL database.

# models/your_udf_models.py
from sqlalchemy import Column, String, Integer
from hemera.models.base import Base  # Import Base class from Hemera Indexer

class YourCustomModel(Base):
    __tablename__ = 'your_custom_table'

    field1 = Column(String, primary_key=True)
    field2 = Column(Integer)
    field3 = Column(String)
    # Add other fields matching your data class

Make sure that the fields defined in the model correspond to those in the data class to ensure proper mapping of the processed data.


Step 3: Implement the UDF Job

The job logic is where your UDF processes the input data and produces output data. The job needs to:

  • Define dependencies: Specify which data classes you depend on, such as Transaction or Log.

  • Process transactions: Implement the logic to filter, extract, and transform data from the input.

  • Define get_filter() to Specify Which Blockchain Events to Process. Filter based on criteria like addresses and topics to limit which blockchain events are processed.

  • Core Logic in _process() Function:

    • Iterate through each transaction.

    • Apply filtering logic to select specific transactions, such as high-value transactions.

    • Transform data from the transaction and map it to the fields in the output data class.

  • Save to database: Convert the output into a format that matches your model and store it in the database.

# your_udf_jobs.py
from hemera.jobs.base_job import BaseJob
from hemera.dataclasses import Transaction
from your_udf_data_classes import YourCustomDataClass
from your_udf_models import YourCustomModel

class YourCustomJob(BaseJob):
    """
    A custom job to process blockchain data and output custom data.
    """

    def __init__(self):
        super().__init__()
        self.dependencies = [Transaction]  # Define input data classes
        self.output_data_classes = [YourCustomDataClass]  # Define output data classes

    def process(self, transactions):
        """
        Your custom processing logic goes here.
        """
        output_data = []
        for tx in transactions:
            if self._is_target_transaction(tx):
                data = YourCustomDataClass(
                    field1=tx.hash,
                    field2=int(tx.value, 16),
                    field3=tx.from_address
                )
                output_data.append(data)
        return output_data

    def get_filter(self):
        """
        Implement any filtering logic needed for transactions.
        """
        # Example: return transactions that match a specific criterion
        return None

Step 4: Integrate Your UDF Job into Hemera Indexer

After defining your job, integrate it into Hemera Indexer so it can run as part of the indexing process. You may need to:

  1. Register the job: Place your job file in the appropriate directory within the Hemera Indexer project (e.g., hemera/jobs/custom/).

  2. Update configuration: Ensure that your job is included in the Hemera Indexer job registry or configuration files so that it is executed during indexing.

# Example job registration (this can vary depending on the indexer setup)
job_registry.register(YourCustomJob)


f Once updated, run the hemera indexer. For more detailed steps and deployment options, refer to the Testing and Running UDFsection.


Example: ERC721 Minting Job

Here’s an example of implementing a UDF job that processes ERC721 token minting transactions from a blockchain and saves the mint data to a PostgreSQL database.

Step 1: Define Input and Output Data Classes

Input Data Class: The input for this job is a Transaction, which is provided by the Hemera Indexer.

Output Data Class: The output class ERC721TokenMint represents the minting information for each ERC721 token.

# indexer/modules/custom/erc721_token_mint/domain/erc721_mint_time.py
from dataclasses import dataclass
from indexer.domain import FilterData

@dataclass
class ERC721TokenMint(FilterData):
    token_address: str
    token_id: int
    block_number: int
    block_timestamp: int
    transaction_hash: str

Step 2: Define the Database Model

# indexer/modules/custom/erc721_token_mint/models/erc721_token_mint.py
from sqlalchemy import Column, BIGINT, TIMESTAMP, BYTEA, NUMERIC
from hemera.models.base import Base

class ERC721TokenMint(Base):
    __tablename__ = 'erc721_token_mint'

    token_address = Column(BYTEA, primary_key=True)
    token_id = Column(NUMERIC(100), primary_key=True)
    block_number = Column(BIGINT)
    block_timestamp = Column(TIMESTAMP)
    transaction_hash = Column(BYTEA)

Step 3: Implement the UDF Job

# indexer/modules/custom/erc721_token_mint/export_erc721_token_mint_job.py
import logging
from typing import List
from indexer.domain.log import Log
from indexer.domain.token_transfer import TokenTransfer, extract_transfer_from_log
from indexer.domain.transaction import Transaction
from indexer.jobs.base_job import FilterTransactionDataJob
from indexer.modules.custom.erc721_token_mint.domain.erc721_mint_time import ERC721TokenMint

logger = logging.getLogger(__name__)

TARGET_TOKEN_ADDRESS = ["0x144e8e2450d8660c6de415a56452b10187343ad6"]

def _filter_mint_tokens(logs: List[Log]) -> List[TokenTransfer]:
    token_transfers = []
    for log in logs:
        token_transfers += extract_transfer_from_log(log)
    return [
        transfer
        for transfer in token_transfers
        if transfer.from_address == "0x0000000000000000000000000000000000000000"
    ]

class ExportERC721MintTimeJob(FilterTransactionDataJob):
    dependency_types = [Transaction]
    output_types = [ERC721TokenMint]

    def _collect(self):
        logs = self._data_buff[Log.type()]
        mint_tokens = _filter_mint_tokens(logs)
        erc721_mint_infos = [
            ERC721TokenMint(
                token_address=token.token_address,
                token_id=token.value,
                block_timestamp=token.block_timestamp,
                block_number=token.block_number,
                transaction_hash=token.transaction_hash
            )
            for token in mint_tokens
        ]
        self._collect_domains(erc721_mint_infos)

Step 4: Integration

Register the ExportERC721MintTimeJob within the Hemera Indexer so that it becomes part of the indexing workflow.

This example demonstrates a complete workflow for a UDF job that processes blockchain transactions and stores the result in a database.

Notes:

  • Dependencies: Specify the data classes your job depends on (e.g., Transaction).

  • Output Data Classes: Specify the data classes your job outputs.

  • Processing Logic: Implement your custom logic in the process method.

  • Database Interaction: Use SQLAlchemy sessions to save data to the database.

Step 4: Integrate Your UDF into Hemera Indexer

To have your UDF executed as part of the indexing process, you need to register it within Hemera Indexer.

Depending on how Hemera Indexer is set up, you may need to:

  • Place your job files in the appropriate directories (e.g., hemera/jobs/custom/).

  • Update any job registries or configuration files to include your job.

Last updated