June 7, 20248 minute read

Running Singer on Dagster

Singer Taps and Targets are popular data movement tools. Here is how (and why) you run them in Dagster.
Fraser Marlow
Name
Fraser Marlow
Handle
@frasermarlow

Dagster offers many embedded data movement and ingestion options, and we recently summarized them in this blog post. However, Dagster’s flexible framework makes it easy to run any ELT process.

In this post, we will look at how to set up a Singer ELT pipeline using a Singer Tap and Target, and how to leverage the components of Dagster’s orchestration system. This post should be valuable for data engineers with an existing Singer tap/target who want to control these from within Dagster.

We should also note that Singer Taps are supported by Airbyte and we offer a Meltano integration so you have a wide range of choices. The following tutorial is probably the most lightweight approach.

Pros and Cons: Running Singer Taps Within Dagster

By invoking the Tap and Target from within Dagster, we gain several advantages:

  1. You can make Singer an integral part of your scheduled pipelines.
  2. You can add metadata to your Singer taps and targets.
  3. Because Dagster pickles the JSON payload rather than pipe it directly, you can retry on failure, send the Tap payload to multiple targets, or debug a target without constantly hitting the upstream API.

Of course, there are a few drawbacks we should consider, namely that Singer is already not the most performant ingestion solution (due, in part, to the serialization/deserialization over stdIn/Out). With the addition of the Dagster steps, we increase latency and also introduce more storage requirements. You will want to consider this if you use Singer to migrate large amounts of data and if speed is a big concern.

Before We Start

This post assumes that you are somewhat familiar with Dagster’s core concepts. If you are not, we recommend the Dagster Essentials course over at Dagster University. We will not cover the basic installation of Dagster here, but plenty of guides exist, and it is covered in the Dagster University course.

We also assume you are familiar with Singer Taps and Targets and that you have set up and configured one of each and can run them with a command such as:

singer-tap –config config.json | singer-target

Overview of the Setup

Here is a breakdown of our setup:

  1. We will create a reusable Dagster @resource for the Tap and one for the Target.
  2. We will create two assets that leverage the @resource and invoke the Tap and Target.
  3. We will configure our Code Location's Definitions
  4. We will add the Assets to a job.
  5. We will watch the pipeline executing in Dagster's UI

We will create a dedicated code location for this exercise, but you can add the resources and assets to an existing code location. In this example, we will use a singer folder:

(1) Home of our Definitions
(2) Creates two assets based on the @resource, one for the Tap and one for the Target. We will also grab some metadata while we are at it.
(3) Sets up our two assets as a simple Dagster @job
(4) Defines a @resource for the Tap and one for the Target

Setting Singer up on Dagster

Creating the Singer Resource

In the example below, we create a SingerTapResource and a SingerTargetResource class. These will be used to set the right parameters and invoke the command when the resource is invoked.

# singer/resources/__init__.py

from dagster import InitResourceContext
import subprocess

class SingerTapResource:
    def __init__(self, context: InitResourceContext, tap_executable: str, config_path: str, catalog_path=None, state_path=None):
        if check_config_path(config_path):
            self.tap_executable = tap_executable
            self.config_path = config_path
            self.catalog_path = catalog_path
            self.state_path = state_path
            self.context = context

    def run(self):
        command = [self.tap_executable, '--config', self.config_path]
        if self.catalog_path:
            command.extend(['--catalog', self.catalog_path])
        if self.state_path:
            command.extend(['--state', self.state_path])

        tap_result = subprocess.Popen(command, stdout=subprocess.PIPE, text=True)
        output, errors = tap_result.communicate()
        return output

class SingerTargetResource:
    def __init__(self, target_executable: str, config_path: str, context: InitResourceContext):
        if check_config_path(config_path):
            self.target_executable = target_executable
            self.config_path = config_path
            self.context = context

    def run(self, singer_tap_dataset):
        command = [self.target_executable]
        if self.config_path:
            command.extend(['--config', self.config_path])

        target_input = subprocess.Popen(command, stdin=subprocess.PIPE, stdout=subprocess.PIPE, text=True)
        output, errors = target_input.communicate(input=singer_tap_dataset)

        return output

