Blog
Enabling Data Quality with Dagster and Great Expectations

Enabling Data Quality with Dagster and Great Expectations

July 8, 2024
Enabling Data Quality with Dagster and Great Expectations
Enabling Data Quality with Dagster and Great Expectations

Use Dagster and GX to improve data pipeline reliability without writing custom logic for data testing.

One of the most important considerations when building a data pipeline is ensuring that the data assets it produces are reliable, up-to-date, and trustworthy.

To that end, we strongly believe that data quality should be natively integrated into the data orchestrator, which not only ensures that the data orchestrator is the single pane of glass for your data assets but also allows the orchestrator to stop erroneous data from being propagated downstream to stakeholder-facing products, such as dashboards and reports.

While Asset Checks enable exactly this, you may often find yourself having to write a lot of custom logic and boilerplate code to codify the expectations you have of your data.

In this piece, we'll demonstrate  how you can combine Great Expectations' extensive suite of data quality tests with Dagster's Asset Checks to improve the reliability of your data pipelines without having to write custom logic for testing your data.

The full code for this walkthrough can be found here.

Setup

To start off, we recommend cloning the demo repo to your computer and then install it as a package using pip. This will install Dagster, Great Expectations, and all the other dependencies you’ll need to follow along. We’d also recommend creating a new virtual environment before doing so.

### Clone the repo and cd into the directory
git clone https://github.com/mjkanji/dagster-ge-demo.git
cd dagster-ge-demo

### Install the package
pip install -e ".[dev]"

Next, we’ll define a simple asset that fetches the infamous Titanic dataset from the internet, reads it using Pandas, and then writes it to disk as a CSV file.

@asset
def titanic():
    titanic_df = pd.read_csv(
        "https://github.com/datasciencedojo/datasets/raw/master/titanic.csv"
    )
    titanic_df.to_csv("titanic.csv", index=False)

Since we’re fetching this data from the internet, it’s possible that the source file may change without our knowledge. To ensure that the materialized asset aligns with our expectations, we may want to codify some expectations about the data.

For example, we may expect that the Survived column (which indicates if a passenger survived):

  • has no null values; and
  • that all of its values are either 0 or 1.

We can use Dagster’s Asset Checks feature to codify these expectations. An example implementation for the first test is shown below. Since we want to ensure that downstream assets are not materialized if our source contains potentially erroneous data, we’ll also set blocking=True.

@asset_check(asset=titanic, blocking=True)
def target_has_no_nulls(context: AssetCheckExecutionContext):
    titanic_df = pd.read_csv("titanic.csv")
    num_null_survivednull_count = titanic_df["Survived"].isna().sum()

    return AssetCheckResult(
        passed=bool(num_null_survived == 0),
        severity=AssetCheckSeverity.ERROR,
        metadata={
            "null_count": int(num_null_survived),
        },
    )

Please see the docs on Asset Checks for a detailed breakdown of the Asset Checks API.

In this case, we’re defining the logic for the check by ourselves and also using Dagster’s robust metadata logging capabilities to keep track of how the number of null values changes over time.

While this is a simple test, defining custom logic for every single expectation we have for every asset we create can easily get tedious and substantially increase the amount of code we may have to write and maintain. For large data pipelines, it may even require building utility functions and abstractions to reduce the duplication of code for similar patterns and types of tests, adding even more work to the data team’s docket.

Great Expectations to the Rescue

Thankfully, the team at Great Expectations (GX) have already built an extensive suite of expectations for data quality checks (with a large number of additional checks implemented by the community) that we can build on top of, allowing us to focus on simply defining our expectations, without having to build the underlying implementation.

Before we rewrite the above test to use Great Expectations, let’s go over the GreatExpectationsResource, a simple resource that allows us to avoid having to duplicate some GX-related boilerplate in our asset checks:

class GreatExpectationsResource(ConfigurableResource):
    def get_validator(self, asset_df: pd.DataFrame):
        project_config = DataContextConfig(
            store_backend_defaults=InMemoryStoreBackendDefaults()
        )
        data_context = EphemeralDataContext(project_config=project_config)
        data_source = data_context.sources.add_pandas(name="my_pandas_datasource")
        asset_name = "asset_check_df"
        suite_name = "asset_check_expectation_suite"
        data_asset = data_source.add_dataframe_asset(name=asset_name)
        batch_request = data_asset.build_batch_request(dataframe=asset_df)
        data_context.add_or_update_expectation_suite(suite_name)
        validator = data_context.get_validator(
            batch_request=batch_request, expectation_suite_name=suite_name
        )
        return validator

Of note here is the fact that while GX has a rich ecosystem of abstractions for reading data from multiple sources, defining a re-usable set of expectations, and persisting GX-related configuration and validation results, we are opting to forego many of these features in favor of a simpler setup.

