本节提供一个动手练习,以巩固您在特征平台环境中实现复杂特征转换的理解。我们将处理简单的查找或聚合之外的场景,涉及多个数据源、自定义逻辑和时间敏感的计算,这反映了生产机器学习系统中遇到的挑战。您将应用本章前面讨论的思想,例如转换流水线、流式特征处理和管理不同的计算周期。我们将模拟一个常见情况:通过结合静态用户画像信息与实时交易活动来增强用户特征。我们的目标是计算能够捕获近期用户行为模式的特征,这些特征在欺诈检测或推荐系统等方面通常具有高预测性。场景:实时用户活动特征假设我们有两个主要数据源:用户画像: 一个批处理源(例如,以 Parquet 文件形式存储在 S3 或 GCS 等数据湖中),包含相对静态的用户信息,例如账户创建日期、用户细分和可能的人口统计数据。此数据不常更新(例如,每日更新)。用户交易: 一个流式源(例如,Kafka、Kinesis),提供有关用户购买、登录或其他交互的实时事件。每个事件包含用户ID、时间戳、交易金额和事件类型。我们的目标是创建结合来自两个源的信息并需要非平凡计算的派生特征,例如:时间窗口聚合: 过去1小时、24小时和7天内的平均交易金额。频率计数: 过去1小时和24小时内的交易数量。组合特征: 一个比率,比较用户近期活动(例如,过去一小时的平均交易金额)与其历史细分群体的平均值或特定画像属性。数据流该过程通常涉及定义一个特征视图或转换函数,以协调以下步骤:定义源: 指定批处理源(用户画像)和流式源(用户交易)。处理流: 对传入的交易流应用转换。这通常涉及时间窗口聚合。查找批处理数据: 对于正在处理的每个实体(用户ID),从批处理源中获取相关的静态特征。组合与计算: 将聚合的流特征与批处理特征连接。应用自定义 Python 函数或类 SQL 表达式来计算最终的派生特征。物化: 配置特征平台将计算出的特征写入在线存储(用于低延迟服务)和离线存储(用于训练数据生成和分析)。digraph G { rankdir=LR; node [shape=box, style=filled, fillcolor="#dee2e6", fontname="Arial"]; edge [fontname="Arial"]; subgraph cluster_sources { label = "数据源"; style=filled; fillcolor="#e9ecef"; node [fillcolor="#ced4da"]; batch_source [label="用户画像\n(批处理源)", shape=cylinder]; stream_source [label="用户交易\n(流式源)", shape=cylinder]; } subgraph cluster_compute { label = "特征平台计算"; style=filled; fillcolor="#e9ecef"; node [fillcolor="#a5d8ff"]; stream_agg [label="时间窗口\n聚合"]; lookup [label="批处理特征\n查找"]; combine [label="连接与自定义\n转换"]; } subgraph cluster_storage { label = "特征存储"; style=filled; fillcolor="#e9ecef"; node [fillcolor="#b2f2bb", shape=cylinder]; online_store [label="在线存储"]; offline_store [label="离线存储"]; } stream_source -> stream_agg [label=" 事件 "]; stream_agg -> combine [label=" 聚合\n特征 "]; batch_source -> lookup [label=" 静态\n特征 "]; lookup -> combine; combine -> online_store [label=" 物化 "]; combine -> offline_store [label=" 物化 "]; }计算组合流式和批处理特征的数据流。实现转换尽管具体语法很大程度上取决于所选的特征平台框架(例如 Feast、Tecton、Vertex AI 特征平台、SageMaker 特征平台),但核心思想保持一致。让我们使用基于 Python 的伪代码进行说明,假设有一个特征平台客户端库。1. 定义数据源首先,我们声明我们的源。这通常涉及将特征平台指向数据的位置和格式。# 假设 'fs' 是一个已初始化的特征平台客户端 # 批处理源:存储在 S3 中的用户画像 user_profiles_source = fs.define_batch_source( name="user_profiles_batch", path="s3://my-data-bucket/user_profiles/", format="parquet", timestamp_field="profile_update_ts", created_timestamp_column="account_created_ts" ) # 流式源:来自 Kafka 的用户交易 user_transactions_source = fs.define_stream_source( name="user_transactions_stream", kafka_bootstrap_servers="kafka.example.com:9092", topic="user_transactions", format="json", timestamp_field="event_timestamp" )2. 定义时间窗口聚合我们在流式源上定义转换,以在特定时间窗口内计算聚合。# 定义交易流上的聚合 # 假设模式类似 {'user_id': ..., 'event_timestamp': ..., 'amount': ...} transaction_aggregations = fs.aggregate_stream( source=user_transactions_source, entity_key="user_id", aggregations=[ fs.Aggregation(column="amount", function="mean", time_window="1h"), fs.Aggregation(column="amount", function="mean", time_window="24h"), fs.Aggregation(column="*", function="count", time_window="1h"), fs.Aggregation(column="*", function="count", time_window="24h"), ], # 定义如何处理延迟数据(例如,允许15分钟延迟) event_timestamp_column="event_timestamp", watermark_delay="15m" ) # 输出可能是以下特征: # amount_mean_1h, amount_mean_24h, transaction_count_1h, transaction_count_24h3. 定义组合源的自定义转换现在,我们使用自定义转换函数将聚合的流特征与静态画像特征结合。此函数接收来自不同源的对应实体(用户)的数据,并执行最终计算。# 定义所需的用户画像特征 user_profile_features = fs.select_features( source=user_profiles_source, features=["user_segment", "account_age_days"] # account_age_days 可能每日预计算 ) # 定义自定义转换逻辑 @fs.transformation(sources=[transaction_aggregations, user_profile_features]) def compute_advanced_user_features(agg_data, profile_data): """ 结合交易聚合数据和用户画像数据。 参数: agg_data (DataFrame/Dict): 来自流式源的聚合数据。 包含 'user_id'、'amount_mean_1h' 等。 profile_data (DataFrame/Dict): 来自批处理源的静态数据。 包含 'user_id'、'user_segment'、'account_age_days'。 返回: DataFrame/Dict: 用户的最终计算特征。 """ # 示例:计算近期消费与账户年龄的比率 # 处理可能的除零或数据缺失情况 if profile_data["account_age_days"] and profile_data["account_age_days"] > 0: spending_per_day_1h = (agg_data.get("amount_mean_1h", 0) * 24) / profile_data["account_age_days"] else: spending_per_day_1h = 0.0 # 或者 None,根据下游要求 # 示例:计算活动突发性(过去一小时的计数与过去24小时的计数) count_1h = agg_data.get("transaction_count_1h", 0) count_24h = agg_data.get("transaction_count_24h", 0) activity_burst_ratio = count_1h / count_24h if count_24h > 0 else 0.0 return { "user_id": agg_data["user_id"], # 确保返回实体键 "avg_spending_1h": agg_data.get("amount_mean_1h"), "avg_spending_24h": agg_data.get("amount_mean_24h"), "transaction_count_1h": count_1h, "transaction_count_24h": count_24h, "spending_rate_1h": spending_per_day_1h, "activity_burst_ratio": activity_burst_ratio, "user_segment": profile_data.get("user_segment") # 传递画像特征 } # 将此转换注册为特征视图或类似的功能 advanced_user_feature_view = fs.register_feature_view( name="advanced_user_features", entities=["user_id"], transformation=compute_advanced_user_features, # 定义物化目标(在线/离线存储) online_store=my_online_store, offline_store=my_offline_store, ttl="90d" # 离线存储的存活时间 )此示例使用 Python 函数,但许多特征平台允许 SQL 转换或与 Spark 或 Flink 等分布式计算框架集成,以大规模执行这些流水线。测试与验证实现复杂转换需要全面测试:单元测试: 隔离地测试自定义转换函数(在我们的示例中为 compute_advanced_user_features),使用模拟输入数据代表各种情况(例如,缺失画像数据、零交易、计算中的边缘情况)。集成测试: 使用加载到批处理和流式源测试版本中的样本数据,测试端到端流水线。验证特征是否正确计算并物化到目标存储中。数据验证: 纳入数据验证步骤(正如第3章所讨论的),以检查计算出的特征的分布和范围,确保它们在服务或用于训练之前符合预期。动手任务现在,轮到您实现一组复杂转换了。假设您有权访问特征平台环境(这可以是使用 Docker 和 Feast 的本地设置,或云服务商的平台沙箱)。提供数据(模拟):user_profiles.csv:包含 user_id、account_created_ts、user_segment。user_transactions.csv:包含 user_id、event_timestamp、amount、transaction_type。您可以将其视为用于模拟流的批处理文件,或配置一个工具逐行喂入。您的任务:设置数据源: 配置您的特征平台环境,以识别提供的 CSV 文件为批处理和/或流式源。您可能需要预处理 user_profiles.csv 以根据参考日期计算 account_age_days。实现转换: 使用您所选特征平台框架提供的工具,实现逻辑以计算每个 user_id 的以下特征:transaction_count_1h:过去一小时的交易计数(流式聚合)。transaction_count_7d:过去7天的交易计数(流式聚合)。avg_amount_1h:过去一小时的平均交易金额(流式聚合)。activity_ratio_1h_vs_7d:比率 transaction_count_1h / (transaction_count_7d / 7.0)。适当处理除零情况(例如,返回 0 或 null)。这结合了多个流聚合。normalized_avg_amount_1h:avg_amount_1h 除以用户的 account_age_days(如果可用且大于0,否则适当处理)。这结合了流聚合和批处理特征。注册与物化: 定义一个特征视图(或类似的功能),其中包含这些转换。配置它将特征物化到在线和离线目标(如果您的环境支持)。验证:触发计算(例如,使用 feature_store apply 或运行物化作业)。查询在线存储中几个特定的 user_id,以检查特征是否可用且具有合理的值。查询或检查离线存储,以验证历史特征值是否正确生成。"本练习将为您提供实践经验,包括定义数据源、实现涉及聚合和自定义逻辑的多步转换,并将这些计算集成到特征平台框架中,从而为您在系统中构建复杂的特征工程流水线做好准备。"