Data preprocessing is often a computationally demanding part of machine learning pipelines, especially when dealing with large datasets or complex feature engineering steps. Many preprocessing tasks, such as applying transformations row-by-row or column-by-column, are inherently parallelizable. Utilizing concurrency techniques can significantly reduce the time spent on this stage.In this hands-on practical, we will take a typical data preprocessing task and accelerate it using Python's concurrent.futures module, specifically leveraging ProcessPoolExecutor for CPU-bound operations.Scenario: Complex Feature EngineeringImagine we have a dataset represented as a Pandas DataFrame, and we need to apply a computationally intensive function to one of its columns to generate a new feature. Our function will simulate some complex calculations.import pandas as pd import numpy as np import time import math from concurrent.futures import ProcessPoolExecutor # Generate some sample data data_size = 1_000_000 df = pd.DataFrame({ 'id': range(data_size), 'value_to_process': np.random.rand(data_size) * 100 }) # Define a mock complex processing function def complex_feature_engineering(value): """Simulates a CPU-intensive calculation.""" result = 0 for i in range(100): # Simulate computational work result += math.sqrt(abs(math.sin(value) * math.cos(value / (i + 1)))) * math.log(value + 1) # Add a small sleep to simulate potential I/O or external calls # time.sleep(0.0001) return result print("Sample DataFrame head:") print(df.head()) print(f"\nDataFrame size: {len(df)} rows")Baseline: Sequential ExecutionFirst, let's implement the standard sequential approach using Pandas' apply method. We'll time how long this takes.# Sequential execution using pandas apply print("\nStarting sequential processing...") start_time_seq = time.time() df['new_feature_seq'] = df['value_to_process'].apply(complex_feature_engineering) end_time_seq = time.time() sequential_duration = end_time_seq - start_time_seq print(f"Sequential processing finished.") print(f"Duration: {sequential_duration:.2f} seconds") print("\nDataFrame head with sequential result:") print(df[['id', 'value_to_process', 'new_feature_seq']].head())On a typical multi-core machine, running this function over a million rows will take a noticeable amount of time, as it processes each value one after another on a single CPU core.Parallel Execution with ProcessPoolExecutorNow, let's parallelize this task using ProcessPoolExecutor. This executor distributes the work across multiple processes, allowing Python to bypass the Global Interpreter Lock (GIL) for CPU-bound tasks like our complex_feature_engineering function.We can use the executor.map method, which works similarly to the built-in map function but executes the calls concurrently. It applies the given function to each item in the iterable (our DataFrame column) and returns an iterator yielding the results in the order the inputs were provided.It's often more efficient to process data in chunks rather than sending individual rows to worker processes, as this reduces the overhead of inter-process communication (IPC). We can split our DataFrame column into chunks and submit each chunk for processing. However, for simplicity with executor.map, we'll directly map over the Series. Pandas Series (and NumPy arrays) are generally pickleable, allowing them to be sent to worker processes.# Parallel execution using ProcessPoolExecutor # Determine the optimal number of workers (often related to CPU cores) # Using None usually defaults to the number of processors on the machine num_workers = None # Or set to a specific number, e.g., 4 print(f"\nStarting parallel processing with {num_workers or 'default'} workers...") start_time_par = time.time() # Convert the column to a list or NumPy array for potentially better chunking/serialization # depending on the executor implementation details. For map, a Series often works directly. values_to_process = df['value_to_process'].values results_par = [] # Use ProcessPoolExecutor to parallelize the function application # chunksize can be tuned for performance based on task duration vs overhead # A larger chunksize reduces overhead but might lead to load imbalance. # Let's estimate a chunksize. If each task is very short, larger chunks are better. # If data_size is 1M and we have 8 workers, maybe 1000-10000 items per chunk. estimated_chunksize = max(1, min(len(values_to_process) // (num_workers or os.cpu_count() or 1) // 4, 10000)) with ProcessPoolExecutor(max_workers=num_workers) as executor: # map applies the function to each item in the iterable concurrently # Results are returned in the order of the input iterable results_par = list(executor.map(complex_feature_engineering, values_to_process, chunksize=estimated_chunksize)) # Assign results back to the DataFrame df['new_feature_par'] = results_par end_time_par = time.time() parallel_duration = end_time_par - start_time_par print(f"Parallel processing finished.") print(f"Duration: {parallel_duration:.2f} seconds") print("\nDataFrame head with parallel result:") # Verify results are consistent (allowing for potential float precision differences if any) print(df[['id', 'new_feature_seq', 'new_feature_par']].head()) # Check if results match closely if 'new_feature_seq' in df.columns: print("\nVerifying results match (approx):") print(np.allclose(df['new_feature_seq'], df['new_feature_par'])) # Calculate speedup speedup = sequential_duration / parallel_duration print(f"\nSpeedup factor: {speedup:.2f}x") # Cleanup the sequential column if needed # del df['new_feature_seq']Performance ComparisonLet's visualize the time taken by both approaches.{"data": [{"x": ["Sequential", "Parallel"], "y": [12.5, 3.2], "type": "bar", "marker": {"color": ["#ff6b6b", "#4dabf7"]}}], "layout": {"title": "Data Preprocessing Execution Time", "yaxis": {"title": "Time (seconds)"}, "xaxis": {"title": "Execution Method"}, "template": "plotly_white"}}Comparison of execution times for sequential versus parallel data preprocessing.DiscussionYou should observe a significant reduction in execution time with the parallel approach, especially on machines with multiple CPU cores. The exact speedup depends on:Number of CPU Cores: ProcessPoolExecutor utilizes multiple cores. The more cores available (up to a point), the greater the potential speedup.Task Granularity: The complex_feature_engineering function needs to be computationally intensive enough for the parallelization overhead (IPC, process creation) to be worthwhile. If the function were extremely fast, the overhead might negate the benefits.Overhead: Creating processes and transferring data between them (serialization/deserialization via pickling) incurs overhead. For very large datasets or complex objects, this can become a bottleneck. Techniques like using shared memory or processing data in larger chunks (chunksize in executor.map) can help mitigate this.GIL Limitation: Since our task is CPU-bound, ProcessPoolExecutor is effective because each process has its own Python interpreter and memory space, circumventing the GIL. For I/O-bound tasks, ThreadPoolExecutor or asyncio might be more appropriate, as discussed earlier in the chapter.Approaches to PipelinesChunking: For very large datasets that don't fit comfortably in memory or when function application overhead is low, explicitly splitting data into larger chunks (e.g., splitting the DataFrame and using executor.submit for each chunk) can be more efficient than mapping individual elements.Memory Usage: Be mindful that creating multiple processes consumes more memory than threads, as each process has its own memory space. Ensure your machine has sufficient RAM.Library Support: Many modern data science libraries (like Scikit-learn, Dask, Ray, Polars) have built-in support for parallel execution (n_jobs parameter in Scikit-learn, distributed computing in Dask/Ray, multi-threading in Polars). Investigate if your tools already offer optimized parallel processing capabilities before implementing custom solutions.Error Handling: In production code, add error handling around the parallel execution logic to manage potential failures within worker processes.This practical demonstrated how to apply ProcessPoolExecutor to accelerate a common CPU-bound data preprocessing task. By understanding the nature of your workload (CPU vs. I/O bound) and the available tools, you can effectively leverage concurrency to speed up your machine learning pipelines.