December 22, 20225 minute read

Converting an ETL script to Software-Defined Assets

Pete Hunt
Name
Pete Hunt
Handle
@floydophone

In this post we’re going to talk about the process of moving from an ETL script to a robust Dagster pipeline using Software-Defined Assets.

Pete Hunt walks us through the 'Why' and 'How-to' convert ETL scripts to Dagster Software-Defined Assets.

Table of contents:

🧠 What is ETL?

ETL is an acronym for “Extract-Transform-Load.” This is one of the most common activities that data practitioners need to do day to day. Broadly, they consist of three steps:

  • Extract: fetch data from some upstream source, like a database, file system, or third-party API.
  • Transform: take the extracted data and change it in some way that adds business value.
  • Load: take the transformed data and store it in some downstream system (like a database or filesystem) where it can be used by others.

ETL scripts are often called ETL pipelines, because they start out as a single ETL script, but eventually grow to a series of interdependent ETL scripts that need to be run in the correct order. When visualized, this resembles a pipeline.

Usually, ETL pipelines start out as a set of Python scripts that are run via cron. While this is easy to get started with, it can introduce numerous problems.

💥 What’s wrong with ETL scripts?

ETL scripts are easy to write but are hard to maintain as the number of scripts and the complexity of the pipeline grows. They introduce a number of issues:

  • Development velocity. Complex ETL pipelines may take many minutes or hours to run, and the entire pipeline must be re-run for every change. In these situations, any change, regardless of how simple or complex, is very expensive to make as the developer needs to continuously re-run the pipeline from start to finish.
  • Scheduling. ETL scripts need to be run on a regular basis. They are often triggered either by an event or the passage of time. cron works for a while, but as interdependencies between scripts get more complex, cron becomes unwieldy.
  • Robustness. ETL scripts can fail and should page their developers. However, ideally an ETL script would attempt to recover from the failure before waking someone up in the middle of the night with a page.
  • Testability. Running prototype ETL scripts in production is dangerous and expensive. They should be developed locally with automated testing to improve quality and development velocity. Refactoring ETL scripts to be testable in a local environment can be quite painful.

Introducing a data orchestrator can solve many of these problems. Introducing an asset-oriented data orchestrator is even better. But we’ll get to that in a second.

🧑‍🏫 Introducing the example

Let’s consider a simple ETL script example. It will fetch the top 500 Hacker News stories and create a wordcloud visualization of the top headlines.

It’ll have three steps, of course: extract, transform and load. But before we get to this, let’s create a new Python file called hackernews.py and add some imports:

import base64
from io import BytesIO
import matplotlib.pyplot as plt
import pandas as pd
import requests
from wordcloud import STOPWORDS, WordCloud
from tqdm import tqdm

Let’s also install the dependencies we need.

pip install matplotlib pandas requests wordcloud tqdm

Extract step

Next, let’s add our extract step. This will hit the Hacker News API to fetch the IDs of the top 500 stories, and then fetch metadata for each story, returning the result as a pandas DataFrame.

def extract() -> pd.DataFrame:
    newstories_url = "https://hacker-news.firebaseio.com/v0/topstories.json"
    hackernews_topstory_ids = requests.get(newstories_url).json()

    results = []
    for item_id in tqdm(hackernews_topstory_ids):
        item = requests.get(
            f"https://hacker-news.firebaseio.com/v0/item/{item_id}.json"
        ).json()
        results.append(item)

    hackernews_topstories = pd.DataFrame(results)

    return hackernews_topstories

Transform step

Next, we’ll take the data fetched in the extract step and turn it into a wordcloud. I won’t dig too deep into the details here, but we:

  • Split the titles into words and remove commonly used words.
  • Create a wordcloud using matplotlib and the wordcloud library.
  • Embed the wordcloud in a markdown document using base64 encoding.

Here’s the code.

def transform(hackernews_topstories: pd.DataFrame) -> str:
    stopwords = set(STOPWORDS)
    stopwords.update(["Ask", "Show", "HN"])
    titles_text = " ".join([str(item) for item in hackernews_topstories["title"]])
    titles_cloud = WordCloud(stopwords=stopwords, background_color="white").generate(
        titles_text
    )

    # Generate the word cloud image
    plt.figure(figsize=(8, 8), facecolor=None)
    plt.imshow(titles_cloud, interpolation="bilinear")
    plt.axis("off")
    plt.tight_layout(pad=0)

    # Save the image to a buffer
    buffer = BytesIO()
    plt.savefig(buffer, format="png")
    image_data = base64.b64encode(buffer.getvalue())
    return f"""
# Wordcloud of top Hacker News stories

![img](data:image/png;base64,{image_data.decode()})
""".strip()

Load step

For the final step we write the markdown to a file on disk. In production we may choose to store it somewhere else, like Amazon S3.

def load(md_content: str):
    with open("output.md", "w") as f:
        f.write(md_content)

Running the pipeline

Finally, let’s add some code to pull all the pieces together.

if __name__ == "__main__":
    input = extract()
    output = transform(input)
    load(output)

Running the script should produce an output.md file containing the wordcloud.

🚅 Migrating to Software-Defined Assets

The first step when migrating an ETL script to Dagster is actually quite simple. Just wrap your entire script in a single software-defined asset. Create hn_dagster.py with the following code:

from hackernews import extract, transform, load
from dagster import asset

@asset
def hackernews_wordcloud():
    input = extract()
    output = transform(input)
    load(output)

As you can see, we’ve basically just copy-pasted the __main__ block from our script into a specially-decorated Python function. Now we can install and run Dagster and Dagit, the Dagster UI:

