October 6, 20227 minute read

A Dagster Crash Course

Pete Hunt
Name
Pete Hunt
Handle
@floydophone

Hey - I’m the head of engineering at Elementl, the company that builds Dagster. This post is my take on a crash-course introduction to Dagster.

And if you want to support the Dagster Open Source project, be sure to star our Github repo.

💡 What does Dagster do?
👩‍💻 Who is Dagster for?
🧠 How is Dagster different?
🏇 Getting started with Dagster
🛠 Improving the code
👨‍🏫 Learning more

💡 What does Dagster do?

Dagster is a data orchestrator. What’s that mean?

An illustration of a company's data platform with the data orchestration layer at its center.

Think of Dagster as a framework for building data pipelines, similar to how Django is a framework for building web apps.

Data pipelines produce data assets. Data assets can be many things, but they’re usually machine learning models, tables in a data warehouse, or a report. In order to build a data asset, you basically need to do four things:

  • Ingest data from external sources or other data assets.
  • Combine and transform the data in a meaningful way.
  • Store the asset in a place where it can be used
  • Re-run this process incrementally whenever the asset is out of date - either on a schedule or when an external system triggers the run.

There are tons of tools that do these things. Why choose Dagster? Dagster is for a particular type of user and has several unique differentiators that make it a great choice to sit at the center of your data platform for both new and existing data pipelines.

👩‍💻 Who is Dagster for?

An illiustration of the Dagster ideal user at the intersection of data practitioners and software engineering.

Dagster is meant for data engineers and machine learning engineers. These are people that:

  • Are software engineers. They know languages like Python and SQL, and use tools like git and Visual Studio Code.
  • Are knee-deep in data. They’re usually tasked with “cleaning data”, fixing broken pipelines in the middle of the night, and are often on-call for questions and asks from business intelligence analysts, data scientists
  • Bridge the engineering and data worlds. They’re frequently required to work in both the infrastructure stack - managing AWS resources, debugging production services, etc - as well as in the data domain.
  • Respect the complexity of the data domain. Data engineers know that building data apps is a complex problem that requires sharp tools to manage.

🧠 How is Dagster different?

In my opinion, Dagster has three key differentiators that make it better than the alternatives:

  • Local development and testing. Dagster was built from the ground-up to make local development and automated testing easy through its emphasis on separating business logic from I/O concerns such as storage and interacting with external systems.
  • Software defined assets (SDAs). Dagster’s primary abstraction is the SDA: a declarative, pure Python function that computes the value of an asset and has associated metadata. Other orchestrators use imperative tasks as their primary abstraction, which is much more primitive on a number of dimensions:
    • Engineers generally find the declarative mental model of SDAs much easier to work with.
    • SDAs unambiguously document which assets are meant to exist.
    • SDAs have clear, fine-grained data lineage that makes debugging and understanding the state of an asset easy.
    • SDAs decouple the business logic for computing the asset’s value from the I/O logic to read to and write from storage (docs)
    • SDAs can be imported from any tool in your stack, so if you use an external tool like dbt that creates multiple tables in your data warehouse, Dagster can track the lineage of every individual table (other orchestrators will simply have a “black box” dbt task in the graph).
    • SDAs support rich, searchable metadata and grouping tools to support scaling to large, complex organizations.
    • SDAs support time partitioning and backfills out of the box.
  • Decoupling pipelines from the environment. Dagster was built from the ground up to abstract away the environment from the business logic in your data pipeline, which leads to a number of elegant capabilities that are clunky or nonexistent in other orchestrators:
    • Staging and Testing environments are much easier to set up by swapping out external services (docs)
    • The underlying runtime can be swapped out without changing any user code (see the docs on run launchers and executors if you want the gritty details)
    • Dagster was built with containers in mind from day 1, so you don’t have to deal with pip-hell managing conflicting Python environments in large projects (docs)

🏇 Getting started with Dagster

Let Pete Hunt walk you through the Dagster Crash Course

Let’s build a quick, realistic example that pulls some data from GitHub and visualizes it. This is an example of an ETL pipeline.

An illiustration of the Dagster ideal user at the intersection of data practitioners and software engineering.

This tutorial assumes you have basic familiarity with Python and Python data tools like Jupyter and pandas.

If you want to just see the code, it’s available on GitHub.

Installing Dagster

Let’s start by following the setup instructions. tl;dr:

$ pip install dagster
$ dagster project scaffold --name my-dagster-project
$ cd my-dagster-project
$ pip install -e '.[dev]'
$ dagit

This will scaffold a new project with default settings and launch the Dagster UI (called dagit) at http://localhost:3000.

Installing the dependencies for this example

For this tutorial we’ll need to install a few dependencies. Modify your setup.py file to add the required dependencies:

from setuptools import find_packages, setup

if __name__ == "__main__":
    setup(
        name="my_dagster_project",
        packages=find_packages(exclude=["my_dagster_project_tests"]),
        install_requires=[
            "dagster",
            "PyGithub",
            "matplotlib",
            "pandas",
            "nbconvert",
            "nbformat",
            "ipykernel",
            "jupytext",
        ],
        extras_require={"dev": ["dagit", "pytest"]},
    )

Once this is done, install by running pip install -e '.[dev]' and restart dagit.

Creating an asset for GitHub stars

Before we begin, go to Github and generate a personal access token with the gist permission. Then, let’s create an asset that fetches the GitHub stars for the Dagster repo by updating the my_dagster_project/assets/__init__.py file:

from dagster import asset
from github import Github

ACCESS_TOKEN = "ghp_YOUR_TOKEN_HERE"

@asset
def github_stargazers():
    return list(Github(ACCESS_TOKEN).get_repo("dagster-io/dagster").get_stargazers_with_dates())

Aggregate the GitHub stars by week

Let’s add a second asset that aggregates the raw stargazers data into a weekly count and stores it in a pandas.DataFrame. Let’s add some more code to my_dagster_project/assets/__init__.py:

import pandas as pd
from datetime import timedelta

@asset
def github_stargazers_by_week(github_stargazers):
    df = pd.DataFrame(
        [
            {
                "users": stargazer.user.login,
                "week": stargazer.starred_at.date()
                + timedelta(days=6 - stargazer.starred_at.weekday()),
            }
            for stargazer in github_stargazers
        ]
    )
    return df.groupby("week").count().sort_values(by="week")

Most of this code is just data transformation using pandas; see the pandas docs for more information.

Notice that this asset takes an argument called github_stargazers. Dagster will automatically find the asset named github_stargazers and materialize it before calling github_stargazers_by_week. This might seem like magic at first, but it’s very easy to get used to, and extremely convenient when you’re building large pipelines.

Visualize the GitHub stars

Now that we have a dataset of GitHub stars per week, let’s visualize it as a bar chart. Jupyter Notebooks are a great tool for this. We’ll use a neat library called jupytext which lets us author notebooks as Markdown strings instead of using raw .ipynb files. Add the following to my_dagster_project/assets/__init__.py to create an asset representing the notebook:

import nbformat
from nbconvert.preprocessors import ExecutePreprocessor
import pickle
import jupytext

@asset
def github_stars_notebook(github_stargazers_by_week):
    markdown = f"""
# Github Stars

```python
import pickle
github_stargazers_by_week = pickle.loads({pickle.dumps(github_stargazers_by_week)!r})

## Github Stars by Week, last 52 weeks

github_stargazers_by_week.tail(52).reset_index().plot.bar(x="week", y="users")

    """
    nb = jupytext.reads(markdown, "md")
    ExecutePreprocessor().preprocess(nb)
    return nbformat.writes(nb)

There are a few things going on here.

  • We create a markdown string representing our notebook.
  • We use pickle to pass the DataFrame to the notebook.
  • We use pandas to plot the last 52 weeks as a bar chart.
  • We use jupytext to convert the markdown string to a Jupyter NotebookNode
  • We use ExecutePreprocessor().preprocess() to execute the notebook in a new kernel
  • And we use nbformat.writes() to write out the NotebookNode as ipynb file contents.

Share the notebook as a GitHub gist

Now we have a notebook. How can we view it?

One easy way is to upload the ipynb as a GitHub gist. GitHub has built-in support for visualizing notebooks, and they’re very easy to share with stakeholders. Update my_dagster_project/assets/__init__.py with the following:

from github import InputFileContent

@asset
def github_stars_notebook_gist(context, github_stars_notebook):
    gist = (
        Github(ACCESS_TOKEN)
        .get_user()
        .create_gist(
            public=False,
            files={
                "github_stars.ipynb": InputFileContent(github_stars_notebook),
            },
        )
    )
    context.log.info(f"Notebook created at {gist.html_url}")
    return gist.html_url

This is a fairly straightforward asset that simply takes the github_stars_notebook asset contents, attaches it to a new GitHub gist, and returns the URL.

Note the context argument. This is a special argument that does not correspond to the name of an asset. It contains various useful pieces of information and utilities, including context.log - the primary way to log information to the user in Dagster. Read the docs for more information.

Adding a schedule

Finally, let’s be sure that we refresh the notebook every day, so we always have the latest numbers. We can use Schedules to do this.

Update your my_dagster_project/repository.py file to read:

from dagster import (
    load_assets_from_package_module,
    repository,
    define_asset_job,
    ScheduleDefinition,
)
from my_dagster_project import assets

