Orchestrate Meltano Jobs with Dagster | Dagster Blog

April 4, 20236 minute read

Orchestrate Meltano Jobs with Dagster

Meltano provides 550 connectors and tools, all of which can be configured and orchestrated straight from Dagster.
Fraser Marlow
Name
Fraser Marlow
Handle
@frasermarlow

In this blog post we will explore orchestrating the popular ingestion solution Meltano from inside Dagster.

By executing the commands from within Dagster, we get to take full advantage of the solution's other capabilities such as scheduling, dependency management, end-to-end testing, partitioning and more.

Meltano is one of dozens of integrations for Dagster, and the complete list can be found here.

Note: This tutorial was updated in May 2023 to include version updates.
It was last tested on: Dagster, version 1.2.6 and Meltano, version 2.16.1.

Contents

An introduction to Meltano

The origins: Singer.io

Back in the late 2010’s, a dozen companies and open source projects popped up aiming to solve the problem of ELt in the new world of SaaS - namely, how to easily ingest data from a dozen SaaS sources into a centralized warehouse for analysis, typically to support a Business Analytics use case.

One popular open-source project for ELt was Singer, a specification with a simple premise: you could write any data extraction program to pull data from a source (say, a simple Python program using requests) and write any data loading program to push your data into a destination like mySQL or Redshift (or later Snowflake, Databricks, Azure Synapse, Duck DB…). As long as your data extraction program (called a ‘tap’) and your data loading program (called a ‘target’) could write/read in a serialized JSON format that met the Singer standard, you could pipe the data over ‘stdout’ from the tap to the target with a simple command.

Meltano

The Singer.io framework provided some other capabilities, such as configuring a catalog.py for selecting what data to replicate, a STATE JSON map for persisting information between invocations of a tap, and a config file that contains the parameters needed to pull data from the source (such as credentials).

Once this standard was established, any member of the data community could submit a tap or target of their choice to the collective open-source library.

I wrote an introduction guide to using Singer a while back that you can find here.

Enter Meltano

The in-house data team at GitLab adopted the Singer spec, and built an internal framework for better managing their custom taps and targets. The project was branded Meltano, which became an open-source project. It launched publicly in 2018 and became independent of GitLab in 2021.

Meltano

Building on the original Singer specification, Meltano added an SDK for building new integrations, a configuration wrapper, and an integrations Hub to support the community of Singer users. At the time of writing, the Meltano Hub offers over 550 integrations and the company is prepping the launch of its cloud service.

Meltano

As such, Meltano is an interesting open-source tool that Dagster users may be interested in integrating into their pipelines.

Project overview

For simplicity, we will work off the Meltano tutorial example, which involves ingesting data from GitHub and storing it in a dockerized Postgres database. We will then add to this by orchestrating this pipeline with Dagster.

Meltano

Upon completing the installation steps, your project files will look like the folder structure below. There are four key files to be aware of:

Meltano
Dagster’s __init__.py file (inside the Dagster project subfolder). This is where we will be making our main code changes for Dagster, but note that in a typical Dagster project, we would organize our code in a more structured fashion.

Dagster’s setup.py where we will specify our Python dependencies before installing Dagster. In this tutorial our only dependency is the dagster-meltano library.


Meltano’s .env file where any sensitive configuration values get stored.

Meltano’s meltano.yml file. This is the main configuration file for the Meltano instance, and each `meltano config` command will make updates to this file.

Interested in trying Dagster Cloud for Free?
Enterprise orchestration that puts developer experience first. Serverless or hybrid deployments, native branching, and out-of-the-box CI/CD.

Setting up a Meltano project (a bash cheatsheet)

The Meltano splashscreen in terminal during install

To set up Meltano, you can either follow the four-part tutorial, or, if you would rather zip through that, you will find below the commands you need if working locally on Mac (or you can grab the bash script here). Both the tutorial and the script below should get you to the same place:

To use the script:

  1. Install and boot up Docker desktop on Mac or install Docker on Linux
  2. Execute the set of commands listed below. Note that I provided a set of variables in the shell script, and I will refer back throughout the tutorial, including in some commands. You can use whichever variables make sense for your project, but they are:
  • PROJECT: The top-level folder for our project.
  • GITHUB_TOKEN: a Personal Access Token (Classic) with minimum access permissions.
  • REPOS_TO_IMPORT: a list of Github repositories you want to pull data for.
  • START_DATE: the date of the earliest data you want to extract.
  • DOCKERCONTAINERNAME: a unique name for the docker container.
  • POSTGRES_USER: a new user for your postgres database.
  • POSTGRES_PASSWORD: the postgres user's password.
  • DATABASE: an arbitrary name for the database.
  • ENVIRONMENT: our Meltano environment.