As such, we’re using an Ephemeral Data Context and reading the data from memory as a Pandas data frame (instead of using a GX data source). This is very similar to using Great Expectations interactively, while we rely on Dagster’s metadata logging to persist any important information (such as the validation results and related metadata).

Users with a deeper investment in GX or those wishing to use GX Cloud alongside Dagster may need to customize the setup to make use of GX’s more advanced features.

@asset_check(asset=titanic, blocking=True)
def ge_target_has_no_nulls(
    context: AssetCheckExecutionContext, ge_resource: GreatExpectationsResource
):
    titanic_df = pd.read_csv("titanic.csv")
    validator = ge_resource.get_validator(titanic_df)
    validation_result = validator.expect_column_values_to_not_be_null(column="Survived")

    return AssetCheckResult(
        passed=validation_result.success,
        severity=AssetCheckSeverity.ERROR,
        metadata=validation_result.result,
    )

Next, let’s look at how we can rewrite the Asset Check from before using a GX expectation:

Note that instead of defining the implementation for the null check ourselves, we’re using the expect_column_values_to_not_be_null method from Great Expectations.

We additionally use the ExpectationValidationResult.success attribute to determine if the check passed and pass this along to the AssetCheckResult. We also use the ExpectationValidationResult.result variable to log metadata, such as the number of rows that failed the test. This is a dictionary with metadata about the expectation; for example:

{
    'element_count': 891,
    'unexpected_count': 0,
    'unexpected_percent': 0.0,
    'partial_unexpected_list': []
}

In this case, GX has tested 891 rows and none of the rows failed the test.

By capturing this information using Dagster, we can have a log of any changes over time. And Dagster will also automatically visualize this metadata for you in the Plots section of the Asset Check’s details (or Dagster Insights, if you’re using Dagster+).

For example, in the screenshot above, we can see that the last materialization had 112 rows which did not meet our expectations, allowing us to quickly identify anomalous events and debug them accordingly.

Defining Multiple Expectations

Recall that we had other expectations that we wanted to test. While we could simply replicate the above example and replace the validation_result definition to use a different expectation, this would be a rather verbose solution with a lot of code duplication.

Below, we’ll show two patterns you can use to quickly and succinctly define asset checks for multiple GX expectations.

Using the multi_asset_check

The first option is to use the multi_asset_check decorator; this allows you to define multiple checks using a single op/function. An example of this, with checks for both of our expectations, is shown below:

@multi_asset_check(
    specs=[
        AssetCheckSpec(name="multicheck_target_has_no_nulls", asset=titanic),
        AssetCheckSpec(name="multicheck_target_has_valid_values", asset=titanic),
    ]
)
def ge_multiple_checks(
    context: AssetCheckExecutionContext, ge_resource: GreatExpectationsResource
):
    titanic_df = pd.read_csv("titanic.csv")
    validator = ge_resource.get_validator(titanic_df)

    validation_result = validator.expect_column_values_to_not_be_null(column="Survived")
    yield AssetCheckResult(
        passed=validation_result.success,
        severity=AssetCheckSeverity.ERROR,
        metadata=validation_result.result,
        check_name="multicheck_target_has_no_nulls",
    )

    validation_result = validator.expect_column_values_to_be_in_set(
        column="Survived", value_set={0, 1}
    )
    yield AssetCheckResult(
        passed=validation_result.success,
        severity=AssetCheckSeverity.ERROR,
        metadata=validation_result.result,
        check_name="multicheck_target_has_valid_values",
    )

In this way, you can create a single op for each asset and then define and yield each of the expectations sequentially.

Using the Factory Pattern to Generate Asset Checks with GX Expectations

Alternatively, you can use the factory pattern to further reduce the boilerplate for yielding individual AssetCheckResult objects by automatically generating the asset checks from a common specification. Thankfully, Great Expectations already has a ExpectationConfiguration class that’s perfect for this job.

Our common format for the specs will be a dictionary that maps the names of the asset checks (i.e., what’s passed to the name argument of the @asset_check decorator) to their corresponding ExpectationConfiguration. For our two tests from earlier, the specs would be as follows:

specs = {
    "factory_target_has_no_nulls": ExpectationConfiguration(
        expectation_type="expect_column_values_to_not_be_null",
        kwargs={"column": "Survived"},
    ),
    "factory_target_has_valid_values": ExpectationConfiguration(
        expectation_type="expect_column_values_to_be_in_set",
        kwargs={"column": "Survived", "value_set": {0, 1}},
    ),
}

You can find a wider array of tests in the Expectations Gallery on the GX website, alongside the arguments each test requires.

Next, we define a factory for generating the asset checks from these specs as follows:

def make_ge_asset_check(
    expectation_name: str,
    expectation_config: ExpectationConfiguration,
    asset: CoercibleToAssetKey | AssetsDefinition | SourceAsset,
):
    @asset_check(
        asset=asset,
        name=expectation_name,
        blocking=True,
        compute_kind="great_expectations",
    )
    def _asset_check(
        context: AssetCheckExecutionContext, ge_resource: GreatExpectationsResource
    ):
        titanic_df = pd.read_csv("titanic.csv")
        validator = ge_resource.get_validator(titanic_df)
        validation_result = expectation_config.validate(validator)

        return AssetCheckResult(
            passed=validation_result.success,  # type: ignore
            severity=AssetCheckSeverity.ERROR,
            metadata=validation_result.result,
        )

    return _asset_check

And defining our asset checks is then reduced to a simple list comprehension:

titanic_checks = [*make_ge_asset_check*(k, v, *titanic*) for k, v in specs.*items*()]

Adding new expectations to our asset is now as simple as adding a new ExpectationConfiguration to the specs dictionary, making the code significantly less verbose.

We can also define checks for other assets by creating a new specs dictionary for each asset.

Conclusion

Dagster’s Asset Checks are a great tool for ensuring the reliability of your data and stopping malformed or erroneous data from flowing downstream.

Using them alongside Great Expectations’ expansive library of expectations works incredibly well and can help you significantly speed up the adoption of data quality checks in your data pipelines.

Have feedback or questions? Start a discussion in Slack or Github.

Interested in working with us? View our open roles.

Want more content like this? Follow us on LinkedIn.

Dagster Newsletter

Get updates delivered to your inbox

Latest writings

The latest news, technologies, and resources from our team.

Multi-Tenancy for Modern Data Platforms
Webinar

April 7, 2026

Multi-Tenancy for Modern Data Platforms

Learn the patterns, trade-offs, and production-tested strategies for building multi-tenant data platforms with Dagster.

Deep Dive: Building a Cross-Workspace Control Plane for Databricks
Webinar

March 24, 2026

Deep Dive: Building a Cross-Workspace Control Plane for Databricks

Learn how to build a cross-workspace control plane for Databricks using Dagster — connecting multiple workspaces, dbt, and Fivetran into a single observable asset graph with zero code changes to get started.

Dagster Running Dagster: How We Use Compass for AI Analytics
Webinar

February 17, 2026

Dagster Running Dagster: How We Use Compass for AI Analytics

In this Deep Dive, we're joined by Dagster Analytics Lead Anil Maharjan, who demonstrates how our internal team utilizes Compass to drive AI-driven analysis throughout the company.

DataOps with Dagster: A Practical Guide to Building a Reliable Data Platform
DataOps with Dagster: A Practical Guide to Building a Reliable Data Platform
Blog

March 17, 2026

DataOps with Dagster: A Practical Guide to Building a Reliable Data Platform

DataOps is about building a system that provides visibility into what's happening and control over how it behaves

Unlocking the Full Value of Your Databricks
Unlocking the Full Value of Your Databricks
Blog

March 12, 2026

Unlocking the Full Value of Your Databricks

Standardizing on Databricks is a smart strategic move, but consolidation alone does not create a working operating model across teams, tools, and downstream systems. By pairing Databricks and Unity Catalog with Dagster, enterprises can add the coordination layer needed for dependency visibility, end-to-end lineage, and faster, more confident delivery at scale.

Announcing AI Driven Data Engineering
Announcing AI Driven Data Engineering
Blog

March 5, 2026

Announcing AI Driven Data Engineering

AI coding agents are changing how data engineers work. This Dagster University course shows how to build a production-ready ELT pipeline from prompts while learning practical patterns for reliable AI-assisted development.

How Magenta Telekom Built the Unsinkable Data Platform
Case study

February 25, 2026

How Magenta Telekom Built the Unsinkable Data Platform

Magenta Telekom rebuilt its data infrastructure from the ground up with Dagster, cutting developer onboarding from months to a single day and eliminating the shadow IT and manual workflows that had long slowed the business down.

Scaling FinTech: How smava achieved zero downtime with Dagster
Case study

November 25, 2025

Scaling FinTech: How smava achieved zero downtime with Dagster

smava achieved zero downtime and automated the generation of over 1,000 dbt models by migrating to Dagster's, eliminating maintenance overhead and reducing developer onboarding from weeks to 15 minutes.

Zero Incidents, Maximum Velocity: How HIVED achieved 99.9% pipeline reliability with Dagster
Case study

November 18, 2025

Zero Incidents, Maximum Velocity: How HIVED achieved 99.9% pipeline reliability with Dagster

UK logistics company HIVED achieved 99.9% pipeline reliability with zero data incidents over three years by replacing cron-based workflows with Dagster's unified orchestration platform.