Kafka 消息积压如何处理?
Kafka 消息积压如何处理?
Kafka 消息积压是生产环境最常见的故障之一,也是面试高频考点。核心表现为 Consumer 消费速度跟不上 Producer 生产速度,消息在 Broker 端持续堆积。处理思路是:先定位原因,再分层治理——短期应急止血,长期架构优化。
积压原因定位
消息积压不是单一问题,通常由以下几类原因叠加导致:
消费端瓶颈
- 消费逻辑耗时过长,单条消息处理耗时超过生产间隔
- 单线程消费,未能充分利用分区并行度
- 外部依赖(数据库、RPC)响应慢,拖慢整体消费速率
生产端突增
- 业务高峰(大促、秒杀)导致消息量激增
- Producer 批量发送配置不当,瞬时流量过大
- 上游系统异常重试导致消息重复涌入
数据倾斜
- 消息 Key 分布不均,部分分区积压严重,其他分区空闲
- 典型场景:用 userId 做 Key 时,大客户的消息集中在少数分区
系统故障
- Consumer 宕机或频繁 Rebalance
- 依赖服务宕机导致消费持续失败重试
- 网络抖动引起消费超时
监控与快速诊断
定位积压的第一步是看 Consumer Lag:
bash# 查看 Consumer Group 的 Lag 情况 kafka-consumer-groups --bootstrap-server localhost:9092 \ --describe --group my-group # 输出关键字段:CURRENT-OFFSET、LOG-END-OFFSET、LAG # LAG = LOG-END-OFFSET - CURRENT-OFFSET
关键监控指标:
| 指标 | 含义 | 告警建议 |
|---|---|---|
| Consumer Lag | 积压消息数 | > 10万触发 P2,> 100万触发 P1 |
| 消费速率 (msg/s) | 每秒消费消息数 | 持续低于生产速率触发告警 |
| Rebalance 频率 | 消费者组重平衡次数 | 5分钟内超过3次需排查 |
| 分区 Lag 分布 | 各分区积压差异 | 最大分区 Lag > 平均值3倍需关注数据倾斜 |
短期应急方案
1. 紧急扩容:临时 Topic 分流
这是处理百万级以上积压最有效的短期方案,核心思路是将积压数据快速分散到更多分区并行消费:
操作步骤:
bash# 第一步:创建临时 Topic,分区数为原来的 N 倍 kafka-topics --bootstrap-server localhost:9092 \ --create --topic my-topic-temp --partitions 50 \ --replication-factor 3
java// 第二步:写一个分发 Consumer,消费积压消息并轮询写入临时 Topic // 关键:不做业务处理,只做数据搬运 while (true) { ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(500)); for (ConsumerRecord<String, String> record : records) { // 轮询写入临时 Topic 的各分区,保证均匀分布 ProducerRecord<String, String> pr = new ProducerRecord<>( "my-topic-temp", partitionCounter % 50, record.key(), record.value()); producer.send(pr); partitionCounter++; } consumer.commitSync(); }
bash# 第三步:部署 N 倍的 Consumer 消费临时 Topic # 第四步:积压消费完毕后,恢复原架构,下线临时 Consumer
注意:此方案会打破消息分区内的顺序性。如果业务要求顺序消费,需要将相同 Key 的消息路由到同一临时分区。
2. 增加消费者实例
最直接的方式,但受限于分区数:
bash# 先查看当前分区数 kafka-topics --bootstrap-server localhost:9092 \ --describe --topic my-topic # Consumer 数量不能超过 Partition 数量 # 如果 Consumer 已满,需要先增加分区 kafka-topics --bootstrap-server localhost:9092 \ --alter --topic my-topic --partitions 20
关键限制:一个分区同一时刻只能被同一个 Consumer Group 中的一个 Consumer 消费。Consumer 数量 = 分区数时并行度最大,超过则空闲。
3. 消费端快速优化
批量处理替代逐条处理:
javaConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100)); List<Record> batch = new ArrayList<>(); for (ConsumerRecord<String, String> record : records) { batch.add(mapRecord(record)); if (batch.size() >= 500) { // 批量写入数据库,而非逐条插入 db.batchInsert(batch); batch.clear(); } } if (!batch.isEmpty()) { db.batchInsert(batch); }
多线程消费(单 Consumer 多 Worker):
javaExecutorService executor = Executors.newFixedThreadPool(8); ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100)); for (ConsumerRecord<String, String> record : records) { executor.submit(() -> processMessage(record)); } // 注意:提交 Offset 必须在所有 Worker 处理完成后
调整消费配置:
properties# 增大单次拉取量 max.poll.records=1000 # 延长拉取间隔,给处理留更多时间 max.poll.interval.ms=300000 # 适当缩短会话超时,加快故障感知 session.timeout.ms=25000 heartbeat.interval.ms=8000
处理数据倾斜
数据倾斜是积压的隐蔽原因,表现为少数分区 Lag 远高于其他分区:
bash# 查看各分区 Lag 分布 kafka-consumer-groups --bootstrap-server localhost:9092 \ --describe --group my-group # 如果 PARTITION-0 Lag=500万,其他分区 Lag=10万,就是数据倾斜
解决方案:
- 修改分区策略:将热点 Key 加随机后缀打散,如
userId_123→userId_123_0~userId_123_9 - 自定义 Partitioner:在 Producer 端实现更均匀的分区分配逻辑
- 临时方案:对热点分区单独部署 Consumer 消费
消息过期的特殊处理
如果消息设置了 retention.ms,积压期间消息可能被 Broker 清理,导致数据丢失:
bash# 紧急延长消息保留时间 kafka-configs --bootstrap-server localhost:9092 \ --entity-type topics --entity-name my-topic \ --alter --add-config retention.ms=604800000 # 7天
如果消息已经被清理,需要从数据源重新回放:
java// 从业务数据库或备份系统重新生成消息 // 写入一个新的补偿 Topic,单独消费处理
丢弃非关键消息
仅适用于日志采集、指标上报等可容忍丢失的场景:
java// 方式一:跳到最新 Offset,丢弃积压消息 consumer.seekToEnd(partitions); // 方式二:按时间跳转,只消费最近 N 小时的消息 long timestamp = System.currentTimeMillis() - Duration.ofHours(2).toMillis(); Map<TopicPartition, Long> timestamps = partitions.stream() .collect(Collectors.toMap(tp -> tp, tp -> timestamp)); Map<TopicPartition, OffsetAndTimestamp> offsets = consumer.offsetsForTimes(timestamps); offsets.forEach((tp, offsetAndTimestamp) -> { if (offsetAndTimestamp != null) { consumer.seek(tp, offsetAndTimestamp.offset()); } });
严禁在订单、支付等业务场景使用,必须先备份再处理。
保证消费顺序性
扩容和并行消费会打破分区内的消息顺序。如果业务要求局部有序:
java// 方案:按 Key 分队列,每个队列单线程处理 Map<String, BlockingQueue<Record>> keyQueues = new ConcurrentHashMap<>(); ExecutorService[] singleThreadExecutors = new ExecutorService[16]; for (ConsumerRecord<String, String> record : records) { String key = record.key(); int queueIndex = Math.abs(key.hashCode()) % 16; singleThreadExecutors[queueIndex].submit(() -> processMessage(record)); }
长期预防机制
容量规划:
- 根据业务峰值 QPS 评估所需 Consumer 数量,预留 30% 冗余
- 分区数按预估峰值设定,宁多勿少(增加分区不影响已有数据,但减少分区不支持)
监控告警体系:
- Consumer Lag 分级告警:P2(10万)、P1(100万)、P0(500万)
- 消费速率持续低于生产速率超5分钟触发预警
- Rebalance 频率异常告警
限流与降级:
java// Producer 端限流,防止突发流量冲垮消费端 RateLimiter rateLimiter = RateLimiter.create(5000); // 5000 msg/s rateLimiter.acquire(); producer.send(record);
- 高峰期降级非核心功能的消费逻辑
- 关闭非必要的数据同步消费任务
应急预案:
- 提前准备临时扩容脚本,5分钟内可完成 Topic 创建和 Consumer 部署
- 定期演练积压场景,验证扩容方案的有效性
- 建立消息备份机制,支持从数据源回放
面试回答模板
30秒核心回答:
"Kafka 消息积压处理分三步:第一步定位原因——看 Consumer Lag 和分区分布,区分是消费慢、流量突增还是数据倾斜;第二步短期应急——如果积压量大,创建临时 Topic 扩大分区数并行消费,同时优化消费逻辑做批量处理和异步化;第三步长期治理——做好容量规划、监控告警和限流降级,从根本上预防积压。"
追问方向:
- 数据倾斜怎么发现、怎么处理?—— 看各分区 Lag 差异,热点 Key 加随机后缀打散
- 消息过期被清理了怎么办?—— 延长 retention 时间,从数据源回放补偿
- 如何保证消费顺序性?—— 按 Key 分队列单线程处理,或使用 Coordinated Rebalance 减少顺序中断
- Consumer 数量能无限增加吗?—— 不能超过分区数,一个分区只能被一个 Consumer 消费