Dagster 0.10.0: The Edge of Glory

Published on 2021-01-19

Nick Schrock
Name
Nick Schrock
Handle
@schrockn
Max Gasner
Name
Max Gasner
Handle
@gasnerpants

We are delighted to announce Dagster 0.10.0, codenamed "The Edge of Glory."

This release's focus is operational maturity, reliability, and the expansion of production-grade use cases.

Release highlights

  • Exactly-once, Fault-Tolerant Scheduling: Dagster now includes a fault-tolerant run scheduler with exactly-once semantics, based on a reconciliation loop.
  • Sensors: Dagster now supports sensors (event-based schedules) managed by our scheduler process. Dagster sensors are fault-tolerant, exactly-once, and supported by best-in-class operational tools.
  • Mature Kubernetes Execution Engine: Production-ready, cloud-native execution environment built on Kubernetes.
  • I/O Managers: Programming model improvements to cleanly separate I/O and compute. Especially compelling for users who would like to use data frames and target data warehouses and data lakes.

We hope you'll come away excited about the value this work provides to current users of the system and some of the future possibilities it unlocks.

Reconciliation-based scheduler

Exactly-once, fault-tolerant schedules

Dagster now comes with a built-in, fully integrated scheduler that runs as a daemon process. The new scheduler provides exactly-once, fault-tolerant scheduling of pipeline runs, and built-in queueing for scheduled runs. Schedules are now timezone-aware and correctly handle daylight savings.

Prior to 0.10.0, although Dagster included support for schedules, our pluggable schedulers relied on external services such as cron or Kubernetes (using the CronJob abstraction) to kick-off scheduled runs. As a consequence, there was the potential for drift between the scheduler and the external service, leading to subtle bugs, awkward deployment, and missed runs in the presence of node failures.

Schematic of the scheduler reconciliation loop

The new Dagster scheduler is based on a reconciliation loop. The scheduler loop examines the external state of the world (on a configurable interval), compares it to the internal state of the Dagster deployment, and then launches runs to reconcile the difference between the expected state and the actual state.

This idea of reconciliation is a powerful mental model that has been applied in successful systems in other domains. For example, Kubernetes operates on a reconciliation loop: users of the system provide a declarative notion of the state the system should be in, Kubernetes observes the state that the system is actually in, and it is Kubernetes' responsibility to reconcile the two. In a very different domain, React also reconciles the state of its user-computed virtual DOM with the observed state of the actual browser DOM.

In the case of an ordinary Dagster schedule, the external state in question is time. There is a set of scheduled runs that should already have been launched or completed at any given time. It is the responsibility of the scheduler to ensure that this expectation aligns with the actual state of the world. This mental model allows users to express complex schedules (e.g., skipping certain runs based on business rules) and allows the system to gracefully handle thorny issues such as daylight saving time handling and fault tolerance.

Exactly-once, fault-tolerant sensors

We can extend this model to handle event-based schedules as well as time-based schedules. A typical schedule will say something like, "I want to run this computation every weeknight at 3 AM." But very often, temporal conditions like these are used as proxies for true underlying conditions in the external world, such as "I want to run this computation every business day, after our partner uploads a daily flat file to our shared S3 bucket."

Other systems, such as Airflow, model this kind of dependency using long-running tasks that spin forever until an external condition is met. With the reconciliation loop in our dedicated daemon process, we can instead treat sensors as a generalization of schedules. Whereas schedules reconcile based on time, sensors reconcile based on arbitrary external state.

For example, here's a toy sensor that ensures a pipeline has run once for each key in some S3 bucket.

from dagster_aws.s3.sensor import get_s3_keys

@sensor(pipeline_name="log_s3_pipeline")
def toy_s3_sensor(context):
    s3_keys = get_s3_keys(bucket)
    for s3_key in s3_keys:
        yield RunRequest(run_key=s3_key)

The reconciliation loop of the scheduler daemon invokes this user-defined function. By default, it's invoked every 15 seconds, but the cadence of sensor ticks is configurable.

Schematic of the sensor reconciliation loop

There are a bunch of advantages to this way of thinking about sensors. First of all, it makes it possible to build tools that let you monitor and operate sensors side-by-side with time-based schedules.

Monitoring the status of a sensor with the Dagit UI

Here you can see Dagit's real-time display of the reconciliation loop timeline, showing each time it checks external state (a tick) and what the result of that tick was (a skip, a requested run, an error, etc.).

This kind of visibility is critical when you're continuously monitoring and probing external systems that will periodically be unavailable or in error states. In the example above, intermittent S3 failures will merely delay the generation of a run request, rather than putting the system into a bad state.

Modeling cross-team dependencies with asset-based sensors

We can also write sensors that trigger pipeline runs based on updates to data assets produced or modified by executions of other Dagster pipelines. Assets are tracked in Dagster's asset catalog, which lets users connect pipeline executions to the assets they create or modify. Sensors that fire based on updates to this catalog allow the asset catalog to be used as a control plane for cross-pipeline dependencies.

While there are many technical advantages to explicitly expressing cross-pipeline dependencies in this way, we think the biggest wins will be organizational. Today, teams whose processes depend on each other face a choice between implicitly encoding cross-team dependencies in temporal schedules ("the other team should have ingested the flat file by 4 AM every morning...") or muddying responsibility by building monolithic pipelines whose components are maintained by different teams.

With asset-based sensors, teams can explicitly encode their pipelines' dependencies on assets created by their partner teams and trigger their pipelines only when those assets are updated. They can also layer their business logic on top of these triggers, perhaps rate-limiting updates because of cost constraints or different SLAs. (There are a lot of reasons a team might want to execute their pipelines conditionally on, but not immediately after, another team's computations are complete.)

Two teams interacting using the asset catalog to trigger their pipelines

This approach to structuring cross-team relationships is in line with emerging thinking around the data mesh, which envisions teams organized along business domains using assets as their interface.

Check out our documentation for more details on defining schedules and sensors.

Cloud-native deployments

We've done a lot of work over this release to harden our out-of-the-box Kubernetes deployment options. In this release, the Dagster + Kubernetes integration has gone from a prototype to a mature, battle-tested implementation, running in production on spot instances at some of the largest users of Dagster. Our work hardening Kubernetes deployments ensures that there's a cloud-native, scalable, well-tested, cost-effective reference option available for Dagster users.

This integration makes it possible to execute each step of a Dagster pipeline in its own Kubernetes Job, providing both isolation and horizontal scalability up to the limits of Kubernetes cluster size.

Our Helm chart is now easier to configure and deploy, and we've made big investments in observability and reliability. You can view Kubernetes interactions in the structured event log and use Dagit to help you understand what's happening in your deployment. New defaults in the Helm chart will give you graceful degradation and failure recovery out of the box.

Dagster remains a pluggable system that can be (and is) deployed on many different infrastructures. As you might expect, the Kubernetes integration builds on top of the standard integration points we expect users to lever to serve their own specific infrastructure needs -- the RunLauncher and Executor abstractions. It's important that Dagster be extensible and adaptable to the needs of users with specific infrastructure requirements or preferences, and that there be a clear pathway to scale on a standard deployment substrate. So while we enable out-of-the-box deployment to Kubernetes, we do not require Kubernetes.

We've also added new monitoring tools to Dagit to make it easier to monitor the health of a deployment. This is especially important as we add more components, like the scheduler daemon, to the system. With this in mind, we've added an "Instance Status" page to our beloved Dagit UI to give you better visibility into various processes in your Dagster instance. Whether you're running Dagster locally, on a VM, or on Kubernetes, the "Instance Status" page is extremely useful for viewing the health of repository locations, daemon processes, schedules, sensors, and linkable configuration.

Monitoring the status of a Dagster instance in the Dagit UI

In this example, a user has pushed code with a syntax error in one of their repositories, the live-api-data-warehouse-demo. Instead of bringing down the system, our monitoring simply notes that it couldn't successfully load that particular repository location. These are some of the fruits of process-level isolation for user code and standardized APIs. For larger deployments (multi-developer and multi-team), isolation guarantees are especially important as insulation against the effects of localized errors.

We're increasingly excited about large-scale deployments of Dagster where the system allows a separation of concerns between a data platform team or persona — responsible for keeping the Dagster orchestration cluster deployed and running — and the analysts and other personas who write Dagster pipelines and consume data assets it produces. We think this kind of self-service is the future of data teams of all sizes. We've found that many of the teams thinking this way today are also moving to Kubernetes, and we're glad to be able to support that transition with a mature deployment path. Check out the documentation for more details.

I/O management

One of the core promises of Dagster is that the developer can write solids that express pure business logic, separated from concerns that are specific to the execution environment. Code that's structured in this way is easier to develop and test locally, swapping heavy production dependencies out for lighter test systems or even fixtures and mocks. This leads to dramatically shorter feedback loops for developers, which in turn leads to order-of-magnitude improvements in productivity. Code written in this way is also easier to reuse in different contexts and more portable when it becomes necessary to change out a third-party system.

While Dagster's resource system has always made it possible to swap out external dependencies, in previous releases, Dagster's persistence subsystem wasn't sufficiently flexible to handle some critical use cases. Specifically, our persistence subsystem assumed homogenous storage across executions, assumed that users would always want to create immutable data on every run, offered little control over the layout of the produced data, and was confusing to customize.

In 0.10.0, we're introducing a new, streamlined primitive, the I/O manager, which can be attached directly to outputs and is swappable when you execute pipelines in different modes. This gives you fine-grained control over persistence, without the need to change any solid business logic to support execution in different environments such as dev, test, and prod. Here, the teal boxes are handled by an I/O manager.

IO managers handle the storage of values passed between solids

This is especially compelling for teams that work with dependency graphs that operate on data frames (such as Pandas or PySpark pipelines). Each node in such a graph can now be expressed in terms of pure business logic, leaving I/O as a separate concern.

@solid
def make_people(context):
    schema = StructType(
        [StructField("name", StringType()), StructField("age", IntegerType())]
    )
    rows = [Row(name="Thom", age=51), Row(name="Jonny", age=48)]
    return context.resources.spark.createDataFrame(rows, schema)

@solid
def filter_over_50(_context, people_df):
    return people_df.filter(people["age"] > 50)

@pipeline(
    mode_defs=[
        ModeDefinition(resource_defs={"io_manager": local_parquet_store, ...})
    ]
)
def filter_people_pipeline():
    filter_over_50(make_people())

This is a contrived example with only two nodes of compute. But you'll notice that the object passed between the two solids is a data frame, and the compute within the solid bodies operates directly on that data frame, with no I/O handling code anywhere. Persistence (in this case, to a local parquet file) is handled by the I/O manager resource defined on the pipeline mode.

class LocalParquetStore(IOManager):
    def _get_path(self, context):
        return os.path.join(context.run_id, context.step_key, context.name)

    def handle_output(self, context, obj):
        obj.write.parquet(self._get_path(context))

    def load_input(self, context):
        return context.resources.spark.read.parquet(
            self._get_path(context.upstream_output)
        )

@io_manager(required_resource_keys={"spark"})
def local_parquet_store(_context):
    return LocalParquetStore()

In this example, we're persisting the data frames output by both solids as parquet files on local disk. Providing alternative I/O managers allows for fine-grained control. For example, we could provide a pure in-memory I/O manager for tests and an I/O manager that writes parquet to S3 in production. We could even decide, on a per-output basis, to persist some data in S3 and some data in Snowflake to mitigate cost, with no changes to the core business logic.

This is a big leap forwards for the Dagster programming model and should make it easier than ever to write reusable solids and pipelines that seamlessly interoperate with the wide range of persistence models desirable in dev, test, and prod. To learn more, check out the new I/O manager docs.

Conclusion

Our 0.10.0 release includes both architectural changes as well as the maturation of our core technology, especially around deployment and ops.

We're excited about changes to the core programming model, like I/O managers, that should make it easier than ever to write testable solids and pipelines, even in the presence of complex persistence requirements.

We think our new scheduling architecture is a fundamentally better way to schedule periodic computations, and we're very excited to see what new applications our users come up with for reliable event-based scheduling with sensors. We believe that these sensors will provide a natural cross-team interface point using our asset catalog as a control plane. We also project that continuously evaluated, reliable sensors, combined with a modern execution architecture, are a good basis for building low-latency batching operations for near-real-time analytics and machine learning.

And we're excited to continue supporting our largest users with a Kubernetes deployment that provides observability, robust user code isolation guarantees, and a great horizontal scalability story for large-scale use cases (including near-real-time).

Please check out our changelog for a fuller discussion of all the changes that have landed in this release, visit our Github and join our Slack to learn more!