Back to Glossary Index

Dagster Data Engineering Glossary:


Multiprocessing in Python

Optimize execution time with multiple parallel processes.

A definition of Multiprocessing in Python

Multiprocessing refers to a paradigm where multiple processes are employed to execute different tasks simultaneously in order to optimize the execution time and improve the efficiency of a pipeline, particularly for large volumes of data or heavy compute requirements. This is achieved using the multiprocessing module in Python which allows the creation of process-based parallelism.

Here are some important points about multiprocessing in Python:

  1. Each process in a multiprocessing environment runs in its own memory space, which means they don't share global variables and run independently. This is advantageous in data processing tasks as it prevents data corruption due to simultaneous read/write operations from different processes.

  2. This module provides functionalities for distributing input data across different processes (data parallelism), pooling processes to limit total number of active processes, synchronizing and sharing states between processes, and more.

  3. Python multiprocessing is particularly useful in data engineering where CPU-intensive tasks like data cleaning, data transformation, etc., need to be performed on large datasets. By distributing these tasks among different CPUs or cores, multiprocessing can significantly reduce execution time.

  4. Multiprocessing comes with its own overheads such as inter-process communication, process creation time, etc. For small datasets or tasks where the processing time is less than the overhead time, multiprocessing will not provide significant benefits, or may even slow down the execution.

Example of multiprocessing in Python

Let's look at an example of a fan-out process with multiprocessing. We will generate a list of numbers and use the multiprocessing module in Python to spawn a number of worker processes. Each worker process will apply a function to a subset of the list, allowing us to utilize multiple cores for our computation.

We start a number of worker processes equal to the number of cores available on the machine (multiprocessing.cpu_count()). Each worker process applies a function to a number from the list and stores the results in a shared queue. Once all the numbers have been processed, the main process retrieves the results from the shared Queue.

After using multiprocessing to process the numbers, it will also process the numbers using a single process and compare the execution times.

import multiprocessing
import os
import time

def generate_data_file():
    with open('numbers.txt', 'w') as f:
        for i in range(100000):
            f.write(f"{i}\n")

def worker(input_queue, output_queue):
    print(f"Worker process id: {os.getpid()}")
    result = []
    for number in iter(input_queue.get, 'STOP'):
        result.append(number**2)  # some computational task
    output_queue.put(result)

def single_process(numbers):
    return [number**2 for number in numbers]

def fan_out_process():
    # Generate data file
    generate_data_file()

    # Read data from file
    with open('numbers.txt', 'r') as f:
        numbers = [int(line.strip()) for line in f]

    num_workers = multiprocessing.cpu_count()

    # Create queues
    input_queue = multiprocessing.Queue()
    output_queue = multiprocessing.Queue()

    # Start worker processes
    for _ in range(num_workers):
        multiprocessing.Process(target=worker, args=(input_queue, output_queue)).start()

    # Send data to worker processes
    for number in numbers:
        input_queue.put(number)

    # Signal worker processes to stop when they're done
    for _ in range(num_workers):
        input_queue.put('STOP')

    # Collect results
    results = []
    for _ in range(num_workers):
        results.extend(output_queue.get())

    return results

if __name__ == "__main__":
    # Measure time taken using multiprocessing
    start_time = time.time()
    results = fan_out_process()
    multi_proc_time = time.time() - start_time
    print(f"Computation with multiprocessing took {multi_proc_time:.2f} seconds")
    print(f"First 10 results with multiprocessing: {results[:10]}")

    # Measure time taken using single process
    start_time = time.time()
    with open('numbers.txt', 'r') as f:
        numbers = [int(line.strip()) for line in f]
    results = single_process(numbers)
    single_proc_time = time.time() - start_time
    print(f"Computation with single process took {single_proc_time:.2f} seconds")
    print(f"First 10 results with single process: {results[:10]}")

    # Compare times
    time_difference = single_proc_time / multi_proc_time
    if time_difference > 1:
        print(f"Multiprocessing was {time_difference:.2f} times faster")
    else:
        print(f"Multiprocessing was {1/time_difference:.2f} times slower")

