5月28日 08:30

Kafka 出现消息重复消费怎么解决?

面试官为什么爱问这个问题

Kafka 默认提供的是 at-least-once 语义,消息至少被消费一次,但可能重复。在金融支付、订单处理等场景下,重复消费意味着重复扣款、重复发货,后果严重。面试官问这道题,考察的是你对 Kafka 消费语义的理解深度,以及在实际业务中如何保证 exactly-once。

重复消费是怎么产生的

根本原因只有一个:Consumer 消费了消息,但 Offset 没有成功提交。下次重启或 Rebalance 后,Kafka 认为这条消息没消费过,重新投递。

常见触发场景:

  1. Rebalance 导致 Offset 丢失:Consumer 处理消息耗时超过 max.poll.interval.ms(默认 5 分钟),Kafka 认为该 Consumer 已死,触发 Rebalance,未提交的 Offset 对应的消息会被重新分配给其他 Consumer 消费
  2. Consumer 异常宕机:启用了自动提交(enable.auto.commit=true,默认开启),但提交间隔内宕机,最近一次提交之后消费的消息都会被重复消费
  3. 手动提交失败:关闭自动提交后调用 commitSync()commitAsync(),提交请求因网络问题失败,消息被重复消费
  4. Producer 重试导致重复写入:Producer 发送消息后未收到 ACK,触发重试,实际上第一次已经写入成功,造成 Broker 端消息重复

解决方案:从 Kafka 机制到业务层,逐层防御

第一层:Producer 幂等性——防止重复写入

Kafka 0.11 引入幂等 Producer,原理是为每个 Producer 分配一个 PID(Producer ID),为每条消息分配递增的 SequenceNumber。Broker 端维护每个 <PID, Partition> 的最新 SequenceNumber,收到消息时比对:如果新消息的 SequenceNumber ≤ 已记录的值,判定为重复写入,直接丢弃。

properties
# 开启幂等性(Kafka 3.0 后默认开启) enable.idempotence=true # 等价于同时设置: # acks=all # retries=Integer.MAX_VALUE # max.in.flight.requests.per.connection<=5

注意限制:幂等性只保证单 Partition 内的去重,跨 Partition 或 Producer 重启(PID 变化)后无法去重。

第二层:Kafka 事务——跨分区 Exactly-Once

当需要将消费 Offset 提交和消息发送放在同一个事务中时(典型的 consume-transform-produce 模式),需要 Kafka 事务支持:

java
// 事务型 Producer 配置 props.put("transactional.id", "order-tx-001"); producer.initTransactions(); try { producer.beginTransaction(); // 消费-处理-发送,放在同一个事务中 producer.send(new ProducerRecord<>("topic", key, value)); // 将消费 Offset 也提交到事务中 producer.sendOffsetsToTransaction(offsets, consumerGroupId); producer.commitTransaction(); } catch (Exception e) { producer.abortTransaction(); }

事务保证:要么消费 Offset 提交和消息发送同时成功,要么同时回滚,从源头杜绝重复。

第三层:Consumer 端幂等——业务层最后一道防线

无论 Producer 和 Broker 做了多少保障,Consumer 端的幂等性是必须实现的,因为 Rebalance 场景下 Kafka 无法完全避免重复投递。

方案一:数据库唯一约束

利用数据库主键或唯一索引天然去重,最可靠:

sql
-- 以消息 ID 作为唯一索引 INSERT INTO orders (order_id, user_id, amount, status) VALUES ('msg-123', 1001, 99.9, 'PAID') ON DUPLICATE KEY UPDATE status = status; -- MySQL 写法,重复则跳过

方案二:Redis SET 去重

适合高吞吐场景,利用 Redis Set 的去重特性:

java
// 消费前先判断是否已处理 String dedupeKey = "kafka:processed:" + topic + ":" + partition; Boolean isNew = redisTemplate.opsForSet().add(dedupeKey, messageId); if (Boolean.TRUE.equals(isNew)) { processMessage(message); // 设置过期时间避免 Key 无限膨胀 redisTemplate.expire(dedupeKey, 24, TimeUnit.HOURS); }

方案三:状态机控制

适用于有明确状态流转的业务,如订单从"待支付"到"已支付",重复消费时状态已变更,业务逻辑自然跳过:

java
// 利用业务状态天然幂等 Order order = orderDao.getById(orderId); if (order.getStatus() == OrderStatus.PAID) { log.info("订单已支付,跳过重复消息: {}", orderId); return; // 已处理,直接跳过 } order.setStatus(OrderStatus.PAID); orderDao.update(order);

第四层:Offset 提交策略优化

properties
# 关闭自动提交,掌控提交时机 enable.auto.commit=false

手动提交的选择:

  • commitSync():同步提交,阻塞等待 Broker 确认,可靠但吞吐低
  • commitAsync():异步提交,不阻塞,但可能丢失提交确认
  • 推荐做法:正常消费用 commitAsync() 保证吞吐,Consumer 关闭时用 commitSync() 兜底确保最后一次提交成功
java
try { while (true) { ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100)); for (ConsumerRecord<String, String> record : records) { processMessage(record); } consumer.commitAsync(); // 异步提交,追求吞吐 } } finally { consumer.commitSync(); // 关闭前同步提交,确保不丢 consumer.close(); }

面试追问与加分回答

追问:Kafka 幂等 Producer 的 PID 在重启后会变吗?

会变。PID 是 Broker 在 Producer 启动时分配的,重启后获得新的 PID,之前的 SequenceNumber 也会重置,所以幂等性无法跨会话去重。跨会话去重需要配合 transactional.id 使用事务机制。

追问:为什么不在 Broker 端做全局去重?

Broker 端全局去重需要维护所有消息的索引,存储和计算开销极大,与 Kafka 追求高吞吐的设计目标冲突。Kafka 选择在 Producer 端做有限去重(单 Partition 内),把跨 Partition、跨会话的去重责任交给业务层。

标签:Kafka