December 8, 20226 minute read

Declarative Scheduling for Data Assets

Sandy Ryza
Name
Sandy Ryza
Handle
@s_ryz

New files arrive on S3, and, within 5 minutes, the core tables that depend on them are re-built.

In time for the daily 9 AM executive meeting, the daily rollup table is recomputed from those core tables.

You push a bugfix to the code that derives one of your core tables. A backfill launches that recomputes it, as well as the downstream tables and machine learning models that depend on it.

Through all of this, you haven’t scheduled any jobs, Airflow DAGs, or Prefect Flows. You’ve just declared how your data flows and when you expect it to be up-to-date.

This is the vision behind declarative, asset-based scheduling, introduced in Dagster 1.1: a principled way of managing change that models each data asset as a function of its predecessors and schedules work based on how up-to-date data needs to be. As source code and upstream data changes, the updates ripple through the asset graph, eagerly or lazily depending on the requirements of the data products they support.

Sandy Ryza provided an overview of Declarative Scheduling during Dagster's Community Day in Dec 2022.

Orchestrators keep data assets up-to-date

Data practitioners use orchestrators to keep data assets, like tables and machine learning models, up-to-date. Scheduling a data pipeline effectively requires thinking about how up-to-date your assets need to be and what causes those assets to become out-of-date.

Up-to-date has different meanings for different data assets. For some assets, it’s important that they incorporate relatively recent data. For example, someone monitoring load on the power grid might want to see hot-spots within 30 minutes of when they emerge. Other assets, for example a complex analysis that’s reviewed at a weekly meeting, might need to be updated less often.

Assets typically only become out-of-date when the data or code that they depend on changes. If we’ve received no new data and have deployed no new code, there’s usually no reason to spend resources running anything. When new data arrives in an upstream asset, downstream assets eventually need to be updated to incorporate that data. When new code is deployed, the asset built using that code eventually needs to be updated using that code.

If we’re able to express our intentions to the orchestrator directly in these terms, then it can reliably schedule updates at the times we need them, and avoid unnecessary work when we don’t.

Workflows != data pipelines

But most orchestration tools don’t think in these terms. They instead require translating data pipelines into imperative workflows - sequences of tasks that run together on fixed schedules. On a personal note, I’ve spent many hours building workflows in Airflow that try to do this.

Trying to represent and schedule pipelines of assets as workflows is often painful, because workflows are an awkward metaphor for data pipelines. A single data source can power dozens of data products, each with different data freshness requirements. A single data product can consume data from dozens of different sources, each of which is updated at a different cadence. Trying to chop up this graph of data assets into discrete workflows that each get executed synchronously often feels like forcing a square peg into a round hole.

You don’t need a Google-sized asset graph to experience this awkwardness. Workflow-based orchestrators have trouble with even basic patterns, like a daily table and an hourly table that both depend on the same upstream table. Or an hourly table that depends on a daily table.

Different ways to break up “a daily table and an hourly table that depend on the same upstream table” into workflows. Each is awkward in its own way.

Another friction with imperative workflow-based orchestration is that, every time you add an asset, you have to find a DAG to put it in to get it scheduled. This means you have to worry about whether DAGs are getting too large and unwieldy, on one extreme, or too small and fragmented, on the other. This can intersect with organization frictions - which team’s workflow should a shared task belong to?

Last, imperative, workflow-based orchestrators send alerts when tasks fail, not when data is out of date, which is often what stakeholders actually care about. If the system can retry and self-correct before the deadline, then waking someone up on PagerDuty is a waste.

Declarative, asset-based scheduling with Dagster

The Dagster 1.1 release introduces a system for scheduling data pipelines that allows you to escape writing workflows entirely. Instead, you directly specify how up-to-date you expect each asset to be, as well as how to determine whether source data has changed. Dagster then automatically schedules asset materializations to ensure that data arrives on time, while avoiding unnecessary computation.

As a quick aside, Dagster also supports workflow-based scheduling and will continue to treat it as first-class. If you like using jobs, schedules, and sensors to schedule your data pipelines, these are not going away! Dagster 1.1 also includes a number of improvements to them. But they’re not the focus of this blog post.

These are the core abstractions of Dagster’s declarative, asset-based scheduling system:

Data assets

Dagster thinks in terms of materializing data assets, rather than just executing tasks. My post introducing software-defined assets gives a full view of why and how this works. A crucial element of Dagster’s software-defined assets approach is that the graph of data assets is different from the execution DAGs that show up in systems like Airflow. Execution DAGs track execution dependencies: do task Y after task X. The data asset graph tracks data dependencies: data asset Y is derived from data asset X.

Execution dependencies are context-dependent: if a daily downstream asset depends on an hourly asset, you often won’t update the daily asset every time you update the hourly asset. Or if you have an ML model that’s expensive to train, you often won’t retrain it every time the data it’s based on changes - although you might update them in order when backfilling.

Data dependencies are vital no matter how you schedule or manually update your assets. If a daily asset depends on an hourly asset, you’ll read from the latest data from the hourly asset every time you update the daily asset.

Asset freshness policies

Freshness policies are how you tell Dagster how up-to-date your data assets need to be and thus how often they need to be re-materialized. Freshness policies work similarly to the “latency requirements” that Benn Stancil describes in Down with the DAG. A freshness policy might state that “at any point in time, this asset should incorporate all data that arrived through 30 minutes before that time.”

You can set up Dagster to automatically schedule materializations of assets so they meet their freshness policies.

Here are a couple assets with different freshness policies that depend on the same upstream asset:

from dagster import FreshnessPolicy, asset

@asset
def fact_table():
    ...

@asset(freshness_policy=FreshnessPolicy(maximum_lag_minutes=60))
def aggregation_table_1(fact_table):
    ...

@asset(freshness_policy=FreshnessPolicy(maximum_lag_minutes=60 * 24))
def aggregation_table_2(fact_table):
    ...

And here’s how the freshness policy is represented in Dagster’s web UI:

fact_table is a data warehouse table that’s built from a table in an app database. The app database table is receiving updates continuously. To meet the FreshnessPolicy on aggregation_table_1, in every 60-minute window there must be a materialization of fact_table followed by a materialization of aggregation_table_1. This ensures that all updates to the app database will be incorporated into aggregation_table_1 within 60 minutes.

Granular versioning

Versions are how you help Dagster avoid performing redundant computations. Assets are derived from upstream assets by running code. If the code and upstream data haven't changed, there's usually no reason to spend cycles updating an asset. Dagster can communicate when the versions of the upstream data or code that an asset depends on have changed since the given asset was last materialized.

After making a change to the code that derives an asset, you can increment its code_version to let Dagster know that it has changed.

from dagster import asset

@asset(code_version="1")
def customers():
    ...

@asset(code_version="1")
def orders():
    ...

@asset(code_version="1")
def order_stats(customers, orders):
    ...

If customers was materialized when its code_version was “1”, but then we bump its code version to “2”, Dagster will communicate that its code version has changed. You can set up Dagster to automatically re-materialize an asset when its code version or the code version of upstreams assets have changed.

Versions can also be applied to source assets. Source assets are assets that Dagster knows about, but did not compute itself. For example, you could use a source asset to model a file that another company posts to an SFTP server every day.

If you have a source asset that changes at discrete times, such as a file that gets overwritten every so often, you can supply a function that observes it and reports its version. You can then set up downstream assets to auto-materialize whenever the version changes.

Here’s an asset that represents a CSV file, and whose version is defined as the last time that file was modified:

import os
from dagster import DataVersion, observable_source_asset

@observable_source_asset
def raw_customers(_):
    return DataVersion(str(os.path.getmtime("raw_customers.csv")))

If the modification time changes and the source asset is observed again, then all assets downstream of it will get a marker that indicates new "Upstream data".

Partitions

You can declare that a single asset is composed of multiple partitions, e.g. one for each day. Dagster can then materialize individual partitions that are missing or have new upstream data, instead of re-materializing the entire asset. A single asset partition can depend on multiple partitions of an upstream asset, and vice versa. For example, a daily partition of one asset might depend on 24 hourly partitions of an upstream asset.

Learning more

That was the 10k-foot view of the why and what of declarative asset scheduling with Dagster. To learn more, check out the:

Special thanks to Benn Stancil and Abhi Sivasailam for their thoughtful feedback.


The Dagster Labs logo

We're always happy to hear your feedback, so please reach out to us! If you have any questions, ask them in the Dagster community Slack (join here!) or start a Github discussion. If you run into any bugs, let us know with a Github issue. And if you're interested in working with us, check out our open roles!

Follow us:


Read more filed under
Blog post category for Feature Deepdive. Feature Deepdive