Designing a production-grade inference pipeline requires moving past static scripts and integrating model execution directly into the data flow. We will construct a fraud detection pipeline that ingests raw transaction events, calculates a rolling average of transaction amounts per user, and uses this derived feature to score the transaction against a pre-trained model. This architecture minimizes latency by eliminating network hops associated with external REST APIs.
The architecture consists of three distinct stages executed within a single Flink job to ensure data locality. First, the source operator consumes transaction events from Kafka. Second, a keyed process function maintains the state required for feature engineering. Finally, the enriched record passes to an inference operator containing the embedded model.
The data flow prioritizes locality. Data partitioning happens once at the
KeyBystage, allowing feature generation and inference to occur in the same thread without reshuffling.
To detect anomalies, the model requires the average transaction amount over the last 10 events for the specific user. Unlike batch processing where this is a SQL aggregation, in Flink, we manage this using KeyedState.
We implement a KeyedProcessFunction. This function holds a ListState to store the sliding window of recent transaction values. When a new event arrives, we append the value, trim the window to size , and calculate the mean .
The implementation must strictly manage state size to prevent memory leaks in long-running jobs.
public class FeatureExtraction extends KeyedProcessFunction<String, Transaction, FeatureVector> {
private transient ListState<Double> lastNAmounts;
private static final int WINDOW_SIZE = 10;
@Override
public void open(Configuration parameters) {
ListStateDescriptor<Double> descriptor =
new ListStateDescriptor<>("last-n-amounts", Double.class);
lastNAmounts = getRuntimeContext().getListState(descriptor);
}
@Override
public void processElement(Transaction value, Context ctx, Collector<FeatureVector> out) throws Exception {
Iterable<Double> currentHistory = lastNAmounts.get();
List<Double> amounts = new ArrayList<>();
// Add existing history
if (currentHistory != null) {
for (Double amt : currentHistory) {
amounts.add(amt);
}
}
// Add current and maintain window size
amounts.add(value.getAmount());
if (amounts.size() > WINDOW_SIZE) {
amounts.remove(0);
}
// Update State
lastNAmounts.update(amounts);
// Calculate Mean
double sum = 0;
for (Double amt : amounts) sum += amt;
double avg = sum / amounts.size();
// Emit Enriched Vector
out.collect(new FeatureVector(value.getUserId(), value.getAmount(), avg));
}
}
This code ensures that every inference request includes the most up-to-date context without querying an external feature store, reducing latency from 10-50ms (network RTT) to sub-millisecond memory access.
For the inference stage, we use the RichMapFunction. The critical engineering pattern here is loading the model during the open() lifecycle method rather than per event. Loading a model (e.g., a 500MB XGBoost or TensorFlow graph) is an expensive I/O operation.
We use the JPMML library for model interoperability, assuming the data science team exports the model in PMML format. This decouples the training framework from the Flink runtime.
public class EmbeddedInference extends RichMapFunction<FeatureVector, Prediction> {
private transient Evaluator evaluator;
@Override
public void open(Configuration parameters) throws Exception {
// Load model once per parallel instance
File modelFile = getRuntimeContext().getDistributedCache().getFile("fraud_model.pmml");
evaluator = new LoadingModelEvaluatorBuilder().load(modelFile).build();
evaluator.verify();
}
@Override
public Prediction map(FeatureVector features) {
Map<FieldName, FieldValue> arguments = new LinkedHashMap<>();
// Map features to model inputs
arguments.put(new FieldName("amount"), features.getAmount());
arguments.put(new FieldName("avg_amount"), features.getAvgAmount());
// Execute Inference
Map<FieldName, ?> results = evaluator.evaluate(arguments);
Double score = (Double) results.get(new FieldName("probability_fraud"));
return new Prediction(features.getUserId(), score, System.currentTimeMillis());
}
}
This pattern ensures that if you run with a parallelism of 20, exactly 20 instances of the model are loaded into memory across the cluster.
When architecting real-time AI, the decision between embedded execution and external service calls dictates the system's throughput ceiling. External calls introduce network serialization overhead and connection management costs.
The following chart illustrates the latency characteristics as throughput increases. The embedded approach maintains stable latency until CPU saturation, whereas the external approach degrades rapidly due to connection pool exhaustion and network saturation.
Embedded inference offers predictable low latency but couples the model lifecycle with the application pipeline. External APIs decouple the lifecycle but introduce significant latency penalties.
Running heavy computation (inference) inside the Flink TaskManager requires careful resource tuning. If the model inference takes 10ms and events arrive every 5ms, backpressure will propagate upstream, eventually slowing down the Kafka consumer.
To mitigate this, you must set the parallelism of the inference operator specifically to match the computational cost of the model:
If a single core can handle 100 predictions/second and you need to process 5,000 events/second, you must configure setParallelism(50) on the inference operator.
Additionally, use Async I/O if you must interact with external systems (like a side lookup for a blacklist) during the inference phase. However, for the core prediction logic demonstrated here, synchronous execution within the MapFunction is preferred to maintain strictly ordered processing and simplified error handling.
Was this section helpful?
© 2026 ApX Machine LearningAI Ethics & Transparency•