将搭建一个数据管道,摄取原始数据,应用一组已定义的转换,并将生成的特征保存到结构化的、基于文件的特征存储中。这个过程为特征建立了一个单一、可靠的真实数据来源,构成了对抗训练-服务偏差的第一道防线。目标是集中特征创建逻辑,确保转换函数(我们称之为 $f$)为训练数据 ($x_{train}$) 和服务数据 ($x_{serve}$) 产生相同输出。我们的管道将是一个Python脚本,它可以手动执行,作为cron作业调度,或集成到KubeFlow或Airflow等更大的编排系统中。尽管为了简单起见我们将使用pandas库,但其核心逻辑设计为可移植到Spark或Ray等分布式框架。场景:网约车行程时长特征假设我们正在构建一个模型来预测出租车行程的时长。我们的原始数据源是Parquet文件集合,每个文件包含已完成行程的记录。我们的任务是摄取这些数据并计算一组模型将用于训练的特征。原始数据模式:trip_id: 行程的唯一标识符。pickup_datetime_utc: 行程开始时的时间戳。dropoff_datetime_utc: 行程结束时的时间戳。passenger_count: 乘客数量。start_location_id: 上车区域的标识符。目标特征:trip_duration_minutes: 行程总时长,以分钟计。hour_of_day: 行程开始的小时(0-23)。day_of_week: 行程开始的星期几(0=周一,6=周日)。is_weekend: 一个布尔标志,表示行程是否在周末开始。步骤1:定义核心转换逻辑本次练习最重要的一部分是将特征工程逻辑分离到其自己的可重用模块中。这种分离实现了不同环境之间的一致性。让我们创建一个名为 feature_transforms.py 的文件。# feature_transforms.py import pandas as pd def compute_trip_features(df: pd.DataFrame) -> pd.DataFrame: """ 根据原始数据DataFrame计算行程相关特征。 此函数包含我们特征的权威逻辑。 它可以被我们的批处理管道、流处理作业、 或在线推理服务导入使用。 """ # 确保日期时间列格式正确 df['pickup_datetime_utc'] = pd.to_datetime(df['pickup_datetime_utc']) df['dropoff_datetime_utc'] = pd.to_datetime(df['dropoff_datetime_utc']) # 特征1:行程时长 duration = df['dropoff_datetime_utc'] - df['pickup_datetime_utc'] df['trip_duration_minutes'] = duration.dt.total_seconds() / 60.0 # 特征2:一天中的小时 df['hour_of_day'] = df['pickup_datetime_utc'].dt.hour # 特征3:星期几 df['day_of_week'] = df['pickup_datetime_utc'].dt.dayofweek # 周一=0,周日=6 # 特征4:是否为周末 df['is_weekend'] = (df['pickup_datetime_utc'].dt.dayofweek >= 5) # 选择并只返回特征列和键 feature_df = df[[ 'trip_id', 'pickup_datetime_utc', 'trip_duration_minutes', 'hour_of_day', 'day_of_week', 'is_weekend' ]].copy() # pickup_datetime_utc 保留作为事件时间戳,以确保时间点准确性 return feature_df此函数是自包含且可测试的。它接收一个带有原始模式的DataFrame,并返回一个只包含主键 (trip_id)、事件时间戳和计算特征的新DataFrame。步骤2:创建摄取脚本接下来,我们创建编排摄取过程的脚本。该脚本将处理读取原始数据,应用来自 feature_transforms.py 的转换逻辑,并将输出写入结构化位置。我们将按日期构建输出目录,这是数据湖中管理时间序列数据的常见模式。创建一个名为 ingest_features.py 的文件。# ingest_features.py import argparse import pandas as pd from pathlib import Path from feature_transforms import compute_trip_features def run_ingestion(raw_data_path: str, feature_store_path: str): """ 为单个原始数据文件运行特征摄取管道。 """ print(f"正在从以下路径读取原始数据: {raw_data_path}") raw_df = pd.read_parquet(raw_data_path) print("正在计算特征...") features_df = compute_trip_features(raw_df) # 使用上车时间戳的日期来划分数据 # 在实际系统中,您需要处理跨多天的数据 event_date = features_df['pickup_datetime_utc'].iloc[0].strftime('%Y-%m-%d') output_dir = Path(feature_store_path) / f"event_date={event_date}" output_dir.mkdir(parents=True, exist_ok=True) output_path = output_dir / "features.parquet" print(f"正在将特征写入: {output_path}") features_df.to_parquet(output_path, index=False) print("摄取完成。") if __name__ == "__main__": parser = argparse.ArgumentParser(description="特征摄取管道") parser.add_argument( "--raw-data-path", type=str, required=True, help="原始输入数据Parquet文件的路径。" ) parser.add_argument( "--feature-store-path", type=str, required=True, help="输出特征存储的根路径。" ) args = parser.parse_args() run_ingestion(args.raw_data_path, args.feature_store_path)特征摄取流程我们刚刚定义的过程可以可视化为一个简单的、从原始数据源到结构化特征存储的定向流程。digraph G { rankdir=TB; node [shape=box, style="rounded,filled", fontname="Helvetica", fillcolor="#e9ecef"]; edge [fontname="Helvetica"]; subgraph cluster_source { label = "原始数据源"; style = "filled,dashed"; fillcolor = "#a5d8ff40"; raw_data [label="raw_trips_2023-10-26.parquet", shape=note, fillcolor="#bac8ff"]; } subgraph cluster_process { label = "摄取过程"; style = "filled,dashed"; fillcolor = "#96f2d740"; ingest_script [label="ingest_features.py", fillcolor="#b2f2bb"]; transform_logic [label="feature_transforms.py\n(共享逻辑)", fillcolor="#fab005"]; } subgraph cluster_store { label = "基于文件的特征存储"; style = "filled,dashed"; fillcolor = "#ffc9c940"; feature_store [label="features.parquet", shape=cylinder, fillcolor="#ffc9c9"]; partition [label="event_date=2023-10-26", shape=folder, fillcolor="#ffd8a8"]; } raw_data -> ingest_script [label=" 读取"]; ingest_script -> transform_logic [label=" 导入并调用"]; ingest_script -> partition [label=" 写入到"]; partition -> feature_store; }摄取管道读取原始数据,应用共享转换逻辑模块,并将分区特征数据写入特征存储。步骤3:运行管道要执行此管道,您首先需要创建一些示例原始数据。# 创建一个用于测试的示例原始数据文件 import pandas as pd import numpy as np data = { 'trip_id': range(100, 105), 'pickup_datetime_utc': pd.to_datetime(['2023-10-26 08:05:00', '2023-10-26 08:10:00', '2023-10-26 18:30:00', '2023-10-28 11:00:00', '2023-10-28 23:15:00']), 'dropoff_datetime_utc': pd.to_datetime(['2023-10-26 08:25:00', '2023-10-26 08:22:00', '2023-10-26 18:45:00', '2023-10-28 11:18:00', '2023-10-28 23:55:00']), 'passenger_count': [1, 2, 1, 3, 1], 'start_location_id': [10, 12, 5, 22, 50] } sample_df = pd.DataFrame(data) sample_df.to_parquet('raw_trips.parquet')现在,您可以从终端运行摄取脚本。我们将指定一个名为 feature_store 的目录作为输出位置。# 创建输出目录 mkdir -p feature_store # 运行摄取脚本 python ingest_features.py \ --raw-data-path raw_trips.parquet \ --feature-store-path feature_store运行脚本后,您的目录结构将如下所示:. ├── feature_store/ │ └── event_date=2023-10-26/ │ └── features.parquet ├── raw_trips.parquet ├── ingest_features.py └── feature_transforms.py注意: 我们的简单脚本按第一条记录的日期划分所有记录。生产系统需要处理跨多天的数据,按日期分组并写入相应的分区。让我们检查输出以验证结果:import pandas as pd # 加载我们刚刚创建的特征数据 output_df = pd.read_parquet('feature_store/event_date=2023-10-26/features.parquet') print(output_df)输出将是一个整洁的、经过特征工程处理的表格: trip_id pickup_datetime_utc trip_duration_minutes hour_of_day day_of_week is_weekend 0 100 2023-10-26 08:05:00 20.0 8 3 False 1 101 2023-10-26 08:10:00 12.0 8 3 False 2 102 2023-10-26 18:30:00 15.0 18 3 False 3 103 2023-10-28 11:00:00 18.0 11 5 True 4 104 2023-10-28 23:15:00 40.0 23 5 True从基本管道到生产系统您已成功构建了一个特征摄取管道。这种简单结构是机器学习生产级数据系统的组成部分。防止偏差: 这里的核心成果是 feature_transforms.py 模块。训练管道将从 feature_store 目录读取数据。在线推理服务在接收到新的行程请求时,会导入并调用完全相同的 compute_trip_features 函数来处理输入数据,以实时生成特征。这确保了 $f(x_{train})$ 和 $f(x_{serve})$ 是相同的。版本控制: 下一个合理的步骤是将整个目录置于版本控制之下。使用DVC等工具,您可以追踪 feature_transforms.py、ingest_features.py 和生成的 feature_store 数据的版本。这为您提供了完全的可复现性。可伸缩性: 如果 raw_trips.parquet 增长到数TB,基于pandas的脚本将无法运行。然而,compute_trip_features 中的逻辑几乎可以直接转换为Spark的用户定义函数 (UDF) 或Ray的远程任务。架构模式保持不变;只有执行引擎发生改变。本次练习表明,一个良好设计的数据系统优先考虑其转换逻辑的集中化和可重用性。通过这样做,您可以为整个机器学习生命周期构建一个可靠的根基。