Back to Glossary Index

Parallelize

Boost execution speed of large data processing by breaking the task into many smaller concurrent tasks.

A definition of parallelization

Parallelization refers to the breaking down of data processing tasks into smaller units of work that can be executed concurrently, either on a single machine with multiple processing units or across distributed computing resources. It aims to enhance the efficiency and speed of data processing by leveraging parallel computing techniques.

Data engineers often deal with large volumes of data that require complex transformations, aggregations, or analysis. Parallelization enables them to distribute these tasks across multiple computing resources, such as CPU cores or machines, to achieve faster and more scalable data processing.

In Python, parallelization can be achieved in data pipelines using various techniques:

  1. Parallel Execution: Data engineers can split the processing tasks into smaller chunks and execute them in parallel using techniques like multiprocessing, threading, or asynchronous programming. This allows for concurrent execution of tasks, taking advantage of multiple CPU cores or threads, and reducing the overall processing time.

  2. Distributed Computing: For larger datasets or computationally intensive operations, data engineers can employ distributed computing frameworks like Apache Spark, Dask, or Ray. These frameworks enable the execution of data processing tasks across a cluster of machines, providing scalability and fault tolerance.

  3. Batch Processing: Data engineers can utilize batch processing frameworks such as Apache Hadoop or Apache Airflow, which enable the parallel execution of tasks in a predefined workflow. These frameworks schedule and distribute tasks across multiple nodes, allowing for parallel processing of data pipelines.

  4. Data Parallelism: When dealing with data-intensive operations, data engineers can partition the data into smaller subsets and perform parallel computations on each subset. This approach is particularly useful for tasks like data aggregation, map-reduce operations, or parallel database operations.

An example of process parallelization in Python using Dask

In the following steps, we will be utilizing Python's built-in os module, pandas for data manipulation, uuid for generating unique identifiers and dask for parallel processing.

First, let's create a directory named sales_data, and populate it with twelve CSV files representing sales data for each month:

import os
import pandas as pd
import numpy as np
from uuid import uuid4

# Function to check if a directory exists and create if it doesn't
def create_directory(directory_name):
    if not os.path.exists(directory_name):
        os.makedirs(directory_name)

# Create the directory
directory_name = "sales_data"
create_directory(directory_name)

# Function to generate random sales data
def create_sales_data():
    return pd.DataFrame({
        "item_id": [uuid4() for _ in range(1000)],
        "price": np.random.rand(1000) * 100,
        "quantity": np.random.randint(1, 10, size=1000),
    })

# Generate and save the sales data to CSV for each month
for month in range(1, 13):
    sales_data = create_sales_data()
    sales_data.to_csv(f"{directory_name}/sales_data_{month}.csv", index=False)

Now that we have our data, we can write a Dask process to read all the CSV files, compute the total sales (price * quantity) for each item sold each month, and then calculate the total sales over the entire period.

import dask.dataframe as dd

# Construct a Dask DataFrame from all the CSV files
ddf = dd.read_csv(f"{directory_name}/*.csv")

# Calculate the total sales for each item by multiplying price with quantity
ddf['total_sales'] = ddf['price'] * ddf['quantity']

# Compute the total sales over the entire period
total_sales = ddf['total_sales'].sum().compute()

print(f"Total sales over the period: ${total_sales:,.2f}")

In this script, dask.dataframe.read_csv function constructs a Dask DataFrame representing all of the data in the CSV files. This doesn't actually load any data yet, but instead lazily represents it as computations to perform.

The line ddf['total_sales'] = ddf['price'] * ddf['quantity'] adds a new column to the DataFrame that represents the total sales for each item. Again, this doesn't perform any computations yet.

Finally, total_sales = ddf['total_sales'].sum().compute() performs all of the lazy computations. It sums the total sales for each item to get the total sales for the entire period, then uses compute() to execute the computations in parallel.

Comparing performance

Let's add some timing to our previous script to see the performance difference between Dask and Pandas.

We will use Python's built-in time module to measure the execution time of the Dask computation and the equivalent Pandas computation.

First, let's perform the computation with Dask and measure the time taken:

import time

# Dask
start_time = time.time()

ddf = dd.read_csv(f"{directory_name}/*.csv")
ddf['total_sales'] = ddf['price'] * ddf['quantity']
total_sales = ddf['total_sales'].sum().compute()

end_time = time.time()

dask_time = end_time - start_time

print(f"Total sales over the period (Dask): ${total_sales:.2f}")
print(f"Time taken with Dask: {dask_time:.2f} seconds")

Now, let's do the same thing with Pandas:

# Pandas
start_time = time.time()

