Blog
The Unreasonable Effectiveness of Data Pipeline Smoke Tests

The Unreasonable Effectiveness of Data Pipeline Smoke Tests

October 19, 2022
The Unreasonable Effectiveness of Data Pipeline Smoke Tests
The Unreasonable Effectiveness of Data Pipeline Smoke Tests

Data practitioners waste time writing unit tests to catch bugs they could have caught with smoke tests.

Data practitioners waste time writing unit tests to catch bugs they could have caught with smoke tests.

In this post, we’ll discuss a powerful technique for speeding up data pipeline development: the data pipeline smoke test.  You write your smoke test just once: you don’t need to write a test for every newly derived data asset.  It can complete in a few seconds and exercises every transformation inside your data pipeline.

The idea of the data pipeline smoke test is to automatically run all your data transformations on empty or synthetic data.  When defining your data pipeline, you include metadata on your data assets - e.g., column schemas - that determines how to mock them in downstream transformations.

The test verifies that:

  • The code in each transformation follows the rules of the data processing language that it’s written in.
  • Each transformation can handle the type of data produced by upstream transformations.

Data pipeline smoke tests can be written for data transformations in a variety of frameworks, e.g. Pandas, SQL, Spark, or Dask.  Data orchestrators like Dagster help you write data pipeline smoke tests by modeling the dependencies between data assets and hosting metadata about the assets that can be used to stub them out.

The code shown in the examples in this post is available on Github, here.

Data transformations: a minefield of Stupid Mistakes

Programming data transformations is a minefield of stupid mistakes.

Here are some examples:

  • Trying to access a column that’s missing from the table produced by the upstream step in the pipeline.
  • Trying to access a column that’s missing from the dataframe produced in the line of code right above.
  • Trying to select from a table that doesn’t exist.
  • Trying to call a function that doesn’t exist.
  • Forgetting to include a required argument to a function.
  • Trying to perform arithmetic on a string column.
  • Trying to access the fourth dimension of a tensor with only three dimensions.

If you have never made one of these mistakes, please stop reading this post and found a coaching service for us mortals.  For the rest of us, read on.

When confronted with one of these Stupid Mistakes, fixing the problem is almost always trivial.  But, unless you have an eidetic memory for the Pandas documentation and the columns in every table in your data warehouse, spotting them with the naked eye is nigh impossible.

We often discover these mistakes in one of a few ways:

  • We manually test our code. Depending on the tools we’re using, this often requires deploying our code to a cluster, getting access to data, waiting a long time to chug through an actual dataset, and manually inspecting the outputs.
  • We painstakingly write a unit test. This is What We Are Told To Do, but it’s often a waste of time. For transformations that operate on tables with many columns, most of the effort in writing the unit test goes into enumerating all the input and output columns, not verifying business logic. These tests then slow down further development because, when we want to make a small change, like pulling in a new column, we end up needing to change ten different sites in our code.
  • We deploy our code to production and catch the mistakes when the pipeline fails. I’ve been there, and I’m not here to judge you.

Debugging is costly

Long development cycles are costly - both in developer time and compute costs.

If you can catch these mistakes in five seconds instead of five minutes, you can often develop almost 60 times faster.  This sounds hyperbolic - of course, there are fixed costs like waiting for a SparkSession to boot up, plus the time spent actually modifying code to fix the error - but, in my subjective experience, the feeling of developing with smoke tests actually approaches this speedup.  There is just a monumental amount of wasted time in the typical data transformation development loop.

Even better, you can avoid accidentally breaking our pipelines in production - the smoke test makes it cheap to get broad test coverage, which means you can catch more errors.

How to write a data pipeline smoke test

The idea of the data pipeline smoke test is to exercise the business logic behind every transformation in our data pipeline, without writing a specialized test for every step.  It runs every step on empty data, or synthetic data, to find Stupid Mistakes.

It’s primarily useful for pipelines that are focused on data transformation, i.e., that have heavy business logic. It’s less useful for pipelines that primarily involve data movement because it involves stubbing out external systems as much as possible.  If the data pipeline’s correctness depends primarily on the details of its interactions with these external systems, then the test won’t be effective at catching relevant bugs.

We’ll first cover how to smoke-test pipelines that exclusively include Python data transformations, and then we’ll cover how to smoke-test pipelines that include SQL transformations as well.

Smoke-testing pipelines of Python data transformations

If your data transformations are written using Pandas, PySpark, or vanilla Python data structures, then your smoke test can execute entirely in memory with no external infrastructure.