Note that these objects make use of a helper function to ensure that the config.json file is present:

from dagster import DagsterExecutionInterruptedError
import os

def check_config_path(config_path):
    if not isinstance(config_path, str):
        raise TypeError(f"config_path must be a string, got {type(config_path).__name__}")

    if not os.path.isfile(config_path):
        raise DagsterExecutionInterruptedError(f"Config file not found at {config_path}")

    if not config_path.endswith('.json'):
        raise DagsterExecutionInterruptedError(f"Config file at {config_path} is not a .json file")

    return True

Now we can declare the @resource:

from dagster import resource, InitResourceContext, Field, String

@resource(
    config_schema={
        "tap_executable": Field(String, description="Path to the tap executable"),
        "config_path": Field(String, description="Path to the config.json file"),
        "catalog_path": Field(String, is_required=False, description="Path to the catalog.json file"),
        "state_path": Field(String, is_required=False, description="Path to the state.json file"),
    }
)
def singer_tap_resource(init_context: InitResourceContext) -> SingerTapResource:
    return SingerTapResource(
        tap_executable=init_context.resource_config["tap_executable"],
        config_path=init_context.resource_config["config_path"],
        catalog_path=init_context.resource_config.get("catalog_path"),
        state_path=init_context.resource_config.get("state_path"),
        context=init_context
    )


@resource(
    config_schema={
        "target_executable": Field(String, description="Path to the target executable"),
        "config_path": Field(String, description="Path to the config.json file")
    }
)
def singer_target_resource(init_context: InitResourceContext) -> SingerTargetResource:
    return SingerTargetResource(
        target_executable=init_context.resource_config["target_executable"],
        config_path=init_context.resource_config["config_path"],
        context=init_context
    )

All told, our /resources/init.py file looks as follows:

# singer/resources/__init__.py

from dagster import resource, InitResourceContext, Field, String, DagsterExecutionInterruptedError
import subprocess
import os

def check_config_path(config_path):
    if not isinstance(config_path, str):
        raise TypeError(f"config_path must be a string, got {type(config_path).__name__}")

    if not os.path.isfile(config_path):
        raise DagsterExecutionInterruptedError(f"Config file not found at {config_path}")

    if not config_path.endswith('.json'):
        raise DagsterExecutionInterruptedError(f"Config file at {config_path} is not a .json file")

    return True

class SingerTapResource:
    def __init__(self, context: InitResourceContext, tap_executable: str, config_path: str, catalog_path=None, state_path=None):
        if check_config_path(config_path):
            self.tap_executable = tap_executable
            self.config_path = config_path
            self.catalog_path = catalog_path
            self.state_path = state_path
            self.context = context

    def run(self):
        command = [self.tap_executable, '--config', self.config_path]
        if self.catalog_path:
            command.extend(['--catalog', self.catalog_path])
        if self.state_path:
            command.extend(['--state', self.state_path])

        tap_result = subprocess.Popen(command, stdout=subprocess.PIPE, text=True)
        output, errors = tap_result.communicate()
        return output

class SingerTargetResource:
    def __init__(self, target_executable: str, config_path: str, context: InitResourceContext):
        if check_config_path(config_path):
            self.target_executable = target_executable
            self.config_path = config_path
            self.context = context

    def run(self, singer_tap_dataset):
        command = [self.target_executable]
        if self.config_path:
            command.extend(['--config', self.config_path])

        target_input = subprocess.Popen(command, stdin=subprocess.PIPE, stdout=subprocess.PIPE, text=True)
        output, errors = target_input.communicate(input=singer_tap_dataset)

        return output


