Standard Flink window definitions such as tumbling or sliding windows rely on preset logic to determine when to process data. While these suffice for periodic reporting, advanced streaming applications often require logic that reacts to the data itself rather than just the passage of time. You may need to emit an early result when a specific value threshold is breached, or purge a window immediately after a specific sequence of events to manage memory.
To achieve this granular control, Flink exposes the Trigger and Evictor interfaces. These components decouple the window assignment (grouping elements into buckets) from the window execution (evaluating the contents).
A Trigger determines when a window is ready to be processed. Every WindowAssigner comes with a default trigger. For example, TumblingEventTimeWindows uses an EventTimeTrigger that fires solely when the watermark passes the window end.
However, many low-latency requirements dictate that we cannot wait for the window end. Consider a fraud detection system aggregating transaction counts over 24 hours. Waiting 24 hours to alert on a spike is unacceptable. You need a trigger that fires speculatively as counts increase, while still retaining the state for the final daily aggregate.
The Trigger interface allows you to implement this via three primary callback methods:
onElement(): Called for every element added to the window.onEventTime(): Called when a registered event-time timer fires.onProcessingTime(): Called when a registered processing-time timer fires.Each method returns a TriggerResult enum, which dictates the action taken by the window operator:
ProcessWindowFunction) emitting the current result, but retain the window state.To implement a trigger that fires early based on a count threshold or at the end of the window (whichever comes first), you must manage state within the trigger itself. Triggers in Flink can persist state using the TriggerContext.
Below is a diagram illustrating the decision flow for a trigger handling both count thresholds and time boundaries:
The decision logic for a dual-condition trigger evaluates element counts against a threshold while simultaneously monitoring time progress.
The following Java implementation demonstrates a trigger that fires if the element count reaches a specified limit, resetting the count after firing to avoid spamming updates, while still allowing the final window closure to handle the remaining tail of data.
public class CountOrTimeTrigger<W extends Window> extends Trigger<Object, W> {
private final long maxCount;
// Descriptor for mutable state within the trigger context
private final ReducingStateDescriptor<Long> countStateDesc =
new ReducingStateDescriptor<>("count", new Sum(), LongSerializer.INSTANCE);
public CountOrTimeTrigger(long maxCount) {
this.maxCount = maxCount;
}
@Override
public TriggerResult onElement(Object element, long timestamp, W window, TriggerContext ctx) throws Exception {
ReducingState<Long> countState = ctx.getPartitionedState(countStateDesc);
countState.add(1L);
if (countState.get() >= maxCount) {
countState.clear(); // Reset count for the next speculative batch
return TriggerResult.FIRE;
}
// Register a timer for the standard window end
if (window.maxTimestamp() <= ctx.getCurrentWatermark()) {
return TriggerResult.FIRE_AND_PURGE;
} else {
ctx.registerEventTimeTimer(window.maxTimestamp());
return TriggerResult.CONTINUE;
}
}
@Override
public TriggerResult onEventTime(long time, W window, TriggerContext ctx) {
return time == window.maxTimestamp() ?
TriggerResult.FIRE_AND_PURGE :
TriggerResult.CONTINUE;
}
// Other overrides (onProcessingTime, clear, etc.) omitted for brevity
}
While Triggers control when a window is computed, Evictors control what data remains in the window buffer. An evictor is an optional component that can remove elements from the window either before or after the window function executes.
This is particularly useful for logic that requires a finite buffer within an otherwise infinite stream, such as maintaining a "Last N Events" window or removing outliers before calculation.
The Evictor interface provides two methods:
evictBefore(): Executed after the trigger fires but before the window function applies.evictAfter(): Executed after the window function completes.It is critical to understand the performance cost of using an Evictor. Standard time windows with a ReduceFunction or AggregateFunction perform incremental aggregation. Flink maintains only a single accumulator value (e.g., a running sum) rather than storing all individual elements.
When you attach an Evictor, Flink must store all raw elements in the window state because the Evictor logic requires access to the individual records to decide which ones to keep or remove. This significantly increases memory pressure and RocksDB state size. Consequently, Evictors should only be used when the logic explicitly requires analyzing the collection of elements, not just their aggregate.
A common use case is Delta Eviction, where elements are kept only if they differ significantly from a reference point (often the last accepted element). This creates a dynamic buffer that smooths out noise in sensor data.
Let be the value of the new element and be the value of the last retained element. The eviction condition is:
If the condition is not met, the element is effectively noise and is evicted before the window function processes the batch.
To architect resilient pipelines, one must visualize how elements flow through the window operator when both custom triggers and evictors are present. The interaction defines the consistency of your results.
The operational flow within a Flink Window Operator defines the sequence of state updates, trigger evaluations, and eviction phases.
A frequent issue in advanced windowing is the misuse of GlobalWindows with custom triggers. A GlobalWindow assigns all data to a single window that never closes naturally. If you implement a custom trigger that returns FIRE but never PURGE (or FIRE_AND_PURGE), the underlying state backend will grow indefinitely until the application crashes with an OutOfMemoryError.
When designing custom triggers for global windows (e.g., implementing a custom session logic based on content rather than time gaps), you must ensure that every code path eventually leads to a PURGE result.
clear()The Trigger interface includes a clear() method. This is invoked when the window is purged or when the window expires. You must implement this method to clean up any custom state you created in the TriggerContext (like the countState in the previous code example). Failure to clear trigger state results in "zombie" state entries that persist in RocksDB even after the window itself has been removed.
@Override
public void clear(W window, TriggerContext ctx) throws Exception {
// Mandatory: Clean up the custom state used for counting
ctx.getPartitionedState(countStateDesc).clear();
// Clean up any pending timers to prevent them from firing on a non-existent window
ctx.deleteEventTimeTimer(window.maxTimestamp());
}
By mastering triggers and evictors, you move past the rigid structures of fixed time intervals. You gain the ability to model complex business logic, such as sessionization based on cart activity, speculative emission of high-priority signals, and noise filtering, directly within the streaming runtime. This reduces the need for complex post-processing and ensures your application reacts to event patterns with millisecond latency.
Was this section helpful?
© 2026 ApX Machine LearningAI Ethics & Transparency•