pip install dagster dagit
dagit -f hn_dagster.py

Start Dagit on localhost and you will see our first unmaterialized asset:

Now, click “materialize all” in the UI and watch your pipeline come to life.

🧱 Making your pipeline robust

Our pipeline makes a lot of HTTP requests. It’s possible that those requests could intermittently fail. Rather than page our on-call engineer for every failure, we should have the script retry a few times before giving up. Dagster makes this a one liner. Modify the hackernews_wordcloud asset to look like this:

from dagster import RetryPolicy

@asset(retry_policy=RetryPolicy(max_retries=5, delay=5))
def hackernews_wordcloud():
    input = extract()
    output = transform(input)
    load(output)

We’ve declaratively added a RetryPolicy to tell Dagster to re-run this pipeline up to 5 times, with 5 seconds of delay in between each run, before giving up.

🗓 Scheduling your pipeline

Like any report, it’s important that the wordcloud we’re producing at the end of our pipeline is kept up-to-date. This means we need to put our pipeline on a schedule. In this case, we simply want to ensure that the wordcloud is never more than 30 minutes out of date. With Dagster, this is also a one liner. Simply add one line of declarative configuration to the hackernews_wordcloud asset:

from dagster import FreshnessPolicy

@asset(
    retry_policy=RetryPolicy(max_retries=5, delay=5),
    freshness_policy=FreshnessPolicy(maximum_lag_minutes=30),
)
def hackernews_wordcloud():
    input = extract()
    output = transform(input)
    load(output)

With this one line of code, Dagster will be aware of your freshness expectations and can automatically refresh the wordcloud for you, as well as alert if it gets too old.

♻️ Increase development velocity

We may want to iterate on the rendering of the wordcloud - the transform step - without running the expensive extract step. We can do this by refactoring our code to spin out the extract step into its own asset, hackernews_source_data.

@asset
def hackernews_source_data():
    return extract()

@asset(
    retry_policy=RetryPolicy(max_retries=5, delay=5),
    freshness_policy=FreshnessPolicy(maximum_lag_minutes=30),
)
def hackernews_wordcloud(hackernews_source_data):
    output = transform(hackernews_source_data)
    load(output)

Now we have two assets: hackernews_source_data, which does the expensive extract step, and hackernews_wordcloud, which does the much cheaper transform step. By providing a hackernews_source_data argument to hackernews_wordcloud, we’ve told Dagster to set up a dependency between the two assets. It looks like this in the Dagster UI:

Now you can materialize each of these independently, allowing you to iterate quickly on the hackernews_wordcloud asset without rematerializing the expensive hackernews_source_data asset. Over time, this becomes truly invaluable as you add many different ETL scripts that may want to reuse different steps in unexpected ways.

☁️ Abstracting away storage

Interleaving storage code with your business logic has a lot of downsides. It can make testing pipelines challenging, since it is hard to isolate the test version of your pipeline from the production data storage systems. Additionally, it can make migrating to different data storage systems in the future very painful. Finally, developing business logic in isolation from storage makes for a more pleasant development experience and simpler mental model.

Rather than introduce a rats' nest of if statements, Dagster provides built-in support for this through its IO Manager abstraction. Let’s refactor our code to make hackernews_wordcloud free of storage logic (i.e. get rid of any reference to the load() step), and instead provide that functionality through a new IO Manager.

Let’s start by adding the IO Manager.

from dagster import IOManager

class HackerNewsIOManager(IOManager):
    def load_input(self, context, input):
        raise NotImplementedError()

    def handle_output(self, context, md_content):
        load(md_content)

IO Managers abstract both reads from and writes to storage. For this example, we’re simply interested in abstracting away writes, so we leave load_input() as unimplemented. Our handle_output() function is easy - it just calls the pre existing load() step.

Next, let’s add the IO Manager to the hackernews_wordcloud asset, and refactor it to be completely free of storage logic.

@asset(
    retry_policy=RetryPolicy(max_retries=5, delay=5),
    freshness_policy=FreshnessPolicy(maximum_lag_minutes=30),
    io_manager_key="hn_io_manager",
)
def hackernews_wordcloud(hackernews_source_data):
    return transform(hackernews_source_data)

Notice that we’ve made two changes. First, we removed the call to load(). Second, we’ve added io_manager_key="hn_io_manager" to indicate that this asset should save its data using our new IO Manager.

Finally, we need to register our IO Manager with Dagster under the name hn_io_manager. Add this to the end of the file:

from dagster import Definitions

defs = Definitions(
    assets=[hackernews_source_data, hackernews_wordcloud],
    resources={"hn_io_manager": HackerNewsIOManager()},
)

This tells Dagster about all of the definitions in our project. It includes the two assets we’ve built, and the IO Manager (IO Managers are a type of Dagster Resource, so they’re passed in via the resources arg).

If you re-run your pipeline, everything will work just as it used to. However, we’ve substantially modularized our codebase: the hackernews_wordcloud is pure business logic, and the underlying storage can be swapped out during test or for future migrations.

🧬 Future work

I hope you found this guide useful. Now that you can migrate ETL scripts into Dagster, check out the crash course to learn how to test and deploy your pipelines, and continue on to the official Dagster docs to learn more.

See you in the next one!

We're always happy to hear your feedback, so please reach out to us! If you have any questions, ask them in the Dagster community Slack (join here!) or start a Github discussion. If you run into any bugs, let us know with a Github issue. And if you're interested in working with us, check out our open roles!

Follow us:


Read more filed under
Blog Post