Here’s a basic data pipeline, defined in Dagster, where the data transformations are written using Pandas.   Dagster models data pipelines as graphs of software-defined assets: each@asset-decorated function in it represents a data asset that is produced by the pipeline and may be consumed by other assets within the pipeline.

raw_country_populations is a “source asset”, i.e. a data asset that’s used by the pipeline but not produced by the pipeline.

Here’s a visual representation of the pipeline from Dagster’s web UI, along with the code:

from dagster import SourceAsset, asset
from pandas import DataFrame

raw_country_populations = SourceAsset("raw_country_populations")

@asset
def country_populations(raw_country_populations) -> DataFrame:
    country_populations = raw_country_populations.copy()
    country_populations["change"] = (
        country_populations["change"].str.rstrip("%").str.replace("−", "-").astype("float") / 100.0
    )
    return country_populations

@asset
def continent_stats(country_populations: DataFrame) -> DataFrame:
    result = country_populations.groupby("continent").agg({"pop2019": "sum", "change": "mean"})
    return result

@asset
def country_stats(country_populations: DataFrame, continent_stats: DataFrame) -> DataFrame:
    result = country_populations.join(continent_stats, on="continent", lsuffix="_continent")
    result["continent_pop_fraction"] = result["pop2019"] / result["pop2019_continent"]
    return result

Python transformations smoke test step 1: stub out your source data

To test a data transformation, you need to feed it input data that has the right shape.  Typically, data transformations will depend on the existence of particular columns with particular data types, so the input needs to include those columns with those data types.

For data assets that are produced inside your data pipeline, you can feed the produced values to the downstream data transformations that depend on them.  But for data assets that are produced outside your pipeline, you need to make mock versions of them for your smoke test.

The easiest way to do this is as follows:

  • For every source data asset, specify its columns and column data types.
  • Automatically construct empty data frames with those columns and column data types.

Here’s an example of how we might specify the columns and data types of a source data asset, using Dagster:

from dagster import TableSchema

raw_country_populations = SourceAsset(
    "raw_country_populations",
    metadata={
        "column_schema": TableSchema.from_name_type_dict(
            {
                "country": "string",
                "continent": "string",
                "region": "string",
                "pop2018": "int",
                "pop2019": "int",
                "change": "string",
            }
        ),
    },
)

This metadata isn’t only useful for smoke testing.  It also documents the data, and tools like Dagster can show it in their web interfaces to help stakeholders and developers understand what the data they’re working with is supposed to look like.

Here’s a function that constructs an empty DataFrame using the column schema metadata. We’ll use this function in the next section.

def empty_dataframe_from_column_schema(column_schema: TableSchema) -> DataFrame:
    return DataFrame(
        {column.name: Series(dtype=column.type) for column in column_schema.columns}
    )

Optionally, you can specify this column schema metadata for data assets in the middle of the pipeline too. This makes it possible to run the smoke test on individual data assets without running all the upstream transformations that produce them.

If you want to get fancier and more realistic with the data that your test consumes, you can use a library like Pandera or Faker to build synthetic data.

Python transformations smoke test step 2: stub out your I/O

The data pipeline smoke test should avoid clobbering production data.  Also, if it can store data in memory instead of in persistent storage, it can run faster. Also, you want to return your empty or synthetic data instead of trying to read actual source data that could be large or inaccessible from our test environment.  To accomplish all of these, you need to stub out our I/O.

Because we defined our pipeline using Dagster, we can swap out the Dagster I/O manager that we use in production with a custom one written for this test:

from dagster import InMemoryIOManager

class SmokeIOManager(InMemoryIOManager):
    def load_input(self, context):
        if context.asset_key not in context.step_context.job_def.asset_layer.asset_keys:
            column_schema = context.upstream_output.metadata["column_schema"]
			return empty_dataframe_from_column_schema(column_schema)
        else:
            return super().load_input(context)

It stores data in memory, except in the case where it’s loading from a source asset, in which case it uses the empty_dataframe_from_column_schema function we defined above to synthesize data.

Python transformations smoke test step 3: run your pipeline in-memory

Now we can run our pipeline by invoking Dagster’s materialize function to materialize all the data assets to memory.

from dagster import load_assets_from_current_module, materialize

assets = load_assets_from_current_module()

materialize(assets, resources={"io_manager": SmokeIOManager()})

If any of data transformations in your pipeline include Stupid Mistakes, an error will be raised, and we can track down why.

Smoke testing pipelines with dbt / SQL      

Python data transformations can be executed entirely within the process running a test, but data transformations written in SQL typically rely on a database that lives outside the test process.

If your data pipeline includes SQL transformations, your smoke test will need access to such a database. To avoid interfering with your production database, ideally you’ll spin up a database that’s scoped to your test run.

For the examples in this section, we’ll assume that our SQL transformations are defined using dbt, and that we’re using the dagster-dbt integration to connect them with your Python transformations:

from dagster_dbt import load_assets_from_dbt_project

DBT_PROJECT_DIR = file_relative_path(__file__, "../dbt_project")
DBT_PROFILES_DIR = file_relative_path(__file__, "../dbt_project/config")

raw_country_populations = SourceAsset(
    "raw_country_populations",
    metadata={
        "column_schema": TableSchema.from_name_type_dict(
            {
                "country": "string",
                "continent": "string",
                "region": "string",
                "pop2018": "int",
                "pop2019": "int",
                "change": "string",
            }
        ),
    },
)

@asset
def country_stats(country_populations: DataFrame, continent_stats: DataFrame) -> DataFrame:
    result = country_populations.join(continent_stats, on="continent", lsuffix="_continent")
    result["continent_pop_fraction"] = result["pop2019"] / result["pop2019_continent"]
    return result

dbt_assets = load_assets_from_dbt_project(DBT_PROJECT_DIR, DBT_PROFILES_DIR)

Full Python code is available here, and the dbt project is available here.

Stubbing out your database

You’ll want to use a separate database from your production database so you’re not overwriting your production tables with empty data.

For our example, we’ll set the separate database in an environment variable so both Dagster and DBT can know about it:

export SNOWFLAKE_SMOKE_TEST_DATABASE="SANDY_SMOKE_TEST_DATABASE"

To make use of the database in dbt, we can make a dbt target that reads / writes from whatever database is set via an environment variable:

dbt_project:
  target: smoke_test
  outputs:
    local:
      type: snowflake
      database: "{{ env_var('SNOWFLAKE_SMOKE_TEST_DATABASE') }}"
	  ...

If your pipeline also includes Python transformations, you’ll probably want to store their outputs and look for their inputs in your database, so they can interoperate with your SQL transformations.

In our example, we do that by using a Snowflake I/O manager instead of an in-memory I/O manager, again pointing at our smoke test database:

from dagster_snowflake_pandas import SnowflakePandasIOManager

snowflake_config = {
    "user": os.environ["SNOWFLAKE_USER"],
    "password": os.environ["SNOWFLAKE_PASSWORD"],
    "account": os.environ["SNOWFLAKE_ACCOUNT"],
    "database": os.environ["SNOWFLAKE_DATABASE"],
}

io_manager = SnowflakePandasIOManager(
    **snowflake_config
)
Stubbing your source data

You can define the shape of your source data assets using the same approach as in the “in-memory” section above: with SourceAssets.

However, to read empty / synthetic versions of these source assets in downstream transformations, you’ll need to take a slightly different approach than the approach in the in-memory section.  That’s because, when transformations are happening in SQL, you don’t have control over how data is loaded - it happens inside the database.

Instead, at the start of your smoke test, you can write empty / synthetic data to your database:

conn = snowflake.connector.connect(**snowflake_config)

for source_asset in source_assets:
	db_name = snowflake_config["database"]
    table_name = f"{db_name}.public.{source_asset.key.path[-1]}"
    columns_str = ", ".join(
        [
            f"{column.name} {column.type}"
            for column in source_asset.metadata["column_schema"].schema.columns
        ]
    )
    conn.cursor().execute(f"CREATE OR REPLACE TABLE {table_name} ({columns_str})")
Running your pipeline

Then, you can run your pipeline using your stubbed database:

from dagster import load_assets_from_current_module, materialize
from dagster_dbt import DbtCliClientResource

assets = load_assets_from_current_module()

materialize(assets, resources={
    "io_manager": io_manager, 
    "dbt": DbtCliClientResource(
        project_dir=DBT_PROJECT_DIR,
        profiles_dir=DBT_PROFILES_DIR,
    )
})

Conclusion

So there you have it: the data pipeline smoke test.  You can run it during local development to speed up your iteration time, as well as in CI to stop bugs from making it into production.  You can smoke-test individual transformations, all transformations that are downstream of a change you made, or your entire pipeline.

It can take some effort to set up at the beginning, but once you’ve done so, it automatically covers new transformations that you add to your pipeline.

Final note: if you want to support the Dagster Open Source project, be sure to Star our Github repo.

Happy testing!

Have feedback or questions? Start a discussion in Slack or Github.

Interested in working with us? View our open roles.

Want more content like this? Follow us on LinkedIn.

Dagster Newsletter

Get updates delivered to your inbox

Latest writings

The latest news, technologies, and resources from our team.

Multi-Tenancy for Modern Data Platforms
Webinar

April 13, 2026

Multi-Tenancy for Modern Data Platforms

Learn the patterns, trade-offs, and production-tested strategies for building multi-tenant data platforms with Dagster.

Deep Dive: Building a Cross-Workspace Control Plane for Databricks
Webinar

March 24, 2026

Deep Dive: Building a Cross-Workspace Control Plane for Databricks

Learn how to build a cross-workspace control plane for Databricks using Dagster — connecting multiple workspaces, dbt, and Fivetran into a single observable asset graph with zero code changes to get started.

Dagster Running Dagster: How We Use Compass for AI Analytics
Webinar

February 17, 2026

Dagster Running Dagster: How We Use Compass for AI Analytics

In this Deep Dive, we're joined by Dagster Analytics Lead Anil Maharjan, who demonstrates how our internal team utilizes Compass to drive AI-driven analysis throughout the company.

Dagster 1.13: Octopus's Garden
Dagster 1.13: Octopus's Garden
Blog

April 9, 2026

Dagster 1.13: Octopus's Garden

Dagster skills, partitioned asset checks, state backed components, virtual assets, and stronger integrations.

Monorepos, the hub-and-spoke model, and Copybara
Monorepos, the hub-and-spoke model, and Copybara
Blog

April 3, 2026

Monorepos, the hub-and-spoke model, and Copybara

How we configure Copybara for bi-directional syncing to enable a hub-and-spoke model for Git repositories

Making Dagster Easier to Contribute to in an AI-Driven World
Making Dagster Easier to Contribute to in an AI-Driven World
Blog

April 1, 2026

Making Dagster Easier to Contribute to in an AI-Driven World

AI has made contributing to open source easier but reviewing contributions is still hard. At Dagster, we’re improving the contributor experience with smarter review tooling, clearer guidelines, and a focus on contributions that are easier to evaluate, merge, and maintain.

How Magenta Telekom Built the Unsinkable Data Platform
Case study

February 25, 2026

How Magenta Telekom Built the Unsinkable Data Platform

Magenta Telekom rebuilt its data infrastructure from the ground up with Dagster, cutting developer onboarding from months to a single day and eliminating the shadow IT and manual workflows that had long slowed the business down.

Scaling FinTech: How smava achieved zero downtime with Dagster
Case study

November 25, 2025

Scaling FinTech: How smava achieved zero downtime with Dagster

smava achieved zero downtime and automated the generation of over 1,000 dbt models by migrating to Dagster's, eliminating maintenance overhead and reducing developer onboarding from weeks to 15 minutes.

Zero Incidents, Maximum Velocity: How HIVED achieved 99.9% pipeline reliability with Dagster
Case study

November 18, 2025

Zero Incidents, Maximum Velocity: How HIVED achieved 99.9% pipeline reliability with Dagster

UK logistics company HIVED achieved 99.9% pipeline reliability with zero data incidents over three years by replacing cron-based workflows with Dagster's unified orchestration platform.

Modernize Your Data Platform for the Age of AI
Guide

January 15, 2026

Modernize Your Data Platform for the Age of AI

While 75% of enterprises experiment with AI, traditional data platforms are becoming the biggest bottleneck. Learn how to build a unified control plane that enables AI-driven development, reduces pipeline failures, and cuts complexity.

Download the eBook on How to Scale Data Teams
Guide

November 5, 2025

Download the eBook on How to Scale Data Teams

From a solo data practitioner to an enterprise-wide platform, learn how to build systems that scale with clarity, reliability, and confidence.

Download the eBook Primer on How to Build Data Platforms
Guide

February 21, 2025

Download the eBook Primer on How to Build Data Platforms

Learn the fundamental concepts to build a data platform in your organization; covering common design patterns for data ingestion and transformation, data modeling strategies, and data quality tips.

AI Driven Data Engineering
Course

March 19, 2026

AI Driven Data Engineering

Learn how to build Dagster applications faster using AI-driven workflows. You'll use Dagster's AI tools and skills to scaffold pipelines, write quality code, and ship data products with confidence while still learning the fundamentals.

Dagster & ETL
Course

July 11, 2025

Dagster & ETL

Learn how to ingest data to power your assets. You’ll build custom pipelines and see how to use Embedded ETL and Dagster Components to build out your data platform.

Testing with Dagster
Course

April 21, 2025

Testing with Dagster

In this course, learn best practices for testing, including unit tests, mocks, integration tests and applying them to Dagster.