@resource(
    config_schema={
        "tap_executable": Field(String, description="Path to the tap executable"),
        "config_path": Field(String, description="Path to the config.json file"),
        "catalog_path": Field(String, is_required=False, description="Path to the catalog.json file"),
        "state_path": Field(String, is_required=False, description="Path to the state.json file"),
    }
)
def singer_tap_resource(init_context: InitResourceContext) -> SingerTapResource:
    return SingerTapResource(
        tap_executable=init_context.resource_config["tap_executable"],
        config_path=init_context.resource_config["config_path"],
        catalog_path=init_context.resource_config.get("catalog_path"),
        state_path=init_context.resource_config.get("state_path"),
        context=init_context
    )


@resource(
    config_schema={
        "target_executable": Field(String, description="Path to the target executable"),
        "config_path": Field(String, description="Path to the config.json file")
    }
)
def singer_target_resource(init_context: InitResourceContext) -> SingerTargetResource:
    return SingerTargetResource(
        target_executable=init_context.resource_config["target_executable"],
        config_path=init_context.resource_config["config_path"],
        context=init_context
    )

Define the Dagster Assets

Tapping into the resources we have defined, we will now create an asset for our Tap and an asset for our Target (which takes the Tap as a dependency).

# singer/assets/singer.py

from dagster import asset, Output, AssetExecutionContext

@asset(required_resource_keys={"singer_tap"}, tags={"compute_kind": "tap"}, group_name="singer_taps")
def singer_tap_dataset(context: AssetExecutionContext) -> Output[str]:
    """
    Call the Singer tap and retrieve the data
    :return: JSON payload from the tap
    :rtype: str
    """
    singer_tap = context.resources.singer_tap
    output = singer_tap.run()  # Use the resource to run the tap
    output_length = len(output)
    num_streams, records_per_stream = count_streams_and_records(output)

    return Output(value=output, metadata={"output_length": output_length})

@asset(required_resource_keys={"singer_target"}, tags={"compute_kind": "target"}, group_name="singer_targets")
def singer_target_dataset(context: AssetExecutionContext, singer_tap_dataset) -> Output[str]:
    """
    Call the Singer target and submit the data
    :return:
    """
    singer_target = context.resources.singer_target
    output = singer_target.run(singer_tap_dataset)  # Use the resource to run the tap
    return Output(value=output)

You will notice that we collect some metadata on the tap. In this case, it is a simple calculation of the length of the json object (metadata={"output_length": output_length}).

We can build on this by introducing a simple count of the number of STREAMS and RECORDS used in the Tap. We can build this in as a separate function:

import json
from collections import defaultdict


def count_streams_and_records(json_payload):
    # Split the payload into individual JSON objects
    json_lines = json_payload.strip().split("\n")

    # Initialize counters
    streams_count = set()
    records_count = defaultdict(int)

    # Process each JSON object
    for line in json_lines:
        obj = json.loads(line)

        # Count unique streams
        if 'stream' in obj:
            streams_count.add(obj['stream'])

        # Count records per stream
        if obj['type'] == 'RECORD' and 'stream' in obj:
            records_count[obj['stream']] += 1

    # Number of unique streams
    num_streams = len(streams_count)

    return num_streams, dict(records_count)

Let’s add this to our singer_tap_dataset asset. Our singer.py file now looks like this:

from dagster import asset, Output, AssetExecutionContext
import json
from collections import defaultdict

def count_streams_and_records(json_payload):
    # Split the payload into individual JSON objects
    json_lines = json_payload.strip().split("\n")

    # Initialize counters
    streams_count = set()
    records_count = defaultdict(int)

    # Process each JSON object
    for line in json_lines:
        obj = json.loads(line)

        # Count unique streams
        if 'stream' in obj:
            streams_count.add(obj['stream'])

        # Count records per stream
        if obj['type'] == 'RECORD' and 'stream' in obj:
            records_count[obj['stream']] += 1

    # Number of unique streams
    num_streams = len(streams_count)

    return num_streams, dict(records_count)

@asset(required_resource_keys={"singer_tap"}, tags={"compute_kind": "tap"}, group_name="singer_taps")
def singer_tap_dataset(context: AssetExecutionContext) -> Output[str]:
    """
    Call the Singer tap and retrieve the data
    :return: JSON payload from the tap
    :rtype: str
    """
    singer_tap = context.resources.singer_tap
    output = singer_tap.run()  # Use the resource to run the tap
    output_length = len(output)
    num_streams, records_per_stream = count_streams_and_records(output)

    return Output(value=output, metadata={"output_length": output_length,"count_of_streams": num_streams, "records_per_stream": records_per_stream})

@asset(required_resource_keys={"singer_target"}, tags={"compute_kind": "target"}, group_name="singer_targets")
def singer_target_dataset(context: AssetExecutionContext, singer_tap_dataset) -> Output[str]:
    """
    Call the Singer target and submit the data
    :return:
    """
    singer_target = context.resources.singer_target
    output = singer_target.run(singer_tap_dataset)  # Use the resource to run the tap
    return Output(value=output)

Configuring the Code Location Definitions

Next, we set up our code location definitions in singer/init.py. You will need to replace the resources parameters to match the tap (“my-tap”) and the target (“my-target”) that you plan on using. Note that it is a best practice to invoke both the Tap and the Target from within their respective virtual environments (see the Singer documentation for details).

# singer/__init__.py

from dagster import load_assets_from_modules, Definitions
from .assets import singer
from .resources import singer_tap_resource, singer_target_resource

singer_assets = load_assets_from_modules([singer])
all_jobs = [run_singer_job]

defs = Definitions(
    assets=[*singer_assets],
    jobs=all_jobs,
    resources={
        "singer_tap": singer_tap_resource.configured({
            "tap_executable": "/Users/frasermarlow/.virtualenvs/my-tap/bin/my-tap",
            "config_path": "/Users/frasermarlow/singer/my-tap-config/config.json",
            "catalog_path": "/Users/frasermarlow/singer/my-tap-config/catalog.json"
        }),
        "singer_target": singer_target_resource.configured({
            "target_executable": "/Users/frasermarlow/.virtualenvs/my-target/bin/target-csv",
            "config_path": "/Users/frasermarlow/singer/my-target/config.json",
        })
    }
)

Set Up a Simple Job

Finally, to help make this easier to trigger, let’s wrap the two assets into a Dagster job:

# jobs/__init__.py

from dagster import AssetSelection, define_asset_job

all_assets = AssetSelection.all()

run_singer_job = define_asset_job(
    name="run_singer_job",
    selection=all_assets
)

And remember to import this into singer/__init__.py:

from .jobs import run_singer_job

Running Our Singer Pipeline in the Dagster UI:

Let’s boot up Dagster and see our assets in the UI:

dagster dev -m ~/singer

Here we see the two assets rendered with their dependency modeled:

The assets are automatically added to our asset catalog…

… and, after a few materializations, the metadata is plotted for us, giving us nicer observability over the data flowing through our pipeline.

Metadata that is not in a numeric series, such as the names of the streams and the record counts, is still captured in the asset metadata.

Leveraging More of Dagster

The setup shown above might be adequate for your use case, but if you want to tap into more Dagster capabilities, you can now leverage advanced scheduling, sensors, data quality checks, and more.

Wrapping Up

Setting up a Singer Tap and Target is straightforward in Dagster and lets us weave these into our pipelines.

Using this combination can help you achieve better integration, observability, track metadata, and more.


The Dagster Labs logo

We're always happy to hear your feedback, so please reach out to us! If you have any questions, ask them in the Dagster community Slack (join here!) or start a Github discussion. If you run into any bugs, let us know with a Github issue. And if you're interested in working with us, check out our open roles!

Follow us:


Read more filed under
Blog post category for Integration. Integration