#! /bin/bash
#
# A tutorial on running Meltano data ELT jobs from inside Dagster
# using the dagster-meltano library found at
# https://github.com/quantile-development/dagster-meltano
#
# Fraser Marlow [https://github.com/frasermarlow] - April 2023
#
# Ensure Docker is running locally before executing this script.
#
# ---------------------------------------------------------------

# SET PROJECT VARIABLES BELOW

PROJECT_FOLDER="dag-melt" # set a name for your project folder
MELTANO_PROJECT="meltano-project"
MELTANO_JOB="my-meltano-job"
GITHUB_TOKEN="ghp_abcdefghijklmnopqrstuvwxyz123456"  # set a standard GitHub token with minimum access permissions
REPOS_TO_IMPORT="['quantile-development/dagster-meltano','frasermarlow/tap-bls','dagster-io/fake-star-detector']"  # one or more Github repositories you want to pull data for
START_DATE="2021-12-31"  # the date of the earliest data you want to extract
DOCKER_CONTAINER_NAME="my-pg-container" # a unique name for the docker container
POSTGRES_USER="meltano"  # a new user for your postgres database
POSTGRES_PASSWORD="password"  # the postgres user's password
DATABASE="my-db-name"  # an arbitrary name for the database
ENVIRONMENT="dev"  # our Meltano environment, defaulting to 'dev' is fine.

###########################################################

# create project folder
mkdir $PROJECT_FOLDER
cd $PROJECT_FOLDER

# create project virtual environment
python3 -m venv venv
source venv/bin/activate

# install packages
pip install --upgrade pip
pip install meltano
meltano --version  # meltano, version 2.16.1
meltano init $MELTANO_PROJECT
cd $MELTANO_PROJECT

meltano environment add $ENVIRONMENT  # add a new environment if needed. Defaulting to 'dev' is fine.
export MELTANO_ENVIRONMENT=$ENVIRONMENT  # sets the environment as an env var

# install and configure the GitHub data extractor (tap)
meltano add extractor tap-github --variant=meltanolabs
meltano config tap-github set auth_token $GITHUB_TOKEN
meltano config tap-github set repositories $REPOS_TO_IMPORT
meltano config tap-github set start_date $START_DATE
meltano config tap-github # prints out the configuration settings

# select the streams we will import
meltano select tap-github commits url
meltano select tap-github commits sha
meltano select tap-github commits commit_timestamp

# Setup postgres database
docker run -p 5432:5432 --name $DOCKER_CONTAINER_NAME -e POSTGRES_USER=$POSTGRES_USER -e POSTGRES_PASSWORD=$POSTGRES_PASSWORD -e POSTGRES_DB=$DATABASE -e POSTGRES_INITDB_ARGS="--auth-host=md5 --auth-local=md5" -d postgres
docker container ls  # confirms the docker instance is up and running

## configure target-postgres loader
meltano add loader target-postgres --variant=meltanolabs
meltano config target-postgres set user $POSTGRES_USER
meltano config target-postgres set password $POSTGRES_PASSWORD
meltano config target-postgres set database $DATABASE
meltano config target-postgres set add_record_metadata True
meltano config target-postgres set host localhost

# create this same task as a job
meltano job add $MELTANO_JOB --tasks "tap-github target-postgres"

## Run the basic ingest job to check all is OK
meltano run tap-github target-postgres

# shut down the virtual environment
deactivate

Upon completion you should now have a new Docker container running locally, with a postgres database running on port 5432.

Meltano
Our Docker container as seen in Docker Desktop

The second-to-last command meltano run tap-github target-postgres executes the run, so our database should be populated. We can connect to it using any Postgres database management tool. Here for example, is the connection and the imported data using the free dbeaver:

Meltano
Connecting to our database
Meltano
Data imported from Github via Meltano

Setting up Dagster

Now that we have a basic Meltano E(t)L process set up let’s add Dagster to the mix.

From the commands above, we created our Meltano project in a folder called ~/dag-melt/ so we will now create a folder for our Dagster instance at ~/dag-melt/dagster. We will create a separate venv for Dagster, so we will deactivate first just in case you have the previous venv still active.

deactivate
mkdir ~/dag-melt/dagster && cd $_
python3 -m venv dag-venv
source dag-venv/bin/activate

Next we will install Dagster. In most cases this will simply involve

pip install dagster dagit

…but you should refer to https://docs.dagster.io/getting-started/install and follow the most recent instructions.

Once installed, we will scaffold a blank dagster project for demo purposes:

dagster project scaffold --name dag-melt-project
cd dag-melt-project
rehash

Add the dagster-meltano library as a required install item in the Dagster project setup.py:

install_requires=[
   "dagster",
   "dagster-meltano"
],

We can now install our dependencies and launch Dagster:

pip install -e ".[dev]"
dagster dev

This should start the Dagster instance at https://localhost:3000

Arguably, it looks a bit empty right now, but it's up and running.

Meltano
A blank Dagster instance

Using the Dagster-Meltano library

Now that we have Meltano up and running, we can get to the good stuff: how to execute Meltano commands straight from Dagster. Let's explore some of the options for executing Meltano commands from Dagster.

Run config

When initiating a run in Dagster, we can pass along configuration variables at run time such as the location of the Meltano project. Look for the 'Launchpad' tab after clicking on the job name in the left nav.

Configuring a Dagster run in the launchpad wth Meltano project location
resources:
  meltano:
    config:
      project_dir: "full-path-to/the-meltano/project-folder"

If you fail to specify this, you will run into the error meltano run must be run inside a Meltano project.”

Side note: Injecting Env Variables

Meltano stores any env variables in a local .env file in the root of the Meltano project folder.

You can, however, pass such configuration variables along at runtime from the Dagster Launchpad as follows:

ops:
  tap_github_target_postgres:
    config:
      env:
        TARGET_POSTGRES_PASSWORD: 'password123xyz'

Three ways to run Meltano from Dagster

There are several techniques for triggering a Meltano run using the integration.

Note that Meltano tracks the STATE (for incremental replication), and subsequent invocations will not duplicate the import unless you explicitly ask it to by overriding it. You can do a full refresh (to ignore existing state) using meltano run tap target --full-refresh. You can also use meltano state clear <state_id> to delete the existing state as documented here.

Option 1: import jobs from Meltano

Our first option is to import jobs that have been defined in Meltano. For this we can simply use load_jobs_from_meltano_project() and point to our Meltano project.

Earlier, during the Meltano setup, we created a job for our Github->Postgres pipeline with the command

# create this same task as a job
meltano job add $MELTANO_JOB --tasks "tap-github target-postgres"

So we can now import that job (along with any other defined jobs).

Edit the file Dagster project __init__.py as follows, replacing the path with the one on your machine:

from dagster import Definitions, job, repository
from dagster_meltano import meltano_resource, meltano_run_op, load_jobs_from_meltano_project

meltano_jobs = load_jobs_from_meltano_project("/Users/<username>/PROJECT_FOLDER/MELTANO_PROJECT")

defs = Definitions(jobs=meltano_jobs)

For example, the path for me is /Users/frasermarlow/dag-melt/meltano-project.

Note that, since we are providing the path, this job requires no configuration. You can refresh the Dagster project, then click on the Launchpad tab, and click 'Launch Run'.

Running Meltano Option 1: import jobs from Meltano

Option 2: issue a meltano run command.

Now edit the file __init__.py and replace the file contents with the following:

from dagster import Definitions, job
from dagster_meltano import meltano_resource, meltano_run_op

@job(resource_defs={"meltano": meltano_resource})
def run_elt_job():
   tap_done = meltano_run_op("tap-github target-postgres")()

defs = Definitions(jobs=[run_elt_job])

Again, refresh the Dagster project, click on the Jon, click on Launchpad, add the configuration for the run as detailed in the "Run config" section above, and then click 'Launch run'.

Option 3: Issue a Meltano run job command:

Very similar to option 2, if you have a job that has been defined in Meltano you can simply run meltano_run_op("my-meltano-job-name")()

As the job executes you will see the Meltano command run:

Configuring a Dagster run in the launchpad wth Meltano project location

Running other Meltano commands

Now that we have demonstrated how to trigger a basic run, we can look at how to do any other configuration changes in Meltano. You can make any changes programmatically from Dagster using the meltano_command_op() function.

The meltano_resource will access the Meltano project location, prepend the meltano reference, and execute the command:

from dagster import job, Definitions
from dagster_meltano import meltano_resource, meltano_command_op

@job(resource_defs={"meltano": meltano_resource})
def meltano_command_job():
    meltano_command_op("config tap-github set repositories \"['dagster-io/hooli-data-eng-pipelines','dagster-io/fake-star-detector']\"","update_gh_repos")()

defs = Definitions(jobs=[meltano_command_job])

In conclusion

We hope this guide will be helpful to anybody looking to tap into Meltano's capabilities as part of a Dagster managed project. This guide covered the basics of getting a Meltano project running, and we encourage you to investigate further as there is a lot more capabilities under the hood.

References

The Meltano intro tutorial
The Dagster utility on the Meltano Hub
The Quantile README for the dagster-meltano library

Dagster and Meltano

The Dagster Labs logo

We're always happy to hear your feedback, so please reach out to us! If you have any questions, ask them in the Dagster community Slack (join here!) or start a Github discussion. If you run into any bugs, let us know with a Github issue. And if you're interested in working with us, check out our open roles!

Follow us:


Read more filed under
Blog post category for Integration. Integration