Blog
Building Cost Effective AI Pipelines with OpenAI, LangChain, and Dagster

Building Cost Effective AI Pipelines with OpenAI, LangChain, and Dagster

May 8, 2024
Building Cost Effective AI Pipelines with OpenAI, LangChain, and Dagster
Building Cost Effective AI Pipelines with OpenAI, LangChain, and Dagster

Leverage the power of LLMs while keeping the costs in check using the Dagster OpenAI integration.

Hi. Now that LLMs are becoming more mature and accessible, many organizations are looking to integrate them into common business practices such as technical support bots, online real-time help, and other knowledge-base-related tasks.

While the premise of LLMs is remarkable, and the use cases are many, the cost of operating such processes is also becoming apparent.

In the following piece, we share an approach to leveraging the power of LLMs while keeping the costs in check, by building an AI pipeline on top of Dagster’s new OpenAI integration.

We hope this tutorial will inspire you to build novel AI-powered processes without breaking the bank.

Just over a year ago, we shared our initial steps in creating a support bot, utilizing Dagster, GPT-3, and LangChain. Today, as Large Language Models (LLMs) are emerging and evolving a lot more, we’re entering the next chapter in our adventure, incorporating the latest updates from GPT-4 and Dagster, particularly the new dagster-openai integration.

If you'd prefer to follow along with the project, you can do so via our GitHub repo here.

Recap of Initial Setup

Our journey began with leveraging Dagster to streamline the fetching and indexing of GitHub documentation, thereby enabling our support bot to respond to user queries with precision.

Prerequisites

This is not a beginner’s tutorial and we will assume you are familiar with Python, working with command line, and have a basic familiarity with Dagster.

To run, you need:

  • Python 3 installed locally
  • An OpenAI api key
  • a number of Python dependencies that can be installed as follows:
### Install the Dagster dependencies
pip install dagster dagster-aws dagster-cloud dagster-openai
### Install the LangChain dependencies
pip install faiss-cpu langchain langchain-community langchain-openai

Our initial setup started with extracting raw documents from GitHub repositories:

@asset
def source_docs():
	return list(get_github_docs("dagster-io", "dagster"))

Then, chunk these documents and their embeddings into a Faiss index:

@asset
def search_index(source_docs):
	source_chunks = []
	splitter = CharacterTextSplitter(separator=" ", chunk_size=1024, chunk_overlap=0)
	for source in source_docs:
		for chunk in splitter.split_text(source.page_content):
			source_chunks.append(Document(page_content=chunk, metadata=source.metadata))
	search_index = FAISS.from_documents(source_chunks, OpenAIEmbeddings())
	with open("search_index.pickle", "wb") as f:
		pickle.dump(search_index.serialize_to_bytes(), f)

Finally, we used a vector space search engine with LangChain to improve the support bot’s efficiency:

def print_answer(question):
    with open("search_index.pickle", "rb") as f:
        serialized_search_index = pickle.load(f)

    search_index = FAISS.deserialize_from_bytes(serialized_search_index, OpenAIEmbeddings())
    print(
        chain(
            {
                "input_documents": search_index.similarity_search(question, k=4),
                "question": question,
            },
            return_only_outputs=True,
        )["output_text"]
    )

This infrastructure laid the groundwork for a robust Q&A mechanism. It leverages the power of OpenAI's models to interpret user questions based on our documentation, alongside crafting cost-effective prompts with LangChain.

New Challenges with AI Pipelines

With the growth of our system and the introduction of new models and AI capabilities, gaining a deeper understanding of our pipelines became crucial, especially in terms of optimizing our use of OpenAI services. We aimed to develop more cost-effective strategies, with a focus on:

  • Enhanced Visibility into OpenAI Usage: Understanding the cost implications of running our support bot and other AI-driven features was our primary concern. The task of monitoring usage across different models and APIs, while developing complex AI pipelines, highlighted the need for a more efficient tracking system. We sought a solution that would allow us to manage our OpenAI interactions with greater ease and precision.
  • Cost Control: As we leveraged LLM for more complex tasks such as generating completions and embeddings, concerns about escalating costs arose. A mechanism to effectively control and predict costs became essential.

So, we want a straightforward way to compare OpenAI models based on the actual data, enabling us to make informed decisions that optimize both performance and cost-efficiency.

Introducing the dagster-openai Integration

In our journey to refine AI pipelines, we're excited to share that we've made our work with OpenAI APIs available to the broader community through an open-source dagster-openai integration. Paired with Dagster's software-defined assets, this integration not only facilitates seamless interactions with OpenAI APIs but also enhances transparency by automatically logging OpenAI usage metadata within asset metadata. For detailed insights, explore our guide on leveraging Dagster with OpenAI.

