趋近智
分布式系统本身存在局部故障的风险,即一个进程在写入一个分区后但在写入另一个分区前崩溃。这种部分完成的状态在需要严格数据完整性的情况下是不可接受的,例如财务账本更新或库存管理。Apache Kafka 通过其事务 API 来处理此问题,该 API 实现了跨多个分区的原子性写入。此功能保证了一组消息要么完全对下游消费者可见,要么完全被有效丢弃。
Kafka 事务依赖于为流处理调整的两阶段提交协议。该协议引入了一个名为事务协调器(Transaction Coordinator)的专用组件。协调器在一个名为 __transaction_state 的内部主题中维护活动事务的状态。
当生产者启动事务时,它会向协调器注册。协调器会分配一个生产者 ID (PID) 和一个 epoch。在写入阶段,生产者像往常一样向用户主题发送消息,但这些消息不会立即被视为“已提交”。一旦生产者发出提交命令,协调器就会向事务日志写入一条“准备提交”消息。随后,协调器会向事务中涉及的所有分区写入“提交标记 (token)”。
下图显示了生产者、事务协调器和主题分区在成功提交序列中的互相作用。
通过事务协调器确保跨不同分区的原子写入操作序列。
要启用事务,您必须在生产者中配置 transactional.id 属性。此 ID 必须是静态的,并且对每个生产者实例来说是唯一的。它允许事务协调器在生产者重启后识别出同一个生产者。如果生产者崩溃并使用相同的 transactional.id 重启,协调器会使用 epoch 机制隔离旧实例,防止“僵尸”进程损坏数据。
启用事务会自动强制执行幂等性 (enable.idempotence=true),从而保证重试不会在单个分区中产生重复数据。
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
// transactional.id 是强制要求的
props.put("transactional.id", "order-processor-01");
// 建议对事务性工作负载使用高持久性设置
props.put("enable.idempotence", "true");
props.put("acks", "all");
KafkaProducer<String, String> producer = new KafkaProducer<>(props);
事务性工作流的实现遵循一个特定的生命周期:初始化、启动事务、发送记录,最后提交或中止事务。initTransactions() 方法只调用一次,用于向协调器注册生产者并同步 epoch。
此逻辑必须包含在完善的错误处理中。具体来说,您必须处理 ProducerFencedException,它表示另一个具有相同 transactional.id 的实例已经启动,以及 AuthorizationException,它表示权限失败。
// 1. 初始化事务上下文
producer.initTransactions();
try {
// 2. 启动新事务
producer.beginTransaction();
// 3. 向多个分区发送记录
ProducerRecord<String, String> recordA =
new ProducerRecord<>("financial-ledger", "Account-123", "Debit: $100");
ProducerRecord<String, String> recordB =
new ProducerRecord<>("audit-log", "Account-123", "TransactionID: 998877");
producer.send(recordA);
producer.send(recordB);
// 4. 提交事务
producer.commitTransaction();
} catch (ProducerFencedException | OutOfOrderSequenceException | AuthorizationException e) {
// 致命错误:生产者无法恢复
producer.close();
throw e;
} catch (KafkaException e) {
// 非致命错误:中止事务,并在必要时重试
producer.abortTransaction();
}
事务性写入只是问题的一部分。如果消费者使用默认配置读取数据,他们可能会看到属于未完成事务的消息或来自已中止事务的消息。为保证端到端的一致性,消费者必须配置正确的隔离级别。
将 isolation.level 设置为 read_committed 可确保消费者只读取已成功提交的消息。消费者会在内部缓存消息,直到遇到控制消息(提交标记 (token))。如果遇到中止标记,缓存的消息将被丢弃。
生产者写入状态与消费者可见性之间的关系并不简单。下图详细说明了消息批次可能处于的状态。
基于隔离级别配置的消费者消息可用性比较。
事务性写入会引入额外开销。这种延迟源于与事务协调器的互相作用以及将控制标记 (token)写入日志。无论事务中包含多少消息,每个事务的开销大致是固定的。因此,提交的频率会显著影响吞吐量 (throughput)。
我们可以将有效吞吐量 定义为相对于原始吞吐量 和事务开销 的关系:
其中 是每个事务中的消息数量。如果 很小(例如,每个事务只有 1 条消息), 将占据主导地位,性能会严重下降。增加 beginTransaction / commitTransaction 块内的批处理大小可以分摊协调成本。
然而,增加事务大小也会增加 read_committed 消费者的延迟,因为他们在收到提交标记之前无法处理数据。优化此问题需要平衡大事务的吞吐量效率与下游消费者的延迟要求。
分布式写入中一个主要的风险是“僵尸生产者”情况。这发生在生产者因网络分区或长时间垃圾回收 (GC) 暂停而停止时。系统可能会认为该生产者已死亡并启动一个替代者。如果原始生产者醒来并尝试写入,可能会发生数据损坏。
Kafka 通过 epoch 来处理此问题。当调用 initTransactions() 时,协调器会增加该 transactional.id 的 epoch。
发送给 broker 的所有写入请求都包含此 epoch。如果 broker 收到一个 epoch 低于事务元数据中当前 epoch 的请求,它会以 ProducerFencedException 拒绝该请求。这种机制保证了在任何给定时间,具有特定 ID 的生产者只有一个有效实例可以写入日志。
这部分内容有帮助吗?
© 2026 ApX Machine Learning用心打造