趋近智
使用外部系统存储的数据丰富流事件是许多生产流水线的基本要求。对于变化缓慢、基数较低的参考数据,例如广播状态模式,当丰富数据超出内存限制或更新过于频繁时,这些方法将变得不切实际。例子包括从 NoSQL 数据库获取用户交易历史,或从特征存储中检索预计算的嵌入 (embedding)数据。
使用 MapFunction 或 ProcessFunction 等标准转换操作符实现这些查找会引入显著的性能瓶颈。这些操作符是同步执行的。当一个操作符向外部数据库发出请求时,处理线程会阻塞,直到响应返回。在高吞吐量 (throughput)流处理系统中,即使是单位毫秒范围内的网络延迟,也会显著降低流水线的整体处理能力。
为了理解异步操作的必要性,请考虑同步系统中延迟与吞吐量之间的数学关系。如果一次外部服务调用需要 毫秒才能完成,那么单个处理线程在数学上被限制在特定的吞吐量 。
对于延迟为 10 毫秒 () 的数据库调用,单个线程每秒最多可以处理 100 个事件。要达到每秒 100,000 个事件的吞吐量,Flink 作业将需要 1,000 个并行任务。这会导致资源利用效率低下、高的上下文 (context)切换开销,并增加检查点机制的负载。
异步 I/O 将事件的处理与 I/O 请求的等待时间解耦。Flink 操作符不阻塞线程,而是注册一个回调并立即处理流中的下一个事件。这种并发性使得单个并行单元能够同时处理许多进行中的请求。
阻塞式同步执行与非阻塞式异步执行的比较,其中多个请求在时间上重叠。
Flink 通过 AsyncDataStream 工具类和 RichAsyncFunction 接口提供了此功能。与立即返回值的标准函数不同,RichAsyncFunction 返回 void 并接受一个 ResultFuture。实现遵循以下模式:
open() 方法中初始化。此客户端必须支持异步非阻塞操作(例如 Vert.x、不常见的 Java NIO 客户端或标准驱动的异步包装器)。asyncInvoke(input, resultFuture) 方法中,应用程序发出查询。ResultFuture。如果外部系统支持批处理,可以通过在客户端内缓冲请求并按组发送来进一步优化实现,但这会增加超时逻辑的复杂程度。
当有多个请求正在处理时,响应可能乱序到达。一个在 时发送的请求可能在 时发送的请求之后返回。Flink 提供了两种处理此行为的模式,通过 AsyncDataStream 静态方法进行配置。
AsyncDataStream.orderedWait 确保输出流的顺序严格匹配输入流的顺序。Flink 维护一个内部缓冲区。如果事件 的结果在事件 之前到达,Flink 会将 保留在缓冲区中,直到 完成并发出。
此模式会引入额外的延迟,该延迟等于当前请求窗口中最慢和最快响应之间的差值。当其下游逻辑依赖精确的事件顺序时,例如在有状态模式匹配中,此模式是必需的。
AsyncDataStream.unorderedWait 一旦结果可用就立即发出。这最小化了延迟和开销,但改变了流的顺序。然而,在事件时间模式下,这种乱序并非绝对。Flink 确保水位线不会超过记录。
在无序模式下,水位线作为同步屏障。水位线 和 之间的事件可能会相互重新排序,但在 通过后,不会发出 之前的任何事件。这保持了下游窗口聚合的完整性。
异步 I/O 操作符维护一个容量参数 (parameter),用于限制并发进行中请求的数量。这可以防止操作符使外部系统过载或用待处理的 Future 耗尽 Flink 堆内存。
当进行中请求的数量达到此容量时,操作符会触发反压。它会停止从输入流消费,直到至少一个待处理请求完成。此机制向上游传播,自然地减慢源的速度以匹配外部系统的吞吐量能力。
对数比较显示了异步 I/O 如何在延迟增加的情况下保持高吞吐量,而同步处理则迅速下降。
外部系统容易出现超时和瞬时故障。asyncInvoke 方法必须可靠地处理这些情况,以防止数据丢失或无限期停滞。
AsyncDataStream 配置接受一个超时参数 (parameter)。如果结果 Future 未在此持续时间内完成,Flink 会调用 AsyncFunction 上的 timeout() 方法。默认实现会抛出异常并重启作业,这对于轻微的网络瞬断通常是不理想的。
为了实现弹性,请重写 timeout() 方法。常见策略有:
检查点通过等待所有进行中的请求完成,才最终确定快照,从而与异步 I/O 交互。这保证了精确一次语义,但意味着一个挂起的请求会延迟检查点。因此,在数据库客户端本身上配置更严格的超时是更好的选择,而不是仅仅依赖 Flink 的超时机制。
ForkJoinPool 或同步执行器。回调逻辑通常在客户端的 I/O 线程上运行。保持回调内部的逻辑尽可能少(例如,仅完成 Future),以避免阻塞 I/O 选择器线程。RichAsyncFunction 内部的内存 Guava 缓存或 Caffeine 缓存结合使用。首先同步检查本地缓存;如果未命中,则继续进行异步调用。这部分内容有帮助吗?
© 2026 ApX Machine LearningAI伦理与透明度•