December 8, 2022 • 6 minute read
Declarative Scheduling for Data Assets
New files arrive on S3, and, within 5 minutes, the core tables that depend on them are re-built.
In time for the daily 9 AM executive meeting, the daily rollup table is recomputed from those core tables.
You push a bugfix to the code that derives one of your core tables. A backfill launches that recomputes it, as well as the downstream tables and machine learning models that depend on it.
Through all of this, you haven’t scheduled any jobs, Airflow DAGs, or Prefect Flows. You’ve just declared how your data flows and when you expect it to be up-to-date.
This is the vision behind declarative, asset-based scheduling, introduced in Dagster 1.1: a principled way of managing change that models each data asset as a function of its predecessors and schedules work based on how up-to-date data needs to be. As source code and upstream data changes, the updates ripple through the asset graph, eagerly or lazily depending on the requirements of the data products they support.
Orchestrators keep data assets up-to-date
Data practitioners use orchestrators to keep data assets, like tables and machine learning models, up-to-date. Scheduling data pipelines effectively requires thinking about how up-to-date the assets need to be and what causes assets to become out-of-date.
Up-to-date has different meanings for different data assets. For some assets, it’s important that they incorporate relatively recent data. For example, someone monitoring load on the power grid might want to see hot-spots within 30 minutes of when they emerge. Other assets, for example a complex analysis that’s reviewed at a weekly meeting, might need to be updated less often.
Assets typically only become out-of-date when the data or code that they depend on changes. If we’ve received no new data and have deployed no new code, there’s usually no reason to spend resources running anything. When new data arrives, the assets that are derived from that data become “stale” until they’re updated to incorporate it. When new code is deployed, the asset that depends on that code is “stale” until it’s re-materialized using that code.
If we’re able to express our intentions to the orchestrator directly in these terms, then it can do a really faithful job at scheduling materializations at the times we need them and avoid unnecessary work when we don’t.
Workflows != data pipelines
But most orchestration tools don’t think in these terms. They instead require translating data pipelines into imperative workflows - sequences of tasks that run together on fixed schedules. On a personal note, I’ve spent many hours building workflows in Airflow that try to do this.
Trying to represent and schedule pipelines of data assets as workflows is often painful, because workflows are an awkward metaphor for data pipelines. A single data source can power dozens of data products, each with different data freshness requirements. A single data product can consume data from dozens of different sources, each of which is updated at a different cadence. Trying to chop up this graph of data assets into discrete workflows that each get executed synchronously often feels like forcing a square peg into a round hole.
You don’t need a Google-sized asset graph to experience this awkwardness. Workflow-based orchestrators have trouble with even basic patterns, like a daily table and an hourly table that both depend on the same upstream table. Or an hourly table that depends on a daily table.
Another friction with imperative workflow-based orchestration is that, every time you add an asset, you have to find a DAG to put it in to get it scheduled. This means you have to worry about whether DAGs are getting too large and unwieldy, on one extreme, or too small and fragmented, on the other. This can intersect with organization frictions - which team’s workflow should a shared task belong to?
Last, imperative, workflow-based orchestrators send alerts when tasks fail, not when data is out of date, which is often what stakeholders actually care about. If the system can retry and self-correct before the deadline, then waking someone up on PagerDuty is a waste.
Declarative, asset-based scheduling with Dagster
The Dagster 1.1 release introduces a system for scheduling data pipelines that allows you to escape writing workflows entirely. Instead, you directly specify how up-to-date you expect each asset to be, as well as how to determine whether source data has changed. Dagster then automatically schedules asset materializations to ensure that data arrives on time, while avoiding unnecessary computation.
As a quick aside, Dagster also supports workflow-based scheduling and will continue to treat it as first-class. If you like using jobs, schedules, and sensors to schedule your data pipelines, these are not going away! Dagster 1.1 also includes a number of improvements to them. But they’re not the focus of this blog post.
These are the core abstractions of Dagster’s declarative, asset-based scheduling system:
Dagster thinks in terms of materializing data assets, rather than just executing tasks. My post introducing software-defined assets gives a full view of why and how this works. A crucial element of Dagster’s software-defined assets approach is that the graph of data assets is different from the execution DAGs that show up in systems like Airflow. Execution DAGs track execution dependencies: do task Y after task X. The data asset graph tracks data dependencies: data asset Y is derived from data asset X.
Execution dependencies are context-dependent: if a daily asset depends on an hourly asset, you often won’t update the daily asset every time you update the hourly asset. Or if you have an ML model that’s expensive to train, you often won’t retrain it every time the data it’s based on changes - although you might update them in order when backfilling.
Data dependencies are vital no matter how you schedule or manually update your assets. If a daily asset depends on an hourly asset, you’ll read from the hourly asset every time you update the daily asset.
Asset freshness policies
Freshness policies are how you tell Dagster how up-to-date your data assets need to be and thus how often they need to be re-materialized. Freshness policies work similarly to the “latency requirements” that Benn Stancil describes in Down with the DAG. A freshness policy might state that “at any point in time, this asset should incorporate all data that arrived through 30 minutes before that time.”
You can set up Dagster to automatically schedule materializations of assets so they meet their freshness policies.
Here are a couple assets with different freshness policies that depend on the same upstream asset:
from dagster import FreshnessPolicy, asset @asset def fact_table(): ... @asset(freshness_policy=FreshnessPolicy(maximum_lag_minutes=60)) def aggregation_table_1(fact_table): ... @asset(freshness_policy=FreshnessPolicy(maximum_lag_minutes=60 * 24)) def aggregation_table_2(fact_table): ...
And here’s how the freshness policy is represented in Dagster’s web UI:
fact_table is a data warehouse table that’s built from a table in an app database. The app database table is receiving updates continuously. To meet the
aggregation_table_1, in every 60-minute window there must be a materialization of
fact_table followed by a materialization of
aggregation_table_1. This ensures that all updates to the app database will be incorporated into
aggregation_table_1 within 60 minutes.
Versions are how you help Dagster track what assets are “stale” and avoid performing redundant computation. Assets are derived from upstream assets, by running code. If the versions of the upstream data or code that an asset depends on have changed since the asset was last materialized, then the asset is considered stale.
After making a change to the code that derives an asset, you can increment its
code_version to let Dagster know that it has changed.
from dagster import asset @asset(code_version="2") def nabisco_cereals(cereals): return cereals[cereals["mfr"] == "A"] @asset(code_version="1") def cereal_protein_fractions(cereals): ... @asset(code_version="1") def highest_protein_nabisco_cereal(nabisco_cereals, cereal_protein_fractions): ...
nabisco_cereals was materialized when its
code_version was “1”, but then we bump the code version to “2”,
nabisco_cereals is considered stale and will need to be re-materialized to be considered fresh. After bumping the
nabisco_cereals, downstream assets like
highest_protein_nabisco_cereal that were materialized using the earlier version of
nabisco_cereals are also considered stale.
Versions can also be applied to source assets. Source assets are assets that Dagster knows about, but did not compute itself. For example, you could use a source asset to model a file that another company posts to an SFTP server every day.
If you have a source asset that changes at discrete times, such as a file that gets overwritten every so often, you can supply a function that observes it and reports its version, to use in the determination of what assets are stale.
Here’s an asset that determines the version of a file based on the last time it was modified:
import os from dagster import LogicalVersion, observable_source_asset @observable_source_asset def cereals(_): return LogicalVersion(str(os.path.getmtime("raw_cereals.csv")))
If the modification time changes and the source asset is observed again, then all assets downstream of it will be marked as stale.
It’s easy to launch materializations that target only the stale assets:
You can declare that a single asset is composed of multiple partitions, e.g. one for each day. Dagster can then materialize individual partitions that are missing or stale, instead of re-materializing the entire asset. A single asset partition can depend on multiple partitions of an upstream asset, and vice versa. For example, a daily partition of one asset might depend on 24 hourly partitions of an upstream asset.
Asset reconciliation states: stale vs. late
A Dagster asset is considered stale if Dagster knows that its code or upstream data has changed, but the asset hasn’t been materialized since then to incorporate those changes. Any asset that depends on a stale asset is also stale.
Freshness policies dictate whether it’s OK for an asset to be stale at the current time. If a Dagster asset is more stale than its freshness policy allows, then it’s considered late. Otherwise, it’s considered on-time.
A Dagster asset can be both stale and on-time if – say – its upstream data changed five minutes ago and it hasn’t been updated since then, but its freshness policy allows it to wait 30 minutes before incorporating changed upstream data.
That was the 10k-foot view of the why and what of declarative asset scheduling with Dagster. To learn more, check out the:
- The Dagster 1.1 release notes.
- Docs on freshness policies, automatic asset reconciliation, code versions, and source asset versions.
- Ask us questions in Slack
Special thanks to Benn Stancil and Abhi Sivasailam for their thoughtful feedback.
We're always happy to hear your feedback, so please reach out to us! If you have any questions, ask them in the Dagster community Slack (join here!) or start a Github discussion. If you run into any bugs, let us know with a Github issue. And if you're interested in working with us, check out our open roles!