水印提供了衡量流处理数据完整性的标准,但它们是一种启发式方法而非保证。在具有可变网络延迟的分布式系统中严格执行水印边界通常会导致数据丢失。一个发生在 $t=100$ 的事件可能在水印已推进到 $W(105)$ 时才到达操作器。如果系统仅仅根据水印严格关闭窗口,这个迟到的事件就会被丢弃,损害了计算聚合结果的准确性。对于生产级管道,特别是那些驱动金融仪表板或欺诈检测系统的,丢弃数据几乎不是一个选择。Flink 通过提供分层处理延迟数据的方法来解决此问题:允许延迟事件在宽限期内更新现有窗口,并将超出此宽限期的数据路由到称为侧输出的辅助通道。延迟数据的生命周期要了解 Flink 如何处理延迟数据,需要分析窗口操作器的生命周期。默认情况下,当属于该时间段的第一个元素到达时,就会创建一个窗口。窗口保持活动状态,直到水印通过窗口的结束时间戳。在这个确切的时刻,配置的触发器会触发,聚合结果会发出,并且窗口状态会被清除以释放内存。当我们引入“允许的延迟”时,我们会改变窗口的终止条件。当水印通过窗口结束时,窗口状态不会立即清除。相反,Flink 会在指定的时间内保留状态(在 RocksDB 或堆中)。生命周期转变为三个不同阶段:累积: 事件在水印通过窗口结束前到达。窗口正常累积状态。延迟 (宽限期): 水印已通过窗口结束,但尚未超出允许的延迟阈值。在此阶段到达的事件会触发窗口的重新计算。清除: 水印超出窗口结束时间加上允许的延迟。状态被销毁。任何后续事件都被视为“迟到”,并被丢弃或导向侧输出。我们可以通过下图来查看传入事件相对于当前水印和窗口配置的决策逻辑。digraph G { rankdir=TB; node [shape=box, style=filled, fontname="Helvetica", fontsize=10, color="#dee2e6"]; edge [fontname="Helvetica", fontsize=9, color="#868e96"]; Start [label="传入事件 E(t)", fillcolor="#bac8ff"]; CheckWindow [label="W 是否大于窗口结束?", shape=diamond, fillcolor="#e9ecef"]; NormalProcess [label="正常处理\n(累积状态)", fillcolor="#b2f2bb"]; CheckLateness [label="W 是否大于\n窗口结束 + 允许的延迟?", shape=diamond, fillcolor="#e9ecef"]; UpdateWindow [label="更新窗口状态\n再次触发", fillcolor="#ffec99"]; SideOutputCheck [label="是否配置侧输出?", shape=diamond, fillcolor="#e9ecef"]; SideOutput [label="发出到侧输出", fillcolor="#ffc9c9"]; Drop [label="丢弃事件", fillcolor="#fa5252"]; Start -> CheckWindow; CheckWindow -> NormalProcess [label="否 (及时)"]; CheckWindow -> CheckLateness [label="是 (延迟)"]; CheckLateness -> UpdateWindow [label="否 (在宽限期内)"]; CheckLateness -> SideOutputCheck [label="是 (太迟)"]; SideOutputCheck -> SideOutput [label="是"]; SideOutputCheck -> Drop [label="否"]; }根据水印推进和延迟配置处理事件的决策树。管理允许延迟的状态更新在 DataStream API 中配置允许的延迟很简单,但对下游消费者的影响很大。当延迟数据在宽限期内到达时,Flink 必须为之前被认为是“已关闭”的窗口发出更新后的结果。OutputTag<Event> lateTag = new OutputTag<Event>("late-data"){}; DataStream<Result> stream = input .keyBy(Event::getId) .window(TumblingEventTimeWindows.of(Time.minutes(10))) .allowedLateness(Time.minutes(5)) // 窗口结束后保留状态5分钟 .sideOutputLateData(lateTag) // 将极度延迟的数据路由到此处 .process(new AggregateProcessFunction());在此配置中,如果一个窗口覆盖 $[12:00, 12:10)$ (不包括 $12:10$),当水印通过 $12:10$ 时,会发出主要结果。然而,状态仍保留在后端。如果时间戳为 $12:08$ 的事件在水印为 $12:12$ 时到达,Flink 会检索窗口状态,更新聚合结果,并再次触发。这导致流中包含同一窗口的多个结果。第一个结果是近似值;后续结果是细化。这种行为从根本上改变了 sink 操作器的要求。下游数据库或消费者必须是幂等的,或者能够处理“upsert”(更新/插入)操作。如果下游系统期望一个仅追加流(例如写入标准 Kafka topic 或 HDFS 文件),此配置将导致重复或冲突的数据。您必须设计管道,使下游系统将窗口开始时间视为主键,允许新值覆盖旧值。用于处理极端延迟的侧输出允许的延迟会带来成本:内存。为已关闭的窗口保留状态会增加您的状态后端的占用空间。在使用 RocksDB 的高吞吐量场景中,将窗口保持开启数小时或数天通常是不切实际的。对于在允许的延迟期限过后到达的事件,通常由长期网络分区或移动设备在数天后重新上线引起,侧输出提供了一种在不维护窗口状态的情况下捕获数据的机制。侧输出捕获被丢弃数据的原始事件流。这些数据有效地绕过了主要的窗口逻辑。一种常见的架构模式是将此侧输出路由到“冷存储”路径,例如 S3 存储桶或单独的 Kafka topic。批处理过程可以稍后将这些延迟数据与实时聚合结果合并,以实现最终一致性。访问侧输出流需要使用窗口操作结果上的 getSideOutput 方法:SingleOutputStreamOperator<Result> mainStream = ...; // 窗口操作 DataStream<Event> lateStream = mainStream.getSideOutput(lateTag); lateStream.addSink(new FlinkKafkaProducer<>( "late-events-topic", new EventSchema(), properties ));使用 OutputTag 是必要的,因为 Java 中的泛型类型在运行时会被擦除。OutputTag 声明中的匿名内部类 {} 确保类型信息得以保留,从而使 Flink 能够正确序列化和路由侧输出事件。对水印推进的影响了解处理延迟数据不会停止或回溯水印这一点非常重要。水印是一个单调递增的值。在水印为 $W(200)$ 时处理时间戳为 $t=100$ 的延迟事件不会将水印移回 100。这种分离确保了管道即使在处理迟到事件的同时,也能继续以低延迟处理实时数据。权衡仅在于状态大小和下游处理的复杂程度。实现策略:累积式与撤回式当触发器因延迟数据而多次触发时,ProcessWindowFunction 或 WindowFunction 严格决定发出什么。然而,如果您使用 Flink SQL 或 Table API,系统会通过内部变更日志来管理这一点。在 DataStream API 中,开发者控制发出策略。通常有两种选择:累积式: 窗口状态存储运行总计。每次触发都会发出新的总计。(例如,首次发出:10,延迟更新发出:11). 这要求 sink 执行覆盖操作。增量/撤回式: 操作器发出差值。这在 DataStream API 中手动实现很复杂,但在 Flink SQL 中是标准做法。(例如,首次发出:+10. 延迟更新:-10, +11)。对于大多数涉及 KeyedWindows 的 DataStream 应用,累积式策略与支持 upsert 的 sink(如 Redis、Cassandra 或支持 upsert 的 JDBC)结合使用,是处理延迟数据的标准模式。下表比较了标准窗口和带有允许延迟的窗口的事件时间线。{ "layout": { "title": "带延迟数据的窗口发出时间线", "xaxis": { "title": "系统时间 (处理时间)", "showgrid": true, "zeroline": false }, "yaxis": { "title": "水印与事件时间", "showgrid": true, "zeroline": false }, "margin": {"l": 50, "r": 20, "t": 40, "b": 40}, "showlegend": true, "legend": {"orientation": "h", "y": -0.2}, "plot_bgcolor": "#f8f9fa", "paper_bgcolor": "#f8f9fa" }, "data": [ { "x": [1, 2, 3, 4, 5, 6, 7, 8], "y": [10, 20, 30, 40, 50, 60, 70, 80], "mode": "lines", "name": "水印推进", "line": {"color": "#4dabf7", "width": 2} }, { "x": [3], "y": [30], "mode": "markers", "name": "窗口结束 (t=30)", "marker": {"color": "#fa5252", "size": 12, "symbol": "diamond"} }, { "x": [5], "y": [25], "mode": "markers", "name": "延迟事件 (t=25)", "marker": {"color": "#fcc419", "size": 10} }, { "x": [7], "y": [20], "mode": "markers", "name": "延迟事件 (t=20)", "marker": {"color": "#fcc419", "size": 10} }, { "x": [8], "y": [15], "mode": "markers", "name": "丢弃/侧输出 (t=15)", "marker": {"color": "#868e96", "size": 10, "symbol": "x"} } ] }水印相对于窗口关闭和延迟事件到达的推进。allowedLateness 和 sideOutputLateData 的配置提供了一个安全保障。它使管道能够优先处理大多数数据的低延迟,同时保留一种机制来最终处理延迟信息的一致性。通过将最远的异常值导向侧输出,我们保护实时状态免受无限增长,同时确保没有数据真正丢失。