Kafka 出现消息重复消费怎么解决?
面试官为什么爱问这个问题
Kafka 默认提供的是 at-least-once 语义,消息至少被消费一次,但可能重复。在金融支付、订单处理等场景下,重复消费意味着重复扣款、重复发货,后果严重。面试官问这道题,考察的是你对 Kafka 消费语义的理解深度,以及在实际业务中如何保证 exactly-once。
重复消费是怎么产生的
根本原因只有一个:Consumer 消费了消息,但 Offset 没有成功提交。下次重启或 Rebalance 后,Kafka 认为这条消息没消费过,重新投递。
常见触发场景:
- Rebalance 导致 Offset 丢失:Consumer 处理消息耗时超过
max.poll.interval.ms(默认 5 分钟),Kafka 认为该 Consumer 已死,触发 Rebalance,未提交的 Offset 对应的消息会被重新分配给其他 Consumer 消费 - Consumer 异常宕机:启用了自动提交(
enable.auto.commit=true,默认开启),但提交间隔内宕机,最近一次提交之后消费的消息都会被重复消费 - 手动提交失败:关闭自动提交后调用
commitSync()或commitAsync(),提交请求因网络问题失败,消息被重复消费 - Producer 重试导致重复写入:Producer 发送消息后未收到 ACK,触发重试,实际上第一次已经写入成功,造成 Broker 端消息重复
解决方案:从 Kafka 机制到业务层,逐层防御
第一层:Producer 幂等性——防止重复写入
Kafka 0.11 引入幂等 Producer,原理是为每个 Producer 分配一个 PID(Producer ID),为每条消息分配递增的 SequenceNumber。Broker 端维护每个 <PID, Partition> 的最新 SequenceNumber,收到消息时比对:如果新消息的 SequenceNumber ≤ 已记录的值,判定为重复写入,直接丢弃。
properties# 开启幂等性(Kafka 3.0 后默认开启) enable.idempotence=true # 等价于同时设置: # acks=all # retries=Integer.MAX_VALUE # max.in.flight.requests.per.connection<=5
注意限制:幂等性只保证单 Partition 内的去重,跨 Partition 或 Producer 重启(PID 变化)后无法去重。
第二层:Kafka 事务——跨分区 Exactly-Once
当需要将消费 Offset 提交和消息发送放在同一个事务中时(典型的 consume-transform-produce 模式),需要 Kafka 事务支持:
java// 事务型 Producer 配置 props.put("transactional.id", "order-tx-001"); producer.initTransactions(); try { producer.beginTransaction(); // 消费-处理-发送,放在同一个事务中 producer.send(new ProducerRecord<>("topic", key, value)); // 将消费 Offset 也提交到事务中 producer.sendOffsetsToTransaction(offsets, consumerGroupId); producer.commitTransaction(); } catch (Exception e) { producer.abortTransaction(); }
事务保证:要么消费 Offset 提交和消息发送同时成功,要么同时回滚,从源头杜绝重复。
第三层:Consumer 端幂等——业务层最后一道防线
无论 Producer 和 Broker 做了多少保障,Consumer 端的幂等性是必须实现的,因为 Rebalance 场景下 Kafka 无法完全避免重复投递。
方案一:数据库唯一约束
利用数据库主键或唯一索引天然去重,最可靠:
sql-- 以消息 ID 作为唯一索引 INSERT INTO orders (order_id, user_id, amount, status) VALUES ('msg-123', 1001, 99.9, 'PAID') ON DUPLICATE KEY UPDATE status = status; -- MySQL 写法,重复则跳过
方案二:Redis SET 去重
适合高吞吐场景,利用 Redis Set 的去重特性:
java// 消费前先判断是否已处理 String dedupeKey = "kafka:processed:" + topic + ":" + partition; Boolean isNew = redisTemplate.opsForSet().add(dedupeKey, messageId); if (Boolean.TRUE.equals(isNew)) { processMessage(message); // 设置过期时间避免 Key 无限膨胀 redisTemplate.expire(dedupeKey, 24, TimeUnit.HOURS); }
方案三:状态机控制
适用于有明确状态流转的业务,如订单从"待支付"到"已支付",重复消费时状态已变更,业务逻辑自然跳过:
java// 利用业务状态天然幂等 Order order = orderDao.getById(orderId); if (order.getStatus() == OrderStatus.PAID) { log.info("订单已支付,跳过重复消息: {}", orderId); return; // 已处理,直接跳过 } order.setStatus(OrderStatus.PAID); orderDao.update(order);
第四层:Offset 提交策略优化
properties# 关闭自动提交,掌控提交时机 enable.auto.commit=false
手动提交的选择:
commitSync():同步提交,阻塞等待 Broker 确认,可靠但吞吐低commitAsync():异步提交,不阻塞,但可能丢失提交确认- 推荐做法:正常消费用
commitAsync()保证吞吐,Consumer 关闭时用commitSync()兜底确保最后一次提交成功
javatry { while (true) { ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100)); for (ConsumerRecord<String, String> record : records) { processMessage(record); } consumer.commitAsync(); // 异步提交,追求吞吐 } } finally { consumer.commitSync(); // 关闭前同步提交,确保不丢 consumer.close(); }
面试追问与加分回答
追问:Kafka 幂等 Producer 的 PID 在重启后会变吗?
会变。PID 是 Broker 在 Producer 启动时分配的,重启后获得新的 PID,之前的 SequenceNumber 也会重置,所以幂等性无法跨会话去重。跨会话去重需要配合 transactional.id 使用事务机制。
追问:为什么不在 Broker 端做全局去重?
Broker 端全局去重需要维护所有消息的索引,存储和计算开销极大,与 Kafka 追求高吞吐的设计目标冲突。Kafka 选择在 Producer 端做有限去重(单 Partition 内),把跨 Partition、跨会话的去重责任交给业务层。