许多现代机器学习应用,例如实时欺诈检测、个性化推荐和动态定价,都依赖于从持续到达的数据流中提取的特征。与从静态数据集定期计算的批处理特征不同,流式特征反映了系统或用户行为的最新状态,通常是在几秒或几分钟前发生的事件上进行计算的。将这些接近实时的特征整合到特征存储基础设施中,会带来特定的架构和计算难题。本节详细介绍了有效处理流数据以生成及时特征的模式和技术。流式特征的需求流式特征源自无边界数据源,通常是事件流,如用户点击、交易日志、物联网传感器读数或应用程序日志。使用它们的主要原因是低延迟。如果模型需要对事件实时做出反应,仅仅依赖于几小时甚至几分钟前计算的批处理特征通常是不够的。请考虑以下场景:欺诈检测: 识别欺诈性交易需要分析最近几分钟或几秒内的活动模式。实时竞价 (RTB): 决定广告展示的最佳出价需要反映用户最近浏览行为的特征。系统监控: 检测应用程序性能异常可能需要对以秒为单位的滑动时间窗口内的指标进行聚合。融入流数据需要能够持续处理事件并以低延迟更新特征值的架构。流式特征摄入的架构模式流数据如何流入特征存储系统并被其处理?存在多种模式,通常涉及专门的流处理引擎。模式1:直接摄入在线存储(复杂特征较少用)最简单的模型中,轻量级消费者可以直接从消息队列(如Kafka或Kinesis)读取数据,并将特征值直接写入低延迟的在线存储(例如Redis、Cassandra)。优点: 对于非常基本的特征(例如,用户最后已知位置)来说很简单。如果不需要复杂的处理,基础设施开销较低。缺点: 难以处理有状态转换(如窗口聚合)。将转换逻辑分散到许多小型消费者中。不利于填充离线存储以保持训练数据一致性。主要适用于事件负载本身就是特征值的“最新值”类型特征。模式2:流处理引擎集成(最常见)这是更复杂的流式特征的普遍模式。一个专门的流处理引擎位于原始事件流和特征存储之间。digraph G { rankdir=LR; node [shape=box, style=rounded, fontname="sans-serif", fontsize=10, color="#495057", fontcolor="#495057"]; edge [fontname="sans-serif", fontsize=9, color="#868e96", fontcolor="#868e96"]; subgraph cluster_source { label="数据源"; style=dashed; color="#adb5bd"; raw_stream [label="原始事件流\n(例如,Kafka, Kinesis)", shape=cylinder, color="#1c7ed6", fontcolor="#1c7ed6"]; } subgraph cluster_processing { label="处理"; style=dashed; color="#adb5bd"; stream_processor [label="流处理器\n(例如,Flink, Spark Streaming,\nKafka Streams, Dataflow)", color="#7048e8", fontcolor="#7048e8"]; } subgraph cluster_feature_store { label="特征存储"; style=dashed; color="#adb5bd"; online_store [label="在线存储\n(低延迟)", color="#12b886", fontcolor="#12b886"]; offline_store [label="离线存储\n(批处理/训练)", color="#f76707", fontcolor="#f76707"]; } raw_stream -> stream_processor [label=" 消费 "]; stream_processor -> online_store [label=" 更新特征 "]; stream_processor -> offline_store [label=" 持久化特征 "]; }使用集成到特征存储的专用流处理器进行流式特征计算的数据流。工作流程:摄入: 原始事件流入持久化消息队列(例如,Kafka、AWS Kinesis、Google Pub/Sub)。处理: 流处理引擎(例如,Apache Flink、Spark Streaming、Kafka Streams、Beam/Dataflow、ksqlDB)从队列消费事件。转换: 引擎执行必要的转换。这些可以是无状态的(过滤、映射)或有状态的(窗口聚合、会话化)。物化: 计算出的特征值被写入到:在线存储: 用于低延迟服务(例如,在Redis中更新用户的5分钟点击计数)。离线存储: 用于持久化、训练数据生成和一致性(例如,将聚合的特征值追加到S3中的Parquet文件或Delta Lake表中)。优点: 处理复杂的有状态转换。集中转换逻辑。同时填充在线和离线存储,有助于保持一致性。可扩展且容错(借助引擎能力)。缺点: 引入了另一个需要管理的系统组件。需要流处理框架方面的专业知识。与直接摄入相比,可能存在稍高的延迟(尽管通常可以忽略不计)。模式3:近实时微批处理一些系统使用微批处理框架(如Spark Streaming的原始DStream API或频繁运行的自定义批处理作业)以小而离散的时间间隔(例如,每分钟)处理数据。优点: 可以重用现有的批处理代码和基础设施。在某些情况下,与纯流式处理相比,状态管理更简单。缺点: 根据批处理间隔引入固有延迟。可能不适用于需要亚秒级新鲜度的应用。对于高吞吐量、低延迟任务,可能不如纯流式引擎的资源效率高。这些模式的选择取决于特征转换的复杂性、延迟要求、现有基础设施和团队专业知识。对于高级特征存储,使用专用流处理引擎的模式2通常是最灵活和可扩展的方法。实现流式转换流式转换可分为无状态或有状态。无状态转换: 这些转换独立地作用于每个事件,无需来自先前事件的信息(可能除了固定的查找数据)。示例包括:解析事件字段(例如,从JSON负载中提取用户ID)。过滤事件(例如,只保留“购买”事件)。简单的数据丰富(例如,使用广播查找表根据产品ID添加产品类别信息)。 这些通常在任何流处理框架中都很容易实现。有状态转换: 这些转换需要在多个事件之间维护状态,通常基于键(如用户ID)或时间窗口。这是流处理的能力和复杂性所在。示例包括:窗口聚合: 计算时间窗口内的指标(例如,过去10分钟内的平均交易金额)。这是一个基本的使用场景,将在“大规模时间窗口聚合”一节中详细介绍。流处理器提供了定义窗口(滚动、滑动、会话)和管理相关状态的机制。会话化: 将属于同一用户会话的事件分组,通常由不活动期间定义。模式检测: 识别特定的事件序列(例如,登录 -> 添加到购物车 -> 放弃)。计数不同项: 在时间窗口内近似计算唯一元素的数量(例如,HyperLogLog)。实现有状态转换需要仔细考量:状态管理: 状态存储在哪里以及如何存储(内存、磁盘、分布式文件系统)。像Flink这样的流处理器提供带检查点功能的后端存储以实现容错。时间语义: 使用事件时间(事件发生的时间)而非处理时间(事件被处理的时间)对于准确结果非常重要,特别是对于乱序或迟到数据。水印策略用于跟踪事件时间进度。容错性: 使用检查点或保存点等机制,确保在故障后能够准确恢复状态。从流更新在线和离线存储在此背景下,流处理器的一个主要功能是更新特征存储目标:在线存储: 更新必须是低延迟的。处理器通常对单个键执行点写入或更新(例如,UPDATE user_features SET click_count_5m = 12 WHERE user_id = 'abc')。数据库选择和数据建模会影响写入性能。Redis、DynamoDB或Cassandra等技术是常见的选择。离线存储: 目标是适合批处理和模型训练的持久化存储。处理后的特征通常以批处理方式(例如,每隔几分钟)写入到分布式存储(S3、GCS、ADLS、HDFS)上的Parquet或Delta Lake等列式格式。这会创建只追加的数据集,记录随着时间计算出的特征值,从而实现训练时的历史时间点查询。确保从流更新的在线和离线视图之间的一致性可能具有挑战性,特别是对于复杂聚合或处理迟到数据时。这一方面将在第3章中更详细地讨论。技术选择选择合适的技术很重要:流处理引擎:Apache Flink: 由于其原生的流式架构以及完善的状态管理和事件时间处理能力,常被青睐用于复杂的有状态处理、低延迟和高吞吐量场景。Spark Streaming: 成熟的引擎,如果您有现有的Spark生态系统,则尤其强大。使用微批处理(Structured Streaming)或旧的DStream API。与Spark ML和DataFrames API集成良好。Kafka Streams: 一个直接在Kafka内部构建流应用程序的库。与Kafka生态系统紧密集成,当Kafka是您架构的核心时适用。状态管理通过本地RocksDB实例和Kafka变更日志主题进行处理。云服务商服务: AWS Kinesis Data Analytics、Google Cloud Dataflow(使用Apache Beam)、Azure Stream Analytics 提供托管服务,可以减少运营开销,但与开源引擎相比,其功能集或灵活性可能有所不同。消息队列: Apache Kafka 是事实上的标准。AWS Kinesis、Google Pub/Sub和Azure Event Hubs是云原生替代方案。Pulsar是另一个有力的开源竞争者。特征存储框架: 像Feast或Tecton这样的工具通常提供连接器或SDK,旨在与流处理输出集成,简化根据框架定义结构将计算出的特征推送到在线和离线存储的过程。代码示例 (PyFlink)这是一个PyFlink示例,说明了数据流:从Kafka读取,执行一个简单的窗口计数,并打印(代表写入特征存储接收器)。from pyflink.common import Types from pyflink.datastream import StreamExecutionEnvironment, TimeCharacteristic from pyflink.datastream.window import TumblingEventTimeWindows from pyflink.common.time import Time from pyflink.datastream.connectors.kafka import FlinkKafkaConsumer from pyflink.common.serialization import SimpleStringSchema # 假设存在自定义的 FeatureStoreSink # 1. 设置执行环境 env = StreamExecutionEnvironment.get_execution_environment() env.set_stream_time_characteristic(TimeCharacteristic.EventTime) # 配置检查点以实现容错(细节已省略) # env.enable_checkpointing(...) # 2. 配置Kafka源 kafka_props = { 'bootstrap.servers': 'kafka:9092', 'group.id': 'feature_processor_group' } # 假设JSON字符串类似:{"user_id": "xyz", "event_type": "click", "timestamp": 1678886400000} kafka_consumer = FlinkKafkaConsumer( 'user_events', SimpleStringSchema(), kafka_props ) # 根据JSON中的'timestamp'字段分配时间戳和水印(细节已省略) # source_stream = env.add_source(kafka_consumer).assign_timestamps_and_watermarks(...) source_stream = env.add_source(kafka_consumer) # 3. 定义转换(示例:每个用户的5分钟点击计数) def parse_event(json_string): # 基本解析 - 实际实现需要错误处理 import json try: data = json.loads(json_string) # 假设事件时间已在之前提取和分配 return (data.get('user_id'), 1) # (用户ID, 计数=1) except: return (None, 0) feature_stream = source_stream \ .map(parse_event, output_type=Types.TUPLE([Types.STRING(), Types.INT()])) \ .filter(lambda x: x[0] is not None) \ .key_by(lambda x: x[0]) \ .window(TumblingEventTimeWindows.of(Time.minutes(5))) \ .reduce(lambda a, b: (a[0], a[1] + b[1])) # 在窗口内求和计数 # 4. 汇入特征存储 # 在实际场景中,您将使用自定义的SinkFunction或连接器 # feature_store_sink = FeatureStoreSink(online_config=..., offline_config=...) # feature_stream.add_sink(feature_store_sink) # 仅用于演示,进行打印 feature_stream.print() # 替换为实际的Sink # 5. 执行作业 env.execute("Streaming Feature Engineering Job")此示例展示了按user_id对流进行键控,基于事件时间应用5分钟滚动窗口,并对每个用户在每个窗口内的计数求和。结果随后将发送到负责更新在线和离线存储的接收器。挑战与良好实践成功实现流式特征需要处理一些操作复杂性:迟到数据: 事件可能在其对应的时间窗口已处理完毕后才到达。请配置流处理器允许的迟到时间,并可能设置单独的机制来处理或丢弃非常迟到的数据。可扩展性: 确保您的流处理器、消息队列和特征存储接收器能够处理峰值负载。对数据流进行分区(例如,按用户ID)是必不可少的。容错性: 使用流处理器的检查点和状态后端机制,确保在故障后能正确恢复状态,最大程度地减少数据丢失或重复。监控: 对处理延迟(处理与实时相差多远)、吞吐量、错误率、资源利用率以及输出特征的数据质量指标实施全面监控。模式演进: 规划如何管理传入事件数据模式的变化,而不中断处理管道。模式注册表(如Confluent Schema Registry)可以提供帮助。成本管理: 有状态流处理,特别是涉及大量状态或高吞吐量时,可能会消耗大量资源。优化资源分配并选择适当的实例类型或托管服务层级。处理流式特征是一项复杂的能力,它显著增强了特征存储的功能,使得机器学习模型能够对近实时事件做出反应。尽管与批处理相比,它引入了操作复杂性,但掌握流处理引擎的集成能让您构建高度响应且富有影响力的机器学习系统。