”Config system.” — A phrase that'll cause a pit in the stomach of a programmer…
Being asked to “configure” in the first place is a tall ask; I'd much rather not have to spend time figuring out the correct magic environment variables to set. But to also understand an entire “system”? Oof, no, thank you. I'd have better luck trying to understand what's going on in the head of the squirrel outside my office window, let alone some OSS programmer from years ago.
And yet, orchestration without interacting with external systems is not very useful, and interacting with external systems requires configuration. And so, it's essential to know how to coherently organize and structure how your orchestration framework works with external systems. If the orchestration framework you are building on top of has yet to think hard about the abstractions it uses to manage this, you will be missing out.
From Airflow to Dagster
When data teams are considering moving from Airflow to Dagster, they've likely invested time writing custom integrations for Airflow, and they've spent the time learning the Airflow configuration system. It might seem daunting to leave that work behind.
While the dagster-airflow library lets you port your Airflow DAGs to Dagster with 0 code changes, it's best to write new integrations directly using Dagster's well-supported, modern abstractions. As you'll see below, the Dagster code is much more succinct, easier to understand, and easier to debug than the equivalent Airflow code.
Making it concrete
Talking about config systems in the abstract isn't very useful, so to try to illustrate the differences between Dagster and Airflow, we'll rewrite the Airflow Slack Integration in Dagster.
Let's start by rewriting the class definition and parameters for the Airflow Slack Hook as a Dagster Resource:
Airflow
import warnings
from pathlib import Path
from typing import TYPE_CHECKING, Any, Sequence
from slack_sdk import WebClient
from airflow.exceptions import AirflowException, AirflowNotFoundException
from airflow.hooks.base import BaseHook
from airflow.providers.slack.utils import ConnectionExtraConfig
from airflow.utils.log.secrets_masker import mask_secret
class SlackHook(BaseHook):
conn_name_attr = "slack_conn_id"
default_conn_name = "slack_api_default"
conn_type = "slack"
hook_name = "Slack API"
def __init__(
self,
token: str | None = None,
slack_conn_id: str | None = None,
base_url: str | None = None,
timeout: int | None = None,
proxy: str | None = None,
retry_handlers: list[RetryHandler] | None = None,
**extra_client_args: Any,
) -> None:
if not token and not slack_conn_id:
raise AirflowException("Either `slack_conn_id` or `token` should be provided.")
if token:
mask_secret(token)
warnings.warn(
"Provide token as hook argument deprecated by security reason and will be removed "
"in a future releases. Please specify token in `Slack API` connection.",
DeprecationWarning,
stacklevel=2,
)
if not slack_conn_id:
warnings.warn(
"You have not set parameter `slack_conn_id`. Currently `Slack API` connection id optional "
"but in a future release it will mandatory.",
FutureWarning,
stacklevel=2,
)
super().__init__()
self._token = token
self.slack_conn_id = slack_conn_id
self.base_url = base_url
self.timeout = timeout
self.proxy = proxy
self.retry_handlers = retry_handlers
self.extra_client_args = extra_client_args
if self.extra_client_args.pop("use_session", None) is not None:
warnings.warn("`use_session` has no affect in slack_sdk.WebClient.", UserWarning, stacklevel=2)
Dagster
class SlackResource(ConfigurableResource):
token: str = Field(
description="Slack API Token.",
)
base_url: str | None = Field(
description="A string representing the Slack API base URL. If not set than default WebClient BASE_URL will use (``https://www.slack.com/api/``).",
default=None,
)
timeout: int | None = Field(
description="The maximum number of seconds the client will wait to connect and receive a response from Slack. If not set than default WebClient value will use.",
default=None,
)
proxy: str | None = Field(
description="Proxy to make the Slack API call.",
default=None,
)
The Pydantic attributes available in Dagster allow for simpler syntax and annotating parameters with descriptions that will be available for users to see in the Dagster UI. We’ll review the UI for both Airflow Connections and Dagster resources later.
Next, let's rewrite the Slack client definition:
Airflow
@cached_property
def client(self) -> WebClient:
"""Get the underlying slack_sdk.WebClient (cached)."""
return WebClient(**self._get_conn_params())
def get_conn(self) -> WebClient:
"""Get the underlying slack_sdk.WebClient (cached)."""
return self.client
def _get_conn_params(self) -> dict[str, Any]:
"""Fetch connection params as a dict and merge it with hook parameters."""
conn = self.get_connection(self.slack_conn_id) if self.slack_conn_id else None
conn_params: dict[str, Any] = {"retry_handlers": self.retry_handlers}
if self._token:
conn_params["token"] = self._token
elif conn:
if not conn.password:
raise AirflowNotFoundException(
f"Connection ID {self.slack_conn_id!r} does not contain password (Slack API Token)."
)
conn_params["token"] = conn.password
extra_config = ConnectionExtraConfig(
conn_type=self.conn_type,
conn_id=conn.conn_id if conn else None,
extra=conn.extra_dejson if conn else {},
)
# Merge Hook parameters with Connection config
conn_params.update(
{
"timeout": self.timeout or extra_config.getint("timeout", default=None),
"base_url": self.base_url or extra_config.get("base_url", default=None),
"proxy": self.proxy or extra_config.get("proxy", default=None),
}
)
# Add additional client args
conn_params.update(self.extra_client_args)
if "logger" not in conn_params:
conn_params["logger"] = self.log
return {k: v for k, v in conn_params.items() if v is not None}
@cached_property
def token(self) -> str:
warnings.warn(
"`SlackHook.token` property deprecated and will be removed in a future releases.",
DeprecationWarning,
stacklevel=2,
)
return self._get_conn_params()["token"]
def __get_token(self, token: Any, slack_conn_id: Any) -> str:
warnings.warn(
"`SlackHook.__get_token` method deprecated and will be removed in a future releases.",
DeprecationWarning,
stacklevel=2,
)
if token is not None:
return token
if slack_conn_id is not None:
conn = self.get_connection(slack_conn_id)
if not getattr(conn, "password", None):
raise AirflowException("Missing token(password) in Slack connection")
return conn.password
raise AirflowException("Cannot get token: No valid Slack token nor slack_conn_id supplied.")
Dagster
@cached_property
def client(self) -> WebClient:
"""Get the underlying slack_sdk.WebClient (cached)."""
client_kwargs = {
"token": self.token,
"proxy": self.proxy,
"retry_handlers": self.retry_handlers,
}
if self.base_url is not None:
client_kwargs["base_url"] = self.base_url
if self.timeout is not None:
client_kwargs["timeout"] = self.timeout
return WebClient(**client_kwargs)
Hopefully, you are noticing a theme: Dagster requires less code to do the same thing. The Airflow connection abstraction requires a bunch of boilerplate and indirection in the Airflow code that we can remove when rewriting in Dagster, making the code more readable.
Now let's rewrite the actual API functions:
Airflow
def call(self, api_method: str, **kwargs) -> SlackResponse:
return self.client.api_call(api_method, **kwargs)
def send_file(
self,
*,
channels: str | Sequence[str] | None = None,
file: str | Path | None = None,
content: str | None = None,
filename: str | None = None,
filetype: str | None = None,
initial_comment: str | None = None,
title: str | None = None,
) -> SlackResponse:
if not ((not file) ^ (not content)):
raise ValueError("Either `file` or `content` must be provided, not both.")
elif file:
file = Path(file)
with open(file, "rb") as fp:
if not filename:
filename = file.name
return self.client.files_upload(
file=fp,
filename=filename,
filetype=filetype,
initial_comment=initial_comment,
title=title,
channels=channels,
)
return self.client.files_upload(
content=content,
filename=filename,
filetype=filetype,
initial_comment=initial_comment,
title=title,
channels=channels,
)
Dagster
def call(self, api_method: str, **kwargs) -> SlackResponse:
return self.client.api_call(api_method, **kwargs)
def send_file(
self,
*,
channels: str | Sequence[str] | None = None,
file: str | Path | None = None,
content: str | None = None,
filename: str | None = None,
filetype: str | None = None,
initial_comment: str | None = None,
title: str | None = None,
) -> SlackResponse:
if not ((not file) ^ (not content)):
raise ValueError("Either `file` or `content` must be provided, not both.")
elif file:
file = Path(file)
with open(file, "rb") as fp:
if not filename:
filename = file.name
return self.client.files_upload(
file=fp,
filename=filename,
filetype=filetype,
initial_comment=initial_comment,
title=title,
channels=channels,
)
return self.client.files_upload(
content=content,
filename=filename,
filetype=filetype,
initial_comment=initial_comment,
title=title,
channels=channels,
)
Yep, they're the same; the differences between Dagster and Airflow integrations almost entirely boil down to the abstractions and patterns each chose for managing configuration.
What about the UI?
Airflow Hooks do not exist in the UI. Connections however do, but require some special code to get them rendering in an intuitive way. Dagster does not need this and can interpret your Resources classes into the UI without extra work:
To get something similar in Airflow for our Slack hook, we need to add the following to the airflow hook class:
@classmethod
def get_connection_form_widgets(cls) -> dict[str, Any]:
"""Returns dictionary of widgets to be added for the hook to handle extra values."""
from flask_appbuilder.fieldwidgets import BS3TextFieldWidget
from flask_babel import lazy_gettext
from wtforms import IntegerField, StringField
from wtforms.validators import NumberRange, Optional
return {
"timeout": IntegerField(
lazy_gettext("Timeout"),
widget=BS3TextFieldWidget(),
validators=[Optional(strip_whitespace=True), NumberRange(min=1)],
description="Optional. The maximum number of seconds the client will wait to connect "
"and receive a response from Slack API.",
),
"base_url": StringField(
lazy_gettext("Base URL"),
widget=BS3TextFieldWidget(),
description="Optional. A string representing the Slack API base URL.",
),
"proxy": StringField(
lazy_gettext("Proxy"),
widget=BS3TextFieldWidget(),
description="Optional. Proxy to make the Slack API call.",
),
}
@classmethod
@_ensure_prefixes(conn_type="slack")
def get_ui_field_behaviour(cls) -> dict[str, Any]:
"""Returns custom field behaviour."""
return {
"hidden_fields": ["login", "port", "host", "schema", "extra"],
"relabeling": {
"password": "Slack API Token",
},
"placeholders": {
"password": "xoxb-1234567890123-09876543210987-AbCdEfGhIjKlMnOpQrStUvWx",
"timeout": "30",
"base_url": "https://www.slack.com/api/",
"proxy": "http://localhost:9000",
},
}
What this is doing is overriding the default UI values in the connection UI with something that makes more sense for the Slack-specific connection; note that in the DB, the password field will still be used, and it's only when rendering the UI will it be overridden to “Slack API Token”.
Authoring pipelines
Now that we’ve defined our Slack Integrations we can start using them in our pipelines, below is what this would look like in Airflow and Dagster
Airflow
import pendulum
from airflow.models.dag import DAG
from airflow.providers.slack.operators.slack import SlackAPIFileOperator
with DAG(
dag_id="slack_example",
schedule=None,
start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
catchup=False,
default_args={
"slack_conn_id": "slack",
},
max_active_runs=1,
) as dag:
slack_operator_ = SlackAPIFileOperator(
task_id="slack_operator_example",
channel="#random",
content="hello world",
)
Dagster
import os
from .slack_resource import SlackResource
from dagster import Definitions, job, op
@op
def send_slack_message(slack: SlackResource):
slack.send_file(
channels="#random",
content="hello world",
)
@job
def slack_job():
send_slack_message()
definitions = Definitions(
jobs=[slack_job],
resources={
"slack": SlackResource(
token=os.getenv("SLACK_API_TOKEN", "fake_token"),
),
},
)
It might seem like Airflow is more concise, but we haven’t yet talked about the SlackAPIFileOperator
. Airflow Operators are another abstraction that normally wraps Airflow Hooks, usually operators focus on implementing various orchestration lifecycle methods but in the case of SlackAPIFileOperator
it implements the single execute
method.
from __future__ import annotations
import json
import warnings
from typing import Any, Sequence
from airflow.compat.functools import cached_property
from airflow.models import BaseOperator
from airflow.providers.slack.hooks.slack import SlackHook
from airflow.utils.log.secrets_masker import mask_secret
class SlackAPIOperator(BaseOperator):
"""
Base Slack Operator
The SlackAPIPostOperator is derived from this operator.
In the future additional Slack API Operators will be derived from this class as well.
Only one of `slack_conn_id` and `token` is required.
:param slack_conn_id: :ref:`Slack API Connection <howto/connection:slack>`
which its password is Slack API token. Optional
:param token: Slack API token (https://api.slack.com/web). Optional
:param method: The Slack API Method to Call (https://api.slack.com/methods). Optional
:param api_params: API Method call parameters (https://api.slack.com/methods). Optional
:param client_args: Slack Hook parameters. Optional. Check airflow.providers.slack.hooks.SlackHook
"""
def __init__(
self,
*,
slack_conn_id: str | None = None,
token: str | None = None,
method: str | None = None,
api_params: dict | None = None,
**kwargs,
) -> None:
super().__init__(**kwargs)
if token:
mask_secret(token)
self.token = token
self.slack_conn_id = slack_conn_id
self.method = method
self.api_params = api_params
@cached_property
def hook(self) -> SlackHook:
"""Slack Hook."""
return SlackHook(token=self.token, slack_conn_id=self.slack_conn_id)
def construct_api_call_params(self) -> Any:
"""
Used by the execute function. Allows templating on the source fields
of the api_call_params dict before construction
Override in child classes.
Each SlackAPIOperator child class is responsible for
having a construct_api_call_params function
which sets self.api_call_params with a dict of
API call parameters (https://api.slack.com/methods)
"""
raise NotImplementedError(
"SlackAPIOperator should not be used directly. Chose one of the subclasses instead"
)
def execute(self, **kwargs):
if not self.api_params:
self.construct_api_call_params()
self.hook.call(self.method, json=self.api_params)
class SlackAPIFileOperator(SlackAPIOperator):
"""
Send a file to a slack channels
Examples:
.. code-block:: python
# Send file with filename and filetype
slack_operator_file = SlackAPIFileOperator(
task_id="slack_file_upload_1",
dag=dag,
slack_conn_id="slack",
channels="#general,#random",
initial_comment="Hello World!",
filename="/files/dags/test.txt",
filetype="txt",
)
# Send file content
slack_operator_file_content = SlackAPIFileOperator(
task_id="slack_file_upload_2",
dag=dag,
slack_conn_id="slack",
channels="#general",
initial_comment="Hello World!",
content="file content in txt",
)
:param channels: Comma-separated list of channel names or IDs where the file will be shared.
If set this argument to None, then file will send to associated workspace. (templated)
:param initial_comment: message to send to slack. (templated)
:param filename: name of the file (templated)
:param filetype: slack filetype. (templated) See: https://api.slack.com/types/file#file_types
:param content: file content. (templated)
:param title: title of file. (templated)
:param channel: (deprecated) channel in which to sent file on slack name
"""
template_fields: Sequence[str] = (
"channels",
"initial_comment",
"filename",
"filetype",
"content",
"title",
)
ui_color = "#44BEDF"
def __init__(
self,
channels: str | Sequence[str] | None = None,
initial_comment: str | None = None,
filename: str | None = None,
filetype: str | None = None,
content: str | None = None,
title: str | None = None,
channel: str | None = None,
**kwargs,
) -> None:
if channel:
warnings.warn(
"Argument `channel` is deprecated and will removed in a future releases. "
"Please use `channels` instead.",
DeprecationWarning,
stacklevel=2,
)
if channels:
raise ValueError(f"Cannot set both arguments: channel={channel!r} and channels={channels!r}.")
channels = channel
self.channels = channels
self.initial_comment = initial_comment
self.filename = filename
self.filetype = filetype
self.content = content
self.title = title
super().__init__(method="files.upload", **kwargs)
def execute(self, **kwargs):
self.hook.send_file(
channels=self.channels,
# For historical reason SlackAPIFileOperator use filename as reference to file
file=self.filename,
content=self.content,
initial_comment=self.initial_comment,
title=self.title,
)
Abstractions with lots of magic boilerplate are not free, and the distinction of what should exist in an Airflow Hook vs an Airflow Operator is often hard to understand for users, because of this its not uncommon to see Airflow users combine both into the Operator and forgo using Hooks entirely. In Dagster, resources exist as the single abstraction for creating integrations.
The Airflow Config System
Airflow’s configuration is mostly based on 4 core abstractions: Connections, Variables, Hooks, and Operators.
- Connections are static semi-structured configuration entries in the airflow database, there are a bunch of different ways of setting these, but generally Connections can be understood as persistent configuration state.
- Variables are similar to Connections in that they are stored in the airflow database and can be modified from the Airflow UI but they are different in that they are a generic key/value store and are not intended to be used in hooks (but can be if you want)
- Hooks are an abstraction that uses the config state of Connections to define an API for interacting with that system, they exist only as python classes extending the BaseHook class.
- Operators are the user-facing orchestration primitive; these may depend on hooks or connections and are supposed to mainly focus on the mechanics of orchestrating, not making the correct API interaction with external systems (that's what hooks are meant for). however, it's not uncommon to see Operators not use hooks and combine those two concerns.
Now where the Airflow Config system gets spicy is with its templating system; when the airflow Scheduler executes DAGs, it first applies a Jinja-based templating engine to the DAG definitions. Allowing DAG authors to directly reference the values stored in Airflow Connections and Variables directly in their Operator invocations which in turn will pass those to hooks that at execution time might also be fetching Connection and Variable values internally. Sound confusing? It is.
The Dagster Config System
With the 1.3 release of Dagster, a new pythonic resource and config system has been mainlined, making defining the config and params needed by integrations more simple than ever, and the pre-existing resource abstraction already provides a much simpler and structured approach to what is available in Airflow.
To be reductive, Dagster’s resource system has three primary distinctions vs Airflow:
- It makes integration dependencies explicit rather than burying them in the class implementation of operator code
- It does not dictate how configuration values are stored; resources can pull from environment variables or other external configuration stores.
- It handles the UI for you, no magic class methods or class properties that you need to set to make the UI render your resource correctly.
See for yourself
If you’d like to view and run both the Airflow and Dagster code in this post and evaluate them yourself you can do that with the example repository here. Dagster is continuing to be laser-focused on providing the best developer ergonomics possible and its config system is a key part of that.
AI's Long-Term Impact on Data Engineering Roles
- Name
- Fraser Marlow
- Handle
- @frasermarlow
10 Reasons Why No-Code Solutions Almost Always Fail
- Name
- TéJaun RiChard
- Handle
- @tejaun
5 Best Practices AI Engineers Should Learn From Data Engineering
- Name
- TéJaun RiChard
- Handle
- @tejaun