Factory Patterns in Python | Dagster Blog

September 4, 202312 minute read

Factory Patterns in Python

We explore design patterns — reusable solutions to common problems in software design — as used in data engineering, specifically factory patterns in Python.
Elliot Gunn
Name
Elliot Gunn
Handle
@elliot


Factory Patterns in Python Programming

In this series, you've learned about Python best practices in data engineering and how to build more robust and scalable software. Today, we’ll take on a more advanced programming concept called design patterns, which are reusable solutions to common problems in software design. You are likely to encounter these in a data engineering project.

We’ll look at what design patterns are, why use them, and dive deeper into one pattern in particular: the factory pattern. We'll also talk about why data engineers would benefit from using the factory pattern.

Factory patterns elegantly simplify the creation of data connectors and make existing data infrastructure more extensible. This is especially helpful for data engineers, who handle a wide variety of data sources with different processing needs.

In this part of the course, we’ll look at how factory patterns achieve reusability, scalability, and maintainability in data engineering.

Table of contents

Design patterns 101

Design patterns in Python work as templates that can be applied to recurring tasks or problems, and are therefore very useful in data engineering. For a Python data engineer, design patterns offer structured and efficient solutions to recurring challenges in data processing and integration tasks. They also provide a shared vocabulary that facilitates clearer communication among team members, leading to more consistent and collaborative software design.

Types of design patterns

Design patterns in Python, and in programming in general, are typically considered an intermediate to advanced concept because they typically require an understanding of programming principles, object-oriented design, and the ability to recognize and abstract recurring problems in larger, more complex systems (i.e. code architecture). Python's design patterns typically fall into three types:

  1. Creational patterns: These include factory patterns and can be used to instantiate and manage database connections, ensuring that whether you're connecting to a SQL database or a NoSQL store, the process is streamlined and consistent
  2. Structural patterns: Provide guidance on organizing and linking different components, and can be invaluable when integrating diverse data sources. For instance, you might use an adapter pattern to harmonize data from a legacy system with a modern analytics platform, ensuring seamless data flow
  3. Behavioral patterns: Offers strategies for effective communication and interaction between objects. For example, an observer pattern can be employed to monitor changes in datasets: imagine a scenario where a data ingestion process notifies multiple downstream processing tasks whenever new data arrives.

Why use design patterns in data engineering?

Design patterns offer distinct advantages for data engineering tasks that echo the core principles of software design:

  1. Declarative: With design patterns, data engineers can define the end goal or what they want to achieve, without being bogged down in the nitty-gritty of how a program should execute. By defining what you want, the underlying logic takes care of the 'how'. This abstraction simplifies the process.
  2. Reusable: Think of design patterns as blueprints. Just as architectural plans can be used to construct various buildings, these patterns can be applied across diverse pipelines or projects, ensuring that your effort isn't confined to just one solution but can be leveraged multiple times.
  3. Consistent: The data world is vast and varied. By employing design patterns, data engineers ensure that data assets, irrespective of their source or application, adhere to a consistent structure and behavior. This makes data operations predictable and reduces anomalies, leading to more robust data systems.

By integrating design patterns like asset factories into data engineering workflows— from data extraction, analyzing data, data transformation, etc. —we pave the way for smoother operations, reduced errors, and more efficient systems, ensuring that data is managed and optimized for best results.

Each design pattern is used to prevent specific problems as your project scales. Today, we'll focus on the factory pattern, which is used to build multiple similar things to promote centralizing configuration, standardizing testing, and allowing flexibility while adhering to consistency.

How do factory patterns work?

Factory patterns are categorized as creational because they create objects in Python programming. They return different objects based on certain conditional statements or parameters.

Separation of object creation from main application

Think of the Factory Pattern as a specialized department in a company that only focuses on producing certain products. This department takes care of all the details of manufacturing, and the rest of the company simply requests the product when needed without worrying about how it's made.

Similarly, the Factory Pattern takes care of all the details of creating specific objects. The rest of your application doesn't need to know how these objects are created or what parameters they require. It simply asks the "factory" to produce the object and trusts it to handle the rest. This separation makes your code cleaner and easier to understand.

Factory patterns in Python

In Python, implementing factory patterns is particularly streamlined, thanks to its dynamic typing and first-class functions. You can return different classes or even functions from a factory function without much boilerplate.

Also, many Python libraries and frameworks leverage the factory pattern, or factory-like patterns, even if it's not explicit or exactly the same. For instance, an ORM (Object-Relational Mapping libraries) like SQLAlchemy uses factories to create database session objects. SQLAlchemy's sessionmaker() can be likened to a factory pattern as it produces new session instances, serving as the primary interface for database communication.

A use of factory patterns in data engineering using Python: example

Python's built-in features, like decorators, can be used to enhance the Factory Pattern. For example, a decorator can be used to register a class with a factory, thereby extending the capabilities of the factory without explicitly modifying it.

Imagine a common scenario in data engineering: a data pipeline for manipulating data from different file formats: CSV, JSON, and XML files. Depending on the file type, different parsing steps should be applied.

We'll use a simple dictionary as our "registry" for these file parsers and functions as our factories.

First, we’ll define the parsing functions:

import csv
import json
import xml.etree.ElementTree as ET

def parse_csv(file_path):
    with open(file_path, mode='r') as file:
        reader = csv.reader(file)
        return list(reader)

def parse_json(file_path):
    with open(file_path, mode='r') as file:
        return json.load(file)

def parse_xml(file_path):
    tree = ET.parse(file_path)
    root = tree.getroot()
    return root  # you'd typically add more logic to process the XML tree

Then, we’ll define a decorator to register these parsers:

PARSERS = {}

def register_parser(file_type):
    def decorator(fn):
        PARSERS[file_type] = fn
        return fn
    return decorator

We’ll register our parsers:

@register_parser('csv')
def csv_parser(file_path):
    return parse_csv(file_path)

@register_parser('json')
def json_parser(file_path):
    return parse_json(file_path)

@register_parser('xml')
def xml_parser(file_path):
    return parse_xml(file_path)

Finally, we will write a function to get the right parser, and use the factory to parse files:

def get_parser(file_type):
    return PARSERS.get(file_type)

data_csv = get_parser('csv')('data.csv')
data_json = get_parser('json')('data.json')
data_xml = get_parser('xml')('data.xml')

For a data engineer, handling multiple file formats is common, and being able to easily extend the system with new parsers (like XML, Parquet, etc.) is crucial. With this setup, a data engineer can easily extend the system to support new file types by simply defining a new parsing function and registering it with the decorator.

There’s no need to touch the existing factory logic, making it easy to maintain and extend. By using the factory pattern together with decorators, we can streamline this process and maintain cleaner, more modular code.

Data engineering with factory patterns

Data engineers or data scientists often use factory patterns for everyday tasks like batch processing to building real-time data streams and ETL pipelines.

For instance, imagine you have various types of data connections in your workflow, such as databases, files, or APIs. Instead of manually creating connections to each one, you can use a Factory Pattern to create the right connection for you, based on what you need at the time. Think of it as an assembly line that produces exactly what you need when you need it, without cluttering the rest of your code with unnecessary details.

In data pipelines

Let's consider a scenario where you need to connect to different types of databases, like MySQL and PostgreSQL. A Factory Pattern can be used to create the appropriate database connection based on a given input. Here's a simple example that illustrates this pattern:

Let’s first define the connection for each database:

import mysql.connector
import psycopg2

def connect_mysql(host, user, password, database):
    connection = mysql.connector.connect(
        host=host,
        user=user,
        password=password,
        database=database
    )
    return connection

def connect_postgresql(host, user, password, database):
    connection = psycopg2.connect(
        host=host,
        user=user,
        password=password,
        dbname=database
    )
    return connection

Next, we’ll define a decorator to register database connection:

DB_CONNECTIONS = {}

def register_db_connector(db_type):
    def decorator(fn):
        DB_CONNECTIONS[db_type] = fn
        return fn
    return decorator

Then, we will register the connection:

@register_db_connector('mysql')
def mysql_connector(host, user, password, database):
    return connect_mysql(host, user, password, database)

@register_db_connector('postgresql')
def postgresql_connector(host, user, password, database):
    return connect_postgresql(host, user, password, database)

Finally, we’ll write a function to get the right connector, and use the factory to get the appropriate database connection:

def get_db_connector(db_type):
    if db_type not in DB_CONNECTIONS:
        raise ValueError(f"Unsupported database type: {db_type}")
    return DB_CONNECTIONS[db_type]

# Example usage:
mysql_conn = get_db_connector('mysql')('localhost', 'user', 'password', 'mydb')
postgres_conn = get_db_connector('postgresql')('localhost', 'user', 'password', 'mydb')

With this setup, adding support for a new type of database connection in the future is simple. We first define the connection function and then register it using the decorator. No other parts need to be altered, demonstrating the maintainability and extensibility benefits of the factory pattern.

Factory patterns in real-world scenarios

Factories are particularly useful when:

  1. Dealing with external libraries or systems that may change over time, letting you isolate those changes
  2. Implementing plugins or extensions to a system
  3. Needing to control object instantiation for purposes like object pooling, lazy initialization, or logging

We’ll take a look at two examples of how factory assets work in the real world.

Example 1: Scraping Wikipedia

Factory patterns are particularly useful in web scraping when you need to scrape different types of pages but want to maintain a consistent interface for all of them. Let’s look at how factory patterns can be used to scrape Wikipedia’s table of countries and dependencies by population:

First, make sure to install the following:

pip install requests
pip install beautifulsoup4

Then, define functions to scrape different tables from Wikipedia. Let's assume Wikipedia might have multiple tables representing this data in different formats. One table might be standard, while another might be mobile-optimized.

from bs4 import BeautifulSoup
import requests

def scrape_standard_table(url):
    page = requests.get(url)
    soup = BeautifulSoup(page.content, 'html.parser')

    # Assuming the first table on the page is the one of interest
    table = soup.find_all("table")[0]

    rows = table.find_all("tr")
    data = []
    for row in rows[1:]:  # skipping the header row
        columns = row.find_all("td")
        country = columns[0].get_text(strip=True)
        population = columns[1].get_text(strip=True)
        data.append((country, population))
    return data

def scrape_mobile_table(url):
    page = requests.get(url)
    soup = BeautifulSoup(page.content, 'html.parser')

    # Mobile tables might be different, for the sake of example let's assume they're div-based
    table_div = soup.find("div", {"class": "mobile-table"})
    rows = table_div.find_all("div", {"class": "row"})
    data = []
    for row in rows:
        country = row.find("div", {"class": "country"}).get_text(strip=True)
        population = row.find("div", {"class": "population"}).get_text(strip=True)
        data.append((country, population))
    return data

Then, we’ll define a decorator to register scraping functions:

SCRAPERS = {}

def register_scraper(scraper_type):
    def decorator(fn):
        SCRAPERS[scraper_type] = fn
        return fn
    return decorator

Now we’ll register our scraping functions:

@register_scraper('standard')
def standard_scraper(url):
    return scrape_standard_table(url)

@register_scraper('mobile')
def mobile_scraper(url):
    return scrape_mobile_table(url)

Finally, we’ll write our function to get the right scraper and use the factory to get data from Wikipedia:

def get_scraper(scraper_type):
    if scraper_type not in SCRAPERS:
        raise ValueError(f"Unsupported scraper type: {scraper_type}")
    return SCRAPERS[scraper_type]

# Example usage:
url = "https://en.wikipedia.org/wiki/List_of_countries_and_dependencies_by_population"
data_standard = get_scraper('standard')(url)
# data_mobile = get_scraper('mobile')(url)  # if you had a mobile URL

In this example, adding support for scraping different formats in the future (like different table structures in Wikipedia) is simple: define the scraping function and then register it using the decorator. This ensures that the scraping code remains modular and easy to extend without modifying existing logic.

Example 2: Data orchestration

Dagster is a data orchestrator that provides a single pane of glass for the different stages of data processing, from ingestion to machine learning. Dagster helps schedule and observe a broad range of data engineering tools, with Python as it’s programming language. It is used extensively by data engineers and data scientists in a range of applications like data science, data analytics, big data, machine learning, etc.

One of the features of Dagster is the ability to manage assets, which are the outputs of data computations. An asset represents a piece of data or a computed result that has value and is worth tracking. This could be a table in a database, a file on disk, a model artifact, etc. Assets are often the output of a pipeline.

Asset factories are a feature in Dagster that allows users to declaratively define how assets are produced. They can be thought of as templates for creating assets by defining the inputs, outputs, and computations required to produce an asset.

Let's walk through a simple example where we'll refactor an existing block of code by applying the factory pattern in Dagster to generate a group of assets.

Before we start, make sure to install Dagster:

pip install dagster dagster-webserver

Let’s assume that we are part of the data engineering team for a non-profit. We have some existing code that queries the API of our donor platform and writes the result to a file (CSV or JSON). It currently looks like this:

from dagster import asset
import requests
import csv

@asset
def volunteers():
    result = requests.get('www.donorplatform.org/api/v1/volunteers')
    with open('volunteers.csv', 'w') as f:
        writer = csv.writer(f)
        writer.writerows(result)

@asset
def donations():
    result = requests.get('www.donorplatform.org/api/v2/donations')
    with open('donations.csv', 'w') as f:
        writer = csv.writer(f)
        writer.writerows(result)

@asset
def donors():
    result = requests.get('www.donorplatform.org/api/v1/donors')
    with open('donors.json', 'w') as f:
        f.write(result)

Our operations team has recently expanded their use of their donor platform and is asking us to run data extraction from 50 new API endpoints. This gets unruly and difficult to manage as you know that it’ll take a long time, data engineers will deviate in how they’ retrieve data, and it’ll be hard to test.

