Prefect Integration with DataHub
Overview
DataHub supports integration with Prefect, allowing you to ingest:
- Prefect flow and task metadata
- Flow run and Task run information
- Lineage information (when available)
This integration enables you to track and monitor your Prefect workflows within DataHub, providing a comprehensive view of your data pipeline activities.
Prefect DataHub Block
What is a Prefect DataHub Block?
Blocks in Prefect are primitives that enable the storage of configuration and provide an interface for interacting with external systems. The prefect-datahub block uses the DataHub REST emitter to send metadata events while running Prefect flows.
Prerequisites
- Use either Prefect Cloud (recommended) or a self-hosted Prefect server. 
- For Prefect Cloud setup, refer to the Cloud Quickstart guide. 
- For self-hosted Prefect server setup, refer to the Host Prefect Server guide. 
- Ensure the Prefect API URL is set correctly. Verify using: - prefect profile inspect
- API URL format: - Prefect Cloud: https://api.prefect.cloud/api/accounts/<account_id>/workspaces/<workspace_id>
- Self-hosted: http://<host>:<port>/api
 
- Prefect Cloud: 
Setup Instructions
1. Installation
Install prefect-datahub using pip:
pip install 'prefect-datahub'
Note: Requires Python 3.7+
2. Saving Configurations to a Block
Save your configuration to the Prefect block document store:
from prefect_datahub.datahub_emitter import DatahubEmitter
DatahubEmitter(
    datahub_rest_url="http://localhost:8080",
    env="PROD",
    platform_instance="local_prefect"
).save("MY-DATAHUB-BLOCK")
Configuration options:
| Config | Type | Default | Description | 
|---|---|---|---|
| datahub_rest_url | str | http://localhost:8080 | DataHub GMS REST URL | 
| env | str | PROD | Environment for assets (see FabricType) | 
| platform_instance | str | None | Platform instance for assets (see Platform Instances) | 
3. Using the Block in Prefect Workflows
Load and use the saved block in your Prefect workflows:
from prefect import flow, task
from prefect_datahub.dataset import Dataset
from prefect_datahub.datahub_emitter import DatahubEmitter
datahub_emitter = DatahubEmitter.load("MY-DATAHUB-BLOCK")
@task(name="Transform", description="Transform the data")
def transform(data):
    data = data.split(" ")
    datahub_emitter.add_task(
        inputs=[Dataset("snowflake", "mydb.schema.tableA")],
        outputs=[Dataset("snowflake", "mydb.schema.tableC")],
    )
    return data
@flow(name="ETL flow", description="Extract transform load flow")
def etl():
    data = transform("This is data")
    datahub_emitter.emit_flow()
Note: To emit tasks, you must call emit_flow(). Otherwise, no metadata will be emitted.
Concept Mapping
| Prefect Concept | DataHub Concept | 
|---|---|
| Flow | DataFlow | 
| Flow Run | DataProcessInstance | 
| Task | DataJob | 
| Task Run | DataProcessInstance | 
| Task Tag | Tag | 
Validation and Troubleshooting
Validating the Setup
- Check the Prefect UI's Blocks menu for the DataHub emitter. 
- Run a Prefect workflow and look for DataHub-related log messages: - Emitting flow to datahub...
 Emitting tasks to datahub...
Debugging Common Issues
Incorrect Prefect API URL
If the Prefect API URL is incorrect, set it manually:
prefect config set PREFECT_API_URL='http://127.0.0.1:4200/api'
DataHub Connection Error
If you encounter a ConnectionError: HTTPConnectionPool(host='localhost', port=8080), ensure that your DataHub GMS service is running.
Additional Resources
For more information or support, please refer to the official Prefect and DataHub documentation or reach out to their respective communities.