July 8, 20246 minute read

Enabling Data Quality with Dagster and Great Expectations

Use Dagster and GX to improve data pipeline reliability without writing custom logic for data testing.
Muhammad Jarir Kanji
Name
Muhammad Jarir Kanji
Handle
@muhammad

One of the most important considerations when building a data pipeline is ensuring that the data assets it produces are reliable, up-to-date, and trustworthy.

To that end, we strongly believe that data quality should be natively integrated into the data orchestrator, which not only ensures that the data orchestrator is the single pane of glass for your data assets but also allows the orchestrator to stop erroneous data from being propagated downstream to stakeholder-facing products, such as dashboards and reports.

While Asset Checks enable exactly this, you may often find yourself having to write a lot of custom logic and boilerplate code to codify the expectations you have of your data.

In this piece, we'll demonstrate  how you can combine Great Expectations' extensive suite of data quality tests with Dagster's Asset Checks to improve the reliability of your data pipelines without having to write custom logic for testing your data.

The full code for this walkthrough can be found here.

Setup

To start off, we recommend cloning the demo repo to your computer and then install it as a package using pip. This will install Dagster, Great Expectations, and all the other dependencies you’ll need to follow along. We’d also recommend creating a new virtual environment before doing so.

### Clone the repo and cd into the directory
git clone https://github.com/mjkanji/dagster-ge-demo.git
cd dagster-ge-demo

### Install the package
pip install -e ".[dev]"

Next, we’ll define a simple asset that fetches the infamous Titanic dataset from the internet, reads it using Pandas, and then writes it to disk as a CSV file.

@asset
def titanic():
    titanic_df = pd.read_csv(
        "https://github.com/datasciencedojo/datasets/raw/master/titanic.csv"
    )
    titanic_df.to_csv("titanic.csv", index=False)

Since we’re fetching this data from the internet, it’s possible that the source file may change without our knowledge. To ensure that the materialized asset aligns with our expectations, we may want to codify some expectations about the data.

For example, we may expect that the Survived column (which indicates if a passenger survived):

  • has no null values; and
  • that all of its values are either 0 or 1.

We can use Dagster’s Asset Checks feature to codify these expectations. An example implementation for the first test is shown below. Since we want to ensure that downstream assets are not materialized if our source contains potentially erroneous data, we’ll also set blocking=True.

@asset_check(asset=titanic, blocking=True)
def target_has_no_nulls(context: AssetCheckExecutionContext):
    titanic_df = pd.read_csv("titanic.csv")
    num_null_survivednull_count = titanic_df["Survived"].isna().sum()

    return AssetCheckResult(
        passed=bool(num_null_survived == 0),
        severity=AssetCheckSeverity.ERROR,
        metadata={
            "null_count": int(num_null_survived),
        },
    )

Please see the docs on Asset Checks for a detailed breakdown of the Asset Checks API.

In this case, we’re defining the logic for the check by ourselves and also using Dagster’s robust metadata logging capabilities to keep track of how the number of null values changes over time.

While this is a simple test, defining custom logic for every single expectation we have for every asset we create can easily get tedious and substantially increase the amount of code we may have to write and maintain. For large data pipelines, it may even require building utility functions and abstractions to reduce the duplication of code for similar patterns and types of tests, adding even more work to the data team’s docket.

Great Expectations to the Rescue

Thankfully, the team at Great Expectations (GX) have already built an extensive suite of expectations for data quality checks (with a large number of additional checks implemented by the community) that we can build on top of, allowing us to focus on simply defining our expectations, without having to build the underlying implementation.

Before we rewrite the above test to use Great Expectations, let’s go over the GreatExpectationsResource, a simple resource that allows us to avoid having to duplicate some GX-related boilerplate in our asset checks:

class GreatExpectationsResource(ConfigurableResource):
    def get_validator(self, asset_df: pd.DataFrame):
        project_config = DataContextConfig(
            store_backend_defaults=InMemoryStoreBackendDefaults()
        )
        data_context = EphemeralDataContext(project_config=project_config)
        data_source = data_context.sources.add_pandas(name="my_pandas_datasource")
        asset_name = "asset_check_df"
        suite_name = "asset_check_expectation_suite"
        data_asset = data_source.add_dataframe_asset(name=asset_name)
        batch_request = data_asset.build_batch_request(dataframe=asset_df)
        data_context.add_or_update_expectation_suite(suite_name)
        validator = data_context.get_validator(
            batch_request=batch_request, expectation_suite_name=suite_name
        )
        return validator

Of note here is the fact that while GX has a rich ecosystem of abstractions for reading data from multiple sources, defining a re-usable set of expectations, and persisting GX-related configuration and validation results, we are opting to forego many of these features in favor of a simpler setup.

As such, we’re using an Ephemeral Data Context and reading the data from memory as a Pandas data frame (instead of using a GX data source). This is very similar to using Great Expectations interactively, while we rely on Dagster’s metadata logging to persist any important information (such as the validation results and related metadata).

Users with a deeper investment in GX or those wishing to use GX Cloud alongside Dagster may need to customize the setup to make use of GX’s more advanced features.

Next, let’s look at how we can rewrite the Asset Check from before using a GX expectation:

@asset_check(asset=titanic, blocking=True)
def ge_target_has_no_nulls(
    context: AssetCheckExecutionContext, ge_resource: GreatExpectationsResource
):
    titanic_df = pd.read_csv("titanic.csv")
    validator = ge_resource.get_validator(titanic_df)
    validation_result = validator.expect_column_values_to_not_be_null(column="Survived")

    return AssetCheckResult(
        passed=validation_result.success,
        severity=AssetCheckSeverity.ERROR,
        metadata=validation_result.result,
    )

Note that instead of defining the implementation for the null check ourselves, we’re using the expect_column_values_to_not_be_null method from Great Expectations.

We additionally use the ExpectationValidationResult.success attribute to determine if the check passed and pass this along to the AssetCheckResult. We also use the ExpectationValidationResult.result variable to log metadata, such as the number of rows that failed the test. This is a dictionary with metadata about the expectation; for example:

{
    'element_count': 891,
    'unexpected_count': 0,
    'unexpected_percent': 0.0,
    'partial_unexpected_list': []
}

In this case, GX has tested 891 rows and none of the rows failed the test.

By capturing this information using Dagster, we can have a log of any changes over time. And Dagster will also automatically visualize this metadata for you in the Plots section of the Asset Check’s details (or Dagster Insights, if you’re using Dagster+).

For example, in the screenshot above, we can see that the last materialization had 112 rows which did not meet our expectations, allowing us to quickly identify anomalous events and debug them accordingly.

Defining Multiple Expectations

Recall that we had other expectations that we wanted to test. While we could simply replicate the above example and replace the validation_result definition to use a different expectation, this would be a rather verbose solution with a lot of code duplication.

Below, we’ll show two patterns you can use to quickly and succinctly define asset checks for multiple GX expectations.

Using the multi_asset_check

The first option is to use the multi_asset_check decorator; this allows you to define multiple checks using a single op/function. An example of this, with checks for both of our expectations, is shown below:

@multi_asset_check(
    specs=[
        AssetCheckSpec(name="multicheck_target_has_no_nulls", asset=titanic),
        AssetCheckSpec(name="multicheck_target_has_valid_values", asset=titanic),
    ]
)
def ge_multiple_checks(
    context: AssetCheckExecutionContext, ge_resource: GreatExpectationsResource
):
    titanic_df = pd.read_csv("titanic.csv")
    validator = ge_resource.get_validator(titanic_df)

    validation_result = validator.expect_column_values_to_not_be_null(column="Survived")
    yield AssetCheckResult(
        passed=validation_result.success,
        severity=AssetCheckSeverity.ERROR,
        metadata=validation_result.result,
        check_name="multicheck_target_has_no_nulls",
    )

    validation_result = validator.expect_column_values_to_be_in_set(
        column="Survived", value_set={0, 1}
    )
    yield AssetCheckResult(
        passed=validation_result.success,
        severity=AssetCheckSeverity.ERROR,
        metadata=validation_result.result,
        check_name="multicheck_target_has_valid_values",
    )

In this way, you can create a single op for each asset and then define and yield each of the expectations sequentially.

Using the Factory Pattern to Generate Asset Checks with GX Expectations

Alternatively, you can use the factory pattern to further reduce the boilerplate for yielding individual AssetCheckResult objects by automatically generating the asset checks from a common specification. Thankfully, Great Expectations already has a ExpectationConfiguration class that’s perfect for this job.

Our common format for the specs will be a dictionary that maps the names of the asset checks (i.e., what’s passed to the name argument of the @asset_check decorator) to their corresponding ExpectationConfiguration. For our two tests from earlier, the specs would be as follows:

specs = {
    "factory_target_has_no_nulls": ExpectationConfiguration(
        expectation_type="expect_column_values_to_not_be_null",
        kwargs={"column": "Survived"},
    ),
    "factory_target_has_valid_values": ExpectationConfiguration(
        expectation_type="expect_column_values_to_be_in_set",
        kwargs={"column": "Survived", "value_set": {0, 1}},
    ),
}

You can find a wider array of tests in the Expectations Gallery on the GX website, alongside the arguments each test requires.

Next, we define a factory for generating the asset checks from these specs as follows:

def make_ge_asset_check(
    expectation_name: str,
    expectation_config: ExpectationConfiguration,
    asset: CoercibleToAssetKey | AssetsDefinition | SourceAsset,
):
    @asset_check(
        asset=asset,
        name=expectation_name,
        blocking=True,
        compute_kind="great_expectations",
    )
    def _asset_check(
        context: AssetCheckExecutionContext, ge_resource: GreatExpectationsResource
    ):
        titanic_df = pd.read_csv("titanic.csv")
        validator = ge_resource.get_validator(titanic_df)
        validation_result = expectation_config.validate(validator)

        return AssetCheckResult(
            passed=validation_result.success,  # type: ignore
            severity=AssetCheckSeverity.ERROR,
            metadata=validation_result.result,
        )

    return _asset_check

And defining our asset checks is then reduced to a simple list comprehension:

titanic_checks = [*make_ge_asset_check*(k, v, *titanic*) for k, v in specs.*items*()]

Adding new expectations to our asset is now as simple as adding a new ExpectationConfiguration to the specs dictionary, making the code significantly less verbose.

We can also define checks for other assets by creating a new specs dictionary for each asset.

Conclusion

Dagster’s Asset Checks are a great tool for ensuring the reliability of your data and stopping malformed or erroneous data from flowing downstream.

Using them alongside Great Expectations’ expansive library of expectations works incredibly well and can help you significantly speed up the adoption of data quality checks in your data pipelines.


The Dagster Labs logo

We're always happy to hear your feedback, so please reach out to us! If you have any questions, ask them in the Dagster community Slack (join here!) or start a Github discussion. If you run into any bugs, let us know with a Github issue. And if you're interested in working with us, check out our open roles!

Follow us:


Read more filed under
Blog post category for Blog Post. Blog Post