New in 0.14.0: Dagster-Airbyte Integration
The first step in many data pipelines is moving data from its original system of record into a location more suited for data transformation and analysis. Airbyte is an open-source, free to use tool that handles this data ingestion task.
Dagster is designed to give detailed views into every step of your data processing pipelines, and this integration is no different. When you orchestrate Airbyte using Dagster, you get deep visibility into the current and historical details of each sync, including detailed log output and a record of each table that Airbyte has processed, complete with detailed metadata.
Alongside the airbyte_resource
, which supplies a client that is able to communicate with the Airbyte API, this library provides a pre-built op and a utility to generate software-defined assets for a given Airbyte connection.
Special thanks to Marcos Marx from the Airbyte core team for supplying the initial version of this integration!
If you're a Fivetran user, we have a similar dagster-fivetran library as well!
Overview
When you run a sync using this library, you'll get a live view containing not just the current status of the sync, but also all of the raw logs from Airbyte, meaning there's no need to jump through different tools to get all of the relevant information about a run:
As the sync completes, Dagster will automatically parse the metadata that Airbyte provides, recording an event for each table that was updated during the sync. Each event will contain metadata that can be tracked and visualized over time, giving you a longitudinal view of how each table has changed over time:
Having this easily-accessible historical record is incredibly valuable, and makes it easy to determine when a schema changed, or if a certain sync had a strange spike in the number of records updated.
Resource
Regardless of if you're using ops or software-defined assets, you can easily configure the Airbyte resource to point at wherever you're hosting your Airbyte instance:
from dagster_airbyte import airbyte_resource
my_airbyte_resource = airbyte_resource.configured(
{
"host": "localhost",
"port": "8000"
}
)
Once you do this, you can supply this resource to either a job (if you're using the pre-built op), or an AssetGroup (if you're using software-defined assets).
Ops
For and end-to-end example of using this integration alongside dbt and custom Python, check out the Airbyte recipe or the Airbyte Community Call.
The airbyte_sync_op
allows you to include an Airbyte sync as an operation in your job. Using it is as simple as configuring the op with the id of the connection you'd like to sync:
from dagster_airbyte import airbyte_sync_op
sync_my_connection = airbyte_sync_op.configured(
{"connection_id": "some-connection-id"},
name="sync_my_connection"
)
From there, you can include this op in a job as usual.
@job(resource_defs={"airbyte":my_airbyte_resource})
def do_stuff():
do_something(sync_my_connection())
This will render in Dagit just the same as any other job:
Software Defined Assets
For and end-to-end example of using this integration alongside dbt and custom Python, check out the Modern Data Stack Assets example or the corresponding video.
When you set up Airbyte to copy tables from system A to system B, you are in essence defining some data assets (the tables in the destination database) that you expect to exist. In 0.14.0, we formalized the concept of Software-Defined Assets, which allow you to explicitly declare the assets that you want to produce in code, unlocking significant observability and ergonomic improvements.
This library provides a utility to generate Software-Defined Assets from an Airbyte connection:
from dagster_airbyte import build_airbyte_assets
airbyte_assets = build_airbyte_assets(
connection_id="some_connection_id",
destination_tables=["users","orders"]
)
When you supply this function a connection id and a list of the tables that you expect it to produce, it will return a set of assets that represent those tables in code. While refreshing these assets will still just require a single operation (running the Airbyte sync), representing them as assets allows you to visualize them separately, and ergonomically encode other assets that depend on some or all of the tables produced therein:
from dagster import AssetGroup, asset
@asset
def other_asset(users, orders):
# ... update the other_asset table
my_group = AssetGroup(
assets=airbyte_assets + [other_asset],
resource_defs={"airbyte": my_airbyte_resource}
)
Viewing this in Dagit, we'll get the following graph of assets:
This graph is automatically generated by Dagster, and gives a more detailed look into the purpose of this sync than a traditional operational view would provide. After all, Dagster will only launch a single operation to refresh both of these Airbyte assets. This asset-focused view represents a layer on top of the steps that will need to be run, providing much deeper visibility into what Airbyte will do.
To learn more about software-defined assets, check out the docs!
Wrapping Up
This integration reflects a focus on positioning the orchestrator as the centralized place to discover and manage the status of an entire data platform. Dagster's role is not simply to kick off arbitrary computation, it also understands (and helps you understand) the purpose of that computation.
We're always happy to hear your feedback, so please reach out to us! If you have any questions, ask them in the Dagster community Slack (join here!) or start a Github discussion. If you run into any bugs, let us know with a Github issue. And if you're interested in working with us, check out our open roles!
Follow us:
Bridging Business Intelligence and Data Orchestration with Dagster + Sigma
- Name
- Brandon Phillips
- Handle
Running Singer on Dagster
- Name
- Fraser Marlow
- Handle
- @frasermarlow
Orchestrate Unstructured Data Pipelines with Dagster and dlt
- Name
- Zaeem Athar
- Handle
- @zaeem