@asset(compute_kind="OpenAI")
def openai_asset(context: AssetExecutionContext, openai: OpenAIResource):
    with openai.get_client(context) as client:
        client.chat.completions.create(
            model="gpt-3.5-turbo",
            messages=[{"role": "user", "content": "Say this is a test."}],
        )

This integration's standout feature is its capacity to amplify visibility into OpenAI API utilization through Dagster Insights. By systematically logging usage metadata, we're now equipped to conduct a more granular analysis of our OpenAI model engagement. This not only aids in cost optimization but also empowers us to make more data-informed decisions when evaluating and comparing models.

Photo

The integration also introduces a method named with_usage_metadata designed for logging usage data from any OpenAI endpoint. We'll dive into the specifics shortly.

Refining Our Approach with the New Integration

Refining our approach with the new integration allowed us to enhance our project's performance and cost-efficiency significantly:

  • Visibility and Control: Leveraging the integration, we monitored our OpenAI usage within the Dagster ecosystem, gaining detailed insights and alerting into our operational practices.
  • Efficient Performance Comparison: By utilizing the logged metadata, we efficiently compared model performances, which facilitated our decision-making process.

Let’s update what we originally have! As mentioned above in the initial setup, we’ve got:

@asset
def search_index(source_docs):
    source_chunks = []
    splitter = CharacterTextSplitter(separator=" ", chunk_size=1024, chunk_overlap=0)
    for source in source_docs:
        for chunk in splitter.split_text(source.page_content):
            source_chunks.append(Document(page_content=chunk, metadata=source.metadata))
    search_index = FAISS.from_documents(source_chunks, OpenAIEmbeddings())
    with open("search_index.pickle", "wb") as f:
        pickle.dump(search_index.serialize_to_bytes(), f)
        

With the new integration, it only needed a few lines of code changes:

Instead of directly calling the API, we would use the client from our OpenAI resource, which automatically records API usage data into our asset catalog.

@asset(compute_kind="OpenAI")
def search_index(context: AssetExecutionContext, openai: OpenAIResource, source_docs: List[Any]):
    source_chunks = []
    splitter = CharacterTextSplitter(separator=" ", chunk_size=1024, chunk_overlap=0)
    for source in source_docs:
        for chunk in splitter.split_text(source.page_content):
            source_chunks.append(Document(page_content=chunk, metadata=source.metadata))

    with openai.get_client(context) as client:
        search_index = FAISS.from_documents(
            source_chunks, OpenAIEmbeddings(client=client.embeddings)
        )

    return search_index.serialize_to_bytes()
    

In addition, we labeled “compute kind” more descriptively. This small yet impactful tag improves the readability of our asset graph and makes it easier to understand the context of our computations involved in each asset at a glance:

Photo

Now let’s run this asset.

We get instant metadata added to asset materialization:

Photo

And… even better, when running this pipeline with Dagster+, we get nice aggregated charts:

Photo

Then, moving onto print_answer. First, let’s update our print answer function to a completion asset, using OpenAIResource, together with LangChain:

@asset(compute_kind="OpenAI")
def completion(context: AssetExecutionContext, openai: OpenAIResource, search_index: Any):
    question = "What can I use Dagster for?"
    search_index = FAISS.deserialize_from_bytes(search_index, OpenAIEmbeddings())
    with openai.get_client(context) as client:
        chain = load_qa_with_sources_chain(OpenAI(client=client.completions, temperature=0))
        context.log.info(
            chain(
                {
                    "input_documents": search_index.similarity_search(question, k=4),
                    "question": question,
                },
                return_only_outputs=True,
            )["output_text"]
        )

Now, you can see all the steps show up in the asset graph:

Photo

With just a few code modifications, we've enabled a pipeline that automatically logs usage to our asset metadata. This establishes a solid foundation for tracking and optimizing our support bot’s cost and performance over time.

Let’s try with the simple question “What is Dagster?”:

Photo

It’s working just as before but with more insights into the usage.

Photo

Now, onto the fun parts.

Addressing Complex Queries with Enhanced Documentation

Currently, our naive support bot gives a pretty generic answer when for a specific question like “How can I use dbt with Dagster?”:

Photo

To enhance responses to complex queries like that, we can incorporate a wider range of documentation. For example, for specific topics such as the  dagster-dbt integration, we could look up the relevant guides, API docs, and GitHub discussions.

To do so, we need to modify our code to fetch documents from various sources.

