Unlocking Flexible Pipelines: Customizing the Asset Decorator | Dagster Blog

April 30, 20245 minute read

Unlocking Flexible Pipelines: Customizing the Asset Decorator

Daniel Gafni
Name
Daniel Gafni
Handle
@danielgafni

Data engineers often encounter scenarios where they need to work with multiple similar data assets.

These assets might differ in small details, like:

  • how is the input table being filtered
  • which transformation is being applied
  • which entities are included in the data
  • which machine learning model is being applied

But share similarities such as:

  • using standard naming conventions
  • producing text and metadata logs
  • having the same columns

At first, it’s easy to make a similar asset by copying and pasting the existing logic while only modifying the unique parts. But what if there are dozens of such assets? How do we keep our code maintainable and DRY (Don't Repeat Yourself)?

Asset Factories to the rescue!

Asset Factories

The Asset Factory is a design pattern used to generate multiple similar assets definitions. The idea is to define a function which produces assets based on some (varying) input parameters. This factory encapsulates the common logic for creating the assets, while allowing the specific details to be customized for each asset. Asset factories offer a systematic approach to generate these assets dynamically, reducing redundancy and enhancing code maintainability.

let's say, we have a machine learning model which predicts client churn probability for the next month. We would like to maintain tables for groups of users which belong to different behavioral groups, such as reliable users or users with high churning risk.

Here is a simple example of an Asset Factory which achieves this goal:

from dagster import asset, AssetsDefinitions
import polars as pl


def build_churning_users_asset(
    min_churn_proba: float, max_churn_proba: float
) -> AssetsDefinitions:
    @asset(name=f"users_churn_between_{min_churn_proba}_{max_churn_proba}")
    def _asset(users: pl.DataFrame) -> pl.DataFrame:
        return users.filter(
            pl.col("predicted_churn_proba") > min_churn_proba,
            pl.col("predicted_churn_proba") <= max_churn_proba,
        )

    return _asset


# create multiple assets
churning_users_assets = [
    build_churning_users_asset(l, h) for l, h in [(0.0, 0.3), (0.3, 0.5), (0.5, 1.0)]
]

We now have 3 different assets which only differ in filtering thresholds.

The codebase can quickly turn in chains of such generated assets. Multiple factories will pass assets from one to another. Thus, it becomes convenient to take the upstream AssetsDefinitions as a factory argument, because it contains a lot of useful information, such as asset key, group, partitioning, etc.

For example, imagine another users-like asset, which may be partitioned and come from a different data source:

from dagster import StatisPartitionsDefinition


@asset(
    key=["users", "special"],
    partitions_def=StatisPartitionsDefinition(["A", "B"]),
    group_name="special",
)
def special_users() -> pl.DataFrame:
    ...

A generalized churn-filtering factory may look something like this:

from dagster import AssetIn, SourceAsset


def build_churning_users_asset(
    upstream_asset: SourceAsset, min_churn_proba: float, max_churn_proba: float
) -> AssetsDefinitions:
    @asset(
        # prepend the original key with a new prefix
        # to clearly separate our generated assets
        key=[f"users_churn_between_{min_churn_proba}_{max_churn_proba}"]
        + list(upstream_asset.key),
        group_name=upstream_asset.group_name,  # we might want to keep the generated asset in the same group
        partitions_def=upstream_asset.partitions_def,  # and we definitely want the same partitioning
        ins={"upstream": AssetIn(upstream_asset.key)},
    )
    def _asset(upstream: pl.DataFrame) -> pl.DataFrame:
        return upstream.filter(
            pl.col("predicted_churn_proba") > min_churn_proba,
            pl.col("predicted_churn_proba") <= max_churn_proba,
        )

    return _asset


churn_thresholds = [(0.0, 0.3), (0.3, 0.5), (0.5, 1.0)]
assets_for_churn_filtering = [users, special_users]

churning_users_assets = [
    build_churning_users_asset(a, l, h)
    for a in assets_for_churn_filtering
    for l, h in churn_thresholds
]

Hurray! We were able to build a quite general asset factory. It can be applied to any upstream asset and will carry on such properties as partitioning and group name.

As always with factories, it works well until it doesn't.

One thing this factory cannot generalize over is input data. Because the upstream assets are hardcoded into the asset body signature, we cannot change them when calling the factory.

For example, imagine we are calculating data science features over some data. Some may be statistical and self-contained, but some may require additional maching learning models to be applied. Features may also depend on each other. This makes feature assets form a complex DAG, which can't be expressed by the above pattern, because we would need to dynamically define dependencies between features at factory call time.

Here is how we solve it: we write more factories. Just joking.

Extending the @asset decorator

Luckily for us, a better solution exists, and it's called functools.wraps.

By using it, we can not only dynamically pass the upstream features dependencies to Dagster, but also use the same neat API by specifying them as function arguments.

In Python, decorators provide a powerful mechanism for modifying or extending the behavior of functions or methods. By leveraging functools.wraps, decorators can preserve the metadata and signature of the original function. Let's take a look at how it works:

import functools


def my_decorator(func):
    @functools.wraps(func)
    def wrapper(*args, **kwargs):
        return func(*args, **kwargs)

    print(wrapper.__name__)
    print(wrapper.__doc__)

    return wrapper


@my_decorator
def say_hello(someone: str):
    """My docstring"""
    print(f"Hello, {someone}!")
>>> say_hello("World")
say_hello
My docstring
Hello, World!

The inner wrapper function now has the same .__doc__, .__name__, and in fact, the same signature, as our say_hello function! You can find more documentation here.

Notice how our decorator is also a factory. Actually, the @ syntax

@decorator
def fn(): ...

is just syntactic sugar for

def _fn(): ...

fn = decorator(_fn)  # factory call spotted! 

Now, let's use it to build a customized @asset decorator (factory).

import functools
from dagster import AssetExecutionContext, AssetIn, Config, SourceAsset


def my_asset_decorator(
    # we can pass any Dagster's `@asset` arguments from here
    # this function can also take more complex objects as arguments, such as SourceAsset
    name: str | None = None,
    key: list[str] | None = None,
    group_name: str | None = None,
    io_manager_key: str | None = None,
    ins: dict[str, AssetIn] | None = None,
):
    def inner(
        compute_fn,
    ) -> AssetsDefinitions:
        @asset(
            name=name,
            key=key,
            io_manager_key=io_manager_key,
            group_name=group_name,
            ins=ins,
        )
        @functools.wraps(compute_fn)
        def _asset(
            context: AssetExecutionContext,
            config: Config,
            *args,
            **kwargs,
        ):
            # you can really do anything you want with *args and **kwargs here
            context.log.debug("Before calling user code...")
            result = compute_fn(context, config, *args, **kwargs)
            context.log.debug("After calling user code...")
            # maybe apply a standardized transformation here?
            # or log some metadata? Your choice!
            return ...

        return _asset

    return inner


class MyConfig(Config):
    foo: str = "bar"


@my_asset_decorator(group_name="my_group", io_manager_key="my_io_manager")
def my_asset(context: AssetExecutionContext, config: MyConfig, upstream_1, upstream_2):
    return ...

This pattern is very general and powerful. You get the idea, it's possible to inject any code into the resulting asset both from the factory and from the compute_fn decorated by my_asset_decorator. It can also take arbitrary Dagster Resources and upstream Assets as input. It's possible to make the custom asset decorator as complex and fine-grained as needed, potentially injecting additional asset dependencies or resources or to do something crazy which I can't think of right now.

Conclusion

The pattern of extending the dagster.asset decorator by using functools.wraps is a powerful tool for creating reusable and flexible data pipelines. It allows for dynamic definition of dependencies between assets, which is crucial for complex data processing tasks, and empowering data engineers to craft flexible, maintainable data pipelines tailored to their specific needs. By embracing these techniques, teams can navigate complex data processing challenges with confidence, unlocking the full potential of Dagster's asset-based programming model.


The Dagster Labs logo

We're always happy to hear your feedback, so please reach out to us! If you have any questions, ask them in the Dagster community Slack (join here!) or start a Github discussion. If you run into any bugs, let us know with a Github issue. And if you're interested in working with us, check out our open roles!

Follow us:


Read more filed under
Blog post category for Blog Post. Blog Post