分布式系统本身存在局部故障的风险,即一个进程在写入一个分区后但在写入另一个分区前崩溃。这种部分完成的状态在需要严格数据完整性的情况下是不可接受的,例如财务账本更新或库存管理。Apache Kafka 通过其事务 API 来处理此问题,该 API 实现了跨多个分区的原子性写入。此功能保证了一组消息要么完全对下游消费者可见,要么完全被有效丢弃。事务协议Kafka 事务依赖于为流处理调整的两阶段提交协议。该协议引入了一个名为事务协调器(Transaction Coordinator)的专用组件。协调器在一个名为 __transaction_state 的内部主题中维护活动事务的状态。当生产者启动事务时,它会向协调器注册。协调器会分配一个生产者 ID (PID) 和一个 epoch。在写入阶段,生产者像往常一样向用户主题发送消息,但这些消息不会立即被视为“已提交”。一旦生产者发出提交命令,协调器就会向事务日志写入一条“准备提交”消息。随后,协调器会向事务中涉及的所有分区写入“提交标记”。下图显示了生产者、事务协调器和主题分区在成功提交序列中的互相作用。digraph G { rankdir=TB; node [shape=box, style=filled, fontname="Helvetica", color="#dee2e6"]; edge [fontname="Helvetica", fontsize=10]; subgraph cluster_0 { label = "客户端"; style=filled; color="#f8f9fa"; Producer [label="事务性生产者", fillcolor="#4dabf7", fontcolor="white"]; } subgraph cluster_1 { label = "Broker 端"; style=filled; color="#f8f9fa"; Coordinator [label="事务\n协调器", fillcolor="#f06595", fontcolor="white"]; TopicA [label="主题 A\n分区 0", fillcolor="#20c997", fontcolor="white"]; TopicB [label="主题 B\n分区 1", fillcolor="#20c997", fontcolor="white"]; } Producer -> Coordinator [label="1. 初始化 / 启动事务", color="#868e96"]; Producer -> TopicA [label="2. 发送数据", color="#868e96"]; Producer -> TopicB [label="2. 发送数据", color="#868e96"]; Producer -> Coordinator [label="3. 提交事务", color="#868e96"]; Coordinator -> TopicA [label="4. 写入提交标记", color="#fcc2d7", style=dashed]; Coordinator -> TopicB [label="4. 写入提交标记", color="#fcc2d7", style=dashed]; }通过事务协调器确保跨不同分区的原子写入操作序列。配置事务性生产者要启用事务,您必须在生产者中配置 transactional.id 属性。此 ID 必须是静态的,并且对每个生产者实例来说是唯一的。它允许事务协调器在生产者重启后识别出同一个生产者。如果生产者崩溃并使用相同的 transactional.id 重启,协调器会使用 epoch 机制隔离旧实例,防止“僵尸”进程损坏数据。启用事务会自动强制执行幂等性 (enable.idempotence=true),从而保证重试不会在单个分区中产生重复数据。Properties props = new Properties(); props.put("bootstrap.servers", "localhost:9092"); props.put("serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); // transactional.id 是强制要求的 props.put("transactional.id", "order-processor-01"); // 建议对事务性工作负载使用高持久性设置 props.put("enable.idempotence", "true"); props.put("acks", "all"); KafkaProducer<String, String> producer = new KafkaProducer<>(props);执行事务循环事务性工作流的实现遵循一个特定的生命周期:初始化、启动事务、发送记录,最后提交或中止事务。initTransactions() 方法只调用一次,用于向协调器注册生产者并同步 epoch。此逻辑必须包含在完善的错误处理中。具体来说,您必须处理 ProducerFencedException,它表示另一个具有相同 transactional.id 的实例已经启动,以及 AuthorizationException,它表示权限失败。// 1. 初始化事务上下文 producer.initTransactions(); try { // 2. 启动新事务 producer.beginTransaction(); // 3. 向多个分区发送记录 ProducerRecord<String, String> recordA = new ProducerRecord<>("financial-ledger", "Account-123", "Debit: $100"); ProducerRecord<String, String> recordB = new ProducerRecord<>("audit-log", "Account-123", "TransactionID: 998877"); producer.send(recordA); producer.send(recordB); // 4. 提交事务 producer.commitTransaction(); } catch (ProducerFencedException | OutOfOrderSequenceException | AuthorizationException e) { // 致命错误:生产者无法恢复 producer.close(); throw e; } catch (KafkaException e) { // 非致命错误:中止事务,并在必要时重试 producer.abortTransaction(); }消费者隔离级别事务性写入只是问题的一部分。如果消费者使用默认配置读取数据,他们可能会看到属于未完成事务的消息或来自已中止事务的消息。为保证端到端的一致性,消费者必须配置正确的隔离级别。将 isolation.level 设置为 read_committed 可确保消费者只读取已成功提交的消息。消费者会在内部缓存消息,直到遇到控制消息(提交标记)。如果遇到中止标记,缓存的消息将被丢弃。生产者写入状态与消费者可见性之间的关系并不简单。下图详细说明了消息批次可能处于的状态。{ "layout": { "title": "按隔离级别划分的消息可见性", "height": 400, "showlegend": true, "barmode": "group", "xaxis": { "title": "事务状态" }, "yaxis": { "title": "可见性概率", "range": [0, 1.1] }, "colorscale": [ "#4dabf7", "#f06595" ] }, "data": [ { "x": ["Open Transaction", "Aborted Transaction", "Committed Transaction"], "y": [1, 1, 1], "name": "read_uncommitted (默认)", "type": "bar", "marker": { "color": "#adb5bd" } }, { "x": ["Open Transaction", "Aborted Transaction", "Committed Transaction"], "y": [0, 0, 1], "name": "read_committed", "type": "bar", "marker": { "color": "#228be6" } } ] }基于隔离级别配置的消费者消息可用性比较。性能影响事务性写入会引入额外开销。这种延迟源于与事务协调器的互相作用以及将控制标记写入日志。无论事务中包含多少消息,每个事务的开销大致是固定的。因此,提交的频率会显著影响吞吐量。我们可以将有效吞吐量 $T_{eff}$ 定义为相对于原始吞吐量 $T_{raw}$ 和事务开销 $O_{tx}$ 的关系:$$ T_{eff} \approx \frac{N_{msgs} \times S_{msg}}{ (N_{msgs} \times T_{write}) + O_{tx} } $$其中 $N_{msgs}$ 是每个事务中的消息数量。如果 $N_{msgs}$ 很小(例如,每个事务只有 1 条消息),$O_{tx}$ 将占据主导地位,性能会严重下降。增加 beginTransaction / commitTransaction 块内的批处理大小可以分摊协调成本。然而,增加事务大小也会增加 read_committed 消费者的延迟,因为他们在收到提交标记之前无法处理数据。优化此问题需要平衡大事务的吞吐量效率与下游消费者的延迟要求。僵尸隔离与 Epochs分布式写入中一个主要的风险是“僵尸生产者”情况。这发生在生产者因网络分区或长时间垃圾回收 (GC) 暂停而停止时。系统可能会认为该生产者已死亡并启动一个替代者。如果原始生产者醒来并尝试写入,可能会发生数据损坏。Kafka 通过 epoch 来处理此问题。当调用 initTransactions() 时,协调器会增加该 transactional.id 的 epoch。$$ E_{current} = E_{previous} + 1 $$发送给 broker 的所有写入请求都包含此 epoch。如果 broker 收到一个 epoch 低于事务元数据中当前 epoch 的请求,它会以 ProducerFencedException 拒绝该请求。这种机制保证了在任何给定时间,具有特定 ID 的生产者只有一个有效实例可以写入日志。