While the threading
and multiprocessing
modules provide the fundamental building blocks for concurrency and parallelism in Python, managing threads or processes directly, handling communication, and ensuring proper cleanup can become complex. The concurrent.futures
module, introduced in Python 3.2, offers a higher-level interface for asynchronously executing callables, simplifying the management of thread and process pools.
It abstracts away much of the boilerplate code associated with creating, managing, and joining threads or processes. The core idea revolves around the Executor
abstract base class, which provides methods to execute calls asynchronously. Two concrete implementations are provided: ThreadPoolExecutor
for thread-based concurrency and ProcessPoolExecutor
for process-based parallelism.
At the heart of the concurrent.futures
module are the Executor
subclasses and the Future
object. You instantiate an executor (either ThreadPoolExecutor
or ProcessPoolExecutor
) and then submit tasks to it.
Executor.submit(fn, *args, **kwargs)
: This method schedules the callable fn
to be executed as fn(*args, **kwargs)
and returns a Future
object representing the execution of the callable.Future
Object: This object represents a computation that may or may not have completed yet. It acts as a placeholder for the result. You can query its status, add callbacks, and retrieve the result or any exception raised during execution.Basic workflow using
concurrent.futures
: submit a task to an Executor, receive a Future object immediately, and later retrieve the result from the Future.
Key methods of the Future
object include:
done()
: Returns True
if the call was successfully cancelled or finished running.result(timeout=None)
: Returns the value returned by the call. If the call hasn't finished, this method will wait for up to timeout
seconds. If the call raised an exception, this method raises the same exception. If the future was cancelled, it raises CancelledError
.exception(timeout=None)
: Returns the exception object raised by the call. If the call completed without raising, it returns None
. If it hasn't finished, it waits similarly to result()
.add_done_callback(fn)
: Attaches a callable fn
that will be called with the future object as its only argument when the future is cancelled or finishes running.As discussed previously, threads in CPython are suitable for I/O-bound tasks because the GIL is released during blocking I/O operations (like network requests, disk reads/writes). ThreadPoolExecutor
manages a pool of threads, reusing them for submitted tasks. This avoids the overhead of creating a new thread for each task.
Consider fetching multiple datasets or configuration files from different URLs. This is primarily an I/O-bound operation.
import concurrent.futures
import requests
import time
import logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(threadName)s - %(message)s')
URLS = [
'https://raw.githubusercontent.com/scikit-learn/scikit-learn/main/sklearn/datasets/data/iris.csv',
'https://raw.githubusercontent.com/scikit-learn/scikit-learn/main/sklearn/datasets/data/diabetes_data.csv.gz',
'https://raw.githubusercontent.com/scikit-learn/scikit-learn/main/sklearn/datasets/data/wine_data.csv'
]
def download_data(url):
"""Downloads data from a URL and returns its size."""
try:
logging.info(f"Starting download: {url.split('/')[-1]}")
response = requests.get(url, timeout=10)
response.raise_for_status() # Raise HTTPError for bad responses (4xx or 5xx)
logging.info(f"Finished download: {url.split('/')[-1]}, Size: {len(response.content)}")
return url, len(response.content)
except requests.exceptions.RequestException as e:
logging.error(f"Error downloading {url}: {e}")
return url, None
# Use context manager for automatic shutdown
start_time = time.time()
results = {}
# max_workers determines the number of threads in the pool
with concurrent.futures.ThreadPoolExecutor(max_workers=3, thread_name_prefix='Downloader') as executor:
# Submit tasks and store Future objects
future_to_url = {executor.submit(download_data, url): url for url in URLS}
# Process results as they complete
for future in concurrent.futures.as_completed(future_to_url):
url = future_to_url[future]
try:
url_res, data_size = future.result()
if data_size is not None:
results[url_res] = data_size
logging.info(f"Result obtained for {url_res.split('/')[-1]}")
else:
logging.warning(f"Download failed for {url_res.split('/')[-1]}")
except Exception as exc:
logging.error(f'{url} generated an exception: {exc}')
end_time = time.time()
print(f"\nDownloaded data sizes: {results}")
print(f"Total time using ThreadPoolExecutor: {end_time - start_time:.2f} seconds")
In this example, ThreadPoolExecutor
manages multiple download tasks concurrently. Because requests.get
involves waiting for network responses (I/O), the threads can release the GIL, allowing other threads to run, leading to faster overall completion compared to sequential downloads. We use concurrent.futures.as_completed
to process results as soon as they become available.
For CPU-bound tasks in CPython, where the GIL prevents true parallel execution of Python bytecode across multiple CPU cores using threads, ProcessPoolExecutor
is the preferred choice. It manages a pool of separate processes. Each process has its own Python interpreter and memory space, thus bypassing the GIL limitation.
Data submitted to and results returned from tasks executed by ProcessPoolExecutor
must be pickleable, as inter-process communication typically involves serialization.
Let's adapt a hypothetical CPU-intensive feature calculation task.
import concurrent.futures
import time
import math
import os
# A dummy CPU-intensive function
def compute_heavy_feature(item_id, data_value):
"""Simulates a complex calculation."""
pid = os.getpid()
print(f"[PID {pid}] Processing item {item_id}...")
result = 0
for i in range(int(data_value * 1e6)): # Simulate work
result += math.sqrt(i) * math.sin(i)
print(f"[PID {pid}] Finished item {item_id}.")
return item_id, result
data_to_process = {
1: 5.1,
2: 6.3,
3: 4.8,
4: 7.2,
5: 5.5,
6: 6.8
}
# Use context manager for automatic shutdown
start_time = time.time()
results = {}
# By default, uses os.cpu_count() workers
# Ensure this code block runs only in the main script
if __name__ == "__main__": # Important for multiprocessing on some OS
with concurrent.futures.ProcessPoolExecutor(max_workers=4) as executor:
# Submit tasks: function, arg1, arg2, ...
future_to_item = {executor.submit(compute_heavy_feature, item_id, value): item_id
for item_id, value in data_to_process.items()}
# Retrieve results
for future in concurrent.futures.as_completed(future_to_item):
item_id = future_to_item[future]
try:
item_res, computed_value = future.result()
results[item_res] = computed_value
print(f"Main: Result received for item {item_res}")
except Exception as exc:
print(f'Main: Item {item_id} generated an exception: {exc}')
end_time = time.time()
print(f"\nComputed feature results (first 10 chars): { {k: str(v)[:10] + '...' for k, v in results.items()} }")
print(f"Total time using ProcessPoolExecutor: {end_time - start_time:.2f} seconds")
Note: The if __name__ == "__main__":
guard is important when using multiprocessing
(which ProcessPoolExecutor
relies on) in scripts, especially on Windows, to prevent issues related to process spawning.
Here, ProcessPoolExecutor
creates multiple independent Python processes. Each compute_heavy_feature
call runs in a separate process, allowing multiple calculations to proceed in parallel on different CPU cores, significantly speeding up the total computation time for CPU-bound workloads.
For simpler cases where you want to apply the same function to multiple items in an iterable, Executor.map
provides an interface similar to the built-in map
function. It applies the function to each item and returns an iterator that yields the results in the order of the original iterable.
import concurrent.futures
import time
import os
def square_number(x):
pid = os.getpid()
# print(f"[PID {pid}] Squaring {x}")
time.sleep(0.1) # Simulate some work
return x * x
numbers = range(10)
if __name__ == "__main__":
start_time = time.time()
# Use ProcessPoolExecutor for potentially CPU-bound work
with concurrent.futures.ProcessPoolExecutor(max_workers=4) as executor:
# map returns an iterator yielding results in order
results_iterator = executor.map(square_number, numbers)
# Consume the iterator to get results
results = list(results_iterator)
end_time = time.time()
print(f"Original numbers: {list(numbers)}")
print(f"Squared numbers: {results}")
print(f"Total time using map: {end_time - start_time:.2f} seconds")
map
is convenient but less flexible than submit
. It waits for results in order, and handling exceptions requires iterating through the result iterator and potentially using a try...except
block around the iteration. submit
combined with as_completed
allows processing results as they finish, which can be more efficient if tasks have varying durations.
Both ThreadPoolExecutor
and ProcessPoolExecutor
implement the context manager protocol. Using a with
statement ensures that the executor is properly shut down (calling executor.shutdown(wait=True)
) when the block is exited, even if errors occur. This is the recommended way to use executors, as it guarantees that resources (threads or processes) are cleaned up correctly.
The choice between ThreadPoolExecutor
and ProcessPoolExecutor
hinges primarily on the nature of your workload and the constraints of the GIL:
ThreadPoolExecutor
: Best suited for I/O-bound tasks (network requests, file operations) where threads spend significant time waiting, allowing the GIL to be released. It has lower overhead than process creation.ProcessPoolExecutor
: Necessary for CPU-bound tasks (heavy numerical computations, complex data transformations in pure Python) where you need true parallelism across multiple CPU cores. It overcomes the GIL limitation but incurs higher overhead due to process creation and inter-process communication (pickling).The concurrent.futures
module provides a significant simplification for managing concurrent operations in Python. By offering a unified interface through ThreadPoolExecutor
and ProcessPoolExecutor
, it allows developers to easily apply threading or multiprocessing based on the task characteristics, accelerating ML workflows involving I/O or intensive computation. Remember to consider the task type (I/O vs. CPU bound) and the overhead associated with each approach when selecting the appropriate executor.
© 2025 ApX Machine Learning