趋近智
机器学习 (machine learning)模型在部署到生产环境时常会因为训练-服务偏差而面临性能下降。这种现象发生在推理 (inference)时所用的数据分布或特征计算逻辑与训练模型所用的历史数据不同时。特征存储在流处理架构中作为核心界面,通过保证离线训练环境和在线推理环境之间的一致性来解决这种差异。
为了满足快速响应的查询和处理大容量历史分析这些相互竞争的需求,特征存储通常采用双数据库架构。Flink 作为计算引擎,同时填充这两个存储层。
在这种架构中,Flink 流水线作为同步机制。当事件流经流时,Flink 计算聚合特征(例如 clicks_last_5_minutes),并执行双写操作。
数据流显示 Flink 将算出的特征同步到在线存储供即时推理,并同步到离线存储供历史训练数据使用。
写入在线存储时的主要工程难题是保持高吞吐量 (throughput)而不阻塞 Flink 算子。对像 Redis 这样的数据库的每个事件进行同步调用,会将流水线吞吐量限制到网络往返的响应时间。
为了缓解此问题,必须使用 Flink 的异步 I/O API。这使得流处理算子能够处理对外部存储的多个并发请求。结果的顺序由 Flink 保持,确保对相同键的更新以正确的顺序应用。
设计在线存储的写入器时,通常使用“更新或插入”(Upsert)语义。由于在线存储代表当前状态,旧值会被覆盖。
其中 是新特征值, 是 Flink 内部状态(例如窗口累加器),而 是触发事件。
以下代码说明了如何实现一个异步函数来更新 Redis 特征存储。这种方式采用 AsyncFunction 接口,将数据库响应时间与流处理吞吐量解耦。
public class RedisFeatureUpdater extends RichAsyncFunction<FeaturePayload, String> {
private transient RedisClient redisClient;
private transient StatefulRedisConnection<String, String> connection;
@Override
public void open(Configuration parameters) {
redisClient = RedisClient.create("redis://localhost:6379");
connection = redisClient.connect();
}
@Override
public void asyncInvoke(FeaturePayload input, ResultFuture<String> resultFuture) {
// 异步更新特征向量
RedisAsyncCommands<String, String> async = connection.async();
// 实体 ID,值:序列化后的特征向量
CompletionStage<String> future = async.set(
"feature:" + input.getEntityId(),
input.toJson()
);
future.whenComplete((res, error) -> {
if (error != null) {
resultFuture.completeExceptionally(error);
} else {
resultFuture.complete(Collections.singleton(res));
}
});
}
}
对离线存储的一项主要要求是,能够在任何特定历史时间戳重建特征状态。这被称为时间旅行。如果一个模型根据过去一小时的交易数量预测欺诈,训练数据必须在欺诈标签生成的那一刻准确反映该计数,而不是一天结束时的计数。
当 Flink 写入离线存储时,它必须追加记录而不是更新它们。离线存储中的每条记录应包含:
离线存储本质上成为一个只追加的特征变更记录。当数据科学家生成训练数据集时,他们执行“as-of join”逻辑:
此查询选择实体 的最新特征值,该值必须严格早于或等于观察到目标变量的时间 。
整合特征存储引入了数据新鲜度与系统繁杂程度之间的权衡。在基于批处理的特征工程中,特征定期更新(例如每日),导致一种“锯齿形”新鲜度模式,在这种模式下,数据逐渐变得陈旧,直到下一次批处理运行。流处理整合创造了一条接近平坦的新鲜度线,使事件时间与特征可用性之间的差异最小化。
然而,写入远程存储会引入网络响应时间。优化 Flink 写入器常涉及缓冲写入或使用流水线技术。
特征新鲜度对比。批处理更新随时间推移而下降,造成现实与模型视图之间的差异。流处理更新保持接近零的响应时间。
在分布式系统中,事件常会乱序到达。Flink 的水印机制在内部处理聚合的精确性,但更新外部特征存储需要特定的策略,以防止“僵尸”数据,即旧事件覆盖在线存储中的新值。
处理这种同步有两种核心策略:
last_updated_timestamp 列。写入器函数使用条件写入(比较并交换)。仅当传入记录的时间戳大于已存储的时间戳时才应用更新。INCRBY),或存储在查询时求和的原始聚合数据。这能够抵御乱序,但要求读取路径(推理 (inference)服务)执行最终计算。对于离线存储,延迟数据问题较小,因为它是一个只追加的记录。那里的难题是纯粹的分析问题:确保“as-of join”查询准确考虑数据何时可用于推理,以避免训练期间的数据泄露。
这部分内容有帮助吗?
© 2026 ApX Machine LearningAI伦理与透明度•