趋近智
本节提供一个动手练习,以巩固您在特征平台环境中实现复杂特征转换的理解。我们将处理简单的查找或聚合之外的场景,涉及多个数据源、自定义逻辑和时间敏感的计算,这反映了生产机器学习 (machine learning)系统中遇到的挑战。您将应用本章前面讨论的思想,例如转换流水线、流式特征处理和管理不同的计算周期。
我们将模拟一个常见情况:通过结合静态用户画像信息与实时交易活动来增强用户特征。我们的目标是计算能够捕获近期用户行为模式的特征,这些特征在欺诈检测或推荐系统等方面通常具有高预测性。
假设我们有两个主要数据源:
我们的目标是创建结合来自两个源的信息并需要非平凡计算的派生特征,例如:
该过程通常涉及定义一个特征视图或转换函数,以协调以下步骤:
计算组合流式和批处理特征的数据流。
尽管具体语法很大程度上取决于所选的特征平台框架(例如 Feast、Tecton、Vertex AI 特征平台、SageMaker 特征平台),但核心思想保持一致。让我们使用基于 Python 的伪代码进行说明,假设有一个特征平台客户端库。
首先,我们声明我们的源。这通常涉及将特征平台指向数据的位置和格式。
# 假设 'fs' 是一个已初始化的特征平台客户端
# 批处理源:存储在 S3 中的用户画像
user_profiles_source = fs.define_batch_source(
name="user_profiles_batch",
path="s3://my-data-bucket/user_profiles/",
format="parquet",
timestamp_field="profile_update_ts",
created_timestamp_column="account_created_ts"
)
# 流式源:来自 Kafka 的用户交易
user_transactions_source = fs.define_stream_source(
name="user_transactions_stream",
kafka_bootstrap_servers="kafka.example.com:9092",
topic="user_transactions",
format="json",
timestamp_field="event_timestamp"
)
我们在流式源上定义转换,以在特定时间窗口内计算聚合。
# 定义交易流上的聚合
# 假设模式类似 {'user_id': ..., 'event_timestamp': ..., 'amount': ...}
transaction_aggregations = fs.aggregate_stream(
source=user_transactions_source,
entity_key="user_id",
aggregations=[
fs.Aggregation(column="amount", function="mean", time_window="1h"),
fs.Aggregation(column="amount", function="mean", time_window="24h"),
fs.Aggregation(column="*", function="count", time_window="1h"),
fs.Aggregation(column="*", function="count", time_window="24h"),
],
# 定义如何处理延迟数据(例如,允许15分钟延迟)
event_timestamp_column="event_timestamp",
watermark_delay="15m"
)
# 输出可能是以下特征:
# amount_mean_1h, amount_mean_24h, transaction_count_1h, transaction_count_24h
现在,我们使用自定义转换函数将聚合的流特征与静态画像特征结合。此函数接收来自不同源的对应实体(用户)的数据,并执行最终计算。
# 定义所需的用户画像特征
user_profile_features = fs.select_features(
source=user_profiles_source,
features=["user_segment", "account_age_days"] # account_age_days 可能每日预计算
)
# 定义自定义转换逻辑
@fs.transformation(sources=[transaction_aggregations, user_profile_features])
def compute_advanced_user_features(agg_data, profile_data):
"""
结合交易聚合数据和用户画像数据。
参数:
agg_data (DataFrame/Dict): 来自流式源的聚合数据。
包含 'user_id'、'amount_mean_1h' 等。
profile_data (DataFrame/Dict): 来自批处理源的静态数据。
包含 'user_id'、'user_segment'、'account_age_days'。
返回:
DataFrame/Dict: 用户的最终计算特征。
"""
# 示例:计算近期消费与账户年龄的比率
# 处理可能的除零或数据缺失情况
if profile_data["account_age_days"] and profile_data["account_age_days"] > 0:
spending_per_day_1h = (agg_data.get("amount_mean_1h", 0) * 24) / profile_data["account_age_days"]
else:
spending_per_day_1h = 0.0 # 或者 None,根据下游要求
# 示例:计算活动突发性(过去一小时的计数与过去24小时的计数)
count_1h = agg_data.get("transaction_count_1h", 0)
count_24h = agg_data.get("transaction_count_24h", 0)
activity_burst_ratio = count_1h / count_24h if count_24h > 0 else 0.0
return {
"user_id": agg_data["user_id"], # 确保返回实体键
"avg_spending_1h": agg_data.get("amount_mean_1h"),
"avg_spending_24h": agg_data.get("amount_mean_24h"),
"transaction_count_1h": count_1h,
"transaction_count_24h": count_24h,
"spending_rate_1h": spending_per_day_1h,
"activity_burst_ratio": activity_burst_ratio,
"user_segment": profile_data.get("user_segment") # 传递画像特征
}
# 将此转换注册为特征视图或类似的功能
advanced_user_feature_view = fs.register_feature_view(
name="advanced_user_features",
entities=["user_id"],
transformation=compute_advanced_user_features,
# 定义物化目标(在线/离线存储)
online_store=my_online_store,
offline_store=my_offline_store,
ttl="90d" # 离线存储的存活时间
)
此示例使用 Python 函数,但许多特征平台允许 SQL 转换或与 Spark 或 Flink 等分布式计算框架集成,以大规模执行这些流水线。
实现复杂转换需要全面测试:
compute_advanced_user_features),使用模拟输入数据代表各种情况(例如,缺失画像数据、零交易、计算中的边缘情况)。现在,轮到您实现一组复杂转换了。假设您有权访问特征平台环境(这可以是使用 Docker 和 Feast 的本地设置,或云服务商的平台沙箱)。
提供数据(模拟):
user_profiles.csv:包含 user_id、account_created_ts、user_segment。user_transactions.csv:包含 user_id、event_timestamp、amount、transaction_type。您可以将其视为用于模拟流的批处理文件,或配置一个工具逐行喂入。您的任务:
user_profiles.csv 以根据参考日期计算 account_age_days。user_id 的以下特征:
transaction_count_1h:过去一小时的交易计数(流式聚合)。transaction_count_7d:过去7天的交易计数(流式聚合)。avg_amount_1h:过去一小时的平均交易金额(流式聚合)。activity_ratio_1h_vs_7d:比率 transaction_count_1h / (transaction_count_7d / 7.0)。适当处理除零情况(例如,返回 0 或 null)。这结合了多个流聚合。normalized_avg_amount_1h:avg_amount_1h 除以用户的 account_age_days(如果可用且大于0,否则适当处理)。这结合了流聚合和批处理特征。feature_store apply 或运行物化作业)。user_id,以检查特征是否可用且具有合理的值。"本练习将为您提供实践经验,包括定义数据源、实现涉及聚合和自定义逻辑的多步转换,并将这些计算集成到特征平台框架中,从而为您在系统中构建复杂的特征工程流水线做好准备。"
这部分内容有帮助吗?
© 2026 ApX Machine LearningAI伦理与透明度•