Dagster 0.12.0: Into the Groove

Published on 2021-07-19


We are happy to announce the 0.12.0 version of Dagster, affectionately named “Into the Groove”. For this post, we’ve chosen to highlight a few of the more impactful and highly-anticipated features:

Release Highlights!

  • Pipeline Failure Sensors allow you to execute arbitrary python code when a pipeline in your repo fails.
  • Solid-Level Retries make it easy to set up policies that automatically retry flaky operations.
  • Direct Invocation of solids, resources, and more, greatly simplifies unit tests for these components.
  • Dagster is coiffed to impress

    Of course, this isn’t all there is to “Into the Groove” — a short sample of the new additions includes: asset sensors which launch pipeline runs in response to asset updates, a new dbt_cli_resource that makes it easier to interface with dbt, a reference deployment for running Dagster on AWS ECS, a community-contributed MLFlow integration (thanks @hug0l1ma!), and even a fully-fledged example repo that demonstrates best practices for a ton of Dagster features!

    We've also made some major (experimental) changes to many of our core APIs. As we look towards a 1.0 release, we've put a ton of work into settling on stable APIs that will stand the test of time. These changes are significant and impactful, including major renames (for example, @solid will become @op), unifying concepts such as presets and partition sets, and separating DAG definitions from their modes / resources. We believe these changes will make Dagster significantly more intuitive and ergonomic (we'll elaborate on that bit in a future blog post), and we encourage you to try them out and let us know what you think! To learn more about these new APIs, check out the Migration Guide.

    For a complete list of changes (including smaller improvements and bug fixes), check out our changelog here.

    Pipeline Failure Sensors

    Unexpected failures are a near-universal experience for anyone who works with pipelines. In practice, even well-tested pipelines fail for a wide variety of reasons. Data arrives in unexpected formats, API calls go unanswered, spot instances vanish, bugs suddenly surface, and so on. Especially for workflows that run in production, it can be critical to perform automated actions (send alerts, page on-call engineers, update status pages, etc.) when a pipeline does not successfully complete.

    While some types of errors can be caught within a pipeline and handled therein, sometimes this is inconvenient, tedious, or flat-out impossible. For example, if the process that your pipeline is running in fails to initialize or unexpectedly terminates, then that pipeline definitely can’t alert you anymore!

    With the new @pipeline_failure_sensor decorator, it becomes simple to execute arbitrary code when a pipeline fails, regardless of what went wrong. At a high level, a pipeline failure sensor works similarly to any other Dagster sensor. Once a failure sensor is defined and added to your Dagster repo, the Dagster daemon will regularly check for failed pipeline runs. If it finds one, it will run your sensor code.

    We provide out-of-the-box support for sensors that send emails or post slack messages when pipelines fail, but of course there is an incalculable number of actions you might want to take in response to a failed pipeline run.

    Imagine that you’re on a team that uses some FooAlertSystem™ to alert stakeholders when key pipelines fail. You might define a sensor that looks something like this:

    from dagster import pipeline_failure_sensor, PipelineFailureSensorContext
    
    @pipeline_failure_sensor
    def send_foo_on_pipeline_failure(context: PipelineFailureSensorContext):
        my_client = FooClient(creds="MY_API_KEY")
        my_client.send_alert(f"Pipeline {context.pipeline_run.pipeline_name} failed!")
    
    

    Note that, unlike traditional sensors, you aren’t launching a pipeline run in response to the detected event, you’re simply running arbitrary python code. Once you’ve defined this protocol, you can add this failure sensor to your repository in the same way that you would add any other sensor...

    @repository
    def my_repo():
        return [
           pipeline_1,
           pipeline_2,
           send_foo_on_pipeline_failure
        ]
    

    ... and you’re done! Now, whenever a pipeline in this repository fails, send_foo_on_pipeline_failure will run and update your FooAlertSystem™. Of course, you might want to have different failure behavior for different pipelines, which we support with the pipeline_selection argument. Here, you can pass in a list of pipeline names that your sensor will detect failures from:

    @pipeline_failure_sensor(pipeline_selection=["pipeline_1"])
    def my_pipeline_1_failure_sensor(context: PipelineFailureSensorContext):
        # do something
    

    This allows you to precisely control exactly who and what gets notified for each pipeline. Nightly batch analytics pipelines might simply send an email that can be addressed the next morning, while real-time business-critical pipelines might sound the alarm on all channels, sending pages, posting slack messages, and updating dashboards.

    We believe that this feature addresses a pain point for many users, and we're excited for you to try it out!

    Easy Solid-Level Retries

    Many data pipelines, especially those that interface with cloud services, occasionally experience transient failures. Perhaps the database that you were querying cancelled your job, or a server under high load timed out your API call. Maybe a spotty network connection caused a request to drop. These sorts of errors are likely self-correcting — running the same code after a small delay may succeed where the original invocation failed.

    In such cases, it can be valuable to automatically retry the flaky steps when they experience failures. This prevents a single (likely recoverable) error from causing the entire pipeline to fail. Let’s take, as a simplified example, a solid that makes some call to an external API which is known to periodically time-out requests. It then returns that response after some processing:

    from dagster import solid
    
    @solid
    def get_api_info():
        response = call_api()
        return do_process(response)
    

    If this call_api() function fails, then your entire pipeline will fail. While it is possible to manually retry a pipeline run from failures, considering you expect this failure to happen somewhat often, it would be nice if Dagster could automatically retry the failed step. With the 0.12.0 release, this is now possible by including a retry_policy parameter in the solid definition:

    from dagster import RetryPolicy, solid
    
    @solid(retry_policy=RetryPolicy())
    def get_api_info():
        x = call_api()
        return do_process(x)
    

    This argument accepts a RetryPolicy object, which defines how Dagster should retry the solid in the event of a failure. A policy with no arguments (as seen above) instructs Dagster to retry running the solid a single time, immediately after the solid fails.

    Of course, depending on the specific variety of flaky operation that you’re dealing with, you might have more specific ways that you would like to handle your retry logic. Instantly sending another API call to an already-busy server might compound the issue, flooding it with duplicate requests. To address this and other related issues, the RetryPolicy object is configurable to allow for delay between retries, backoff to increase the delay between subsequent retries, and jitter to add randomness around the delay. A complex retry policy incorporating all of these concepts might look a bit like the following:

    from dagster import Backoff, Jitter, RetryPolicy, solid
    
    @solid(
        retry_policy=RetryPolicy(
            max_retries=3,
            delay=2,
            backoff=Backoff.EXPONENTIAL,
            jitter=Jitter.FULL
        )
    )
    def get_api_info():
        x = call_api()
        return process(x)
    

    Now, when this solid fails, it can gracefully reattempt the failed request without requiring manual intervention or inadvertently causing a stampede of API calls:

    A flaky solid being automatically retried in DagitA flaky solid in a pipeline being automatically retried. Note that each retry happens after a successively longer delay because we set the backoff parameter.

    Direct Invocation for Testing

    Testing components of pipelines (solids, resources, io managers, etc.) in isolation is an important strategy in accelerating development and improving reliability. Dagster is built in a way that makes it possible to separate these individual pieces so that they can be verified independently. However, prior to 0.12.0, the ergonomics of actually writing these tests lagged behind the underlying power of this abstraction.

    As an example, take the obvious pattern of testing that a solid, when provided a given input, will produce the expected output. In prior releases, if you wanted to run this solid by itself, you’d need to explicitly call a helper function (such as execute_solid ), which would return an output object. Then, you’d need to unwrap that output object (output.result()) to see the actual result of the function. This process required a bit too much boilerplate to express such a simple expectation:

    @solid
    def return_five():
        return 5
    
    # the old method for testing
    output = execute_solid(return_five)
    assert output.result() == 5
    

    This complexity wasn’t for nothing, of course. The @solid decorator does a lot of work, morphing a regular python function into an object understandable by Dagster, and invoking a solid does much more than simply running that underlying function — resource initialization, custom error messages, and output type checking are all bundled together with the original code. Direct invocation is simply a layer on top of this machinery to make it easier to test all of this behavior at once.

    As a result of this new interface, in 0.12.0, writing tests has become significantly easier and more intuitive. Most Dagster constructs (solids, resources, schedules, etc.) can now be invoked directly, rather than requiring this additional scaffolding. For people that have written unit tests in Dagster before, we expect this new interface will feel like a breath of fresh air:

    @solid
    def return_five():
        return 5
    
    assert return_five() == 5 # works!
    

    This, of course, is a very simple example, with a solid that doesn’t take a context argument. However, if you’re taking advantage of resources and configuration in your solid, you’ll probably have something that looks a bit more like this:

    @solid(
        config_schema={"foo": str},
        required_resource_keys={"bar"},
    )
    def hello_solid(context):
        foo = context.solid_config["foo"]
        bar = context.resources.bar
        return f"{foo} {bar}!"
    
    context = build_solid_context(
        config={"foo": "hello"},
        resources={"bar": "dagster"},
    )
    
    assert hello_solid(context) == "hello dagster!"
    

    Here, the build_solid_context function is used to create a valid context object that contains all of the necessary attributes to run hello_solid.

    While solids are the chief building blocks of pipelines, the other elements are all useful to test in similar ways. In past releases, testing resources was particularly hairy, requiring you to create ephemeral pipelines within your tests. This experience has been dramatically improved. To illustrate, take a simple resource function that has some configuration:

    @resource(config_schema={"foo": int})
    def my_resource(context: InitResourceContext):
        return MyResourceClass(foo=context.resource_config["foo"])
    

    Previously, this function could not be invoked outside of a pipeline run, requiring you to create a mock pipeline that used the resource, and then execute that pipeline. Now, we can directly invoke the my_resource function, passing in a properly-formatted InitResourceContext object:

    # creates context with given config
    context = build_init_resource_context(config={"foo": 123})
    
    # returns instance of MyResourceClass()
    test_resource = my_resource(context)
    

    These new APIs represent a consistent effort towards maturing and simplifying the core workflows that we believe are essential for building trustable data pipelines. For an in-depth look at all of the different apis, check out the docs! Solids, schedules, sensors, io managers, loggers, and hooks all follow similar patterns for testing.

    Thank You!

    We’d like to extend our sincere thanks to all the community members that contributed to this release:

    @jrouly, @hug0l1ma, @makotonium, @Andrew-Crosby, @gogi2811, @mrdavidlaing, @deveshi, @orf, @slamer59, @hebo-yang, @dwallace0723, @gdoron, @elsenorbw, @trevenrawr, @keypointt, @pawelad, @PenguinToast, @esztermarton, @a-cid, @saulius, @michaellynton, and @zuik!

    Beyond those that directly contributed to the codebase, we’ve greatly benefited from the insights and feedback from the members of our Slack channel, which we welcome you to join if you haven’t already! If you have any thoughts, feedback, or questions, please don't hesitate to reach out!