def get_github_docs(repo_owner, repo_name, category, archive_name="master"):
    with tempfile.TemporaryDirectory() as d:
        # The archive name can be a branch, tag or commit.
        r = requests.get(f"https://github.com/{repo_owner}/{repo_name}/archive/{archive_name}.zip")
        z = zipfile.ZipFile(io.BytesIO(r.content))
        z.extractall(d)
        root_path = pathlib.Path(os.path.join(d, f"{repo_name}-{archive_name}"))
        docs_path = root_path.joinpath("docs/content", category)
        markdown_files = list(docs_path.glob("*.md*")) + list(docs_path.glob("*/*.md*"))
        for markdown_file in markdown_files:
            with open(markdown_file, "r") as f:
                relative_path = markdown_file.relative_to(root_path)
                github_url = f"https://github.com/{repo_owner}/{repo_name}/blob/{archive_name}/{relative_path}"
                yield Document(page_content=f.read(), metadata={"source": github_url})

Then, we can update the asset to fetch the source docs:

@asset(compute_kind="GitHub")
def source_docs(context: AssetExecutionContext):
        docs = []
        for category in ["guides", "integrations"]:
                docs += list(get_github_docs("dagster-io", "dagster", category))
    return docs

Now, let’s try it again with “How can I use dbt with Dagster?”:

Photo

OK, this response is now looking much more relevant than before.

Optimizing Performance and Costs

However, as we expand our sources for the pipeline, cost could become concerning.

The good news is that we get all the usage data automatically logged in our pipeline. Let’s take a quick look:

Photo

Ok, the cost is growing a bit. Let’s dive in!

Seeing a cost increase prompts us to optimize. We’re certain that some documentation segments are updated less frequently than others, so we can update the embeddings and indexes only as needed for different document sections.

Dagster's native partitioning feature enables us to organize source docs by category efficiently. Here’s how we can adjust our setup:

We start by defining a StaticPartitionsDefinition. Let’s assign each documentation category as a separate partition:

from dagster import (
    StaticPartitionsDefinition,
)

docs_partitions_def = StaticPartitionsDefinition(["guides", "integrations"])

Then, we partition our assets:

@asset(compute_kind="GitHub", partitions_def=docs_partitions_def)
def source_docs(context: AssetExecutionContext):
    return list(get_github_docs("dagster-io", "dagster", context.partition_key))

@asset(compute_kind="OpenAI", partitions_def=docs_partitions_def)
def search_index(context: AssetExecutionContext, openai: OpenAIResource, source_docs):
    source_chunks = []
    splitter = CharacterTextSplitter(separator=" ", chunk_size=1024, chunk_overlap=0)
    for source in source_docs:
        context.log.info(source)
        for chunk in splitter.split_text(source.page_content):
            source_chunks.append(Document(page_content=chunk, metadata=source.metadata))

    with openai.get_client(context) as client:
        search_index = FAISS.from_documents(
            source_chunks, OpenAIEmbeddings(client=client.embeddings)
        )

    return search_index.serialize_to_bytes()

Although our completion asset is not partitioned, it depends on the newly partitioned search_index asset. Using a partition mapping will allow the completion asset to depend on all partitions of search_index:

@asset(
    compute_kind="OpenAI",
    ins={
        "search_index": AssetIn(partition_mapping=AllPartitionMapping()),
    },
)
def completion(
    context: AssetExecutionContext, 
    openai: OpenAIResource, 
    search_index: Dict[str, Any]
):
    merged_index: Any = None
    for index in search_index.values():
        curr = FAISS.deserialize_from_bytes(index, OpenAIEmbeddings())
        if not merged_index:
            merged_index = curr
        else:
            merged_index.merge_from(FAISS.deserialize_from_bytes(index, OpenAIEmbeddings()))
    question = "What can I use Dagster for?"
    with openai.get_client(context) as client:
        chain = load_qa_with_sources_chain(OpenAI(client=client.completions, temperature=0))
        context.log.info(
            chain(
                {
                    "input_documents": merged_index.similarity_search(question, k=4),
                    "question": question,
                },
                return_only_outputs=True,
            )["output_text"]
        )

Now, we can see in our asset graph that our source_docs and search_index assets have 2 partitions each:

Photo

You can now refresh the partitions that you need, and only when you need them!

Photo

By using partitions, we can now selectively update our embeddings and search index only when actual changes occur. This approach minimizes unnecessary computations, and effectively reduces costs.

Leveraging Different OpenAI Models with LangChain

OK, now let’s explore different models to optimize our responses for higher ROI.

