趋近智
itertools 处理复杂序列__getattr__, __getattribute__)multiprocessing 模块concurrent.futures 实现高级并发构建一个实用的、可复用的机器学习数据管道组件涉及应用高级 Python 构造。这些构造包括用于内存高效迭代的生成器、用于可靠资源处理的上下文管理器以及用于更清晰数据转换的函数式编程模式。
我们的目标是创建一个组件,它能够从源(如文件)读取数据,应用一系列数据清洗和转换步骤,并按需提供处理好的数据,供下游使用,同时保持内存高效。
设想我们需要一个满足以下要求的组件:
此组件可作为机器学习管道的早期阶段,将数据输入到特征提取器或模型训练程序中,而无需将整个数据集加载到内存中。
让我们思考本章介绍的技巧如何帮助满足这些要求:
__enter__/__exit__ 方法的类来实现,或者更简单地使用 contextlib 模块中的 @contextmanager 装饰器。map 或生成器表达式可以按顺序将这些函数应用于数据流。让我们一步步构建这个组件。为求简洁,我们将采用基于函数的方法并结合 contextlib。
我们从一个接受文件路径并返回生成器的函数开始。我们使用 contextlib.contextmanager 来管理文件资源。
import csv
from contextlib import contextmanager
@contextmanager
def file_reader_context(file_path, **kwargs):
"""一个用于打开文件并提供文件行的上下文管理器。"""
try:
# 在进入上下文时打开文件
f = open(file_path, 'r', **kwargs)
print(f"DEBUG: 已打开文件: {file_path}")
yield f # 文件句柄被提供给 'with' 代码块
except FileNotFoundError:
print(f"错误: 未找到文件: {file_path}")
# 选项:重新抛出异常、返回空迭代器或以不同方式处理
yield iter([]) # 错误时提供空迭代器
except Exception as e:
print(f"打开或读取文件 {file_path} 时发生错误: {e}")
yield iter([]) # 其他错误时提供空迭代器
finally:
# 确保在退出上下文时关闭文件
if 'f' in locals() and f:
f.close()
print(f"DEBUG: 已关闭文件: {file_path}")
# 直接使用上下文管理器的例子
# with file_reader_context('data.csv') as file_handle:
# for line in file_handle:
# print(line.strip())
这个 file_reader_context 使用 @contextmanager 来管理文件的生命周期。它提供文件句柄 f,该句柄可以在 with 语句中进行迭代。其中包含了 FileNotFoundError 的错误处理。
让我们定义一些简单的清洗和转换示例函数。在实际场景中,这些函数会更复杂。
def strip_whitespace(row):
"""从列表中每个元素中移除前导/尾随空格。"""
return [str(item).strip() for item in row]
def convert_to_float(row, column_indices):
"""尝试将特定列转换为浮点数,并处理错误。"""
processed_row = list(row) # 创建一个可变副本
for index in column_indices:
if 0 <= index < len(processed_row):
try:
processed_row[index] = float(processed_row[index])
except (ValueError, TypeError):
# 处理转换错误,例如设置为 None 或 NaN
processed_row[index] = None
return processed_row
def filter_incomplete_rows(row, expected_length):
"""过滤掉列数不符合预期的行。"""
return row if len(row) == expected_length else None
请注意,filter_incomplete_rows 对于应丢弃的行返回 None。我们稍后需要处理这种情况。
现在,让我们将文件读取器上下文和处理函数组合到我们的主管道组件函数中。这个函数将接受文件路径和处理函数列表。
import csv
from contextlib import contextmanager
from functools import partial # 对于需要额外参数的函数很有用
# 假设 file_reader_context、strip_whitespace、convert_to_float、
# filter_incomplete_rows 已如上定义
def data_processing_pipeline(file_path, processing_steps, delimiter=','):
"""
创建一个生成器,读取文件并应用处理步骤。
参数:
file_path (str): 输入 CSV 文件的路径。
processing_steps (list): 要按顺序应用的函数列表。
delimiter (str): CSV 文件的分隔符。
提供:
处理后的数据行(例如,列表)。
"""
print(f"文件处理管道已启动: {file_path}")
with file_reader_context(file_path, encoding='utf-8') as file_handle:
# 创建一个 CSV 读取器生成器
reader = csv.reader(file_handle, delimiter=delimiter)
# 使用生成器表达式链接处理步骤
data_stream = reader
for step_func in processing_steps:
# 将每个处理函数应用于数据流
# 注意:map 将函数应用于 data_stream 提供的每个项
data_stream = map(step_func, data_stream)
# 添加最后的过滤步骤,以移除由过滤器引入的 None 值
processed_stream = (item for item in data_stream if item is not None)
# 从完全处理的数据流中提供各项
yield from processed_stream
print(f"文件处理管道已完成: {file_path}")
data_processing_pipeline 函数展示了几个重要理念:
with 语句中使用了 file_reader_context 以确保资源安全。csv.reader,csv.reader 本身就是一个迭代器。map 迭代地应用处理步骤。每个 map 对象都是一个迭代器,它根据前一步骤的请求惰性地处理数据项。这创建了一个处理链,而无需为整个数据集存储中间结果。(item for item in data_stream if item is not None) 过滤掉了可能由 filter_incomplete_rows 等过滤函数引入的任何 None 值。yield from 高效地提供最终处理流中的所有数据项。让我们定义一系列处理步骤并使用我们的组件。我们将使用 functools.partial 预先配置 convert_to_float 和 filter_incomplete_rows 等函数及其特定参数(column_indices、expected_length)。
from functools import partial
# 定义处理序列
# 假设我们的 CSV 有 3 列,并且我们想将第 1 列和第 2 列(索引从 0 开始)转换为浮点数。
processing_pipeline_steps = [
strip_whitespace,
partial(convert_to_float, column_indices=[1, 2]),
partial(filter_incomplete_rows, expected_length=3)
]
# 示例:创建一个用于测试的虚拟 CSV 文件
dummy_csv_content = """ header1, header2, header3
value1 , 1.0 , 2.5
value2 , 3.5 , invalid
value3,4.2, 5.1, extra_col
value4 , 6.0 , 7.8
"""
dummy_file_path = 'sample_data.csv'
with open(dummy_file_path, 'w') as f:
f.write(dummy_csv_content)
# 运行管道
processed_data_generator = data_processing_pipeline(
dummy_file_path,
processing_pipeline_steps
)
# 消费生成器中的数据
print("\n消费处理后的数据:")
try:
# 如果需要,跳过标题行(可作为另一个步骤添加)
header = next(processed_data_generator)
print(f"已跳过标题行: {header}")
for processed_row in processed_data_generator:
print(f"处理后的行: {processed_row}")
except StopIteration:
print("没有数据生成(或只找到标题行)。")
except Exception as e:
print(f"处理过程中发生错误: {e}")
# 清理虚拟文件(可选)
import os
#os.remove(dummy_file_path)
预期输出:
文件处理管道已启动: sample_data.csv
DEBUG: 已打开文件: sample_data.csv
消费处理后的数据:
已跳过标题行: ['header1', 'header2', 'header3']
处理后的行: ['value1', 1.0, 2.5]
处理后的行: ['value2', 3.5, None] # 'invalid' 因 convert_to_float 错误处理变为 None
处理后的行: ['value4', 6.0, 7.8] # 含有 'extra_col' 的行被过滤掉
DEBUG: 已关闭文件: sample_data.csv
文件处理管道已完成: sample_data.csv
这个实践示例说明了如何结合生成器、上下文管理器和函数式模式来构建一个灵活且内存高效的数据处理组件。
map 和生成器表达式定义的数据处理链逐项流动。data_processing_pipeline 函数是通用的。它可以处理不同的文件和不同组的处理函数。file_reader_context)、各个处理步骤(小型函数)以及管道编排(data_processing_pipeline)分离,使代码更易于理解、测试和修改。partial 允许预先配置函数,使 processing_pipeline_steps 列表整洁易读。可以轻松添加新步骤或重新排序现有步骤。此组件是一个构建单元。你可以通过添加更精细的错误日志记录、支持不同文件格式,或者集成并行处理技巧(课程后续会讲到)来进一步增强它,以处理 CPU 密集型转换。然而,其核心结构使用了高级 Python 构造来构建更好的机器学习管道。
这部分内容有帮助吗?
© 2026 ApX Machine Learning用心打造