趋近智
设计生产级的推理 (inference)管道需要超越静态脚本,并将模型执行直接融入数据流。我们将构建一个欺诈检测管道,它会接收原始交易事件,计算每个用户的交易金额滚动平均值,并用这个派生特征来对照预训练 (pre-training)模型对交易进行评分。这种架构最大限度地减少延迟,通过消除与外部REST API相关的网络跳转。
该架构包含三个不同阶段,在一个Flink作业中执行,以确保数据局部性。首先,源操作符从Kafka消费交易事件。其次,一个keyed进程函数维护特征工程所需的状态。最后,经过丰富的数据记录传递给包含嵌入 (embedding)式模型的推理 (inference)操作符。
数据流优先考虑局部性。数据分区在
KeyBy阶段发生一次,使得特征生成和推理可以在同一线程中进行,无需重新洗牌。
为了发现异常,模型需要特定用户最近10个事件的平均交易金额。与批处理中这是一种SQL聚合不同,在Flink中,我们使用KeyedState来管理它。
我们实现了一个KeyedProcessFunction。这个函数持有一个ListState,用于存储最近交易值的滑动窗口。当一个新事件到来时,我们会追加该值,将窗口裁剪到大小,并计算平均值 。
该实现必须严格管理状态大小,以防止长时间运行的作业出现内存泄漏。
public class FeatureExtraction extends KeyedProcessFunction<String, Transaction, FeatureVector> {
private transient ListState<Double> lastNAmounts;
private static final int WINDOW_SIZE = 10;
@Override
public void open(Configuration parameters) {
ListStateDescriptor<Double> descriptor =
new ListStateDescriptor<>("last-n-amounts", Double.class);
lastNAmounts = getRuntimeContext().getListState(descriptor);
}
@Override
public void processElement(Transaction value, Context ctx, Collector<FeatureVector> out) throws Exception {
Iterable<Double> currentHistory = lastNAmounts.get();
List<Double> amounts = new ArrayList<>();
// 添加现有历史记录
if (currentHistory != null) {
for (Double amt : currentHistory) {
amounts.add(amt);
}
}
// 添加当前值并维护窗口大小
amounts.add(value.getAmount());
if (amounts.size() > WINDOW_SIZE) {
amounts.remove(0);
}
// 更新状态
lastNAmounts.update(amounts);
// 计算平均值
double sum = 0;
for (Double amt : amounts) sum += amt;
double avg = sum / amounts.size();
// 发出丰富向量
out.collect(new FeatureVector(value.getUserId(), value.getAmount(), avg));
}
}
这段代码确保每个推理 (inference)请求都包含最新的上下文 (context),无需查询外部特征存储,将延迟从10-50毫秒(网络RTT)降低到亚毫秒级的内存访问。
对于推理 (inference)阶段,我们使用RichMapFunction。这里重要的工程模式是,在open()生命周期方法中加载模型,而不是每个事件加载。加载模型(例如,500MB的XGBoost或TensorFlow图)是一项开销大的I/O操作。
我们使用JPMML库来实现模型互操作性,假设数据科学团队以PMML格式导出模型。这将训练框架与Flink运行时解耦。
public class EmbeddedInference extends RichMapFunction<FeatureVector, Prediction> {
private transient Evaluator evaluator;
@Override
public void open(Configuration parameters) throws Exception {
// 每个并行实例只加载一次模型
File modelFile = getRuntimeContext().getDistributedCache().getFile("fraud_model.pmml");
evaluator = new LoadingModelEvaluatorBuilder().load(modelFile).build();
evaluator.verify();
}
@Override
public Prediction map(FeatureVector features) {
Map<FieldName, FieldValue> arguments = new LinkedHashMap<>();
// 将特征映射到模型输入
arguments.put(new FieldName("amount"), features.getAmount());
arguments.put(new FieldName("avg_amount"), features.getAvgAmount());
// 执行推理
Map<FieldName, ?> results = evaluator.evaluate(arguments);
Double score = (Double) results.get(new FieldName("probability_fraud"));
return new Prediction(features.getUserId(), score, System.currentTimeMillis());
}
}
这种模式确保,如果您以20的并行度运行,正好有20个模型实例加载到集群的内存中。
在设计实时AI架构时,嵌入式执行和外部服务调用之间的选择决定了系统的吞吐量 (throughput)上限。外部调用会引入网络序列化开销和连接管理成本。
下面的图表展示了吞吐量增加时延 (latency)迟的特点。嵌入式方法在CPU饱和之前保持稳定的延迟,而外部方法由于连接池耗尽和网络饱和而迅速性能下降。
嵌入式推理 (inference)提供可预测的低延迟,但将模型生命周期与应用程序管道耦合。外部API解耦了生命周期,但会引入显著的延迟惩罚。
在Flink任务管理器内部运行繁重的计算(推理 (inference))需要仔细的资源调整。如果模型推理需要10毫秒,而事件每5毫秒到达,反压将向上游传播,最终会减慢Kafka消费者的速度。
为了缓解这种情况,您必须专门设置推理操作符的并行度,以匹配模型的计算成本:
如果单个核心每秒可以处理100次预测,并且您需要每秒处理5,000个事件,您必须在推理操作符上配置setParallelism(50)。
此外,如果您在推理阶段必须与外部系统(例如用于黑名单的侧向查找)交互,请使用Async I/O。然而,对于此处演示的核心预测逻辑,在MapFunction中进行同步执行是更优选择,以保持严格有序的处理和简化的错误处理。
这部分内容有帮助吗?
© 2026 ApX Machine LearningAI伦理与透明度•