Using our new python skills, let’s apply a factory pattern for this to solve these problems.

First, let’s define what can be configured. There are three parts that need to be customized for every asset:

  1. The name of the asset
  2. The endpoint it queries
  3. The file type the result is saved as

Therefore, we’ll define a JSON object to centralize the possible configurations of the assets.

specs = [
    {
        'name': 'volunteers',
        'endpoint': 'v1/volunteers',
        'file_type': 'csv'
    },
    {
        'name': 'donations',
        'endpoint': 'v2/donations',
        'file_type': 'csv'
    },
    {
        'name': 'donors',
        'endpoint': 'v1/donors',
        'file_type': 'json'
    }
]

Then, we’ll define and generalize our asset function to take a spec and produce an asset

spec = specs[0] # take a single spec as reference while building

@asset(name=spec['name'])
def generic_asset():
    result = requests.get(f'www.donorplatform.org/api/{spec["endpoint"]}')
    with open(f'{spec["name"]}.f{spec["file_type"]}', 'w') as f:
        if spec["file_type"] == 'csv':
            writer = csv.writer(f)
            writer.writerows(result)
        elif spec["file_type"] == 'json':
            f.write(result)

Finally, let’s wrap our generic asset in a function that will serve as a factory to generate all of our assets. The function will take a spec and apply it to the asset for us.

def generate_donor_platform_asset(spec):
    @asset(name=spec['name'])
    def _asset():
        result = requests.get(f'www.donorplatform.org/api/{spec["endpoint"]}')
        with open(f'{spec["name"]}.f{spec["file_type"]}', 'w') as f:
            if spec["file_type"] == 'csv':
                writer = csv.writer(f)
                writer.writerows(result)
            elif spec["file_type"] == 'json':
                f.write(result)

    return _asset

With your factory, you only need to define additional specs to produce more assets. Here is how it might be used in production:

from dagster import Definitions, asset
import requests
import csv

specs = [
    {'name': 'volunteers', 'endpoint': 'v1/volunteers', 'file_type': 'csv'},
    {'name': 'donations', 'endpoint': 'v2/donations', 'file_type': 'csv'},
    {'name': 'donors', 'endpoint': 'v1/donors', 'file_type': 'json'},
    {'name': 'projects', 'endpoint': 'v1/projects', 'file_type': 'json'},
    {'name': 'fundraisers', 'endpoint': 'v1/fundraisers', 'file_type': 'csv'},
]

def generate_donor_platform_asset(spec):
    @asset(name=spec['name'])
    def _asset():
        result = requests.get(f'www.donorplatform.org/api/{spec["endpoint"]}')
        with open(f'{spec["name"]}.f{spec["file_type"]}', 'w') as f:
            if spec["file_type"] == 'csv':
                writer = csv.writer(f)
                writer.writerows(result)
            elif spec["file_type"] == 'json':
                f.write(result)

    return _asset


defs = Definitions(assets=[generate_donor_platform_asset(spec) for spec in specs])

If you run dagster dev, you’ll have access to Dagster’s UI and asset graph on localhost:3000. As you add more specs to the list and reload your definitions, you’ll see more assets generated.

This is a basic data engineering example of getting started with asset factories in Dagster. As you delve deeper into Dagster, you can explore more advanced features. Dagster also provides integrations with many databases and data systems. This makes it easy to use asset factories to produce assets in a variety of formats and locations. You can use an asset factory to produce a file on a cloud storage system, execute SQL, or train a machine learning model.

Hence, asset factories in a python project provide the same three benefits::

  1. Declarative: Asset factories allow you to specify what you want to produce without having to write the detailed logic for how to produce it
  2. Reusable: Since asset factories are templates, they can be reused across different pipelines or projects
  3. Consistent: Using asset factories ensures that assets are produced in a consistent manner, regardless of where or how they are used

Conclusion

Factory patterns are a valuable tool for a data engineer, especially when dealing with different types of data sources or complex object creation. It simplifies your code and makes it more reusable, scalable, and maintainable.

As one of the most popular programming languages, factory patterns are very useful to master when using Python for data engineering. While these also apply in other languages, you are most likely to encounter Python as general purpose programming language behind most popular tools for data engineering tasks.

In our next chapter, we explore Write-Audit-Publish, a design pattern frequently used in ETL to ensure data quality and reliability.


The Dagster Labs logo

We're always happy to hear your feedback, so please reach out to us! If you have any questions, ask them in the Dagster community Slack (join here!) or start a Github discussion. If you run into any bugs, let us know with a Github issue. And if you're interested in working with us, check out our open roles!

Follow us:


Read more filed under
Blog post category for Python Guide. Python Guide