Good Data at Good Eggs: Using Dagster to manage the data platform

Published on 2020-12-09

David Wallace
Name
David Wallace
Handle
@davidjwallace

This is the fourth in a series of posts about putting Dagster into production at Good Eggs. Read the introduction, part one, and part two to learn more about our experience transforming our data platform with Dagster.

As your data platform expands to allow more classes of users to efficiently self-serve, you’ll probably eventually find yourself trying to manage accumulated tech debt in a bunch of disparate tools with implicit dependencies on each other.

At Good Eggs, we enthusiastically adopted dbt and Mode Analytics to let our SQL-speaking analysts build and test analytics pipelines in the data warehouse and reports on top of warehouse tables.

This kind of debt may seem innocuous at first.

But over time, we found that as the focus of our analysts’ attention inevitably moved from one business problem to another, unused dbt models and forgotten Mode reports were beginning to accumulate.

This kind of debt may seem innocuous at first, but it’s hard to discern what’s important when you have to sift through hundreds of unused, irrelevant, or obsolete dbt models and mode reports: cluttering up your interface, imposing cognitive load, and in the case of dbt models, consuming expensive Snowflake credits and developer time every time they run — even when there were no downstream reports making use of them.

We found ourselves looking for a way to automate what would otherwise be a time-consuming manual curation process of culling atrophied dbt models and Mode reports.

Enter the metapipeline

A view of our metapipeline's structure

We realized that we could use Dagster not just to orchestrate the computations that produced assets in our various systems, but also to directly manage the tools that end users of our data platform lived in.

So we decided to build what we internally refer to as a “metapipeline” to help us clean up our unused dbt and Mode artifacts. We think this is a pretty neat example of how to automate and productionize a complex, cross-tool workflow.

This pipeline starts by using the Mode Analytics API to build a graph of all the reports and queries that are defined in Mode. We want to build a graph that looks like the following:

Schematic graph of a Mode report and queries

We do this using a composite solid, which lets us encapsulate complexity by assembling subsets of solids into larger units.

The composite solid that constructs our Mode graph

We then iterate through this graph to find “stale” Mode reports (in our case, reports that haven’t been run in the last 100 days or have low usage), archive them, and mark them for future deletion:

@solid(
    description=(
        "A solid that generates a list of Mode Reports that are candidates for archival."
    ),
    input_defs=[
        InputDefinition(
            name="reports",
            description="A list of non-archived Mode Reports to inspect.",
            dagster_type=List[ModeReport],
        ),
    ],
    output_defs=[
        OutputDefinition(
            name="reports",
            description="A list of Mode Reports that have been deemed candidates for archival.",
            dagster_type=List[ModeReport],
        )
    ],
    tags={"kind": "mode"},
)
def filter_archive_candidates(context, reports: List[ModeReport]) -> List[ModeReport]:
    archive_candidates = []
    for report in reports:
        now = datetime.utcnow().replace(tzinfo=pytz.UTC)
        days_since_last_run = (now - report.last_run_at).days
        days_since_created = (now - report.created_at).days
        low_usage = report.runs_count < 5 and report.view_count < 5 and days_since_created > 60
        if days_since_last_run > 100 or low_usage:
            archive_candidates.append(report)

    yield Output(value=archive_candidates, output_name="reports")

This entire process is automated by our metapipeline:

Fragment of our metapipeline that archives and deletes stale Mode reports.

Finally, we introspect the text of all active Mode query definitions in order to see which queries refer to artifacts created by dbt models.

When we find a query that depends on a dbt model, we augment our existing dbt graph with an edge between the Mode query node and the dbt model node. Using this new “holistic” graph, we are able to discern which dbt models don’t have any recursive path to downstream data consumables, and thus might be good candidates for deprecation.

This is what this augmented graph looks like:

Mode graph augmented with dbt model dependencies.

Our pipeline then automatically creates a new branch of the git repository containing our dbt models, with the deprecation candidates removed. Finally, the pipeline opens a pull request in Github for manual review of the cleanup process.

Fragment of our metapipeline that creates pull requests to delete stale dbt models.

After this pipeline has run, pull requests are opened in Github for manual confirmation:

Example pull request from the metapipeline.

Similarly, we use the metapipeline to automate the maintenance of dbt exposures. Exposures are a relatively new dbt facility that lets users manually specify external downstream dependencies of dbt models.

While this is a powerful tool — since, for example, we can now tell dbt to run all the models that are upstream of, e.g., some other Dagster pipeline — maintaining exposures can be an arduous, manual process.

But since our metapipeline has already composed a graph expressing the relationships between all dbt nodes and all downstream consumer nodes, we've been able to automate that work away: when the metapipeline runs, it just opens a PR with updated exposure definitions.

Example pull request to maintain dbt exposures.

“Meta-analysis” with the metapipeline

This work was a meaningful step forward in being able to programmatically manage the complexity and relevancy of our data platform, but we wanted to be sure that we were having a tangible impact on operations.

Were we successfully controlling the size of our dbt project? Are there dbt models whose runtimes we should be concerned about? Are there specific dbt models where optimization could have the highest impact on our project?

As the final step of our pipeline, within a Jupyter notebook, we perform a set of analyses geared towards helping us tackle these questions.

In general, we prefer not to run notebooks in production, but in this case, it’s the easiest way to create the visualization we need. We wrap the notebook in a Dagster solid using Dagstermill, an library built on top of Dagster and papermill that lets notebooks be parameterized and orchestrated alongside all the other steps of our pipeline.

As part of this notebook, we output a visualization to Slack helping us monitor the overall size of our dbt project by visualizing how many distinct dbt models we run each day. This has enabled us to put numbers behind the impact that our programmatic dbt model deprecation is having:

The output notebook.

Additionally, we want to be able to routinely identify “important” dbt models, whose optimization could have particularly high-impact on the performance of our project.

To gauge a dbt models “importance” within the project, we use a composition of various graph centrality measures. This helps us identify which models may be acting as bottlenecks within the graph.

Schematic of the factors that determine a node's 'importance'.

@solid(
    description=(
        "Calculates various centrality measures for nodes within a networkx "
        "DiGraph object and returns a flattened dataframe with one line per "
        "node in the graph."
    ),
    input_defs=[InputDefinition(name="digraph", dagster_type=DiGraph)],
    output_defs=[OutputDefinition(name="df", dagster_type=NodesDataFrame)],
    tags={"kind": "networkx"},
)
def generate_enriched_graph_dataframe(context, digraph: DiGraph) -> NodesDataFrame:
    dc = nxa.centrality.degree_centrality(digraph)
    idc = nxa.centrality.in_degree_centrality(digraph)
    odc = nxa.centrality.out_degree_centrality(digraph)
    cc = nxa.centrality.closeness_centrality(digraph)
    bc = nxa.centrality.betweenness_centrality(digraph)
    pr = nxa.link_analysis.pagerank_alg.pagerank(digraph)

    nodes = []
    for node, metadata in digraph.nodes.items():
        record = (
            node, metadata, dc[node], idc[node], odc[node], cc[node],
            bc[node], pr[node],
        )
        nodes.append(record)

    columns = [
        "node",
        "metadata",
        "degree_centrality",
        "in_degree_centrality",
        "out_degree_centrality",
        "closeness_centrality",
        "betweenness_centrality",
        "pagerank",
    ]

    df = pd.DataFrame.from_records(nodes, columns=columns)
    return df

Once candidate models are identified, they are automatically flagged to data stakeholders and are subsequently prioritized for optimization work.

We hope you’ve enjoyed reading about our experience with Dagster so far. Tune in next week for our fifth post on effective development with Dagster. -->