Blog
Unlocking Flexible Pipelines: Customizing the Asset Decorator

Unlocking Flexible Pipelines: Customizing the Asset Decorator

April 30, 2024
Unlocking Flexible Pipelines: Customizing the Asset Decorator
Unlocking Flexible Pipelines: Customizing the Asset Decorator

Use Asset Factories within Dagster to streamline data asset creation, promote code reusability, and maintain data engineering workflows.

Data engineers often encounter scenarios where they need to work with multiple similar data assets.

These assets might differ in small details, like:  
- how is the input table being filtered
- which transformation is being applied
- which entities are included in the data
- which machine learning model is being applied

But share similarities such as:
- using standard naming conventions
- producing text and metadata logs
- having the same columns

At first, it’s easy to make a similar asset by copying and pasting the existing logic while only modifying the unique parts. But what if there are dozens of such assets? How do we keep our code maintainable and DRY (Don't Repeat Yourself)?

Asset Factories to the rescue!

Asset Factories

The Asset Factory is a design pattern used to generate multiple similar assets definitions. The idea is to define a function which produces assets based on some (varying) input parameters. This factory encapsulates the common logic for creating the assets, while allowing the specific details to be customized for each asset. Asset factories offer a systematic approach to generate these assets dynamically, reducing redundancy and enhancing code maintainability.

let's say, we have a machine learning model which predicts client churn probability for the next month. We would like to maintain tables for groups of users which belong to different behavioral groups, such as reliable users or users with high churning risk.  

Here is a simple example of an Asset Factory which achieves this goal:

from dagster import asset, AssetsDefinitions
import polars as pl


def build_churning_users_asset(
    min_churn_proba: float, max_churn_proba: float
) -> AssetsDefinitions:
    @asset(name=f"users_churn_between_{min_churn_proba}_{max_churn_proba}")
    def _asset(users: pl.DataFrame) -> pl.DataFrame:
        return users.filter(
            pl.col("predicted_churn_proba") > min_churn_proba,
            pl.col("predicted_churn_proba") <= max_churn_proba,
        )

    return _asset


### create multiple assets
churning_users_assets = [
    build_churning_users_asset(l, h) for l, h in [(0.0, 0.3), (0.3, 0.5), (0.5, 1.0)]
]

We now have 3 different assets which only differ in filtering thresholds.

The codebase can quickly turn in chains of such generated assets. Multiple factories will pass assets from one to another. Thus, it becomes convenient to take the upstream AssetsDefinitions as a factory argument, because it contains a lot of useful information, such as asset key, group, partitioning, etc.

For example, imagine another users-like asset, which may be partitioned and come from a different data source:

from dagster import StatisPartitionsDefinition


@asset(
    key=["users", "special"],
    partitions_def=StatisPartitionsDefinition(["A", "B"]),
    group_name="special",
)
def special_users() -> pl.DataFrame:
    ...

A generalized churn-filtering factory may look something like this:

from dagster import AssetIn, SourceAsset


def build_churning_users_asset(
    upstream_asset: SourceAsset, min_churn_proba: float, max_churn_proba: float
) -> AssetsDefinitions:
    @asset(
        # prepend the original key with a new prefix
        # to clearly separate our generated assets
        key=[f"users_churn_between_{min_churn_proba}_{max_churn_proba}"]
        + list(upstream_asset.key),
        group_name=upstream_asset.group_name,  # we might want to keep the generated asset in the same group
        partitions_def=upstream_asset.partitions_def,  # and we definitely want the same partitioning
        ins={"upstream": AssetIn(upstream_asset.key)},
    )
    def _asset(upstream: pl.DataFrame) -> pl.DataFrame:
        return upstream.filter(
            pl.col("predicted_churn_proba") > min_churn_proba,
            pl.col("predicted_churn_proba") <= max_churn_proba,
        )

    return _asset


churn_thresholds = [(0.0, 0.3), (0.3, 0.5), (0.5, 1.0)]
assets_for_churn_filtering = [users, special_users]

churning_users_assets = [
    build_churning_users_asset(a, l, h)
    for a in assets_for_churn_filtering
    for l, h in churn_thresholds
]

Hurray! We were able to build a quite general asset factory. It can be applied to any upstream asset and will carry on such properties as partitioning and group name.

As always with factories, it works well until it doesn't.

One thing this factory cannot generalize over is input data. Because the upstream assets are hardcoded into the asset body signature, we cannot change them when calling the factory.

For example, imagine we are calculating data science features over some data. Some may be statistical and self-contained, but some may require additional maching learning models to be applied. Features may also depend on each other. This makes feature assets form a complex DAG, which can't be expressed by the above pattern, because we would need to dynamically define dependencies between features at factory call time.

Here is how we solve it: we write more factories. Just joking.

Extending the @asset decorator

Luckily for us, a better solution exists, and it's called functools.wraps.

By using it, we can not only dynamically pass the upstream features dependencies to Dagster, but also use the same neat API by specifying them as function arguments.

In Python, decorators provide a powerful mechanism for modifying or extending the behavior of functions or methods. By leveraging functools.wraps, decorators can preserve the metadata and signature of the original function. Let's take a look at how it works:

import functools


def my_decorator(func):
    @functools.wraps(func)
    def wrapper(*args, **kwargs):
        return func(*args, **kwargs)

    print(wrapper.__name__)
    print(wrapper.__doc__)

    return wrapper


@my_decorator
def say_hello(someone: str):
    """My docstring"""
    print(f"Hello, {someone}!")
>>> say_hello("World")
say_hello
My docstring
Hello, World!

The inner wrapper function now has the same .__doc__, .__name__, and in fact, the same signature, as our say_hello function! You can find more documentation here.

Notice how our decorator is also a factory. Actually, the @ syntax

@decorate
def fn(): ...

Is just syntactic sugar for

def _fn(): ...

fn = decorator(_fn)  # factory call spotted! 

Now, let's use it to build a customized @asset decorator (factory).

import functools
from dagster import AssetExecutionContext, AssetIn, Config, SourceAsset


def my_asset_decorator(
    # we can pass any Dagster's `@asset` arguments from here
    # this function can also take more complex objects as arguments, such as SourceAsset
    name: str | None = None,
    key: list[str] | None = None,
    group_name: str | None = None,
    io_manager_key: str | None = None,
    ins: dict[str, AssetIn] | None = None,
):
    def inner(
        compute_fn,
    ) -> AssetsDefinitions:
        @asset(
            name=name,
            key=key,
            io_manager_key=io_manager_key,
            group_name=group_name,
            ins=ins,
        )
        @functools.wraps(compute_fn)
        def _asset(
            context: AssetExecutionContext,
            config: Config,
            *args,
            **kwargs,
        ):
            # you can really do anything you want with *args and **kwargs here
            context.log.debug("Before calling user code...")
            result = compute_fn(context, config, *args, **kwargs)
            context.log.debug("After calling user code...")
            # maybe apply a standardized transformation here?
            # or log some metadata? Your choice!
            return ...

        return _asset

    return inner


class MyConfig(Config):
    foo: str = "bar"


@my_asset_decorator(group_name="my_group", io_manager_key="my_io_manager")
def my_asset(context: AssetExecutionContext, config: MyConfig, upstream_1, upstream_2):
    return ...

This pattern is very general and powerful. You get the idea, it's possible to inject any code into the resulting asset both from the factory and from the compute_fn decorated by my_asset_decorator. It can also take arbitrary Dagster Resources and upstream Assets as input. It's possible to make the custom asset decorator as complex and fine-grained as needed, potentially injecting additional asset dependencies or resources or to do something crazy which I can't think of right now.

Conclusion

The pattern of extending the dagster.asset decorator by using functools.wraps is a powerful tool for creating reusable and flexible data pipelines. It allows for dynamic definition of dependencies between assets, which is crucial for complex data processing tasks, and empowering data engineers to craft flexible, maintainable data pipelines tailored to their specific needs. By embracing these techniques, teams can navigate complex data processing challenges with confidence, unlocking the full potential of Dagster's asset-based programming model.

Have feedback or questions? Start a discussion in Slack or Github.

Interested in working with us? View our open roles.

Want more content like this? Follow us on LinkedIn.

Dagster Newsletter

Get updates delivered to your inbox

Latest writings

The latest news, technologies, and resources from our team.

Multi-Tenancy for Modern Data Platforms
Webinar

April 7, 2026

Multi-Tenancy for Modern Data Platforms

Learn the patterns, trade-offs, and production-tested strategies for building multi-tenant data platforms with Dagster.

Deep Dive: Building a Cross-Workspace Control Plane for Databricks
Webinar

March 24, 2026

Deep Dive: Building a Cross-Workspace Control Plane for Databricks

Learn how to build a cross-workspace control plane for Databricks using Dagster — connecting multiple workspaces, dbt, and Fivetran into a single observable asset graph with zero code changes to get started.

Dagster Running Dagster: How We Use Compass for AI Analytics
Webinar

February 17, 2026

Dagster Running Dagster: How We Use Compass for AI Analytics

In this Deep Dive, we're joined by Dagster Analytics Lead Anil Maharjan, who demonstrates how our internal team utilizes Compass to drive AI-driven analysis throughout the company.

DataOps with Dagster: A Practical Guide to Building a Reliable Data Platform
DataOps with Dagster: A Practical Guide to Building a Reliable Data Platform
Blog

March 17, 2026

DataOps with Dagster: A Practical Guide to Building a Reliable Data Platform

DataOps is about building a system that provides visibility into what's happening and control over how it behaves

Unlocking the Full Value of Your Databricks
Unlocking the Full Value of Your Databricks
Blog

March 12, 2026

Unlocking the Full Value of Your Databricks

Standardizing on Databricks is a smart strategic move, but consolidation alone does not create a working operating model across teams, tools, and downstream systems. By pairing Databricks and Unity Catalog with Dagster, enterprises can add the coordination layer needed for dependency visibility, end-to-end lineage, and faster, more confident delivery at scale.

Announcing AI Driven Data Engineering
Announcing AI Driven Data Engineering
Blog

March 5, 2026

Announcing AI Driven Data Engineering

AI coding agents are changing how data engineers work. This Dagster University course shows how to build a production-ready ELT pipeline from prompts while learning practical patterns for reliable AI-assisted development.

How Magenta Telekom Built the Unsinkable Data Platform
Case study

February 25, 2026

How Magenta Telekom Built the Unsinkable Data Platform

Magenta Telekom rebuilt its data infrastructure from the ground up with Dagster, cutting developer onboarding from months to a single day and eliminating the shadow IT and manual workflows that had long slowed the business down.

Scaling FinTech: How smava achieved zero downtime with Dagster
Case study

November 25, 2025

Scaling FinTech: How smava achieved zero downtime with Dagster

smava achieved zero downtime and automated the generation of over 1,000 dbt models by migrating to Dagster's, eliminating maintenance overhead and reducing developer onboarding from weeks to 15 minutes.

Zero Incidents, Maximum Velocity: How HIVED achieved 99.9% pipeline reliability with Dagster
Case study

November 18, 2025

Zero Incidents, Maximum Velocity: How HIVED achieved 99.9% pipeline reliability with Dagster

UK logistics company HIVED achieved 99.9% pipeline reliability with zero data incidents over three years by replacing cron-based workflows with Dagster's unified orchestration platform.

Modernize Your Data Platform for the Age of AI
Guide

January 15, 2026

Modernize Your Data Platform for the Age of AI

While 75% of enterprises experiment with AI, traditional data platforms are becoming the biggest bottleneck. Learn how to build a unified control plane that enables AI-driven development, reduces pipeline failures, and cuts complexity.

Download the eBook on how to scale data teams
Guide

November 5, 2025

Download the eBook on how to scale data teams

From a solo data practitioner to an enterprise-wide platform, learn how to build systems that scale with clarity, reliability, and confidence.

Download the e-book primer on how to build data platforms
Guide

February 21, 2025

Download the e-book primer on how to build data platforms

Learn the fundamental concepts to build a data platform in your organization; covering common design patterns for data ingestion and transformation, data modeling strategies, and data quality tips.