Watermarks serve as the pacing mechanism for event time processing. In a distributed streaming environment, they solve the problem of out-of-order data by providing a metric of progress. A watermark carrying a timestamp $t$ declares that the stream is complete up to time $t$. This assertion allows window operators to close buckets and trigger calculations, assuming that no further events with a timestamp $t' \le t$ will arrive.This mechanism is heuristic by definition. Because network partitions and variable latency make perfect knowledge of the stream impossible, a watermark strategy represents a tradeoff between latency and correctness. An aggressive strategy that generates watermarks close to the current processing time yields low latency results but risks dropping valid data as "late". A conservative strategy waits longer for straggling events, ensuring higher accuracy at the cost of increased latency.The Watermark Strategy InterfaceIn Flink, the WatermarkStrategy defines how watermarks are generated and how timestamps are assigned to events. This interface decouples the logic of timestamp extraction from the logic of watermark emission. While simple applications might rely on predefined strategies, production pipelines often require tuning these parameters to match the specific skew characteristics of the upstream data source, such as Kafka partitions.There are two primary styles of watermark generation: periodic and punctuated.Periodic WatermarksPeriodic generation is the standard approach for high-throughput streams. The system emits a watermark at regular processing time intervals, defined by the auto-watermark interval. This interval is decoupled from the event arrival rate. Even if thousands of events arrive per second, the watermark might only update every 200 milliseconds to reduce overhead.The framework calls the onPeriodicEmit method of the WatermarkGenerator at these intervals. The generator inspects the maximum timestamp seen so far and emits a watermark based on a lag tolerance.Punctuated WatermarksPunctuated generation relies on the data itself. The generator observes every incoming event and may choose to emit a watermark immediately. This is necessary in scenarios with sparse data or when specific marker records indicate the completion of a logical block, such as an "end-of-batch" signal in a hybrid stream. This approach incurs higher overhead because the onEvent method must evaluate the watermark logic for every record.Bounded Out-of-OrdernessThe most frequent pattern in distributed logs like Kafka is Bounded Out-of-Orderness. This strategy assumes that while elements move out of sequence, they do so within a finite time window. If the maximum delay in the system is known to be 5 seconds, the watermark at any point is the maximum timestamp observed minus 5 seconds.$$W = \max(T_{event}) - T_{tolerance}$$Events arriving within this tolerance buffer are processed correctly. Events arriving after the watermark passes the event's timestamp are considered late.The following chart illustrates the relationship between event time, processing time, and the watermark line. Points below the watermark line represent late events that would be dropped or sent to a side output.{"layout": {"title": "Bounded Out-of-Orderness Strategy", "xaxis": {"title": "Processing Time (seconds)", "showgrid": true, "gridcolor": "#dee2e6"}, "yaxis": {"title": "Event Time (seconds)", "showgrid": true, "gridcolor": "#dee2e6"}, "plot_bgcolor": "white", "width": 700, "height": 450, "showlegend": true}, "data": [{"mode": "lines", "name": "Ideal (No Lag)", "x": [0, 10], "y": [0, 10], "line": {"color": "#adb5bd", "dash": "dot"}}, {"mode": "lines", "name": "Watermark (Lag = 2s)", "x": [2, 12], "y": [0, 10], "line": {"color": "#228be6", "width": 3}}, {"mode": "markers", "name": "On-Time Events", "x": [3, 4, 5, 7, 8], "y": [2, 3.5, 4, 6, 7.5], "marker": {"color": "#40c057", "size": 10}}, {"mode": "markers", "name": "Late Events", "x": [6, 9], "y": [3, 5], "marker": {"color": "#fa5252", "symbol": "x", "size": 10}}]}The blue line represents the watermark horizon. Green points are processed within the window. Red points arrive after the watermark has passed their timestamp, classifying them as late.Implementing a Custom StrategyWhile Flink provides WatermarkStrategy.forBoundedOutOfOrderness, complex AI pipelines often require custom logic. For example, you might need a strategy that advances time based on the average of the last $N$ timestamps rather than the strict maximum to filter out outliers, or a strategy that handles distinct logical streams interleaved in a single partition.To implement a custom strategy, you define a WatermarkGenerator.public class SmoothLagGenerator implements WatermarkGenerator<MyEvent> { private final long maxLag; private long currentMaxTimestamp; public SmoothLagGenerator(long maxLag) { this.maxLag = maxLag; this.currentMaxTimestamp = Long.MIN_VALUE + maxLag + 1; } @Override public void onEvent(MyEvent event, long eventTimestamp, WatermarkOutput output) { // Track the maximum timestamp seen currentMaxTimestamp = Math.max(currentMaxTimestamp, eventTimestamp); } @Override public void onPeriodicEmit(WatermarkOutput output) { // Emit the watermark minus the lag output.emitWatermark(new Watermark(currentMaxTimestamp - maxLag)); } }This generator is then attached to the stream source:WatermarkStrategy<MyEvent> strategy = WatermarkStrategy .forGenerator(ctx -> new SmoothLagGenerator(Duration.ofSeconds(5).toMillis())) .withTimestampAssigner((event, timestamp) -> event.getCreationTime()); DataStream<MyEvent> stream = env.fromSource(kafkaSource, strategy, "Kafka Source");Watermark Propagation and SynchronizationIn a parallel streaming topology, watermarks must be synchronized across partitions. A stream operator, such as a Window ProcessFunction, usually receives input from multiple parallel instances of an upstream operator. Each upstream instance emits its own watermark based on the data it observes.The downstream operator maintains a partition-specific watermark for each input channel. The operator's logical time is defined as the minimum of all input channel watermarks. This ensures that the operator does not advance its time past the "slowest" upstream partition.$$W_{operator} = \min(W_{partition_0}, W_{partition_1}, \dots, W_{partition_n})$$This mechanism creates a dependency on the slowest partition. If one Kafka partition lags significantly due to network issues or data skew, the entire windowing operation for that operator stalls.digraph G { rankdir=TB; node [shape=box, style="filled,rounded", fontname="Helvetica", penwidth=0]; edge [fontname="Helvetica", color="#868e96"]; bgcolor="transparent"; subgraph cluster_upstream { label = "Upstream Parallel Instances"; fontname = "Helvetica"; color = "#dee2e6"; style = "dashed"; p1 [label="Partition 1\nW: 12:05", fillcolor="#e7f5ff", fontcolor="#1c7ed6"]; p2 [label="Partition 2\nW: 12:02", fillcolor="#fff5f5", fontcolor="#fa5252"]; p3 [label="Partition 3\nW: 12:06", fillcolor="#e7f5ff", fontcolor="#1c7ed6"]; } op [label="Window Operator\nCurrent Watermark = min(12:05, 12:02, 12:06)\n= 12:02", fillcolor="#f3f0ff", fontcolor="#5f3dc4", width=3]; p1 -> op; p2 -> op [color="#fa5252", penwidth=2]; p3 -> op; }The window operator determines its global watermark by taking the minimum of all incoming channel watermarks. Here, Partition 2 holds back the global progress to 12:02.Handling Idle SourcesA critical edge case in watermark propagation is the idle source problem. If a specific Kafka partition contains no data for an extended period, it will not emit watermarks. Because the downstream operator calculates the minimum across all partitions, the absence of a watermark from the idle partition prevents the minimum from increasing. The entire application's time effectively freezes, and windows do not close.This often occurs in topic partitions reserved for low-frequency categories or during periods of low traffic. To resolve this, Flink provides an idleness detection mechanism.WatermarkStrategy<MyEvent> strategy = WatermarkStrategy .forBoundedOutOfOrderness(Duration.ofSeconds(5)) .withIdleness(Duration.ofMinutes(1));When withIdleness is configured, the system marks a partition as "idle" if no events arrive within the specified duration. Once marked idle, the partition is excluded from the downstream watermark calculation. This allows the global watermark to advance based solely on the active partitions. When data resumes flowing from the idle partition, it resumes active status and once again contributes to the synchronization logic.Careful configuration of the idleness timeout is necessary. If set too short, the system might flip-flop between idle and active states frequently. If set too long, the pipeline latency increases significantly when a partition actually goes silent. This configuration is a mandatory check for production readiness in any environment where data rates vary across partitions.