February 5, 202414 minute read

Thinking in Assets When Building Data Pipelines

How to develop data pipelines using Software-defined Assets.
Tim Castillo
Name
Tim Castillo
Handle
@tim
Sandy Ryza
Name
Sandy Ryza
Handle
@s_ryz

So you want to build a data pipeline.

Maybe it’s to:

  • Take data from your application database and use it to populate your data warehouse with metrics for reporting
  • Train a machine learning model that powers your product
  • Take daily data exports that arrive in S3 and extract images

The Dagster Data Asset Approach

Dagster is a system for building and maintaining data pipelines. Many orchestration tools focus on executing tasks, but Dagster’s unique focus on data pipelines encourages you to view your work as the creation of data assets: the files, tables, or ML models that the pipeline creates and uses.

Building pipelines in this way has some big advantages:

  • You get out-of-the-box data lineage, making data dependencies clear both within an individual pipeline and across your entire data platform.
  • The relationship between tasks and data assets are clear. This means that when you encounter task failures, you can reliably understand what tables they affect. And when you find a table in your data warehouse, you can reliably understand what code generates it.
  • Makes code more maintainable by avoiding unwieldy DAG objects. Each Dagster asset specifies the assets that it depends on, so you don’t need a separate object to tie them together. This, in turn, helps with locality of behavior.

Here, we’ll explain how to “think in assets” by walking through the process of developing a data pipeline in Dagster.

Step 1: Design Your Pipeline

Imagine you get into a heated argument with your friends about who the most important character is in your favorite TV show.

With your data engineering knowledge, you decide to build a data pipeline by fetching data about the show and analyzing it. You believe that making a report of how many times each character is mentioned in the plot for episodes will put the discussion to rest.

We’ll use the classic 90s sitcom Friends as an example, but this guide can easily be applied to any of your favorite TV shows.

When designing a data pipeline, the first step is to ask two questions:

  • What data assets does the pipeline need to produce? If you're creating metrics for a stakeholder, it might mean a table they can query or a report they can view. If you're training a machine learning model to use in your company’s product, it might be the machine learning model itself.
  • What source data does the pipeline need to start with? For example, you might need data from your application database, Shopify, Salesforce, or other APIs.

Let’s dive into each of these questions for the data pipeline that we’ll build.

What data assets does the pipeline need to produce?

In this case, the end goal of your data pipeline is to produce a single data asset: a report of how often each character is mentioned in the summaries of a show’s episodes.

This report will be saved into storage on your computer’s file system. Let’s call this data asset appearances_report.

What source data does the pipeline need to start with?

Let’s list out the source data needed to produce this report. You’ll need the following data sets:

  • A list of the main characters for the show
  • Information about which characters were included in each episode of the show

Luckily, TV Maze is an online platform and database for television shows. It offers extensive information about various TV shows, including airing schedules, complete episode lists, and plot summaries. TV Maze provides API endpoints to fetch the data sets we need. To access this data, we’ll need another piece of data: the show's ID. Therefore, we’ll also search the API to get the ID for our show.

There are three data sets you’ll need from this API:

As noted, you’ll need to know the TV show's ID before getting the list of characters and episodes. This means that the episodes and characters assets depend on the tv_show_info asset.

Asset names should be based on the noun or object produced, rather than the step computed for them. For example:

  • Good: episodes
  • Bad: get_episodes

If you draw out your data “pipeline” at this point, it looks something like this:

the incomplete pipeline, drawn in Eraser.io

At this point, you know what you’re starting from, where you want to be. The next step is to figure out what assets are in the middle.

Assets in the Middle

In the above sketch, we broke the data pipeline into at least five steps/assets. You could write your pipeline as one big step that fetches all the data from the data sources, extracts it, transforms it, and immediately produces appearances_report.

Instead, we generally recommend making one asset per step.

There are a few reasons to break this up into multiple steps:

  • Recovering After Errors: If a run fails after the data is fetched but before the report is made, then you can make your fixes and re-run your pipeline from the point of failure, rather than running the whole pipeline again. This is possible because you’ve already fetched the data and made the assets for it.
  • Reusability: If you also want to do an analysis on the average episode rating per season, then reusability is possible for similar reasons as error recovery. Because your episodes asset exists as a separate asset, extending your pipeline with a new average_ratings asset is as easy as saying that average_ratings depends on the episodes asset.
  • Transparency: - Having an asset per step means you know exactly what computation created an asset. You can see the relationships between steps/assets and understand how the logic flows.

