This section walks the user through the steps to define, implement, and integrating a User-Defined Function (UDF) to process blockchain data, extract specific information, and store it in a PostgreSQL database.
We'll be following 4 primary steps:
Setting up input and output data classes
Defining database models
Implementing job logic
Integrating the job into Hemera Indexer
Prerequisites
Before beginning, set up the development environment by following Hemera’s setup guide for Docker or from source.
Step 1: Define the Input and Output Data Classes
Input Data Class
The input data class represents the structure of the data being processed by your job. For blockchain-based UDFs, this might be a Transaction or Log object representing blockchain transactions or events.
Output Data Class
The output data class represents the processed data that will be stored in the database. This class should capture relevant fields from the input data and convert them into a format suitable for saving to the database.
# your_udf_data_classes.pyfrom dataclasses import dataclass@dataclassclassYourCustomDataClass: field1:str field2:int field3:str# Add other fields as necessary
This structure ensures that you are working with typed data and can easily process it within your UDF logic.
Step 2: Define the Database Model
The model maps the processed data to the database table schema. You will use SQLAlchemy to define your model, which ensures that your data class output is correctly persisted in the PostgreSQL database.
# models/your_udf_models.pyfrom sqlalchemy import Column, String, Integerfrom hemera.models.base import Base # Import Base class from Hemera IndexerclassYourCustomModel(Base): __tablename__ ='your_custom_table' field1 =Column(String, primary_key=True) field2 =Column(Integer) field3 =Column(String)# Add other fields matching your data class
Make sure that the fields defined in the model correspond to those in the data class to ensure proper mapping of the processed data.
Step 3: Implement the UDF Job
The job logic is where your UDF processes the input data and produces output data. The job needs to:
Define dependencies: Specify which data classes you depend on, such as Transaction or Log.
Process transactions: Implement the logic to filter, extract, and transform data from the input.
Define get_filter() to Specify Which Blockchain Events to Process. Filter based on criteria like addresses and topics to limit which blockchain events are processed.
Core Logic in _process() Function:
Iterate through each transaction.
Apply filtering logic to select specific transactions, such as high-value transactions.
Transform data from the transaction and map it to the fields in the output data class.
Save to database: Convert the output into a format that matches your model and store it in the database.
# your_udf_jobs.pyfrom hemera.jobs.base_job import BaseJobfrom hemera.dataclasses import Transactionfrom your_udf_data_classes import YourCustomDataClassfrom your_udf_models import YourCustomModelclassYourCustomJob(BaseJob):""" A custom job to process blockchain data and output custom data. """def__init__(self):super().__init__() self.dependencies = [Transaction] # Define input data classes self.output_data_classes = [YourCustomDataClass] # Define output data classesdefprocess(self,transactions):""" Your custom processing logic goes here. """ output_data = []for tx in transactions:if self._is_target_transaction(tx): data =YourCustomDataClass( field1=tx.hash, field2=int(tx.value, 16), field3=tx.from_address ) output_data.append(data)return output_datadefget_filter(self):""" Implement any filtering logic needed for transactions. """# Example: return transactions that match a specific criterionreturnNone
Step 4: Integrate Your UDF Job into Hemera Indexer
When you essentially put your UDFs in the above mentioned schema, you can essentially call the UDFs through the CLI. If you want to run the UDF by default when the indexer runs, you can follow below steps:
After defining your job, integrate it into Hemera Indexer so it can run as part of the indexing process. You may need to:
Register the job: Place your job file in the appropriate directory within the Hemera Indexer project (e.g., hemera/jobs/custom/).
Update configuration: Ensure that your job is included in the Hemera Indexer job registry or configuration files so that it is executed during indexing.
# Example job registration (this can vary depending on the indexer setup)job_registry.register(YourCustomJob)
Once updated, run the hemera indexer. For more detailed steps and deployment options, refer to the Testing and Running UDFsection.
Example: ERC721 Minting Job
Here’s an example of implementing a UDF job that processes ERC721 token minting transactions from a blockchain and saves the mint data to a PostgreSQL database.
Step 1: Define Input and Output Data Classes
Input Data Class: The input for this job is a Transaction, which is provided by the Hemera Indexer.
Output Data Class: The output class ERC721TokenMint represents the minting information for each ERC721 token.
We have an example UDF created below. The ERC721 Minting Job UDF is to get the mints for a particular ERC721 contract address. Check the example and implementation below