Kafka 事务消息的核心机制是什么?
Kafka 事务消息的核心机制是什么?
Kafka 事务消息从 0.11 版本引入,解决的是跨分区、跨会话的原子写入问题。它不是 RocketMQ 那种"半消息+本地事务回查"的模式,而是围绕 Transaction Coordinator + 两阶段提交 实现的 Exactly-Once 语义。
核心三要素:幂等 Producer(PID + Sequence Number 去重)、Transaction Coordinator(事务协调器)、__transaction_state 内部 Topic(持久化事务日志)。
事务消息的完整流程是怎样的?
1. 查找 Coordinator
Producer 发送 FindCoordinatorRequest,根据 transactional.id 哈希定位到某个 Broker 作为该事务的 Coordinator。
2. 初始化 PID
Producer 向 Coordinator 发送 InitPidRequest。Coordinator 分配 PID 和 epoch,并将 transactional.id → PID 映射写入 __transaction_state。这一步保证新 Producer 启动后,旧的同 ID Producer 被 fence 掉(Zombie Fencing)。
3. 开始事务
beginTransaction() 是 Producer 本地操作,仅更新内部状态,不与 Broker 交互。
4. 注册分区
Producer 第一次向某个 TopicPartition 发消息前,先发 AddPartitionsToTxnRequest 给 Coordinator,注册该分区到当前事务。Coordinator 将分区信息写入事务日志。
5. 发送消息
Producer 正常发送 ProduceRequest,消息携带 PID、epoch、sequence number。Broker 写入日志但消息对 read_committed 消费者不可见。
6. 提交消费偏移(Consume-Transform-Produce 模式)
Producer 发 AddOffsetsToTxnRequest 将消费偏移纳入事务,再发 TxnOffsetCommitRequest 写入 __consumer_offsets。保证消费位移和生产消息的原子性。
7. 提交或中止事务
- Commit:Producer 发
EndTxnRequest(commit),Coordinator 先写入 PREPARE_COMMIT 到事务日志,再向各分区 Leader 发WriteTxnMarker写入 Control Batch(commit marker),最后写入 COMPLETE_COMMIT。 - Abort:流程对称,写 abort marker,消费者丢弃对应消息。
Consumer 如何处理事务消息?
isolation.level=read_committed:Broker 端过滤,只返回已 commit 事务的消息。Consumer 还会收到 abort 标记,丢弃回滚消息。isolation.level=read_uncommitted:所有消息立即可见,包括未提交的。性能更好但可能读到脏数据。
关键配置有哪些?
Producer 必须配置:
propertiesenable.idempotence=true transactional.id=my-txn-id acks=all
Consumer 读事务消息需配置:
propertiesisolation.level=read_committed
Broker 端关注 transaction.state.log.replication.factor 和 transactional.id.expiration.ms(默认 7 天过期)。
事务消息的典型代码怎么写?
javaProperties props = new Properties(); props.put("bootstrap.servers", "localhost:9092"); props.put("transactional.id", "order-txn-1"); props.put("enable.idempotence", "true"); KafkaProducer<String, String> producer = new KafkaProducer<>(props); producer.initTransactions(); try { producer.beginTransaction(); producer.send(new ProducerRecord<>("orders", "key1", "value1")); producer.send(new ProducerRecord<>("inventory", "key2", "value2")); producer.commitTransaction(); } catch (ProducerFencedException e) { producer.close(); // 被 fence,不可恢复 } catch (KafkaException e) { producer.abortTransaction(); }
面试追问:事务消息有什么局限?
- Consumer 侧保证弱:
read_committed只能过滤未提交消息,无法阻止 Consumer 重复处理已提交消息,消费端幂等需自行保证。 - 事务超时问题:
transaction.timeout.ms默认 60s,长事务易超时被 Coordinator 主动 abort。 - 跨系统不原子:Kafka 事务只覆盖 Kafka 内部的生产和消费位移提交,与外部数据库的联动需自行实现补偿机制,无法做到真正的跨系统两阶段提交。