Knowing this, how can we apply this to our data pipeline?

To produce the appearances_report asset, you’ll first need to produce the data that will populate the report. In this case, the data is a table that tracks the number of episodes that each character appears in. Here’s an example of what we want it to look like:

{
  "Chandler": 159,
  "Joey": 151,
  "Monica": 169,
  "Phoebe": 152,
  "Rachel": 194,
  "Ross": 187
}

This data set will be stored in an asset that we’ll call character_appearances. To compute character_appearances, you’ll need to go through all of the plot summaries (in episodes) and count all the times each character (from characters) is mentioned in each summary. Therefore, this new character_appearances asset depends on the episodes and characters assets.

This is how designing pipelines with assets works. You start with the end goal and work your way back, identifying what data assets are needed, breaking them down into more atomic assets, and repeating the process until you reach your data sources.

The finished pipeline, drawn in Eraser.io

Let’s look at all the nouns

  • tv_show_info
  • episodes
  • characters
  • character_appearances
  • appearances_report

Step 2: Implement Your Assets

To produce the data assets in your pipeline, you need to execute code. In Dagster, we call this action of executing the code to generate the asset materializing the asset.

As a framework, Dagster is flexible and un-opinionated about how your code produces an asset. For example, a valid asset materialization implementation might look like:

  • Run a SQL query that creates or overwrites a table
  • Execute Pandas/Polars code and write a DataFrame to a Parquet file
  • Communicate with an LLM and store the output in AWS S3
  • Fetch data from a service’s API and save it in your data warehouse

What’s important to note is that each of these operations creates (i.e. materializes) data.

This flexibility also means that Dagster does not have a strict definition of what happens during a materialization. The following actions are all valid materializations:

  • Replacing a file by deleting the old one
  • Inserting new rows into an existing table
  • Or a combination of the two when working with partitions, such as an individual week or a country

The most common way in Dagster to define how an asset is materialized is with the @asset decorator. Adding this decorator to a function communicates to Dagster that the function produces an asset. By default, the name of the function is used as the name of the asset.

After installing Dagster, you'll implement your data pipeline by starting at the source data and working your way over to the last asset on the right of the flow chart. Copy and paste the code below into a Python file (call it assets.py) to write your first asset:

from dagster import asset

import requests
import json

TV_SHOW_FOR_ANALYSIS = "friends"
TV_SHOW_FILE = "show.json"

@asset
def tv_show_info():
    url_for_search = f"https://api.tvmaze.com/search/shows?q={TV_SHOW_FOR_ANALYSIS}"
    search_results = requests.get(url_for_search).json()
    top_result = search_results[0]

    with open(TV_SHOW_FILE, "w") as f:
        f.write(json.dumps(top_result))

Hopefully, this feels familiar to writing normal Python code because it is normal Python code. Dagster simplifies data orchestration by enhancing your existing code and systems with an understanding that your code produces data assets.

After implementing the first asset in the pipeline you designed, you can write out the rest of them pretty easily. For more detail on what's happening here, check out the Dagster tutorial:

from dagster import asset
import requests
import json

TV_SHOW_FOR_ANALYSIS = "friends"

TV_SHOW_FILE = "show.json"
EPISODES_FILE = "episodes.json"
CHARACTERS_FILE = "characters.json"
CHARACTER_APPEARANCES_FILE = "character_appearances.json"
APPEARANCES_REPORT_FILE = "appearances.png"

@asset
def tv_show_info():
    url_for_search = f"https://api.tvmaze.com/search/shows?q={TV_SHOW_FOR_ANALYSIS}"
    search_results = requests.get(url_for_search).json()
    top_result = search_results[0]

    with open(TV_SHOW_FILE, "w") as f:
        f.write(json.dumps(top_result))

@asset(deps=[tv_show_info])
def episodes():
    with open(TV_SHOW_FILE, "r") as f:
        show = json.loads(f.read())

    episodes_response = requests.get(f"https://api.tvmaze.com/shows/{show['show']['id']}/episodes").json()

    with open(EPISODES_FILE, "w") as f:
        f.write(json.dumps(episodes_response))

    # count the number of seasons
    seasons = set()
    for episode in episodes_response:
        seasons.add(episode["season"])