We can parameterize our pipeline to select the best OpenAI model based on the query. Here’s how do it:

First, we can define configurations to allow specifying the OpenAI model and the question it should address:

class OpenAIConfig(Config):
    model: str
    question: str

Then, let's incorporate this config to the completion asset.

@asset(   
    compute_kind="OpenAI",
    ins={
        "search_index": AssetIn(partition_mapping=AllPartitionMapping()),
    },
)

def completion(
    context: AssetExecutionContext,
    openai: OpenAIResource,
    config: OpenAIConfig,
    search_index: Dict[str, Any],
):
  ...
  model = ChatOpenAI(client=client.chat.completions, model=config.model, temperature=0)

Note: to apply different models dynamically, we use LangChain’s Expression Language (LCEL) to facilitate a declarative composition of chains.

This is a declarative way to truly compose chains - and get streaming, batch, and async support out of the box. You can use all the same existing LangChain constructs to create them.

So now, our full code looks like:

@asset(
    compute_kind="OpenAI",
    ins={
        "search_index": AssetIn(partition_mapping=AllPartitionMapping()),
    },
)
def completion(
    context: AssetExecutionContext,
    openai: OpenAIResource,
    config: OpenAIConfig,
    search_index: Dict[str, Any],
):
    merged_index: Any = None
    for index in search_index.values():
        curr = FAISS.deserialize_from_bytes(index, OpenAIEmbeddings())
        if not merged_index:
            merged_index = curr
        else:
            merged_index.merge_from(FAISS.deserialize_from_bytes(index, OpenAIEmbeddings()))
    with openai.get_client(context) as client:
        prompt = stuff_prompt.PROMPT
        model = ChatOpenAI(client=client.chat.completions, model=config.model, temperature=0)
        summaries = " ".join(
            [
                SUMMARY_TEMPLATE.format(content=doc.page_content, source=doc.metadata["source"])
                for doc in merged_index.similarity_search(config.question, k=4)
            ]
        )
        context.log.info(summaries)
        output_parser = StrOutputParser()
        chain = prompt | model | output_parser
        context.log.info(chain.invoke({"summaries": summaries, "question": config.question}))

Then, we can manually launch a run using the launchpad:

Photo

To the question “How can I use dbt with Dagster?”, we get this answer:

Photo

So far, our pipeline has been kicked off manually.

As a production pipeline, we want to automate the responses for new queries without manual intervention.

For the purpose of the demo we store incoming questions in a JSON format within a designated folder.

{
"model": "gpt-3.5-turbo",
"question": "How can I use dbt with Dagster?"
}

Here’s how we can set up a sensor in Dagster to automatically materialize answers when a new question comes in:

question_job = define_asset_job(
    name="question_job",
    selection=AssetSelection.keys(["completion"]),
)

@sensor(job=question_job)
def question_sensor(context):
    PATH_TO_QUESTIONS = os.path.join(os.path.dirname(__file__), "../../", "data/questions")

    previous_state = json.loads(context.cursor) if context.cursor else {}
    current_state = {}
    runs_to_request = []

    for filename in os.listdir(PATH_TO_QUESTIONS):
        file_path = os.path.join(PATH_TO_QUESTIONS, filename)
        if filename.endswith(".json") and os.path.isfile(file_path):
            last_modified = os.path.getmtime(file_path)

            current_state[filename] = last_modified

            if filename not in previous_state or previous_state[filename] != last_modified:
                with open(file_path, "r") as f:
                    request_config = json.load(f)

                    runs_to_request.append(
                        RunRequest(
                            run_key=f"adhoc_request_{filename}_{last_modified}",
                            run_config={"ops": {"completion": {"config": {**request_config}}}},
                        )
                    )

    return SensorResult(run_requests=runs_to_request, cursor=json.dumps(current_state))

Note: This setup is primarily designed to be digestible for demo purposes. In a practical scenario, it'd be more efficient to aggregate queries in a queue or database and batch them into single runs, rather than initiating a run for each question.

Photo

The sensor detects our question file and launches a run to materialize our completion asset. We get the answer:

Photo

Future Directions: Advancing AI Pipeline Efficiency and Effectiveness

In this post, we've laid the groundwork for optimizing AI, emphasizing cost management and enhanced developer productivity.

Key takeaways include:

  • The rollout of our new dagster-openai integration which enables seamless interaction with OpenAI APIs and out-of-the-box usage tracking.
  • Leveraging LangChain to dynamically declare different models
  • Utilizing features of a modern orchestrator (Dagster) to improve developer productivity such as Insights, metadata tracking.

