趋近智
itertools 处理复杂序列__getattr__, __getattribute__)multiprocessing 模块concurrent.futures 实现高级并发数据预处理通常是机器学习流程中计算密集型的部分,尤其是在处理大规模数据集或复杂特征工程步骤时。许多预处理任务,例如逐行或逐列应用转换,本身就可以并行处理。应用前面讨论的并发技术可以大幅缩短此阶段耗时。
选取一个典型的数据预处理任务,并使用 Python 的 concurrent.futures 模块来加速,特别是运用 ProcessPoolExecutor 处理 CPU 密集型操作。
假设我们有一个表示为 Pandas DataFrame 的数据集,我们需要对其一列应用一个计算密集型函数,以生成一个新特征。我们的函数将模拟一些复杂计算。
import pandas as pd
import numpy as np
import time
import math
from concurrent.futures import ProcessPoolExecutor
# 生成一些样本数据
data_size = 1_000_000
df = pd.DataFrame({
'id': range(data_size),
'value_to_process': np.random.rand(data_size) * 100
})
# 定义一个模拟的复杂处理函数
def complex_feature_engineering(value):
"""模拟一个CPU密集型计算。"""
result = 0
for i in range(100): # 模拟计算工作
result += math.sqrt(abs(math.sin(value) * math.cos(value / (i + 1)))) * math.log(value + 1)
# 添加短暂休眠以模拟潜在的I/O或外部调用
# time.sleep(0.0001)
return result
print("样本 DataFrame 前几行:")
print(df.head())
print(f"\nDataFrame 大小:{len(df)} 行")
首先,我们使用 Pandas 的 apply 方法来实现标准的顺序方法。我们将计时以查看此过程需要多长时间。
# 使用pandas apply进行顺序执行
print("\n开始顺序处理...")
start_time_seq = time.time()
df['new_feature_seq'] = df['value_to_process'].apply(complex_feature_engineering)
end_time_seq = time.time()
sequential_duration = end_time_seq - start_time_seq
print(f"顺序处理完成。")
print(f"耗时:{sequential_duration:.2f} 秒")
print("\n包含顺序结果的 DataFrame 前几行:")
print(df[['id', 'value_to_process', 'new_feature_seq']].head())
在典型的多核机器上,对一百万行数据运行此函数将耗费相当长的时间,因为它在单个 CPU 核心上逐个处理每个值。
现在,让我们使用 ProcessPoolExecutor 来并行化此任务。该执行器将工作分配到多个进程,使 Python 能够绕过全局解释器锁 (GIL),处理像 complex_feature_engineering 函数这样的 CPU 密集型任务。
我们可以使用 executor.map 方法,它的工作方式类似于内置的 map 函数,但会并发执行调用。它将给定函数应用于可迭代对象(我们的 DataFrame 列)中的每个项,并返回一个迭代器,按输入顺序生成结果。
以块(分块)方式处理数据通常比将单独的行发送到工作进程更有效,因为这可以减少进程间通信 (IPC) 的开销。我们可以将 DataFrame 列分成块,并提交每个块进行处理。然而,为了 executor.map 的简单性,我们将直接在 Series 上映射。Pandas Series(和 NumPy 数组)通常是可序列化的,这使得它们可以被发送到工作进程。
# 使用 ProcessPoolExecutor 进行并行执行
# 确定最优工作进程数(通常与CPU核心数有关)
# 使用 None 通常默认为机器的处理器数量
num_workers = None # 或者设置为特定数字,例如 4
print(f"\n开始并行处理,使用 {num_workers or '默认'} 个工作进程...")
start_time_par = time.time()
# 将列转换为列表或NumPy数组,以获得可能更好的分块/序列化效果
# 这取决于执行器的实现细节。对于map,Series通常可以直接使用。
values_to_process = df['value_to_process'].values
results_par = []
# 使用 ProcessPoolExecutor 并行应用函数
# chunksize 可以根据任务持续时间与开销进行性能调整
# 较大的 chunksize 会减少开销,但可能导致负载不平衡。
# 让我们估算一个 chunksize。如果每个任务都很短,则更大的块更好。
# 如果 data_size 是 1M 且我们有 8 个工作进程,每个块可能是 1000-10000 个项。
estimated_chunksize = max(1, min(len(values_to_process) // (num_workers or os.cpu_count() or 1) // 4, 10000))
with ProcessPoolExecutor(max_workers=num_workers) as executor:
# map 并发地将函数应用于可迭代对象中的每个项
# 结果按输入可迭代对象的顺序返回
results_par = list(executor.map(complex_feature_engineering, values_to_process, chunksize=estimated_chunksize))
# 将结果重新赋值给 DataFrame
df['new_feature_par'] = results_par
end_time_par = time.time()
parallel_duration = end_time_par - start_time_par
print(f"并行处理完成。")
print(f"耗时:{parallel_duration:.2f} 秒")
print("\n包含并行结果的 DataFrame 前几行:")
# 验证结果是否一致(如果存在浮点精度差异则允许)
print(df[['id', 'new_feature_seq', 'new_feature_par']].head())
# 检查结果是否接近匹配
if 'new_feature_seq' in df.columns:
print("\n验证结果是否匹配(近似):")
print(np.allclose(df['new_feature_seq'], df['new_feature_par']))
# 计算加速比
speedup = sequential_duration / parallel_duration
print(f"\n加速因子:{speedup:.2f}x")
# 如果需要,清除顺序列
# del df['new_feature_seq']
让我们可视化这两种方法的耗时。
顺序数据预处理与并行数据预处理的执行时间比较。
您应该会发现并行方法能大幅减少执行时间,特别是在具有多个 CPU 核心的机器上。具体的加速效果取决于:
ProcessPoolExecutor 利用多个核心。可用的核心越多(在一定程度上),潜在的加速效果越好。complex_feature_engineering 函数需要足够计算密集,才能使并行化开销(IPC、进程创建)变得值得。如果函数执行速度极快,则开销可能会抵消收益。executor.map 中的 chunksize)等技术可以帮助缓解此问题。ProcessPoolExecutor 非常有效,因为每个进程都有自己的 Python 解释器和内存空间,从而绕开了 GIL。对于 I/O 密集型任务,ThreadPoolExecutor 或 asyncio 可能更适合,正如本章前面讨论的。executor.submit)比映射单个元素更有效率。n_jobs 参数,Dask/Ray 中的分布式计算,Polars 中的多线程)。在实现自定义解决方案之前,请调查您的工具是否已提供优化的并行处理能力。本实践演示了如何应用 ProcessPoolExecutor 来加速常见的 CPU 密集型数据预处理任务。通过了解您的工作负载性质(CPU 密集型 vs. I/O 密集型)和可用的工具,您可以有效地运用并发来加速您的机器学习流程。
这部分内容有帮助吗?
© 2026 ApX Machine Learning用心打造