Note that the time difference will vary based on the hardware, operating system, and the specifics of the task being performed. In general, for CPU-bound tasks (like our squaring operation), you can expect to see a speedup roughly proportional to the number of cores on your machine when using multiprocessing.

Here is the output on my machine:

Worker process id: 9700
Worker process id: 9704
Worker process id: 9703
Worker process id: 9699
Worker process id: 9705
Worker process id: 9708
Worker process id: 9701
Worker process id: 9702
Worker process id: 9706
Worker process id: 9707
Computation with multiprocessing took 1.28 seconds
First 10 results with multiprocessing: [1610361, 1635841, 1661521, 1687401, 1713481, 1739761, 1766241, 1792921, 1819801, 1846881]
Computation with single process took 0.03 seconds
First 10 results with single process: [0, 1, 4, 9, 16, 25, 36, 49, 64, 81]
Multiprocessing was 38.11 times slower

Why do we see different results?

The first thing that might jump out at you is that the first 10 results are different for multiprocessing and single-process execution. You may (or may not) see this if you execute the code locally. So what is happening here?

When you use a queue to receive the output of each worker process, the order in which the results are placed in the queue depends on the timing and scheduling of the processes. The operating system determines the execution order of the processes, and it may vary between different runs of the program or on different systems.

If you need to maintain the order of the results, you can include additional information in the output, such as an index or a timestamp, and sort the results based on that information after retrieving them from the queue. Alternatively, you can use the Pool class from the multiprocessing module, which provides a map function that guarantees the order of the results.

So in the case above, the first 10 results added to the queue where not from the worker processing the first numbers in the sequence. To test this, you can reduce the number of items in the range in the line for i in range(100000):. If you have less than 100 items, your computer will return the same result for both approaches. You will also notice that as there are less numbers to crunch, the overhead of multiprocessing significantly affects relative the difference in execution time between these methods.

Why multiprocessing can be slower

OK, so in our example, multiprocessing was almost fourty times SLOWER than single-threaded execution. What happened?

While multiprocessing in Python allows us to utilize multiple cores and therefore speed up CPU-bound tasks, there are several factors that can cause multiprocessing to be slower than single-threaded execution:

  1. Overhead of Inter-Process Communication (IPC): In Python, different processes don't share memory space, so any communication between them (using Queues or Pipes, as in this example) requires pickling/unpickling data, which can be time-consuming, especially for large data sets.

  2. Overhead of Process Creation: Creating a new process is more resource-intensive than creating a new thread. If the number of processes being created is large, or if processes are being created and destroyed frequently, this overhead can become significant.

  3. GIL (Global Interpreter Lock) does not affect multiprocessing: The GIL is a mechanism used in CPython interpreter that allows only one thread to execute Python bytecodes at a time, even on a multi-core system. However, GIL does not prevent multiple processes from running on different cores at the same time. This means that multiprocessing can utilize multiple cores, but this comes with the overheads mentioned above.

  4. The nature of the task: If the task being performed by each process is not CPU-bound (i.e., it doesn't require a lot of computational power), then the overhead of IPC and process creation can outweigh the benefits of running tasks in parallel. In our case, squaring a number is a relatively simple operation, so it's possible that the overheads of multiprocessing are larger than the time it takes to simply square all the numbers in a single process.

To get the benefits of multiprocessing, you generally want to use it for CPU-bound tasks that take a significant amount of time to run. For IO-bound tasks (such as downloading web pages), threading or asynchronous IO can be more efficient. For tasks that are neither CPU-bound nor IO-bound, running them in a single process may be the most efficient approach.

Before designing your final architecture, be sure to test these approaches and research Threading as an alternative approach:


Other data engineering terms related to
Data Processing: