October 19, 2022 • 8 minute read •
The Unreasonable Effectiveness of Data Pipeline Smoke Tests
- Name
- Sandy Ryza
- Handle
- @s_ryz
Data practitioners waste time writing unit tests to catch bugs they could have caught with smoke tests.
In this post, we’ll discuss a powerful technique for speeding up data pipeline development: the data pipeline smoke test. You write your smoke test just once: you don’t need to write a test for every newly derived data asset. It can complete in a few seconds and exercises every transformation inside your data pipeline.
The idea of the data pipeline smoke test is to automatically run all your data transformations on empty or synthetic data. When defining your data pipeline, you include metadata on your data assets - e.g., column schemas - that determines how to mock them in downstream transformations.
The test verifies that:
- The code in each transformation follows the rules of the data processing language that it’s written in.
- Each transformation can handle the type of data produced by upstream transformations.
Data pipeline smoke tests can be written for data transformations in a variety of frameworks, e.g. Pandas, SQL, Spark, or Dask. Data orchestrators like Dagster help you write data pipeline smoke tests by modeling the dependencies between data assets and hosting metadata about the assets that can be used to stub them out.
The code shown in the examples in this post is available on Github, here.
Data transformations: a minefield of Stupid Mistakes
Programming data transformations is a minefield of stupid mistakes.
Here are some examples:
- Trying to access a column that’s missing from the table produced by the upstream step in the pipeline.
- Trying to access a column that’s missing from the dataframe produced in the line of code right above.
- Trying to select from a table that doesn’t exist.
- Trying to call a function that doesn’t exist.
- Forgetting to include a required argument to a function.
- Trying to perform arithmetic on a string column.
- Trying to access the fourth dimension of a tensor with only three dimensions.
If you have never made one of these mistakes, please stop reading this post and found a coaching service for us mortals. For the rest of us, read on.
When confronted with one of these Stupid Mistakes, fixing the problem is almost always trivial. But, unless you have an eidetic memory for the Pandas documentation and the columns in every table in your data warehouse, spotting them with the naked eye is nigh impossible.
We often discover these mistakes in one of a few ways:
- We manually test our code. Depending on the tools we’re using, this often requires deploying our code to a cluster, getting access to data, waiting a long time to chug through an actual dataset, and manually inspecting the outputs.
- We painstakingly write a unit test. This is What We Are Told To Do, but it’s often a waste of time. For transformations that operate on tables with many columns, most of the effort in writing the unit test goes into enumerating all the input and output columns, not verifying business logic. These tests then slow down further development because, when we want to make a small change, like pulling in a new column, we end up needing to change ten different sites in our code.
- We deploy our code to production and catch the mistakes when the pipeline fails. I’ve been there, and I’m not here to judge you.
Debugging is costly
Long development cycles are costly - both in developer time and compute costs.
If you can catch these mistakes in five seconds instead of five minutes, you can often develop almost 60 times faster. This sounds hyperbolic - of course, there are fixed costs like waiting for a SparkSession to boot up, plus the time spent actually modifying code to fix the error - but, in my subjective experience, the feeling of developing with smoke tests actually approaches this speedup. There is just a monumental amount of wasted time in the typical data transformation development loop.
Even better, you can avoid accidentally breaking our pipelines in production - the smoke test makes it cheap to get broad test coverage, which means you can catch more errors.
How to write a data pipeline smoke test
The idea of the data pipeline smoke test is to exercise the business logic behind every transformation in our data pipeline, without writing a specialized test for every step. It runs every step on empty data, or synthetic data, to find Stupid Mistakes.
It’s primarily useful for pipelines that are focused on data transformation, i.e., that have heavy business logic. It’s less useful for pipelines that primarily involve data movement because it involves stubbing out external systems as much as possible. If the data pipeline’s correctness depends primarily on the details of its interactions with these external systems, then the test won’t be effective at catching relevant bugs.
We’ll first cover how to smoke-test pipelines that exclusively include Python data transformations, and then we’ll cover how to smoke-test pipelines that include SQL transformations as well.
Smoke-testing pipelines of Python data transformations
If your data transformations are written using Pandas, PySpark, or vanilla Python data structures, then your smoke test can execute entirely in memory with no external infrastructure.
Here’s a basic data pipeline, defined in Dagster, where the data transformations are written using Pandas. Dagster models data pipelines as graphs of software-defined assets: each@asset
-decorated function in it represents a data asset that is produced by the pipeline and may be consumed by other assets within the pipeline.
raw_country_populations
is a “source asset”, i.e. a data asset that’s used by the pipeline but not produced by the pipeline.
Here’s a visual representation of the pipeline from Dagster’s web UI, along with the code:
from dagster import SourceAsset, asset
from pandas import DataFrame
raw_country_populations = SourceAsset("raw_country_populations")
@asset
def country_populations(raw_country_populations) -> DataFrame:
country_populations = raw_country_populations.copy()
country_populations["change"] = (
country_populations["change"].str.rstrip("%").str.replace("−", "-").astype("float") / 100.0
)
return country_populations
@asset
def continent_stats(country_populations: DataFrame) -> DataFrame:
result = country_populations.groupby("continent").agg({"pop2019": "sum", "change": "mean"})
return result
@asset
def country_stats(country_populations: DataFrame, continent_stats: DataFrame) -> DataFrame:
result = country_populations.join(continent_stats, on="continent", lsuffix="_continent")
result["continent_pop_fraction"] = result["pop2019"] / result["pop2019_continent"]
return result
Python transformations smoke test step 1: stub out your source data
To test a data transformation, you need to feed it input data that has the right shape. Typically, data transformations will depend on the existence of particular columns with particular data types, so the input needs to include those columns with those data types.
For data assets that are produced inside your data pipeline, you can feed the produced values to the downstream data transformations that depend on them. But for data assets that are produced outside your pipeline, you need to make mock versions of them for your smoke test.
The easiest way to do this is as follows:
- For every source data asset, specify its columns and column data types.
- Automatically construct empty data frames with those columns and column data types.
Here’s an example of how we might specify the columns and data types of a source data asset, using Dagster:
from dagster import TableSchema
raw_country_populations = SourceAsset(
"raw_country_populations",
metadata={
"column_schema": TableSchema.from_name_type_dict(
{
"country": "string",
"continent": "string",
"region": "string",
"pop2018": "int",
"pop2019": "int",
"change": "string",
}
),
},
)
This metadata isn’t only useful for smoke testing. It also documents the data, and tools like Dagster can show it in their web interfaces to help stakeholders and developers understand what the data they’re working with is supposed to look like.
Here’s a function that constructs an empty DataFrame using the column schema metadata. We’ll use this function in the next section.
def empty_dataframe_from_column_schema(column_schema: TableSchema) -> DataFrame:
return DataFrame(
{column.name: Series(dtype=column.type) for column in column_schema.columns}
)
Optionally, you can specify this column schema metadata for data assets in the middle of the pipeline too. This makes it possible to run the smoke test on individual data assets without running all the upstream transformations that produce them.
If you want to get fancier and more realistic with the data that your test consumes, you can use a library like Pandera or Faker to build synthetic data.
Python transformations smoke test step 2: stub out your I/O
The data pipeline smoke test should avoid clobbering production data. Also, if it can store data in memory instead of in persistent storage, it can run faster. Also, you want to return your empty or synthetic data instead of trying to read actual source data that could be large or inaccessible from our test environment. To accomplish all of these, you need to stub out our I/O.
Because we defined our pipeline using Dagster, we can swap out the Dagster I/O manager that we use in production with a custom one written for this test:
from dagster import InMemoryIOManager
class SmokeIOManager(InMemoryIOManager):
def load_input(self, context):
if context.asset_key not in context.step_context.job_def.asset_layer.asset_keys:
column_schema = context.upstream_output.metadata["column_schema"]
return empty_dataframe_from_column_schema(column_schema)
else:
return super().load_input(context)
It stores data in memory, except in the case where it’s loading from a source asset, in which case it uses the empty_dataframe_from_column_schema
function we defined above to synthesize data.
Python transformations smoke test step 3: run your pipeline in-memory
Now we can run our pipeline by invoking Dagster’s materialize
function to materialize all the data assets to memory.
from dagster import load_assets_from_current_module, materialize
assets = load_assets_from_current_module()
materialize(assets, resources={"io_manager": SmokeIOManager()})
If any of data transformations in your pipeline include Stupid Mistakes, an error will be raised, and we can track down why.
Smoke testing pipelines with dbt / SQL
Python data transformations can be executed entirely within the process running a test, but data transformations written in SQL typically rely on a database that lives outside the test process.
If your data pipeline includes SQL transformations, your smoke test will need access to such a database. To avoid interfering with your production database, ideally you’ll spin up a database that’s scoped to your test run.
For the examples in this section, we’ll assume that our SQL transformations are defined using dbt, and that we’re using the dagster-dbt integration to connect them with your Python transformations:
from dagster_dbt import load_assets_from_dbt_project
DBT_PROJECT_DIR = file_relative_path(__file__, "../dbt_project")
DBT_PROFILES_DIR = file_relative_path(__file__, "../dbt_project/config")
raw_country_populations = SourceAsset(
"raw_country_populations",
metadata={
"column_schema": TableSchema.from_name_type_dict(
{
"country": "string",
"continent": "string",
"region": "string",
"pop2018": "int",
"pop2019": "int",
"change": "string",
}
),
},
)
@asset
def country_stats(country_populations: DataFrame, continent_stats: DataFrame) -> DataFrame:
result = country_populations.join(continent_stats, on="continent", lsuffix="_continent")
result["continent_pop_fraction"] = result["pop2019"] / result["pop2019_continent"]
return result
dbt_assets = load_assets_from_dbt_project(DBT_PROJECT_DIR, DBT_PROFILES_DIR)
Full Python code is available here, and the dbt project is available here.
Stubbing out your database
You’ll want to use a separate database from your production database so you’re not overwriting your production tables with empty data.
For our example, we’ll set the separate database in an environment variable so both Dagster and DBT can know about it:
export SNOWFLAKE_SMOKE_TEST_DATABASE="SANDY_SMOKE_TEST_DATABASE"
To make use of the database in dbt, we can make a dbt target that reads / writes from whatever database is set via an environment variable:
dbt_project:
target: smoke_test
outputs:
local:
type: snowflake
database: "{{ env_var('SNOWFLAKE_SMOKE_TEST_DATABASE') }}"
...
If your pipeline also includes Python transformations, you’ll probably want to store their outputs and look for their inputs in your database, so they can interoperate with your SQL transformations.
In our example, we do that by using a Snowflake I/O manager instead of an in-memory I/O manager, again pointing at our smoke test database:
from dagster_snowflake_pandas import SnowflakePandasIOManager
snowflake_config = {
"user": os.environ["SNOWFLAKE_USER"],
"password": os.environ["SNOWFLAKE_PASSWORD"],
"account": os.environ["SNOWFLAKE_ACCOUNT"],
"database": os.environ["SNOWFLAKE_DATABASE"],
}
io_manager = SnowflakePandasIOManager(
**snowflake_config
)
Stubbing your source data
You can define the shape of your source data assets using the same approach as in the “in-memory” section above: with SourceAsset
s.
However, to read empty / synthetic versions of these source assets in downstream transformations, you’ll need to take a slightly different approach than the approach in the in-memory section. That’s because, when transformations are happening in SQL, you don’t have control over how data is loaded - it happens inside the database.
Instead, at the start of your smoke test, you can write empty / synthetic data to your database:
conn = snowflake.connector.connect(**snowflake_config)
for source_asset in source_assets:
db_name = snowflake_config["database"]
table_name = f"{db_name}.public.{source_asset.key.path[-1]}"
columns_str = ", ".join(
[
f"{column.name} {column.type}"
for column in source_asset.metadata["column_schema"].schema.columns
]
)
conn.cursor().execute(f"CREATE OR REPLACE TABLE {table_name} ({columns_str})")
Running your pipeline
Then, you can run your pipeline using your stubbed database:
from dagster import load_assets_from_current_module, materialize
from dagster_dbt import DbtCliClientResource
assets = load_assets_from_current_module()
materialize(assets, resources={
"io_manager": io_manager,
"dbt": DbtCliClientResource(
project_dir=DBT_PROJECT_DIR,
profiles_dir=DBT_PROFILES_DIR,
)
})
Conclusion
So there you have it: the data pipeline smoke test. You can run it during local development to speed up your iteration time, as well as in CI to stop bugs from making it into production. You can smoke-test individual transformations, all transformations that are downstream of a change you made, or your entire pipeline.
It can take some effort to set up at the beginning, but once you’ve done so, it automatically covers new transformations that you add to your pipeline.
Final note: if you want to support the Dagster Open Source project, be sure to Star our Github repo.
Happy testing!
We're always happy to hear your feedback, so please reach out to us! If you have any questions, ask them in the Dagster community Slack (join here!) or start a Github discussion. If you run into any bugs, let us know with a Github issue. And if you're interested in working with us, check out our open roles!
Follow us:
10 Reasons Why No-Code Solutions Almost Always Fail
- Name
- TéJaun RiChard
- Handle
- @tejaun
5 Best Practices AI Engineers Should Learn From Data Engineering
- Name
- TéJaun RiChard
- Handle
- @tejaun
The Rise of the Data Platform Engineer
- Name
- Pedram Navid
- Handle
- @pdrmnvd