total_sales = 0
for month in range(1, 13):
    df = pd.read_csv(f"{directory_name}/sales_data_{month}.csv")
    df['total_sales'] = df['price'] * df['quantity']
    total_sales += df['total_sales'].sum()

end_time = time.time()

pandas_time = end_time - start_time

print(f"Total sales over the period (Pandas): ${total_sales:.2f}")
print(f"Time taken with Pandas: {pandas_time:.2f} seconds")

Finally, let's compare the relative performance of each approach:

# Compare times
time_difference = pandas_time / dask_time
if time_difference > 1:
    print(f"Dask was {time_difference:.2f} times faster")
else:
    print(f"Dask was {1 / time_difference:.2f} times slower")

With this, you should be able to see the performance difference between Dask and Pandas.

Total sales over the period: $2,987,126.20
Total sales over the period (Dask): $2,987,126.20
Time taken with Dask: 0.02 seconds
Total sales over the period (Pandas): $2,987,126.20
Time taken with Pandas: 0.01 seconds
Dask was 1.78 times slower

In general, Dask should be faster for larger datasets (that don't fit into memory), as it can perform computations in parallel and only load parts of the data into memory at a time. But as we see in this example, for smaller datasets the overhead of parallelization can make Dask slower than Pandas. In fact this difference in performance would only get larger if we scale the number of files involved as the task is simple, so the overhead of starting each new process compounds. It is important to understand the relative performance of concurrent execution techniques in order to properly optimize your pipelines.

For example, you can try expanding the size of the dataset as follows, and eventually Dask becomes the more performant technique:

# Function to generate random sales data
def create_sales_data():
    return pd.DataFrame({
        "item_id": [uuid4() for _ in range(100000)],
        "price": np.random.rand(100000) * 100,
        "quantity": np.random.randint(1, 10, size=100000),
        "description": generate_random_string(12),
        "item_name": generate_random_string(255),
        "description2": generate_random_string(255),
        "store_id2": generate_random_string(6),
    })

With the above dataset you might see results such as the ones below, and the performance difference will increase along with the size of your datasets:

Time taken with Dask: 2.13 seconds
Time taken with Pandas: 3.08 seconds
Dask was 1.45 times faster

Here is the complete code for the examples above:

import os
import pandas as pd
import numpy as np
from uuid import uuid4
import dask.dataframe as dd
import time

# Function to check if a directory exists and create if it doesn't
def create_directory(directory_name):
    if not os.path.exists(directory_name):
        os.makedirs(directory_name)

# Create the directory
directory_name = "sales_data"
create_directory(directory_name)

# Function to generate random sales data
def create_sales_data():
    return pd.DataFrame({
        "item_id": [uuid4() for _ in range(1000)],
        "price": np.random.rand(1000) * 100,
        "quantity": np.random.randint(1, 10, size=1000),
    })

# Generate and save the sales data to CSV for each month
for month in range(1, 13):
    sales_data = create_sales_data()
    sales_data.to_csv(f"{directory_name}/sales_data_{month}.csv", index=False)


# Construct a Dask DataFrame from all the CSV files
ddf = dd.read_csv(f"{directory_name}/*.csv")

# Calculate the total sales for each item by multiplying price with quantity
ddf['total_sales'] = ddf['price'] * ddf['quantity']

# Compute the total sales over the entire period
total_sales = ddf['total_sales'].sum().compute()

print(f"Total sales over the period: ${total_sales:,.2f}")

# Dask
start_time = time.time()

ddf = dd.read_csv(f"{directory_name}/*.csv")
ddf['total_sales'] = ddf['price'] * ddf['quantity']
total_sales = ddf['total_sales'].sum().compute()

end_time = time.time()

dask_time = end_time - start_time

print(f"Total sales over the period (Dask): ${total_sales:,.2f}")
print(f"Time taken with Dask: {dask_time:.2f} seconds")


# Pandas
start_time = time.time()

total_sales = 0
for month in range(1, 13):
    df = pd.read_csv(f"{directory_name}/sales_data_{month}.csv")
    df['total_sales'] = df['price'] * df['quantity']
    total_sales += df['total_sales'].sum()

end_time = time.time()

pandas_time = end_time - start_time

print(f"Total sales over the period (Pandas): ${total_sales:,.2f}")
print(f"Time taken with Pandas: {pandas_time:.2f} seconds")

# Compare times
time_difference = pandas_time / dask_time
if time_difference > 1:
    print(f"Dask was {time_difference:.2f} times faster")
else:
    print(f"Dask was {1 / time_difference:.2f} times slower")

Other data engineering terms related to
Data Processing: