- Name
- Daniel Gafni
- Handle
- @danielgafni
Data engineers often encounter scenarios where they need to work with multiple similar data assets.
These assets might differ in small details, like:
- how is the input table being filtered
- which transformation is being applied
- which entities are included in the data
- which machine learning model is being applied
But share similarities such as:
- using standard naming conventions
- producing text and metadata logs
- having the same columns
At first, it’s easy to make a similar asset by copying and pasting the existing logic while only modifying the unique parts. But what if there are dozens of such assets? How do we keep our code maintainable and DRY (Don't Repeat Yourself)?
Asset Factories to the rescue!
Asset Factories
The Asset Factory is a design pattern used to generate multiple similar assets definitions. The idea is to define a function which produces assets based on some (varying) input parameters. This factory encapsulates the common logic for creating the assets, while allowing the specific details to be customized for each asset. Asset factories offer a systematic approach to generate these assets dynamically, reducing redundancy and enhancing code maintainability.
let's say, we have a machine learning model which predicts client churn probability for the next month. We would like to maintain tables for groups of users which belong to different behavioral groups, such as reliable users or users with high churning risk.
Here is a simple example of an Asset Factory which achieves this goal:
from dagster import asset, AssetsDefinitions
import polars as pl
def build_churning_users_asset(
min_churn_proba: float, max_churn_proba: float
) -> AssetsDefinitions:
@asset(name=f"users_churn_between_{min_churn_proba}_{max_churn_proba}")
def _asset(users: pl.DataFrame) -> pl.DataFrame:
return users.filter(
pl.col("predicted_churn_proba") > min_churn_proba,
pl.col("predicted_churn_proba") <= max_churn_proba,
)
return _asset
### create multiple assets
churning_users_assets = [
build_churning_users_asset(l, h) for l, h in [(0.0, 0.3), (0.3, 0.5), (0.5, 1.0)]
]
We now have 3 different assets which only differ in filtering thresholds.
The codebase can quickly turn in chains of such generated assets. Multiple factories will pass assets from one to another. Thus, it becomes convenient to take the upstream AssetsDefinitions
as a factory argument, because it contains a lot of useful information, such as asset key, group, partitioning, etc.
For example, imagine another users
-like asset, which may be partitioned and come from a different data source:
from dagster import StatisPartitionsDefinition
@asset(
key=["users", "special"],
partitions_def=StatisPartitionsDefinition(["A", "B"]),
group_name="special",
)
def special_users() -> pl.DataFrame:
...
A generalized churn-filtering factory may look something like this:
from dagster import AssetIn, SourceAsset
def build_churning_users_asset(
upstream_asset: SourceAsset, min_churn_proba: float, max_churn_proba: float
) -> AssetsDefinitions:
@asset(
# prepend the original key with a new prefix
# to clearly separate our generated assets
key=[f"users_churn_between_{min_churn_proba}_{max_churn_proba}"]
+ list(upstream_asset.key),
group_name=upstream_asset.group_name, # we might want to keep the generated asset in the same group
partitions_def=upstream_asset.partitions_def, # and we definitely want the same partitioning
ins={"upstream": AssetIn(upstream_asset.key)},
)
def _asset(upstream: pl.DataFrame) -> pl.DataFrame:
return upstream.filter(
pl.col("predicted_churn_proba") > min_churn_proba,
pl.col("predicted_churn_proba") <= max_churn_proba,
)
return _asset
churn_thresholds = [(0.0, 0.3), (0.3, 0.5), (0.5, 1.0)]
assets_for_churn_filtering = [users, special_users]
churning_users_assets = [
build_churning_users_asset(a, l, h)
for a in assets_for_churn_filtering
for l, h in churn_thresholds
]
Hurray! We were able to build a quite general asset factory. It can be applied to any upstream asset and will carry on such properties as partitioning and group name.
As always with factories, it works well until it doesn't.
One thing this factory cannot generalize over is input data. Because the upstream assets are hardcoded into the asset body signature, we cannot change them when calling the factory.
For example, imagine we are calculating data science features over some data. Some may be statistical and self-contained, but some may require additional maching learning models to be applied. Features may also depend on each other. This makes feature assets form a complex DAG, which can't be expressed by the above pattern, because we would need to dynamically define dependencies between features at factory call time.
Here is how we solve it: we write more factories. Just joking.
Extending the @asset decorator
Luckily for us, a better solution exists, and it's called functools.wraps
.
By using it, we can not only dynamically pass the upstream features dependencies to Dagster, but also use the same neat API by specifying them as function arguments.
In Python, decorators provide a powerful mechanism for modifying or extending the behavior of functions or methods. By leveraging functools.wraps
, decorators can preserve the metadata and signature of the original function. Let's take a look at how it works:
import functools
def my_decorator(func):
@functools.wraps(func)
def wrapper(*args, **kwargs):
return func(*args, **kwargs)
print(wrapper.__name__)
print(wrapper.__doc__)
return wrapper
@my_decorator
def say_hello(someone: str):
"""My docstring"""
print(f"Hello, {someone}!")
>>> say_hello("World")
say_hello
My docstring
Hello, World!
The inner wrapper
function now has the same .__doc__
, .__name__
, and in fact, the same signature, as our say_hello
function! You can find more documentation here.
Notice how our decorator is also a factory. Actually, the @
syntax
@decorator
def fn(): ...
is just syntactic sugar for
def _fn(): ...
fn = decorator(_fn) # factory call spotted!
Now, let's use it to build a customized @asset
decorator (factory).
import functools
from dagster import AssetExecutionContext, AssetIn, Config, SourceAsset
def my_asset_decorator(
# we can pass any Dagster's `@asset` arguments from here
# this function can also take more complex objects as arguments, such as SourceAsset
name: str | None = None,
key: list[str] | None = None,
group_name: str | None = None,
io_manager_key: str | None = None,
ins: dict[str, AssetIn] | None = None,
):
def inner(
compute_fn,
) -> AssetsDefinitions:
@asset(
name=name,
key=key,
io_manager_key=io_manager_key,
group_name=group_name,
ins=ins,
)
@functools.wraps(compute_fn)
def _asset(
context: AssetExecutionContext,
config: Config,
*args,
**kwargs,
):
# you can really do anything you want with *args and **kwargs here
context.log.debug("Before calling user code...")
result = compute_fn(context, config, *args, **kwargs)
context.log.debug("After calling user code...")
# maybe apply a standardized transformation here?
# or log some metadata? Your choice!
return ...
return _asset
return inner
class MyConfig(Config):
foo: str = "bar"
@my_asset_decorator(group_name="my_group", io_manager_key="my_io_manager")
def my_asset(context: AssetExecutionContext, config: MyConfig, upstream_1, upstream_2):
return ...
This pattern is very general and powerful. You get the idea, it's possible to inject any code into the resulting asset both from the factory and from the compute_fn
decorated by my_asset_decorator
. It can also take arbitrary Dagster Resources and upstream Assets as input. It's possible to make the custom asset decorator as complex and fine-grained as needed, potentially injecting additional asset dependencies or resources or to do something crazy which I can't think of right now.
Conclusion
The pattern of extending the dagster.asset
decorator by using functools.wraps
is a powerful tool for creating reusable and flexible data pipelines. It allows for dynamic definition of dependencies between assets, which is crucial for complex data processing tasks, and empowering data engineers to craft flexible, maintainable data pipelines tailored to their specific needs. By embracing these techniques, teams can navigate complex data processing challenges with confidence, unlocking the full potential of Dagster's asset-based programming model.
We're always happy to hear your feedback, so please reach out to us! If you have any questions, ask them in the Dagster community Slack (join here!) or start a Github discussion. If you run into any bugs, let us know with a Github issue. And if you're interested in working with us, check out our open roles!
Follow us:
Interactive Debugging With Dagster and Docker
- Name
- Gianfranco Demarco
- Handle
- @gianfranco
AI's Long-Term Impact on Data Engineering Roles
- Name
- Fraser Marlow
- Handle
- @frasermarlow
10 Reasons Why No-Code Solutions Almost Always Fail
- Name
- TéJaun RiChard
- Handle
- @tejaun