@asset(deps=[tv_show_info])
def characters():
    with open(TV_SHOW_FILE, "r") as f:
        show = json.loads(f.read())

    cast_response = requests.get(f"https://api.tvmaze.com/shows/{show['show']['id']}/cast").json()

    characters = []
    for cast_member in cast_response:
        characters.append(cast_member["character"]["name"].split(" ")[0])

    with open(CHARACTERS_FILE, "w") as f:
        f.write(json.dumps(characters))

@asset(deps=[episodes, characters])
def character_appearances():
    with open(EPISODES_FILE, "r") as f:
        episodes_response = json.loads(f.read())

    with open(CHARACTERS_FILE, "r") as f:
        characters = json.loads(f.read())
        character_appearances = {character: 0 for character in characters}

    for episode in episodes_response:
        summary = episode["summary"]
        for character in character_appearances.keys():
            character_appearances[character] += summary.count(character)

    with open(CHARACTER_APPEARANCES_FILE, "w") as f:
        f.write(json.dumps(character_appearances))

@asset(deps=[character_appearances])
def appearances_report():
    with open(CHARACTER_APPEARANCES_FILE, "r") as f:
        character_appearances = json.loads(f.read())
        character_appearances = {character: count for character, count in character_appearances.items() if count > 0}

    report = requests.get(f"https://quickchart.io/chart?c={{type:'bar',data:{{labels:{list(character_appearances.keys())},datasets:[{{label:'Appearances',data:{list(character_appearances.values())}}}]}}}}")

    with open(APPEARANCES_REPORT_FILE, "wb") as f:
        f.write(report.content)

To run your data pipeline, start Dagster by running the following command:

dagster dev -f assets.py

If you have Dagster installed, it will start up the Dagster on your computer and give you access to its UI on http://127.0.0.1:3000/asset-groups. Here, you should see your asset graph and a Materialize all button towards the top-right corner of the screen. Click on it to run your data pipeline!

a screenshot of Dagster's materialize all button

After running the whole pipeline, you should have new files (assets!) created in the same directory as your assets.py file. Open up your appearances.png file to see your appearances_report asset.

Step 3: Run it Automatically, But Only When Needed

Now that you have a pipeline that generates a set of assets, you’ll want to keep those assets up to date. Running data pipelines has historically been thought of as scheduling your set of operations on a fixed interval—say, every day or every 4 hours. However, this is often poor proxy for what you really care about: that the data is up-to-date.

Having a pipeline run daily usually means “this data needs to be up-to-date every day”. But running data pipelines can be costly. When orchestrating with an asset-based perspective, data should only be materialized when rerunning our compute will generate new results, such as when the upstream data or code changes.

In Dagster, these can be configured with auto-materialization policies—rules that define when an asset should be materialized. Some rules include:

  • Materialize this asset when the assets it depends on are all updated
  • Materialize this asset at 8AM every day
  • Materialize this if the last time it was materialized was more than an hour ago

These rules can be combined or negated to meet the expectations for when each asset needs to be updated. This means data assets are updated precisely as needed rather than the imprecise art of running them on only a schedule.

Let’s look at the updated version of our data pipeline, and note the auto_materialize_policy attached onto each asset:

import json

import requests
from dagster import AutoMaterializePolicy, AutoMaterializeRule, asset

TV_SHOW_FOR_ANALYSIS = "friends"

TV_SHOW_FILE = "show.json"
EPISODES_FILE = "episodes.json"
CHARACTERS_FILE = "characters.json"
CHARACTER_APPEARANCES_FILE = "character_appearances.json"
APPEARANCES_REPORT_FILE = "appearances.png"

@asset(
    auto_materialize_policy=AutoMaterializePolicy.eager.with_rules(
        AutoMaterializeRule.materialize_on_cron(cron_schedule="0 0 * * *")
    ),
)
def tv_show_info():
    url_for_search = f"https://api.tvmaze.com/search/shows?q={TV_SHOW_FOR_ANALYSIS}"
    search_results = requests.get(url_for_search).json()
    top_result = search_results[0]

    with open(TV_SHOW_FILE, "w") as f:
        f.write(json.dumps(top_result))

@asset(deps=[tv_show_info], auto_materialize_policy=AutoMaterializePolicy.eager())
def episodes():
    with open(TV_SHOW_FILE, "r") as f:
        show = json.loads(f.read())

    episodes_response = requests.get(
        f"https://api.tvmaze.com/shows/{show['show']['id']}/episodes"
    ).json()

    with open(EPISODES_FILE, "w") as f:
        f.write(json.dumps(episodes_response))

    # count the number of seasons
    seasons = set()
    for episode in episodes_response:
        seasons.add(episode["season"])

@asset(deps=[tv_show_info], auto_materialize_policy=AutoMaterializePolicy.eager())
def characters():
    with open(TV_SHOW_FILE, "r") as f:
        show = json.loads(f.read())

    cast_response = requests.get(f"https://api.tvmaze.com/shows/{show['show']['id']}/cast").json()

    characters = []
    for cast_member in cast_response:
        characters.append(cast_member["character"]["name"].split(" ")[0])

    with open(CHARACTERS_FILE, "w") as f:
        f.write(json.dumps(characters))

@asset(deps=[episodes, characters], auto_materialize_policy=AutoMaterializePolicy.eager())
def character_appearances():
    with open(EPISODES_FILE, "r") as f:
        episodes_response = json.loads(f.read())

    with open(CHARACTERS_FILE, "r") as f:
        characters = json.loads(f.read())
        character_appearances = {character: 0 for character in characters}

    for episode in episodes_response:
        summary = episode["summary"]
        for character in character_appearances.keys():
            character_appearances[character] += summary.count(character)

    with open(CHARACTER_APPEARANCES_FILE, "w") as f:
        f.write(json.dumps(character_appearances))

@asset(deps=[character_appearances], auto_materialize_policy=AutoMaterializePolicy.eager())
def appearances_report():
    with open(CHARACTER_APPEARANCES_FILE, "r") as f:
        character_appearances = json.loads(f.read())
        character_appearances = {
            character: count for character, count in character_appearances.items() if count > 0
        }

    report = requests.get(
        f"https://quickchart.io/chart?c={{type:'bar',data:{{labels:{list(character_appearances.keys())},datasets:[{{label:'Appearances',data:{list(character_appearances.values())}}}]}}}}"
    )

    with open(APPEARANCES_REPORT_FILE, "wb") as f:
        f.write(report.content)

Step 4 (Bonus): Add Documentation, Metadata, and Data Quality Checks

At this point, you have a working data pipeline that can be executed both manually and automatically, to update your TV appearances report.

For a quick-and-dirty pipeline that gets you the data that you need, this is a reasonable stopping point. But if you're going to deploy this pipeline inside an organization, or run it over time, it's important to document it. It's also important to instrument it, so that you can understand what it's doing and catch problems.

Dagster lets you attach documentation to your assets, to record metadata for each materialization, and to define checks for data quality.

Dagster leverages Python’s native doc string functionality to turn the doc strings of all Dagster-decorated functions into native documentation. This documentation can be anything you feel is valuable for your end users, such as:

  • A high-level summary of how the asset is produced
  • Model type, infrastructure, and hyperparameters of a trained machine learning model
  • Details of where the asset is stored
  • Considerations, assumptions, or limitations
  • Links to further documentation/architecture diagrams

Here’s an example:

@asset(deps=[tv_show_info])
def characters() -> MaterializeResult:
    """A JSON file containing the names of all the characters in the show. This is computed by
    querying the TV Maze API for the cast of the show."""

When materializing assets, adding metadata about the materialization helps users understand their state. You can embed helpful metadata on each materialization. Common metadata involves:

  • Subsets of your data
  • Performance metrics of your machine learning model
  • Embedded charts

Here is an example of metadata you might add to your Friends pipeline:

@asset(
    auto_materialize_policy=AutoMaterializePolicy.eager().with_rules(
        AutoMaterializeRule.materialize_on_cron(cron_schedule="0 0 * * *")
    )
)
def tv_show_info() -> MaterializeResult:
    url_for_search = f"https://api.tvmaze.com/search/shows?q={TV_SHOW_FOR_ANALYSIS}"
    search_results = requests.get(url_for_search).json()
    top_result = search_results[0]

    with open(TV_SHOW_FILE, "w") as f:
        f.write(json.dumps(top_result))

    return MaterializeResult(
        metadata={
            "request_url": MetadataValue.url(url_for_search),
            "top_result": MetadataValue.json(top_result),
            "top_result_search_score": MetadataValue.float(top_result["score"]),
            "total_results": MetadataValue.int(len(search_results)),
        }
    )

Finally, data quality is an essential part of the data engineering process. Assets should be tested for their data quality, and these tests should be present in the data pipelines. Asset checks let you verify data quality for each asset in your pipeline. Common data quality issues to test for are:

  • The detection of anomalies
  • Null values in a column
  • Statistical drift
@asset(
    deps=[episodes, characters],
    check_specs=[AssetCheckSpec(name="at_least_one_character_mentioned", asset="character_appearances")],
)
def character_appearances() -> MaterializeResult:
    # existing computation
    ...

    return MaterializeResult(
        check_results=[
            AssetCheckResult(
                check_name="at_least_one_character_mentioned",
                passed=sum(character_appearances.values()) > 0,
            )
        ],
    )

Below is the final state of your code, with all assets implemented, scheduled, documented, and some with data quality tests attached:

import json

import requests
from dagster import (
    AssetCheckResult,
    AssetCheckSpec,
    AutoMaterializePolicy,
    AutoMaterializeRule,
    MaterializeResult,
    MetadataValue,
    asset,
)

TV_SHOW_FOR_ANALYSIS = "friends"

TV_SHOW_FILE = "show.json"
EPISODES_FILE = "episodes.json"
CHARACTERS_FILE = "characters.json"
CHARACTER_APPEARANCES_FILE = "character_appearances.json"
APPEARANCES_REPORT_FILE = "appearances.png"

@asset(
    auto_materialize_policy=AutoMaterializePolicy.eager().with_rules(
        AutoMaterializeRule.materialize_on_cron(cron_schedule="0 0 * * *")
    ),
    compute_kind="Python",
    group_name="ingestion",
)
def tv_show_info() -> MaterializeResult:
    """The top result from querying the TV Maze API for the TV show we want to analyze."""
    url_for_search = f"https://api.tvmaze.com/search/shows?q={TV_SHOW_FOR_ANALYSIS}"
    search_results = requests.get(url_for_search).json()
    top_result = search_results[0]

    with open(TV_SHOW_FILE, "w") as f:
        f.write(json.dumps(top_result))

    return MaterializeResult(
        metadata={
            "request_url": MetadataValue.url(url_for_search),
            "top_result": MetadataValue.json(top_result),
            "top_result_search_score": MetadataValue.float(top_result["score"]),
            "total_results": MetadataValue.int(len(search_results)),
        }
    )

@asset(
    deps=[tv_show_info],
    compute_kind="Python",
    auto_materialize_policy=AutoMaterializePolicy.eager(),
    group_name="ingestion",
)
def episodes():
    """A JSON file containing all the episodes of the show. This is computed by querying the
    TV Maze API for the episodes of the show."""
    with open(TV_SHOW_FILE, "r") as f:
        show = json.loads(f.read())

    episodes_response = requests.get(
        f"https://api.tvmaze.com/shows/{show['show']['id']}/episodes"
    ).json()

    with open(EPISODES_FILE, "w") as f:
        f.write(json.dumps(episodes_response))

    # count the number of seasons
    seasons = set()
    for episode in episodes_response:
        seasons.add(episode["season"])

    return MaterializeResult(
        metadata={
            "request_url": MetadataValue.url(
                f"https://api.tvmaze.com/shows/{show['show']['id']}/episodes"
            ),
            "num_episodes": MetadataValue.int(len(episodes_response)),
            "num_seasons": MetadataValue.int(len(seasons)),
        }
    )

@asset(
    deps=[tv_show_info],
    compute_kind="Python",
    auto_materialize_policy=AutoMaterializePolicy.eager(),
    group_name="ingestion",
)
def characters() -> MaterializeResult:
    """A JSON file containing the names of all the characters in the show. This is computed by
    querying the TV Maze API for the cast of the show."""
    with open(TV_SHOW_FILE, "r") as f:
        show = json.loads(f.read())

    cast_response = requests.get(f"https://api.tvmaze.com/shows/{show['show']['id']}/cast").json()

    characters = []
    for cast_member in cast_response:
        characters.append(cast_member["character"]["name"].split(" ")[0])

    with open(CHARACTERS_FILE, "w") as f:
        f.write(json.dumps(characters))

    return MaterializeResult(
        metadata={
            "request_url": MetadataValue.url(
                f"https://api.tvmaze.com/shows/{show['show']['id']}/cast"
            ),
            "response": MetadataValue.json(cast_response),
        }
    )

@asset(
    deps=[episodes, characters],
    check_specs=[AssetCheckSpec(name="at_least_one_character_mentioned", asset="character_appearances")],
    auto_materialize_policy=AutoMaterializePolicy.eager(),
    compute_kind="Python",
    group_name="analysis",
)
def character_appearances() -> MaterializeResult:
    """A JSON file containing the number of times each character is mentioned in the show.
    
    This is computed by counting the number of times each character's name appears in the summary of
    each episode."""
    with open(EPISODES_FILE, "r") as f:
        episodes_response = json.loads(f.read())

    with open(CHARACTERS_FILE, "r") as f:
        characters = json.loads(f.read())
        character_appearances = {character: 0 for character in characters}

    for episode in episodes_response:
        summary = episode["summary"]
        for character in character_appearances.keys():
            character_appearances[character] += summary.count(character)

    with open(CHARACTER_APPEARANCES_FILE, "w") as f:
        f.write(json.dumps(character_appearances))

    return MaterializeResult(
        metadata={"mentions": MetadataValue.json(character_appearances)},
        check_results=[
            AssetCheckResult(
                check_name="at_least_one_character_mentioned",
                passed=sum(character_appearances.values()) > 0,
            )
        ],
    )

@asset(
    deps=[character_appearances],
    compute_kind="Python",
    auto_materialize_policy=AutoMaterializePolicy.eager(),
    group_name="analysis",
)
def appearances_report() -> MaterializeResult:
    """A bar chart of the number of times each character is mentioned in the show. This bar chart
    is generated using [the QuickChart API](https://quickchart.io/)."""
    with open(CHARACTER_APPEARANCES_FILE, "r") as f:
        character_appearances = json.loads(f.read())
        character_appearances = {
            character: count for character, count in character_appearances.items() if count > 0
        }

    report = requests.get(
        f"https://quickchart.io/chart?c={{type:'bar',data:{{labels:{list(character_appearances.keys())},datasets:[{{label:'Appearances',data:{list(character_appearances.values())}}}]}}}}"
    )

    with open("appearances.png", "wb") as f:
        f.write(report.content)

    return MaterializeResult(metadata={"chart_url": MetadataValue.url(report.url)})

... and here are the same assets as displayed in the Dagster UI:

A screenshot of the Asset graph of this project, as shown in Dagster's UI

And your final asset: the visualisation of the data. Ross isn't who the show is most frequently about, despite your wishes.

A screenshot of the final graph output by the Dagster Friends pipeline.

And That’s It

This exercise walked through the thought process and steps to build a data pipeline that is focused on the assets it produces, rather than the tasks it runs. In other situations, your final assets might alternatively be tables that you share with your stakeholders or machine learning models that power your product.

You now have a data pipeline that you can test on your laptop, a set of production data assets that other people and pipelines can build on top of, and an understanding of how to think about data first when designing a pipeline.

Tinker around with it! Replace the value of TV_SHOW_FOR_ANALYSIS with your favorite and re-run your pipeline. If you liked this experience writing data pipelines, visit our documentation to get started with Dagster!


The Dagster Labs logo

We're always happy to hear your feedback, so please reach out to us! If you have any questions, ask them in the Dagster community Slack (join here!) or start a Github discussion. If you run into any bugs, let us know with a Github issue. And if you're interested in working with us, check out our open roles!

Follow us:


Read more filed under
Blog post category for Blog Post. Blog Post
Share this article
Share Dagster on Twitter / XCheck out Dagster on LinkedInShare Dagster on Reddit

Dagster Newsletter: Get updates delivered to your inbox

Dagster University