daily_job = define_asset_job(name="daily_refresh", selection="*")
daily_schedule = ScheduleDefinition(
    job=daily_job,
    cron_schedule="@daily",
)

@repository
def my_dagster_project():
    return [
        daily_job,
        daily_schedule,
        load_assets_from_package_module(assets),
    ]

We define two new entities:

  • daily_job is a Dagster job that materializes all of the assets in the project.
  • daily_schedule runs daily_job once a day

Finally, we add them to our Dagster repository (which is just Dagster’s word for “project”).

At this stage, your my_dagster_project/assets/__init__.py should contain the following and your my_dagster_project/repository.py file should be as per the code shown in the prior paragraph.

from dagster import asset
from github import Github

import pandas as pd
from datetime import timedelta

import nbformat
from nbconvert.preprocessors import ExecutePreprocessor
import pickle
import jupytext

from github import InputFileContent

ACCESS_TOKEN = "ghp_YOUR_ACCESS_TOKEN"

@asset
def github_stargazers():
    return list(Github(ACCESS_TOKEN).get_repo("dagster-io/dagster").get_stargazers_with_dates())

@asset
def github_stargazers_by_week(github_stargazers):
    df = pd.DataFrame(
        [
            {
                "users": stargazer.user.login,
                "week": stargazer.starred_at.date()
                + timedelta(days=6 - stargazer.starred_at.weekday()),
            }
            for stargazer in github_stargazers
        ]
    )
    return df.groupby("week").count().sort_values(by="week")

@asset
def github_stars_notebook(github_stargazers_by_week):
    markdown = f"""
# Github Stars

    ```python
    import pickle
    github_stargazers_by_week = pickle.loads({pickle.dumps(github_stargazers_by_week)!r})
    ```

## Github Stars by Week, last 52 weeks
    ```python
    github_stargazers_by_week.tail(52).reset_index().plot.bar(x="week", y="users")
    ```
    """
    nb = jupytext.reads(markdown, "md")
    ExecutePreprocessor().preprocess(nb)
    return nbformat.writes(nb)

@asset
def github_stars_notebook_gist(context, github_stars_notebook):
    gist = (
        Github(ACCESS_TOKEN)
        .get_user()
        .create_gist(
            public=False,
            files={
                "github_stars.ipynb": InputFileContent(github_stars_notebook),
            },
        )
    )
    context.log.info(f"Notebook created at {gist.html_url}")
    return gist.html_url

Actually run the job

Now it’s time to run the job with Dagster.

First, open the UI by going to http://localhost:3000/. Then click “workspace” in the upper right nav, and then click “reload all”.

A screenshot of Dagster's workspace.

Next, click “Deployment” in the upper right nav, and select the “schedules” tab. We should see our daily schedule. You’ll see a warning that your daemon isn’t running; that’s fine to ignore for this tutorial.

A screenshot of Dagster's workspace.

Click the job corresponding to the schedule: daily_refresh. Then hit “materialize all” to run the job.

A screenshot of Dagster's workspace.

The process will run for a bit, and when it completes, you should see a GitHub gist URL printed to the log in the Dagit UI. Note that the first step of this pipeline can take a while; as you iterate, you only have to materialize that asset once and subsequent runs can reuse it.

A screenshot of Dagster's workspace.

And when you navigate to the gist, it should look something like this:

A github star chart generated from the Dagster orchestration run

🛠 Improving the code

Our current example is working great, but it has several issues.

  • The GitHub access token is present in the source code in cleartext, which is a big security hole.
  • There are no tests. We should have some!

Handling secrets the right way: Config and Resources

The first thing we need to do is get the GitHub access token out of the source code. We’ll use two Dagster concepts: Config and Resources, to do this.

The first thing we will do is create a GitHub Resource. A Resource is usually a connection to an external service, like GitHub. There are two main advantages to using resources:

  1. They can be configured separately from the rest of your app. We can configure the GitHub Resource once, and then reuse it in our whole pipeline.
  2. They can be swapped out in different environments. For example, it’s easy to use a different GitHub account in staging, or swap in a mock GitHub implementation during testing.

Create the Resource

To start, let’s create a new file my_dagster_project/resources.py:

from dagster import StringSource, resource
from github import Github

@resource(config_schema={"access_token": StringSource})
def github_api(init_context):
    return Github(init_context.resource_config["access_token"])

There are a few things going on here:

  • We’ve defined a resource called github_api, which returns an instantiated PyGithub client.
  • The resource takes a config parameter called access_token which may be provided by the user at runtime.
  • The config parameter is of type StringSource, so it can come from either an environment variable or be provided directly in the configuration (see the docs)

Configure the Resource and add it to our project

Next, let’s add it to our repository. Change my_dagster_project/repository.py to look like this:

from dagster import (
    load_assets_from_package_module,
    repository,
    with_resources,
    define_asset_job,
    ScheduleDefinition,
)
from my_dagster_project import assets
from my_dagster_project.resources import github_api

daily_job = define_asset_job(name="daily_refresh", selection="*")
daily_schedule = ScheduleDefinition(
    job=daily_job,
    cron_schedule="@daily",
)

@repository
def my_dagster_project():
    return [
        daily_job,
        daily_schedule,
        with_resources(
            load_assets_from_package_module(assets),
            {"github_api": github_api.configured({"access_token": {"env": "GITHUB_ACCESS_TOKEN"}})},
        ),
    ]
  • with_resources() allows us to add one or more resources to our assets. In this case, we use it to add the resource to every asset, but more complicated apps can add resources to smaller subsets of assets.
  • We introduce a resource key called github_api that assets can use to reference the Resource. Usually this will just be the same as your Resource, unless, for example, you need multiple different GitHub clients with different tokens for some reason.
  • github_api.configured() passes configuration information to the resource. In this case, we’re telling Dagster to load the GitHub access token from the GITHUB_ACCESS_TOKEN environment variable.

Use the Resource from our assets

Next, we need to make two changes to our assets. First, change our github_stargazers asset to use the new Resource:

@asset(required_resource_keys={"github_api"})
def github_stargazers(context):
    return list(
        context.resources.github_api.get_repo("dagster-io/dagster").get_stargazers_with_dates()
    )

And change the github_stars_notebook_gist asset to use the Resource.

@asset(required_resource_keys={"github_api"})
def github_stars_notebook_gist(context, github_stars_notebook):
    gist = context.resources.github_api.get_user().create_gist(
        public=False,
        files={
            "github_stars.ipynb": InputFileContent(github_stars_notebook),
        },
    )
    context.log.info(f"Notebook created at {gist.html_url}")
    return gist.html_url

Re-run the job with the environment variable set

Now, relaunch dagit with the environment variable set, and kick off the job:

$ GITHUB_ACCESS_TOKEN=ghp_YOUR_TOKEN_HERE dagit

At this point you can safely remove the ACCESS_TOKEN from your codebase.

Adding some tests

Now it’s time to add some tests. Open up my_dagster_project_tests/test_assets.py and drop the following code in:

from dagster import materialize_to_memory
from unittest.mock import MagicMock
from my_dagster_project.assets import (
    github_stars_notebook_gist,
    github_stars_notebook,
    github_stargazers_by_week,
    github_stargazers,
)
from datetime import date, datetime
import pandas as pd

def test_smoke():
    mock_stargazers = [
        ("user1", datetime(2021, 1, 1)),
        ("user2", datetime(2021, 1, 1)),
        ("user3", datetime(2021, 2, 1)),
    ]

    github_api = MagicMock()
    github_api.get_repo("dagster-io/dagster").get_stargazers_with_dates.return_value = [
        MagicMock(
            user=MagicMock(login=login),
            starred_at=starred_at,
        )
        for (login, starred_at) in mock_stargazers
    ]

    github_api.get_user().create_gist.return_value = MagicMock(
        html_url="https://gist.github.com/test_id"
    )

    result = materialize_to_memory(
        [
            github_stars_notebook_gist,
            github_stars_notebook,
            github_stargazers_by_week,
            github_stargazers,
        ],
        resources={"github_api": github_api},
    )

    assert result.success
    assert result.output_for_node("github_stargazers_by_week").reset_index().to_dict("records") == [
        {"users": 2, "week": date(2021, 1, 3)},
        {"users": 1, "week": date(2021, 2, 7)},
    ]
    assert result.output_for_node("github_stars_notebook_gist") == "https://gist.github.com/test_id"
    assert "# Github Stars" in result.output_for_node("github_stars_notebook")
        assert github_api.get_user().create_gist.call_args[1]["public"] is False

You can run the test using pytest my_dagster_project_tests -s. It should pass quickly.

There are three main parts to this test.

  1. We use Python’s built-in MagicMock to create a mock version of the PyGithub client and populate it with some test data.
  2. We use materialize_to_memory() to materialize our assets. This function allows us to pass in any resources that should be used for the test job.
  3. We use result.success and result.output_for_node() to examine the outputs of the assets to ensure that they executed as expected, and inspect the mocked client to ensure it was called with the correct parameters.

👨‍🏫 Learning more

Hopefully, this is enough to get you up and running with building a real-ish data pipeline with Dagster. I encourage you to read the docs to learn more!

Good luck! And if you need any help, join our expanding Slack community. Also, I’m just a tweet away: floydophone on Twitter.

If you want to support the Dagster Open Source project, be sure to Star our Github repo.


Read more filed under
Blog Post