Good Data at Good Eggs: Correctness and reliability for data infrastructure

Published on 2020-10-01

David Wallace
Name
David Wallace
Handle
@davidjwallace

This is the second in a series of posts about putting Dagster into production at Good Eggs. Read the introduction for an overview of our experience transforming our data platform with Dagster.

Where does data come from? At Good Eggs, some of our most important data comes from the field. Our warehouse managers use Google Sheets, for instance, to report on our various staffing partner labor metrics. The data from these Google Sheets feeds the downstream modeling that powers our logistics operations, and it’s crucial that it be correct. Since all data entry is prone to error, we’d like to catch those errors as quickly as possible, and definitely before bad data makes its way into our decision support.

Checking data quality on ingest with custom types

Good Eggs Data Ingest Pipeline

Let’s look at how we manage our data ingest from Google Sheets using Dagster.

Each node in the pipeline here is what Dagster calls a solid, a functional unit of computation that products and consumes data assets.

In our pipeline, a solid reads a range of values from a Google Sheet, and then reshapes it as a Pandas DataFrame with given column types.

We then type check the data against a custom data frame type as a way of running quality checks on the data. In Dagster, type checks can run arbitrary code and associate their results with structured metadata. Here, we use a type factory from the dagster_pandas library to construct a custom data frame, which runs a set of checks on the data frame and its column. This process guarantees invariants, like that values for certain columns are non-null or belong to certain sets of categorical values. We also inject structued metadata (generated by our own compute_dataframe_summary_statistics function) so that summary statistics about the data appear in Dagit and in longitudinal views.

def compute_dataframe_summary_statistics(dataframe):
   return [
       EventMetadataEntry.text(
           str(len(dataframe)), "n_rows", "Number of rows seen in the dataframe"
       ),
       EventMetadataEntry.md(
           dataframe.head().to_markdown(),
           "head",
           "A markdown preview of the first 5 rows."
       ),
       ...
   ]

  
TransportationTempLaborDataFrame = create_dagster_pandas_dataframe_type(
   name="TransportationTempLaborDataFrame",
   columns=[
       PandasColumn.datetime_column(
           name="date",
           min_datetime=START_DATE,
           max_datetime=datetime.utcnow(),
           non_nullable=True,
       ),
       PandasColumn.string_column(name="team", non_nullable=True),
       PandasColumn.numeric_column(name="hours", min_value=0, non_nullable=True),
       PandasColumn.categorical_column(
           name="location", categories={"sfbay", "MAR1"}, non_nullable=True
       ),
       ...,
   ],`
   dataframe_constraints=[UniqueColumnsConstraint(columns=["date", "location"])],
   event_metadata_fn=compute_dataframe_summary_statistics,
)

Like all other Dagster objects, custom types are self-documenting, so we get a nice view of the custom type in Dagit. This helps with discoverability for our analysts and anyone coming to the system fresh.

Good Eggs Custom Data Frame Type

If our type checks fail, we conditionally execute solids that notify us in Slack and via email of the ingest failures, so we can take corrective action quickly. Emails are sent to select users, e.g., the operations team.

Good Eggs Validation Failure Email

Slack messages surface operational failures to everyone quickly, and include links to the pipeline run and the source data to help debug.

Good Eggs Slack Failure

If our type checks succeed, we know we can move ahead with data ingest. We conditionally execute solids that write the data to S3, then to Snowflake. Then, we trigger downstream dbt models, dbt tests, and Jupyter notebooks.

For us, failing before bad data is ingested to the warehouse is a big win. It’s a little bit like the value provided by database schema constraints, but in a system (Google Sheets) with no concept of schema or typing. You can’t commit strings to an integer column in a database, and you shouldn’t be able to load a form with values of the wrong type either.

We recognized the value of being able to impose strong typing at will on any of our inputs or outputs when we first started to evaluate data orchestrators, and this was one of the factors in our choice of Dagster over Airflow.

We’ve introduced several dozen of these custom data frame types into our data ingest process. Thanks to automated data quality tests, we’ve been able to make a process that used to take days immediate — our max latency is now about an hour. And there’s no chance of polluting downstream data artifacts because when our data isn’t clean, computation now just halts.

We also yield structured metadata as part of our type check process. Being able to view this metadata in the Dagster logs before ingestion has real benefits for the human in the loop. For example, we yield structured metadata with the head and tail of the data in a Google Sheet. This is viewable directly in Dagit and is often enough for a human to diagnose a data quality problem.

Good Eggs Data Frame Tail

Dagster’s support for custom data types, and ability to surface structured metadata in Dagit, means less downstream breakage on data ingest and better recovery when bad data makes it through.

We hope you’ve enjoyed reading about our experience with Dagster so far. Tune in next week for our third post: Enabling multi-persona self-service