Hard-coded business logic in stream processing applications creates significant operational bottlenecks. Changing a fraud detection threshold or updating a marketing campaign rule in a production environment typically requires recompiling the code, stopping the job, and redeploying the updated artifact. This process disrupts service availability and increases the latency between a business decision and its enforcement.To eliminate downtime during logic updates, we utilize the Broadcast State pattern. This pattern allows a control stream to distribute configuration data to all parallel instances of a function, ensuring that every operator has access to the latest rules. In this practical section, we will implement a KeyedBroadcastProcessFunction that evaluates financial transactions against a dynamic set of rules ingested from a separate Kafka topic.The Broadcast State PatternThe architecture involves two distinct streams: a high-throughput stream of events (transactions) and a low-throughput stream of rules (configurations). The event stream is partitioned by a key, such as accountId, to ensure that all transactions for a specific user are processed by the same parallel task. The rule stream is broadcasted, meaning every record in this stream is replicated to every downstream task.The KeyedBroadcastProcessFunction binds these two streams. It maintains a local state for the rules, which is updated whenever a control message arrives. When a transaction arrives, the function reads the current rules from this local state and evaluates the transaction.The following diagram illustrates the data flow and state distribution across parallel tasks.digraph G { rankdir=TB; bgcolor="transparent"; node [style=filled, shape=rect, fontname="Arial", fontsize=10]; edge [fontname="Arial", fontsize=9]; SourceTx [label="Transaction Source\n(Partitioned)", fillcolor="#a5d8ff", color="#1c7ed6"]; SourceRules [label="Rule Source\n(Broadcast)", fillcolor="#b2f2bb", color="#37b24d"]; Task1 [label="Task 1\n[Rule State: {A, B}]", fillcolor="#bac8ff", color="#4263eb"]; Task2 [label="Task 2\n[Rule State: {A, B}]", fillcolor="#bac8ff", color="#4263eb"]; Sink [label="Alert Sink", fillcolor="#ffc9c9", color="#f03e3e"]; SourceTx -> Task1 [label="Key: User 1-100"]; SourceTx -> Task2 [label="Key: User 101-200"]; SourceRules -> Task1 [style=dashed, label="Broadcast"]; SourceRules -> Task2 [style=dashed, label="Broadcast"]; Task1 -> Sink; Task2 -> Sink; }Architecture of the dynamic rule evaluation pipeline. The rule stream is replicated to all tasks to ensure consistency.Defining the State DescriptorsWe first define the schema for our data. We assume a Transaction class containing an ID, an amount, and a timestamp. The Rule class contains a ruleId, a maxThreshold, and a status (active or disabled).To store the rules within the function, we use a MapStateDescriptor. This descriptor identifies the broadcast state and allows the DataStream API to manage it correctly.// Define the MapStateDescriptor for the rules // String (Rule Name), Value: Rule (The rule object) MapStateDescriptor<String, Rule> ruleStateDescriptor = new MapStateDescriptor<>( "RulesBroadcastState", BasicTypeInfo.STRING_TYPE_INFO, TypeInformation.of(Rule.class) );Constructing the TopologyThe topology requires connecting the keyed transaction stream with the broadcasted rule stream. We use the broadcast method on the rule stream, passing the descriptor we created. This tells Flink to replicate the stream and prepare the state backend to store the incoming elements using that descriptor.DataStream<Transaction> transactions = env.addSource(new TransactionSource()) .keyBy(Transaction::getAccountId); BroadcastStream<Rule> broadcastRules = env.addSource(new RuleSource()) .broadcast(ruleStateDescriptor); DataStream<Alert> alerts = transactions .connect(broadcastRules) .process(new DynamicRuleFunction());Implementing the KeyedBroadcastProcessFunctionThe core logic resides in the DynamicRuleFunction. This class extends KeyedBroadcastProcessFunction and overrides two methods: processBroadcastElement for handling rules and processElement for handling transactions.In processBroadcastElement, we update the broadcast state. This method is invoked every time a new rule arrives or an existing rule is updated. Since this stream is broadcast, this method executes on every parallel instance of the operator.In processElement, we access the broadcast state in read-only mode. We iterate through the active rules and check if the incoming transaction violates any of them.public class DynamicRuleFunction extends KeyedBroadcastProcessFunction<String, Transaction, Rule, Alert> { // Define the state descriptor again for access inside the function private final MapStateDescriptor<String, Rule> ruleStateDescriptor = new MapStateDescriptor<>("RulesBroadcastState", BasicTypeInfo.STRING_TYPE_INFO, TypeInformation.of(Rule.class)); @Override public void processBroadcastElement(Rule rule, Context ctx, Collector<Alert> out) throws Exception { // Update the state with the new rule BroadcastState<String, Rule> broadcastState = ctx.getBroadcastState(ruleStateDescriptor); broadcastState.put(rule.getRuleId(), rule); } @Override public void processElement(Transaction txn, ReadOnlyContext ctx, Collector<Alert> out) throws Exception { // Access the rules from the broadcast state ReadOnlyBroadcastState<String, Rule> rulesState = ctx.getBroadcastState(ruleStateDescriptor); // Iterate over all rules in the state for (Map.Entry<String, Rule> entry : rulesState.immutableEntries()) { Rule rule = entry.getValue(); // Apply business logic if (rule.isActive() && txn.getAmount() > rule.getMaxThreshold()) { out.collect(new Alert( txn.getAccountId(), rule.getRuleId(), "Threshold exceeded: " + txn.getAmount() )); } } } }Managing Consistency and LatencyWhen a rule is injected into the Kafka topic, it travels through the network to all TaskManagers. There is a non-zero propagation delay, denoted as $\Delta t$. If a transaction arrives at time $t$ and a rule update was sent at $t - \delta$ (where $\delta < \Delta t$), the transaction might still be evaluated against the old rule set.In distributed systems, this eventual consistency is often acceptable for fraud alerting. However, understanding the relationship between the rule injection rate and processing throughput is critical. If the rule set $R$ becomes extremely large, the iteration loop in processElement can degrade performance.The complexity of processing a single event is linear with respect to the number of active rules:$$ O(N_{rules}) $$If you have thousands of rules, consider indexing them within the function (e.g., using a TreeMap or IntervalTree stored in a transient Java variable) to reduce the search space during evaluation. You would update this local index inside processBroadcastElement.Visualization of Dynamic ThresholdsTo verify the effectiveness of dynamic updates, we can observe the system behavior when a threshold is lowered abruptly. The following chart demonstrates a scenario where the fraud threshold is reduced from $1000 to $500 at minute 10. Consequently, the volume of generated alerts spikes immediately as transactions that were previously considered normal are now flagged.{"layout": {"title": "Impact of Rule Update on Alert Volume", "xaxis": {"title": "Time (Minutes)"}, "yaxis": {"title": "Value"}, "showlegend": true, "height": 400, "margin": {"l": 50, "r": 20, "t": 40, "b": 40}}, "data": [{"type": "scatter", "name": "Threshold ($)", "x": [0, 10, 10, 20], "y": [1000, 1000, 500, 500], "line": {"color": "#1c7ed6", "width": 3, "dash": "dash"}}, {"type": "scatter", "name": "Alerts/Sec", "x": [0, 2, 4, 6, 8, 10, 12, 14, 16, 18, 20], "y": [5, 6, 4, 5, 6, 25, 28, 26, 27, 25, 26], "mode": "lines+markers", "line": {"color": "#f03e3e", "shape": "spline"}}]}The chart correlates the active threshold value with the resulting alert frequency. Note the immediate increase in alert volume at minute 10 without restarting the pipeline.Verification StrategyTo test this pipeline locally, you do not need a full Kafka setup. You can use SourceFunction to simulate both streams. Create a source that emits transactions continuously and another source that emits a single rule update after a 10-second delay.Start the job.Observe the output logs showing no alerts for transactions of $600.Inject a rule update setting the threshold to $500.Observe the subsequent logs immediately showing alerts for transactions of $600.This approach validates that the processBroadcastElement method correctly updated the state across all subtasks and that processElement began using the new logic immediately.