Looking ahead, we plan to further leverage Dagster and OpenAI's capabilities for model performance comparison and overall productivity enhancement. This includes but not limited to:

  • Expanding the use of Dagster Cloud’s Insights to monitor crucial metrics more comprehensively.
  • Leveraging Dagster’s data catalog for better management as we embark more sources, such as integrating Slack history to improve the response quality.
  • Ensuring data freshness and reliability to uphold the quality of our support bot infrastructure, which involves dealing with bad or fake sources.

Have feedback or questions? Start a discussion in Slack or Github.

Interested in working with us? View our open roles.

Want more content like this? Follow us on LinkedIn.

Dagster Newsletter

Get updates delivered to your inbox

Latest writings

The latest news, technologies, and resources from our team.

Multi-Tenancy for Modern Data Platforms
Webinar

April 7, 2026

Multi-Tenancy for Modern Data Platforms

Learn the patterns, trade-offs, and production-tested strategies for building multi-tenant data platforms with Dagster.

Deep Dive: Building a Cross-Workspace Control Plane for Databricks
Webinar

March 24, 2026

Deep Dive: Building a Cross-Workspace Control Plane for Databricks

Learn how to build a cross-workspace control plane for Databricks using Dagster — connecting multiple workspaces, dbt, and Fivetran into a single observable asset graph with zero code changes to get started.

Dagster Running Dagster: How We Use Compass for AI Analytics
Webinar

February 17, 2026

Dagster Running Dagster: How We Use Compass for AI Analytics

In this Deep Dive, we're joined by Dagster Analytics Lead Anil Maharjan, who demonstrates how our internal team utilizes Compass to drive AI-driven analysis throughout the company.

Monorepos, the hub-and-spoke model, and Copybara
Monorepos, the hub-and-spoke model, and Copybara
Blog

April 3, 2026

Monorepos, the hub-and-spoke model, and Copybara

How we configure Copybara for bi-directional syncing to enable a hub-and-spoke model for Git repositories

Making Dagster Easier to Contribute to in an AI-Driven World
Making Dagster Easier to Contribute to in an AI-Driven World
Blog

April 1, 2026

Making Dagster Easier to Contribute to in an AI-Driven World

AI has made contributing to open source easier but reviewing contributions is still hard. At Dagster, we’re improving the contributor experience with smarter review tooling, clearer guidelines, and a focus on contributions that are easier to evaluate, merge, and maintain.

DataOps with Dagster: A Practical Guide to Building a Reliable Data Platform
DataOps with Dagster: A Practical Guide to Building a Reliable Data Platform
Blog

March 17, 2026

DataOps with Dagster: A Practical Guide to Building a Reliable Data Platform

DataOps is about building a system that provides visibility into what's happening and control over how it behaves

How Magenta Telekom Built the Unsinkable Data Platform
Case study

February 25, 2026

How Magenta Telekom Built the Unsinkable Data Platform

Magenta Telekom rebuilt its data infrastructure from the ground up with Dagster, cutting developer onboarding from months to a single day and eliminating the shadow IT and manual workflows that had long slowed the business down.

Scaling FinTech: How smava achieved zero downtime with Dagster
Case study

November 25, 2025

Scaling FinTech: How smava achieved zero downtime with Dagster

smava achieved zero downtime and automated the generation of over 1,000 dbt models by migrating to Dagster's, eliminating maintenance overhead and reducing developer onboarding from weeks to 15 minutes.

Zero Incidents, Maximum Velocity: How HIVED achieved 99.9% pipeline reliability with Dagster
Case study

November 18, 2025

Zero Incidents, Maximum Velocity: How HIVED achieved 99.9% pipeline reliability with Dagster

UK logistics company HIVED achieved 99.9% pipeline reliability with zero data incidents over three years by replacing cron-based workflows with Dagster's unified orchestration platform.

Modernize Your Data Platform for the Age of AI
Guide

January 15, 2026

Modernize Your Data Platform for the Age of AI

While 75% of enterprises experiment with AI, traditional data platforms are becoming the biggest bottleneck. Learn how to build a unified control plane that enables AI-driven development, reduces pipeline failures, and cuts complexity.

Download the eBook on how to scale data teams
Guide

November 5, 2025

Download the eBook on how to scale data teams

From a solo data practitioner to an enterprise-wide platform, learn how to build systems that scale with clarity, reliability, and confidence.

Download the e-book primer on how to build data platforms
Guide

February 21, 2025

Download the e-book primer on how to build data platforms

Learn the fundamental concepts to build a data platform in your organization; covering common design patterns for data ingestion and transformation, data modeling strategies, and data quality tips.