Data preprocessing is often a computationally demanding part of machine learning pipelines, especially when dealing with large datasets or complex feature engineering steps. Many preprocessing tasks, such as applying transformations row-by-row or column-by-column, are inherently parallelizable. Applying the concurrency techniques discussed earlier can significantly reduce the time spent on this stage.
In this hands-on practical, we will take a typical data preprocessing task and accelerate it using Python's concurrent.futures
module, specifically leveraging ProcessPoolExecutor
for CPU-bound operations.
Imagine we have a dataset represented as a Pandas DataFrame, and we need to apply a computationally intensive function to one of its columns to generate a new feature. Our function will simulate some complex calculations.
import pandas as pd
import numpy as np
import time
import math
from concurrent.futures import ProcessPoolExecutor
# Generate some sample data
data_size = 1_000_000
df = pd.DataFrame({
'id': range(data_size),
'value_to_process': np.random.rand(data_size) * 100
})
# Define a mock complex processing function
def complex_feature_engineering(value):
"""Simulates a CPU-intensive calculation."""
result = 0
for i in range(100): # Simulate computational work
result += math.sqrt(abs(math.sin(value) * math.cos(value / (i + 1)))) * math.log(value + 1)
# Add a small sleep to simulate potential I/O or external calls
# time.sleep(0.0001)
return result
print("Sample DataFrame head:")
print(df.head())
print(f"\nDataFrame size: {len(df)} rows")
First, let's implement the standard sequential approach using Pandas' apply
method. We'll time how long this takes.
# Sequential execution using pandas apply
print("\nStarting sequential processing...")
start_time_seq = time.time()
df['new_feature_seq'] = df['value_to_process'].apply(complex_feature_engineering)
end_time_seq = time.time()
sequential_duration = end_time_seq - start_time_seq
print(f"Sequential processing finished.")
print(f"Duration: {sequential_duration:.2f} seconds")
print("\nDataFrame head with sequential result:")
print(df[['id', 'value_to_process', 'new_feature_seq']].head())
On a typical multi-core machine, running this function over a million rows will take a noticeable amount of time, as it processes each value one after another on a single CPU core.
Now, let's parallelize this task using ProcessPoolExecutor
. This executor distributes the work across multiple processes, allowing Python to bypass the Global Interpreter Lock (GIL) for CPU-bound tasks like our complex_feature_engineering
function.
We can use the executor.map
method, which works similarly to the built-in map
function but executes the calls concurrently. It applies the given function to each item in the iterable (our DataFrame column) and returns an iterator yielding the results in the order the inputs were provided.
It's often more efficient to process data in chunks rather than sending individual rows to worker processes, as this reduces the overhead of inter-process communication (IPC). We can split our DataFrame column into chunks and submit each chunk for processing. However, for simplicity with executor.map
, we'll directly map over the Series. Pandas Series (and NumPy arrays) are generally pickleable, allowing them to be sent to worker processes.
# Parallel execution using ProcessPoolExecutor
# Determine the optimal number of workers (often related to CPU cores)
# Using None usually defaults to the number of processors on the machine
num_workers = None # Or set to a specific number, e.g., 4
print(f"\nStarting parallel processing with {num_workers or 'default'} workers...")
start_time_par = time.time()
# Convert the column to a list or NumPy array for potentially better chunking/serialization
# depending on the executor implementation details. For map, a Series often works directly.
values_to_process = df['value_to_process'].values
results_par = []
# Use ProcessPoolExecutor to parallelize the function application
# chunksize can be tuned for performance based on task duration vs overhead
# A larger chunksize reduces overhead but might lead to load imbalance.
# Let's estimate a chunksize. If each task is very short, larger chunks are better.
# If data_size is 1M and we have 8 workers, maybe 1000-10000 items per chunk.
estimated_chunksize = max(1, min(len(values_to_process) // (num_workers or os.cpu_count() or 1) // 4, 10000))
with ProcessPoolExecutor(max_workers=num_workers) as executor:
# map applies the function to each item in the iterable concurrently
# Results are returned in the order of the input iterable
results_par = list(executor.map(complex_feature_engineering, values_to_process, chunksize=estimated_chunksize))
# Assign results back to the DataFrame
df['new_feature_par'] = results_par
end_time_par = time.time()
parallel_duration = end_time_par - start_time_par
print(f"Parallel processing finished.")
print(f"Duration: {parallel_duration:.2f} seconds")
print("\nDataFrame head with parallel result:")
# Verify results are consistent (allowing for potential float precision differences if any)
print(df[['id', 'new_feature_seq', 'new_feature_par']].head())
# Check if results match closely
if 'new_feature_seq' in df.columns:
print("\nVerifying results match (approx):")
print(np.allclose(df['new_feature_seq'], df['new_feature_par']))
# Calculate speedup
speedup = sequential_duration / parallel_duration
print(f"\nSpeedup factor: {speedup:.2f}x")
# Cleanup the sequential column if needed
# del df['new_feature_seq']
Let's visualize the time taken by both approaches.
{"data": [{"x": ["Sequential", "Parallel"], "y": [sequential_duration, parallel_duration], "type": "bar", "marker": {"color": ["#ff6b6b", "#4dabf7"]}}], "layout": {"title": "Data Preprocessing Execution Time", "yaxis": {"title": "Time (seconds)"}, "xaxis": {"title": "Execution Method"}, "template": "plotly_white"}}
Comparison of execution times for sequential versus parallel data preprocessing.
You should observe a significant reduction in execution time with the parallel approach, especially on machines with multiple CPU cores. The exact speedup depends on:
ProcessPoolExecutor
utilizes multiple cores. The more cores available (up to a point), the greater the potential speedup.complex_feature_engineering
function needs to be computationally intensive enough for the parallelization overhead (IPC, process creation) to be worthwhile. If the function were extremely fast, the overhead might negate the benefits.chunksize
in executor.map
) can help mitigate this.ProcessPoolExecutor
is effective because each process has its own Python interpreter and memory space, circumventing the GIL. For I/O-bound tasks, ThreadPoolExecutor
or asyncio
might be more appropriate, as discussed earlier in the chapter.executor.submit
for each chunk) can be more efficient than mapping individual elements.n_jobs
parameter in Scikit-learn, distributed computing in Dask/Ray, multi-threading in Polars). Investigate if your tools already offer optimized parallel processing capabilities before implementing custom solutions.This practical demonstrated how to apply ProcessPoolExecutor
to accelerate a common CPU-bound data preprocessing task. By understanding the nature of your workload (CPU vs. I/O bound) and the available tools, you can effectively leverage concurrency to speed up your machine learning pipelines.
© 2025 ApX Machine Learning