Community Memo: Pythonic Config and Resources | Dagster Blog

April 3, 20235 minute read

Community Memo: Pythonic Config and Resources

Major ergonomic improvements are coming to Dagster's config and resources systems, including a Pydantic frontend.
Nick Schrock
Name
Nick Schrock
Handle
@schrockn
Ben Pankow
Name
Ben Pankow
Handle

Resources and config have been fundamental parts of Dagster's API since the beginning. The config system enables assets and ops to vary their execution at runtime without redeploying code, while the resource system is used to model the external dependencies that users' assets and ops interact with. Whether building out a platform for the first time or working on an existing Dagster project, practitioners will find themselves interacting with these APIs frequently.

This initial version of Dagster was released with support for Python 2, in an era before widespread adoption of Python 3. The core set of APIs including resources and config was built with compatibility in mind, avoiding features such as native Python 3 type annotations and dataclasses which did not yet have wide adoption.

With our 1.3 release, we are mainlining a set of APIs, introduced experimentally in 1.2, to take advantage of these language features. These changes bring much-needed ergonomic improvements to the config and resource systems, setting the stage for future work. Existing resources and config APIs will continue to be supported for the foreseeable future, however we plan to move our docs, examples, and integrations to the newly introduced APIs alongside the 1.3 release scheduled for April 19th.

The new Pythonic config and resources system is aimed at making Dagster easier to learn and to feel more natural for modern Python users. In particular, these changes will:

  • Remove the need to interact with the Dagster config type system
  • Provide strong type hinting and type checking out-of-the-box
  • Eliminate or simplify many of the existing config and resource APIs
  • Make it easier to access resources in ops, assets, schedules, and sensors
  • Greatly improve observability for how resources are configured and used in a project

This blog post is for existing Dagster users, and will outline the changes coming in 1.3 and some of the motivations behind them.

Ergonomic improvements to config

Since its initial release, developers have used the Dagster config type system to specify the config schemas for their pipelines, assets, and resources. For new users, learning the Dagster config type system can be a substantial hurdle.

To remedy this, op, asset, and resource config can now be defined using Python data classes, with Pydantic under the hood used to model and validate this data.

Before:

@op(config_schema={"greeting": str, "name": str})
def greet(context):
    greeting = context.op_config['greeting']
    name = context.op_config['greeting']
    print(f"{greeting}, {name}!")

greet(build_op_context(config={"greeting": "Hello", "name": "Dagster"}))

After:

class GreetingConfig(Config):
    greeting: str
    name: str

@op
def greet(config: GreetingConfig):
    print(f"{config.greeting}, {config.name}!")

greet(config=GreetingConfig(greeting="Hello", name="Dagster"))

This new approach eliminates the need for new users to interact with the Dagster config type system and provides strong type hinting and type checking out-of-the-box.

The resource system is also being given a facelift — resources are defined as classes, with config defined as Pydantic attributes.

Before:

class MyDBResource:
    def __init__(self, connection_uri: str):
        self._connection_uri = connection_uri

    def query(self, query: str):
        return create_engine(self._connection_uri).query(query)

@resource(config_schema={"connection_uri": str})
def my_db(context: InitResourceContext):
    return MyDBResource(context.resource_config["connection_uri"])

After:

class MyDBResource(ConfigurableResource):
    connection_uri: str

    def query(self, query: str):
        return create_engine(self.connection_uri).query(query)

Instead of the clunky StringSource API, values can be specified from the environment using a simpler EnvVar notation:

defs = Definitions(
    assets=...,
    resources={
        "my_db": MyDBResource(connection_uri=EnvVar("DB_URI"))
    }
)

While the existing config and resource format will continue to be supported for the foreseeable future, we believe these new changes are a major step forward in usability. These examples provide just a brief glimpse into these changes, for more details, see our GitHub community discussion.

Top-level resources

In their initial form, resources were bound at the job level. With the introduction of software-defined assets, resources were bound to individual assets as needed. Over time, the tension between the mental model of resources as high-level attachment points to external services and the low-level, component-by-component binding APIs became apparent. Many users found themselves binding a single database or API client repeatedly to each job or asset.

With Dagster 1.1’s introduction of the Definitions API, resources can now be bound at the top level of a project, automatically attached to jobs and assets that request them. Users no longer need to thread resources throughout their project, and swapping out a resource for a staging instance or testing is easier.

We are also making it easier to access resources in the body of an op or asset, with the ability to request resources via parameters instead of through the context object:

Before:

@op(required_resource_keys={"foo_resource"})
def hello_world(context):
    foo_resource = context.resources.foo_resource
    ...

@job(resource_defs={"foo_resource": FooResource()})
def hello_world_job():
    hello_world()

@asset(required_resource_keys={"foo_resource"})
def my_asset(context):
    foo_resource = context.resources.foo_resource
    ...

@repository
def my_repo():
    return [
        hello_world_job,
        *with_resources(
            [my_asset], 
            resource_defs={"foo_resource": FooResource()}
        )
    ]

After:

@op
def hello_world(foo_resource: FooResource):
    ...

@job
def hello_world_job():
    hello_world()

@asset
def my_asset(foo_resource: FooResource):
    ...

defs = Definitions(
    assets=[my_asset],
    jobs=[hello_world_job],
    resources={
        "foo_resource": FooResource()
    }
)

Note that resources will be automatically bound to jobs in Dagster 1.3. For users who are using Dagster 1.2, users can opt-in to this behavior using the BindResourcesToJobs wrapper:

defs = Definitions(
    assets=[my_asset],
    jobs=BindResourcesToJobs([hello_world_job]),
    resources={
        "foo_resource": FooResource()
    }
)

Resources in the UI

Resources’ new status as first-class components of the Dagster system is represented by their new role in the UI. Developers can view resources and their configuration as well as where they are used directly from the interface.

An example Airbyte resource in the resources ui.

With the introduction of resources as top-level objects in the interface, Dagster becomes a window into your data platform. For the typical data platform, which knits a variety of tools, having a central directory of not only your data assets but the services they rely on is invaluable. Developers can track down which assets rely on which external services and drill down into the configuration of those services to see which accounts, databases, or config options are being used.

Resources in schedules and sensors

Until now, the use of resources has been relegated to full-fledged job runs or asset materializations. Sensors and schedules, which often rely on the same external APIs as jobs, could not easily take advantage of the pluggability of resources. Resources can now be utilized in a schedule or sensor function just as they would in a job or asset, making it much easier for them to call out to external services and to be put under test.

@sensor(job=parse_file_job)
def my_sensor(s3: S3Resource):
   for filename in s3.get_files():
       yield RunRequest(run_key=filename)

class MockS3():
    def get_files():
        return ["foo", "bar", "baz]

runs = [request for request in my_sensor(s3=MockS3())]
assert len(runs) == 3

Learning more

We’re excited to share these changes with the community, and we hope you’ll give them a try. If you’re interested in learning more about the new resource system before its full release with 1.3, check out the experimental config and resources docs and leave feedback or ask questions in our Slack.


The Dagster Labs logo

We're always happy to hear your feedback, so please reach out to us! If you have any questions, ask them in the Dagster community Slack (join here!) or start a Github discussion. If you run into any bugs, let us know with a Github issue. And if you're interested in working with us, check out our open roles!

Follow us:


Read more filed under
Blog post category for Community. Community