趋近智
许多现代机器学习 (machine learning)应用,例如实时欺诈检测、个性化推荐和动态定价,都依赖于从持续到达的数据流中提取的特征。与从静态数据集定期计算的批处理特征不同,流式特征反映了系统或用户行为的最新状态,通常是在几秒或几分钟前发生的事件上进行计算的。将这些接近实时的特征整合到特征存储基础设施中,会带来特定的架构和计算难题。本节详细介绍了有效处理流数据以生成及时特征的模式和技术。
流式特征源自无边界数据源,通常是事件流,如用户点击、交易日志、物联网传感器读数或应用程序日志。使用它们的主要原因是低延迟。如果模型需要对事件实时做出反应,仅仅依赖于几小时甚至几分钟前计算的批处理特征通常是不够的。
请考虑以下场景:
融入流数据需要能够持续处理事件并以低延迟更新特征值的架构。
流数据如何流入特征存储系统并被其处理?存在多种模式,通常涉及专门的流处理引擎。
最简单的模型中,轻量级消费者可以直接从消息队列(如Kafka或Kinesis)读取数据,并将特征值直接写入低延迟的在线存储(例如Redis、Cassandra)。
这是更复杂的流式特征的普遍模式。一个专门的流处理引擎位于原始事件流和特征存储之间。
使用集成到特征存储的专用流处理器进行流式特征计算的数据流。
工作流程:
一些系统使用微批处理框架(如Spark Streaming的原始DStream API或频繁运行的自定义批处理作业)以小而离散的时间间隔(例如,每分钟)处理数据。
这些模式的选择取决于特征转换的复杂性、延迟要求、现有基础设施和团队专业知识。对于高级特征存储,使用专用流处理引擎的模式2通常是最灵活和可扩展的方法。
流式转换可分为无状态或有状态。
无状态转换: 这些转换独立地作用于每个事件,无需来自先前事件的信息(可能除了固定的查找数据)。示例包括:
有状态转换: 这些转换需要在多个事件之间维护状态,通常基于键(如用户ID)或时间窗口。这是流处理的能力和复杂性所在。示例包括:
实现有状态转换需要仔细考量:
在此背景下,流处理器的一个主要功能是更新特征存储目标:
UPDATE user_features SET click_count_5m = 12 WHERE user_id = 'abc')。数据库选择和数据建模会影响写入性能。Redis、DynamoDB或Cassandra等技术是常见的选择。确保从流更新的在线和离线视图之间的一致性可能具有挑战性,特别是对于复杂聚合或处理迟到数据时。这一方面将在第3章中更详细地讨论。
选择合适的技术很重要:
这是一个PyFlink示例,说明了数据流:从Kafka读取,执行一个简单的窗口计数,并打印(代表写入特征存储接收器)。
from pyflink.common import Types
from pyflink.datastream import StreamExecutionEnvironment, TimeCharacteristic
from pyflink.datastream.window import TumblingEventTimeWindows
from pyflink.common.time import Time
from pyflink.datastream.connectors.kafka import FlinkKafkaConsumer
from pyflink.common.serialization import SimpleStringSchema
# 假设存在自定义的 FeatureStoreSink
# 1. 设置执行环境
env = StreamExecutionEnvironment.get_execution_environment()
env.set_stream_time_characteristic(TimeCharacteristic.EventTime)
# 配置检查点以实现容错(细节已省略)
# env.enable_checkpointing(...)
# 2. 配置Kafka源
kafka_props = {
'bootstrap.servers': 'kafka:9092',
'group.id': 'feature_processor_group'
}
# 假设JSON字符串类似:{"user_id": "xyz", "event_type": "click", "timestamp": 1678886400000}
kafka_consumer = FlinkKafkaConsumer(
'user_events',
SimpleStringSchema(),
kafka_props
)
# 根据JSON中的'timestamp'字段分配时间戳和水印(细节已省略)
# source_stream = env.add_source(kafka_consumer).assign_timestamps_and_watermarks(...)
source_stream = env.add_source(kafka_consumer)
# 3. 定义转换(示例:每个用户的5分钟点击计数)
def parse_event(json_string):
# 基本解析 - 实际实现需要错误处理
import json
try:
data = json.loads(json_string)
# 假设事件时间已在之前提取和分配
return (data.get('user_id'), 1) # (用户ID, 计数=1)
except:
return (None, 0)
feature_stream = source_stream \
.map(parse_event, output_type=Types.TUPLE([Types.STRING(), Types.INT()])) \
.filter(lambda x: x[0] is not None) \
.key_by(lambda x: x[0]) \
.window(TumblingEventTimeWindows.of(Time.minutes(5))) \
.reduce(lambda a, b: (a[0], a[1] + b[1])) # 在窗口内求和计数
# 4. 汇入特征存储
# 在实际场景中,您将使用自定义的SinkFunction或连接器
# feature_store_sink = FeatureStoreSink(online_config=..., offline_config=...)
# feature_stream.add_sink(feature_store_sink)
# 仅用于演示,进行打印
feature_stream.print() # 替换为实际的Sink
# 5. 执行作业
env.execute("Streaming Feature Engineering Job")
此示例展示了按user_id对流进行键控,基于事件时间应用5分钟滚动窗口,并对每个用户在每个窗口内的计数求和。结果随后将发送到负责更新在线和离线存储的接收器。
成功实现流式特征需要处理一些操作复杂性:
处理流式特征是一项复杂的能力,它显著增强了特征存储的功能,使得机器学习 (machine learning)模型能够对近实时事件做出反应。尽管与批处理相比,它引入了操作复杂性,但掌握流处理引擎的集成能让您构建高度响应且富有影响力的机器学习系统。
这部分内容有帮助吗?
© 2026 ApX Machine LearningAI伦理与透明度•