服务端5月31日 16:34
Kafka 为什么曾经依赖 ZooKeeper?它到底负责什么?在早期和大量存量 Kafka 集群里,ZooKeeper 负责保存和协调集群元数据:Broker 注册、Controller 选举、Topic 与分区信息、部分配置和权限数据。它不是 Kafka 的消息存储层,真正的消息仍然写在 Broker 本地日志里。ZooKeeper 更像一个一致性协调中心,帮助 Kafka 判断“谁还活着、谁来当 Controller、元数据现在是什么状态”。
## ZooKeeper 管哪些事
Broker 启动后会连接 ZooKeeper,并在 `/brokers/ids` 下创建临时节点。临时节点和会话绑定,如果 Broker 宕机或网络断开超过会话超时时间,节点会消失,其他组件就能感知 Broker 不可用。这个机制让 Kafka 能快速发现集群成员变化。
Controller 选举也依赖 ZooKeeper。多个 Broker 会竞争创建 `/controller` 节点,创建成功者成为 Controller。Controller 负责分区 leader 选举、副本状态变化、Topic 创建删除等集群级动作。ZooKeeper 在这里提供的是互斥和通知能力,避免多个 Broker 同时认为自己是控制者。
```text
/brokers/ids/1 # Broker 注册信息
/controller # 当前 Controller
/brokers/topics/orders # Topic 分区与副本元数据
/config/topics/orders # Topic 级配置
/kafka-acl/Topic/orders # ACL 信息
```
这些路径有助于理解 ZooKeeper 的定位:它保存的是元数据和状态,不负责传输业务消息。
## Consumer offset 也在 ZooKeeper 吗
这要看 Kafka 版本和客户端。早期 Consumer 会把 offset 写到 ZooKeeper 的 `/consumers` 路径下,但新版本 Kafka 默认把 offset 存到内部 topic `__consumer_offsets`。所以面试或排查时不能简单说“ZooKeeper 管 Consumer offset”,更准确的说法是:旧消费者协议依赖 ZooKeeper,新消费者组协调和 offset 存储主要由 Kafka 自己完成。
这个细节很容易踩坑。运维老集群时,如果还有旧客户端,ZooKeeper 里可能仍能看到 consumer 路径;新集群则更多关注 Group Coordinator 和内部 topic。判断时看客户端版本和 `kafka-consumer-groups.sh` 查询结果,比背路径更可靠。
## ZooKeeper 配置要注意什么
Kafka 侧至少要配置连接串和超时。生产环境不要只放一个 ZooKeeper 节点,通常使用 3 或 5 个节点组成 quorum。节点数用奇数,是为了在故障时仍能形成多数派。
```properties
# server.properties
zookeeper.connect=zk1:2181,zk2:2181,zk3:2181/kafka-prod
zookeeper.connection.timeout.ms=6000
zookeeper.session.timeout.ms=18000
```
```properties
# zoo.cfg
tickTime=2000
initLimit=10
syncLimit=5
dataDir=/var/lib/zookeeper
clientPort=2181
server.1=zk1:2888:3888
server.2=zk2:2888:3888
server.3=zk3:2888:3888
```
`zookeeper.session.timeout.ms` 不是越短越好。太短会把短暂网络抖动误判成 Broker 下线,引发 Controller 处理分区变化;太长又会让真实故障发现变慢。生产里要结合网络质量、GC 暂停和业务可用性要求调整。
## KRaft 出现后 ZooKeeper 还重要吗
Kafka 新版本已经引入 KRaft,用 Kafka 自己的 Raft 元数据 quorum 替代 ZooKeeper。新建集群可以优先评估 KRaft,部署和运维链路更简单。但很多公司仍有 ZooKeeper 模式的存量集群,迁移涉及版本、工具链、监控、备份和回滚方案,不是改一行配置就结束。
因此 ZooKeeper 仍然是高价值知识点。理解它能帮助你排查 Controller 抖动、Broker 频繁上下线、Topic 元数据异常和老集群权限问题。
## 追问
### ZooKeeper 会保存 Kafka 的消息数据吗?
不会,Kafka 消息写在 Broker 的日志目录里,ZooKeeper 保存的是元数据和协调状态。把 ZooKeeper 当消息存储是常见误解,也会导致错误的备份方案。备份 ZooKeeper 只能保护部分元数据,不能恢复业务消息。真正的数据可靠性要看 topic 副本数、ISR、acks 和 Broker 磁盘。
### ZooKeeper 挂了 Kafka 会立刻不可用吗?
如果 ZooKeeper 集群只是少数节点故障,多数派还在,Kafka 通常可以继续工作。如果 ZooKeeper quorum 整体不可用,已有 leader 分区可能还能短时间处理读写,但创建 Topic、Controller 选举、Broker 变更等元数据操作会受影响。边界在于“已有数据面”和“控制面变更”不是一回事。生产上不能因为消息还能写就忽视 ZooKeeper 故障。
### 为什么 ZooKeeper 要部署奇数个节点?
ZooKeeper 依赖多数派确认,3 个节点允许坏 1 个,5 个节点允许坏 2 个。偶数节点不会提升多数派容错能力,反而增加成本和通信开销。比如 4 个节点仍然需要 3 个形成多数,容错能力和 3 节点一样。常见取舍是中小集群用 3 个,跨机房或更高可用要求再评估 5 个。
### Kafka 迁到 KRaft 后就不用理解 ZooKeeper 了吗?
新建 KRaft 集群确实不再依赖 ZooKeeper,但存量系统、面试和故障复盘里仍经常遇到 ZooKeeper 模式。迁移还要考虑 Kafka 版本、元数据迁移、监控指标变化和回滚路径。只知道 KRaft 的配置,不理解旧模式,很难解释老集群 Controller 抖动或 Broker 注册异常。更稳的做法是同时理解两套元数据管理方式。
### ZooKeeper 相关故障通常怎么排查?
先看 Kafka Broker 日志里是否有 session expired、controller moved、zookeeper disconnected 等关键字。再检查 ZooKeeper 四字命令或监控指标,例如连接数、延迟、leader/follower 状态和磁盘 fsync 时间。很多问题不是 ZooKeeper 进程挂了,而是网络抖动、磁盘慢或 Broker 长 GC 导致会话过期。排查时要把 Kafka 日志、ZooKeeper 日志和节点资源放在同一时间线看。标签
Kafka
Apache Kafka 是一个开源的流处理平台,由 LinkedIn 开发,并于 2011 年贡献给 Apache 软件基金会。它主要用于构建实时的数据管道和流应用程序。Kafka 能够以高吞吐量、可扩展性和容错性的方式处理数据流。

服务端5月28日 08:30
Kafka 出现消息重复消费怎么解决?## 面试官为什么爱问这个问题
Kafka 默认提供的是 at-least-once 语义,消息至少被消费一次,但可能重复。在金融支付、订单处理等场景下,重复消费意味着重复扣款、重复发货,后果严重。面试官问这道题,考察的是你对 Kafka 消费语义的理解深度,以及在实际业务中如何保证 exactly-once。
## 重复消费是怎么产生的
根本原因只有一个:**Consumer 消费了消息,但 Offset 没有成功提交**。下次重启或 Rebalance 后,Kafka 认为这条消息没消费过,重新投递。
常见触发场景:
1. **Rebalance 导致 Offset 丢失**:Consumer 处理消息耗时超过 `max.poll.interval.ms`(默认 5 分钟),Kafka 认为该 Consumer 已死,触发 Rebalance,未提交的 Offset 对应的消息会被重新分配给其他 Consumer 消费
2. **Consumer 异常宕机**:启用了自动提交(`enable.auto.commit=true`,默认开启),但提交间隔内宕机,最近一次提交之后消费的消息都会被重复消费
3. **手动提交失败**:关闭自动提交后调用 `commitSync()` 或 `commitAsync()`,提交请求因网络问题失败,消息被重复消费
4. **Producer 重试导致重复写入**:Producer 发送消息后未收到 ACK,触发重试,实际上第一次已经写入成功,造成 Broker 端消息重复
## 解决方案:从 Kafka 机制到业务层,逐层防御
### 第一层:Producer 幂等性——防止重复写入
Kafka 0.11 引入幂等 Producer,原理是为每个 Producer 分配一个 PID(Producer ID),为每条消息分配递增的 SequenceNumber。Broker 端维护每个 `<PID, Partition>` 的最新 SequenceNumber,收到消息时比对:如果新消息的 SequenceNumber ≤ 已记录的值,判定为重复写入,直接丢弃。
```properties
# 开启幂等性(Kafka 3.0 后默认开启)
enable.idempotence=true
# 等价于同时设置:
# acks=all
# retries=Integer.MAX_VALUE
# max.in.flight.requests.per.connection<=5
```
**注意限制**:幂等性只保证单 Partition 内的去重,跨 Partition 或 Producer 重启(PID 变化)后无法去重。
### 第二层:Kafka 事务——跨分区 Exactly-Once
当需要将消费 Offset 提交和消息发送放在同一个事务中时(典型的 consume-transform-produce 模式),需要 Kafka 事务支持:
```java
// 事务型 Producer 配置
props.put("transactional.id", "order-tx-001");
producer.initTransactions();
try {
producer.beginTransaction();
// 消费-处理-发送,放在同一个事务中
producer.send(new ProducerRecord<>("topic", key, value));
// 将消费 Offset 也提交到事务中
producer.sendOffsetsToTransaction(offsets, consumerGroupId);
producer.commitTransaction();
} catch (Exception e) {
producer.abortTransaction();
}
```
事务保证:要么消费 Offset 提交和消息发送同时成功,要么同时回滚,从源头杜绝重复。
### 第三层:Consumer 端幂等——业务层最后一道防线
无论 Producer 和 Broker 做了多少保障,Consumer 端的幂等性是必须实现的,因为 Rebalance 场景下 Kafka 无法完全避免重复投递。
**方案一:数据库唯一约束**
利用数据库主键或唯一索引天然去重,最可靠:
```sql
-- 以消息 ID 作为唯一索引
INSERT INTO orders (order_id, user_id, amount, status)
VALUES ('msg-123', 1001, 99.9, 'PAID')
ON DUPLICATE KEY UPDATE status = status; -- MySQL 写法,重复则跳过
```
**方案二:Redis SET 去重**
适合高吞吐场景,利用 Redis Set 的去重特性:
```java
// 消费前先判断是否已处理
String dedupeKey = "kafka:processed:" + topic + ":" + partition;
Boolean isNew = redisTemplate.opsForSet().add(dedupeKey, messageId);
if (Boolean.TRUE.equals(isNew)) {
processMessage(message);
// 设置过期时间避免 Key 无限膨胀
redisTemplate.expire(dedupeKey, 24, TimeUnit.HOURS);
}
```
**方案三:状态机控制**
适用于有明确状态流转的业务,如订单从"待支付"到"已支付",重复消费时状态已变更,业务逻辑自然跳过:
```java
// 利用业务状态天然幂等
Order order = orderDao.getById(orderId);
if (order.getStatus() == OrderStatus.PAID) {
log.info("订单已支付,跳过重复消息: {}", orderId);
return; // 已处理,直接跳过
}
order.setStatus(OrderStatus.PAID);
orderDao.update(order);
```
### 第四层:Offset 提交策略优化
```properties
# 关闭自动提交,掌控提交时机
enable.auto.commit=false
```
手动提交的选择:
- `commitSync()`:同步提交,阻塞等待 Broker 确认,可靠但吞吐低
- `commitAsync()`:异步提交,不阻塞,但可能丢失提交确认
- **推荐做法**:正常消费用 `commitAsync()` 保证吞吐,Consumer 关闭时用 `commitSync()` 兜底确保最后一次提交成功
```java
try {
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
processMessage(record);
}
consumer.commitAsync(); // 异步提交,追求吞吐
}
} finally {
consumer.commitSync(); // 关闭前同步提交,确保不丢
consumer.close();
}
```
## 面试追问与加分回答
**追问:Kafka 幂等 Producer 的 PID 在重启后会变吗?**
会变。PID 是 Broker 在 Producer 启动时分配的,重启后获得新的 PID,之前的 SequenceNumber 也会重置,所以幂等性无法跨会话去重。跨会话去重需要配合 `transactional.id` 使用事务机制。
**追问:为什么不在 Broker 端做全局去重?**
Broker 端全局去重需要维护所有消息的索引,存储和计算开销极大,与 Kafka 追求高吞吐的设计目标冲突。Kafka 选择在 Producer 端做有限去重(单 Partition 内),把跨 Partition、跨会话的去重责任交给业务层。服务端5月28日 08:28
Kafka 消息积压如何处理?## Kafka 消息积压如何处理?
Kafka 消息积压是生产环境最常见的故障之一,也是面试高频考点。核心表现为 Consumer 消费速度跟不上 Producer 生产速度,消息在 Broker 端持续堆积。处理思路是:**先定位原因,再分层治理——短期应急止血,长期架构优化**。
### 积压原因定位
消息积压不是单一问题,通常由以下几类原因叠加导致:
**消费端瓶颈**
- 消费逻辑耗时过长,单条消息处理耗时超过生产间隔
- 单线程消费,未能充分利用分区并行度
- 外部依赖(数据库、RPC)响应慢,拖慢整体消费速率
**生产端突增**
- 业务高峰(大促、秒杀)导致消息量激增
- Producer 批量发送配置不当,瞬时流量过大
- 上游系统异常重试导致消息重复涌入
**数据倾斜**
- 消息 Key 分布不均,部分分区积压严重,其他分区空闲
- 典型场景:用 userId 做 Key 时,大客户的消息集中在少数分区
**系统故障**
- Consumer 宕机或频繁 Rebalance
- 依赖服务宕机导致消费持续失败重试
- 网络抖动引起消费超时
### 监控与快速诊断
定位积压的第一步是看 Consumer Lag:
```bash
# 查看 Consumer Group 的 Lag 情况
kafka-consumer-groups --bootstrap-server localhost:9092 \
--describe --group my-group
# 输出关键字段:CURRENT-OFFSET、LOG-END-OFFSET、LAG
# LAG = LOG-END-OFFSET - CURRENT-OFFSET
```
关键监控指标:
| 指标 | 含义 | 告警建议 |
|------|------|----------|
| Consumer Lag | 积压消息数 | > 10万触发 P2,> 100万触发 P1 |
| 消费速率 (msg/s) | 每秒消费消息数 | 持续低于生产速率触发告警 |
| Rebalance 频率 | 消费者组重平衡次数 | 5分钟内超过3次需排查 |
| 分区 Lag 分布 | 各分区积压差异 | 最大分区 Lag > 平均值3倍需关注数据倾斜 |
### 短期应急方案
#### 1. 紧急扩容:临时 Topic 分流
这是处理百万级以上积压最有效的短期方案,核心思路是将积压数据快速分散到更多分区并行消费:
**操作步骤:**
```bash
# 第一步:创建临时 Topic,分区数为原来的 N 倍
kafka-topics --bootstrap-server localhost:9092 \
--create --topic my-topic-temp --partitions 50 \
--replication-factor 3
```
```java
// 第二步:写一个分发 Consumer,消费积压消息并轮询写入临时 Topic
// 关键:不做业务处理,只做数据搬运
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(500));
for (ConsumerRecord<String, String> record : records) {
// 轮询写入临时 Topic 的各分区,保证均匀分布
ProducerRecord<String, String> pr = new ProducerRecord<>(
"my-topic-temp", partitionCounter % 50, record.key(), record.value());
producer.send(pr);
partitionCounter++;
}
consumer.commitSync();
}
```
```bash
# 第三步:部署 N 倍的 Consumer 消费临时 Topic
# 第四步:积压消费完毕后,恢复原架构,下线临时 Consumer
```
**注意**:此方案会打破消息分区内的顺序性。如果业务要求顺序消费,需要将相同 Key 的消息路由到同一临时分区。
#### 2. 增加消费者实例
最直接的方式,但受限于分区数:
```bash
# 先查看当前分区数
kafka-topics --bootstrap-server localhost:9092 \
--describe --topic my-topic
# Consumer 数量不能超过 Partition 数量
# 如果 Consumer 已满,需要先增加分区
kafka-topics --bootstrap-server localhost:9092 \
--alter --topic my-topic --partitions 20
```
**关键限制**:一个分区同一时刻只能被同一个 Consumer Group 中的一个 Consumer 消费。Consumer 数量 = 分区数时并行度最大,超过则空闲。
#### 3. 消费端快速优化
**批量处理替代逐条处理:**
```java
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
List<Record> batch = new ArrayList<>();
for (ConsumerRecord<String, String> record : records) {
batch.add(mapRecord(record));
if (batch.size() >= 500) {
// 批量写入数据库,而非逐条插入
db.batchInsert(batch);
batch.clear();
}
}
if (!batch.isEmpty()) {
db.batchInsert(batch);
}
```
**多线程消费(单 Consumer 多 Worker):**
```java
ExecutorService executor = Executors.newFixedThreadPool(8);
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
executor.submit(() -> processMessage(record));
}
// 注意:提交 Offset 必须在所有 Worker 处理完成后
```
**调整消费配置:**
```properties
# 增大单次拉取量
max.poll.records=1000
# 延长拉取间隔,给处理留更多时间
max.poll.interval.ms=300000
# 适当缩短会话超时,加快故障感知
session.timeout.ms=25000
heartbeat.interval.ms=8000
```
### 处理数据倾斜
数据倾斜是积压的隐蔽原因,表现为少数分区 Lag 远高于其他分区:
```bash
# 查看各分区 Lag 分布
kafka-consumer-groups --bootstrap-server localhost:9092 \
--describe --group my-group
# 如果 PARTITION-0 Lag=500万,其他分区 Lag=10万,就是数据倾斜
```
**解决方案:**
- **修改分区策略**:将热点 Key 加随机后缀打散,如 `userId_123` → `userId_123_0` ~ `userId_123_9`
- **自定义 Partitioner**:在 Producer 端实现更均匀的分区分配逻辑
- **临时方案**:对热点分区单独部署 Consumer 消费
### 消息过期的特殊处理
如果消息设置了 `retention.ms`,积压期间消息可能被 Broker 清理,导致数据丢失:
```bash
# 紧急延长消息保留时间
kafka-configs --bootstrap-server localhost:9092 \
--entity-type topics --entity-name my-topic \
--alter --add-config retention.ms=604800000 # 7天
```
如果消息已经被清理,需要从数据源重新回放:
```java
// 从业务数据库或备份系统重新生成消息
// 写入一个新的补偿 Topic,单独消费处理
```
### 丢弃非关键消息
仅适用于日志采集、指标上报等可容忍丢失的场景:
```java
// 方式一:跳到最新 Offset,丢弃积压消息
consumer.seekToEnd(partitions);
// 方式二:按时间跳转,只消费最近 N 小时的消息
long timestamp = System.currentTimeMillis() - Duration.ofHours(2).toMillis();
Map<TopicPartition, Long> timestamps = partitions.stream()
.collect(Collectors.toMap(tp -> tp, tp -> timestamp));
Map<TopicPartition, OffsetAndTimestamp> offsets = consumer.offsetsForTimes(timestamps);
offsets.forEach((tp, offsetAndTimestamp) -> {
if (offsetAndTimestamp != null) {
consumer.seek(tp, offsetAndTimestamp.offset());
}
});
```
**严禁在订单、支付等业务场景使用,必须先备份再处理。**
### 保证消费顺序性
扩容和并行消费会打破分区内的消息顺序。如果业务要求局部有序:
```java
// 方案:按 Key 分队列,每个队列单线程处理
Map<String, BlockingQueue<Record>> keyQueues = new ConcurrentHashMap<>();
ExecutorService[] singleThreadExecutors = new ExecutorService[16];
for (ConsumerRecord<String, String> record : records) {
String key = record.key();
int queueIndex = Math.abs(key.hashCode()) % 16;
singleThreadExecutors[queueIndex].submit(() -> processMessage(record));
}
```
### 长期预防机制
**容量规划:**
- 根据业务峰值 QPS 评估所需 Consumer 数量,预留 30% 冗余
- 分区数按预估峰值设定,宁多勿少(增加分区不影响已有数据,但减少分区不支持)
**监控告警体系:**
- Consumer Lag 分级告警:P2(10万)、P1(100万)、P0(500万)
- 消费速率持续低于生产速率超5分钟触发预警
- Rebalance 频率异常告警
**限流与降级:**
```java
// Producer 端限流,防止突发流量冲垮消费端
RateLimiter rateLimiter = RateLimiter.create(5000); // 5000 msg/s
rateLimiter.acquire();
producer.send(record);
```
- 高峰期降级非核心功能的消费逻辑
- 关闭非必要的数据同步消费任务
**应急预案:**
- 提前准备临时扩容脚本,5分钟内可完成 Topic 创建和 Consumer 部署
- 定期演练积压场景,验证扩容方案的有效性
- 建立消息备份机制,支持从数据源回放
### 面试回答模板
**30秒核心回答:**
"Kafka 消息积压处理分三步:第一步定位原因——看 Consumer Lag 和分区分布,区分是消费慢、流量突增还是数据倾斜;第二步短期应急——如果积压量大,创建临时 Topic 扩大分区数并行消费,同时优化消费逻辑做批量处理和异步化;第三步长期治理——做好容量规划、监控告警和限流降级,从根本上预防积压。"
**追问方向:**
- 数据倾斜怎么发现、怎么处理?—— 看各分区 Lag 差异,热点 Key 加随机后缀打散
- 消息过期被清理了怎么办?—— 延长 retention 时间,从数据源回放补偿
- 如何保证消费顺序性?—— 按 Key 分队列单线程处理,或使用 Coordinated Rebalance 减少顺序中断
- Consumer 数量能无限增加吗?—— 不能超过分区数,一个分区只能被一个 Consumer 消费服务端5月28日 08:26
Kafka 的副本机制是如何工作的?## 副本机制的核心作用
Kafka 的副本机制解决的是分布式环境下的两个根本问题:**数据可靠性和服务可用性**。每个 Partition 可以配置多个副本(Replica),分布在不同 Broker 上。当某个 Broker 宕机,其他副本可以继续提供服务,保证消息不丢失、服务不中断。
副本因子(`replication.factor`)决定了每个 Partition 有多少个副本。生产环境通常设置为 3,意味着每个 Partition 有 3 份相同数据,允许最多 2 个节点故障而不丢数据。
## Leader 与 Follower 的分工
Kafka 的副本采用 Leader-Follower 模型:
- **Leader 副本**:每个 Partition 只有 1 个 Leader,负责处理该 Partition 的所有读写请求。Producer 写消息、Consumer 读消息都直接与 Leader 交互。
- **Follower 副本**:被动地从 Leader 拉取数据并写入本地日志,不对外提供读写服务。Follower 的唯一职责是保持与 Leader 的数据同步,以便在 Leader 故障时接管。
这种设计的优势在于读写都走 Leader,避免了多副本写入的一致性问题,同时也简化了 Consumer 的消费逻辑——不需要关心从哪个副本读取。
## ISR 机制:同步副本的动态管理
ISR(In-Sync Replicas)是 Kafka 副本机制中最关键的概念之一。它不是静态的副本列表,而是一个由 Leader 动态维护的同步副本集合。
### ISR 的判定标准
Follower 是否留在 ISR 中,取决于它是否在规定时间内与 Leader 保持同步。判定依据是时间而非消息条数——早期 Kafka 用 `replica.lag.max.messages` 判定(已在 0.9.0 版本移除),现在只用 `replica.lag.time.max.ms`(默认 10 秒)。如果一个 Follower 超过这个时间没有发送拉取请求或虽然发送了但还没追上 Leader 的 LEO,就会被移出 ISR,进入 OSR(Out-of-Sync Replicas)。
### AR、ISR、OSR 的关系
```
AR(Assigned Replicas)= ISR + OSR
```
AR 是 Partition 分配的所有副本集合,ISR 是与 Leader 同步的副本,OSR 是落后于 Leader 的副本。理想状态下 ISR = AR,即所有副本都在同步。当 ISR 缩小时,说明集群出现了同步延迟。
### ISR 与消息可靠性
`min.insync.replicas` 配合 Producer 的 `acks=all` 使用时,能提供强可靠性保证。当 ISR 中的副本数小于 `min.insync.replicas` 时,Broker 会拒绝写入,抛出 `NotEnoughReplicasException`。这是一种宁可不可用也不丢数据的策略。
典型配置:`replication.factor=3` + `min.insync.replicas=2` + `acks=all`,允许 1 个节点故障仍能正常写入。
## HW 与 LEO:副本同步的位置标记
理解副本同步,必须搞清楚两个核心位移标记:
- **LEO(Log End Offset)**:每个副本(包括 Leader 和 Follower)各自维护的日志末端位移,表示下一条待写入消息的位置。每个副本的 LEO 可能不同。
- **HW(High Watermark)**:所有 ISR 副本中最小的 LEO。Consumer 只能消费到 HW 之前的消息,HW 之后的消息对 Consumer 不可见。
### 同步过程中 HW 和 LEO 的变化
1. Producer 向 Leader 写入消息,Leader 的 LEO 递增
2. Follower 从 Leader 拉取消息,Follower 的 LEO 递增
3. Leader 收到所有 ISR 副本的 LEO 更新后,推进 HW(取所有 ISR 副本 LEO 的最小值)
4. Follower 在下一次拉取时获取 Leader 的 HW,更新自己的 HW
这个过程确保了:**HW 之前的消息已经被所有 ISR 副本确认,是安全可消费的**。
## Leader 选举:故障恢复的核心流程
当 Leader 所在 Broker 宕机,Controller 会从 ISR 中选出一个新的 Leader。选举过程不是"投票",而是 Controller 直接指定。
### 选举策略
Kafka 的 Leader 选举策略根据触发场景不同分为四种:
| 策略 | 触发场景 | 选举逻辑 |
|------|---------|---------|
| OfflinePartition | Leader Broker 宕机 | 优先从 ISR 中选第一个存活的副本 |
| ReassignPartition | 分区副本重分配 | 从新 AR 中选第一个在线且在 ISR 中的副本 |
| PreferredReplica | 自动均衡 | 选 AR 中的第一个副本(如果在线且在 ISR 中) |
| ControlledShutdown | Broker 优雅关闭 | 选 ISR 中不在关闭 Broker 上的第一个副本 |
### Unclean 选举:可用性与一致性的权衡
当 ISR 为空(所有副本都不同步)时,是否允许从 OSR 中选举 Leader?这由 `unclean.leader.election.enable` 控制:
- **开启(默认 false)**:允许从 OSR 选 Leader,保证可用性,但可能丢数据——因为 OSR 副本的消息落后于原 Leader
- **关闭**:分区不可用直到原 Leader 恢复,保证数据一致性
金融场景通常关闭此选项,宁可短暂不可用也不冒数据丢失的风险。
### Leader Epoch 解决的问题
早期 Kafka 依赖 HW 截断日志来保证副本一致性,但这会导致数据不一致问题。Kafka 0.11 引入了 Leader Epoch 机制:每个 Partition 维护一个单调递增的 Epoch 编号,新 Leader 产生时 Epoch 递增。Follower 用 Leader Epoch 而非 HW 来判断截断位置,避免了 HW 截断导致的数据丢失和分歧。
典型场景:两个 Follower 先后重启,旧 HW 截断可能导致先重启的 Follower 把已提交的消息截掉,而后重启的 Follower 又从前者拉取到不完整数据。Leader Epoch 通过记录每个 Epoch 对应的起始位移,让 Follower 精确知道从哪里截断。
## 副本同步的完整流程
1. **Producer 发送消息到 Leader**:消息写入 Leader 的本地日志,Leader LEO 递增
2. **Follower 拉取消息**:Follower 主动向 Leader 发送 `FetchRequest`,携带自己的 LEO
3. **Leader 返回消息**:Leader 根据 Follower 的 LEO 返回对应数据,同时返回 Leader 当前的 HW
4. **Follower 写入并更新**:Follower 将消息写入本地日志,更新 LEO,然后更新 HW(取 min(LEO, Leader HW))
5. **Leader 推进 HW**:Leader 在收到 Follower 的下一次拉取请求时,根据所有 ISR 副本的 LEO 更新 HW
注意:Follower 是主动拉取而非 Leader 推送,这是 Kafka 副本同步与很多其他系统(如 MySQL 主从)的区别。拉取模式让 Follower 自己控制同步节奏,避免被 Leader 压垮。
## 副本分配与机架感知
创建 Topic 时,Kafka 自动分配副本到不同 Broker。分配算法考虑两个原则:
- 同一 Partition 的副本分布在不同 Broker 上
- 开启机架感知(`broker.rack` 配置)后,副本尽量分布在不同机架
机架感知的意义在于:如果整个机架故障(如电源故障),其他机架上的副本仍可用。不配置机架信息时,Kafka 只保证 Broker 级别分布,无法防御机架级故障。
```properties
# Broker 机架配置
broker.rack=rack1
# 副本因子
default.replication.factor=3
# 最小同步副本数
min.insync.replicas=2
```
## 关键配置参数一览
| 参数 | 默认值 | 说明 |
|------|-------|------|
| `replication.factor` | 1 | 副本数,生产环境建议 ≥ 3 |
| `min.insync.replicas` | 1 | 最小同步副本数,配合 acks=all 使用 |
| `acks` | 1 | Producer 确认级别:0/1/all |
| `replica.lag.time.max.ms` | 10000 | Follower 落后超时时间 |
| `unclean.leader.election.enable` | false | 是否允许非 ISR 副本当选 Leader |
| `auto.leader.rebalance.enable` | true | 是否自动均衡 Leader 到 Preferred 副本 |
## 监控核心指标
排查副本相关问题时,重点关注以下 JMX 指标:
- **`UnderReplicatedPartitions`**:ISR 副本数 < AR 副本数的 Partition 数量,大于 0 说明有副本同步滞后
- **`OfflinePartitionsCount`**:没有 Leader 的 Partition 数量,大于 0 说明有分区不可用
- **`IsrShrinksPerSec` / `IsrExpandsPerSec`**:ISR 缩减和扩张速率,频繁变动说明集群不稳定
- **`ActiveControllerCount`**:应该始终为 1,大于 1 说明有脑裂风险
## 生产环境实践建议
**副本数不是越多越好**。3 副本能满足大多数场景的可靠性要求,增加到 5 或 7 会显著降低写入吞吐(更多副本需要同步)并增加存储成本。只在极少数场景(如金融核心链路)才需要更高副本数。
**务必开启 `min.insync.replicas`**。只配 `acks=all` 不够——如果 ISR 缩减到只剩 Leader,`acks=all` 等于 `acks=1`,此时 Leader 宕机仍会丢数据。`min.insync.replicas=2` 确保至少 2 个副本确认才算写入成功。
**关注 ISR 抖动**。ISR 频繁缩扩通常不是正常波动,往往暗示网络延迟、磁盘 IO 瓶颈或 GC 问题。收到 ISR 缩减告警时,先排查 Follower 所在 Broker 的负载和延迟。
**优雅下线优于故障下线**。使用 `kafka-reassign-partitions` 先迁移 Leader 和副本,再下线 Broker,可以避免不必要的 Leader 选举和数据恢复开销。服务端5月28日 08:24
Kafka 为什么能够实现高吞吐量?## Kafka 为什么能够实现高吞吐量?
Kafka 是目前业界吞吐量最高的消息队列之一,单机每秒可处理数十万条消息。这并非依赖某种银弹技术,而是多个设计决策协同作用的结果。理解这些原理,不仅能帮你在面试中给出有层次的回答,更能指导实际场景中的性能调优。
## 顺序写:磁盘也能很快
很多人对磁盘的性能认知停留在"慢",但这只对随机读写成立。顺序写磁盘的速度可以达到 600MB/s 以上,甚至超过随机写内存的效率。
Kafka 的做法很直接:所有消息以追加(append)的方式写入日志文件,永远不修改已有数据。Consumer 也按偏移量顺序读取,整个读写路径上几乎没有随机 I/O。
这个设计还带来一个额外好处——操作系统对顺序写有天然优化。数据先进入 Page Cache,由 OS 异步刷盘,Kafka 本身不需要调用 fsync(除非配置了强制刷盘),相当于写内存的速度。
## 零拷贝:省掉两次不必要的数据搬运
传统的网络数据发送要经历四次拷贝和四次上下文切换:
```
磁盘 → 内核缓冲区 → 用户缓冲区 → Socket缓冲区 → 网卡
```
其中"内核缓冲区 → 用户缓冲区 → Socket缓冲区"这两步是完全可以避免的。Kafka 使用 Linux 的 `sendfile` 系统调用,数据直接从内核缓冲区传输到网卡:
```
磁盘 → 内核缓冲区 → 网卡
```
拷贝次数从 4 次降到 2 次,CPU 上下文切换也从 4 次降到 2 次。在高吞吐场景下,这个差距会被放大到非常显著的程度。
**补充一点**:Kafka 还用了 `mmap`(内存映射文件)来处理 Consumer 的偏移量索引文件,让索引查找避免一次用户态拷贝。`sendfile` 处理数据流,`mmap` 处理索引,两者配合覆盖了 Kafka 主要的数据路径。
## Page Cache:让 Kafka 不用自己管缓存
很多中间件选择在 JVM 堆内维护缓存,但 JVM 的 GC 是吞吐量杀手——堆越大,GC 暂停越长,对延迟敏感的场景尤其致命。
Kafka 反其道而行:不维护堆内缓存,直接依赖操作系统的 Page Cache。写入时数据进入 Page Cache 就算成功,读取时如果命中缓存就直接返回,都没经过 JVM 堆。
这样做的好处:
- **避免 GC 问题**:Kafka 进程的堆可以设得很小(通常 6GB 足够),GC 暂停极短
- **缓存不随进程重启丢失**:进程挂了,Page Cache 还在,重启后数据依然热
- **利用 OS 的 LRU 策略**:操作系统比应用层更清楚哪些页面该淘汰
## 批量处理:把零散请求打包
网络请求的固定开销很高——一次 TCP 往返的延迟,加上协议解析、线程调度等开销。如果每条消息都单独发送,吞吐量会被网络开销吃掉。
Kafka 在 Producer 端做了两层批量:
```properties
# 一批消息的最大字节数
batch.size=16384
# 等待多久再发送(即使没凑满一批)
linger.ms=5
```
`batch.size` 控制批量上限,`linger.ms` 控制等待时间。两者配合,Producer 会攒够一批再发,或者等 5ms 没有新消息也发出去。这种微小的延迟换取的是网络请求次数的大幅减少。
Consumer 端同理,`fetch.min.bytes` 和 `fetch.max.wait.ms` 也是同样的思路——宁可多等一会,也要一次多拉一些数据。
## 分区并行:水平扩展的基础
单个分区只能被一个 Consumer 消费,这就是单分区的吞吐量上限。Kafka 通过分区实现并行:
- Producer 可以同时向不同分区写入,Broker 端不同分区的写入由不同线程处理
- Consumer Group 中,每个分区分配给一个 Consumer 实例,多个实例并行消费
- 分区分布在不同 Broker 上,网络 I/O 和磁盘 I/O 都被分散
分区数决定了并行度的上限。但分区数也不是越多越好——每个分区在 Broker 上有对应的目录和索引文件,分区过多会增加文件句柄、增大 Leader 选举时间、加重 ZooKeeper/KRaft 负担。一般建议单 Broker 分区数不超过 1000-2000。
## 数据压缩:端到端减少传输量
Kafka 支持在 Producer 端压缩、Broker 端保持原样、Consumer 端解压,即端到端压缩。这意味着压缩的收益不仅体现在网络传输,还体现在磁盘存储上。
常用压缩算法对比:
| 算法 | 压缩比 | 压缩速度 | 适用场景 |
|------|--------|----------|----------|
| Snappy | 中等 | 快 | 通用场景,延迟敏感 |
| LZ4 | 中等 | 最快 | 极致低延迟 |
| Gzip | 高 | 慢 | 带宽受限,对延迟不敏感 |
| Zstd | 较高 | 较快 | Kafka 2.1+ 推荐 |
选择压缩算法本质是 CPU 与带宽的权衡。CPU 有余量、带宽紧张就选高压缩比;延迟敏感就选快压缩。
## 面试追问:如何进一步提升 Kafka 吞吐量?
在理解原理的基础上,实际调优时可以从几个方向入手:
**Producer 侧**:增大 `batch.size` 和 `buffer.memory`,适当调高 `linger.ms`,开启压缩,使用异步发送(`acks=0` 或 `acks=1`,牺牲部分可靠性换吞吐)。
**Broker 侧**:增加分区数提升并行度,将日志目录挂载到不同磁盘实现 I/O 分散,调整 `num.io.threads` 匹配磁盘数量。
**Consumer 侧**:增加 Consumer 实例数(不超过分区数),调大 `fetch.min.bytes` 和 `max.poll.records`,开启自动提交减少偏移量提交开销。
**硬件层面**:SSD 替换 HDD 对顺序写提升有限(因为顺序写 HDD 也不慢),但对随机读和副本同步有明显帮助;增加内存扩大 Page Cache 命中率;万兆网卡消除网络瓶颈。
需要强调的是,高吞吐和强可靠性是矛盾的。`acks=all` + `min.insync.replicas=2` 能保证数据不丢,但吞吐量会比 `acks=0` 低一个量级。生产环境中,金融、订单等关键业务必须优先可靠性,日志采集等场景可以优先吞吐量。服务端5月28日 08:24
Kafka 如何保证消息的顺序性?## Kafka 在 Partition 级别保证消息顺序,不保证 Topic 级别的全局顺序
Kafka 的顺序性保证是面试高频考点。核心结论:**Kafka 只在同一个 Partition 内保证消息的写入和消费顺序一致,跨 Partition 没有顺序保证。** 如果业务需要严格顺序,必须把相关消息路由到同一个 Partition。
---
### 一、Partition 内为什么有序
每个 Partition 本质上是一个追加写入的日志(append-only log),每条消息分配一个单调递增的 offset。Consumer 按 offset 顺序拉取,因此分区内天然有序。
```java
// Producer 发送时指定 Key,相同 Key 的消息进入同一 Partition
ProducerRecord<String, String> record = new ProducerRecord<>(
"order-topic",
orderId, // Key — 决定分区路由
orderEvent // Value
);
producer.send(record);
```
Kafka 默认分区策略:有 Key 则 `hash(key) % numPartitions`,无 Key 则轮询(round-robin)或粘性分区。
---
### 二、跨 Partition 为什么无序
一个 Topic 通常有多个 Partition,不同 Partition 的消息并行写入和消费,无法保证先后关系。
示例:订单创建消息进入 Partition-0,支付消息进入 Partition-1,Consumer 可能先读到支付消息。
```
Partition-0: [订单创建] [发货通知]
Partition-1: [支付成功] [签收确认]
↓ 并行消费,顺序不可控
```
---
### 三、如何保证业务上的顺序性
#### 方法 1:用相同的 Key 路由到同一 Partition(最常用)
```java
// 同一订单的所有事件使用 orderId 作为 Key
producer.send(new ProducerRecord<>("order-topic", orderId, event));
```
优点:简单、无需额外代码;缺点:同一 Key 的消息无法并行处理,可能成为热点。
#### 方法 2:自定义分区器
```java
public class OrderPartitioner implements Partitioner {
@Override
public int partition(String topic, Object key, byte[] keyBytes,
Object value, byte[] valueBytes, Cluster cluster) {
// 按业务规则路由:如按用户ID所在区域分区
String userId = (String) key;
int regionCode = getUserIdRegion(userId);
return regionCode % cluster.partitionCountForTopic(topic);
}
}
```
配置方式:
```properties
partitioner.class=com.example.OrderPartitioner
```
#### 方法 3:单分区 Topic
```bash
# 创建只有一个 Partition 的 Topic
kafka-topics.sh --create --topic strict-order-topic \
--partitions 1 --replication-factor 3
```
全局有序但吞吐极低,仅适用于顺序性要求极严且流量小的场景。
---
### 四、容易踩的坑
#### 1. Producer 端重试导致乱序
Producer 开启重试(`retries > 0`)时,如果 `max.in.flight.requests.per.connection > 1`,第一批消息失败重试后可能排在第二批后面,造成乱序。
```properties
# 严格顺序场景下必须这样配置
retries=2147483647
max.in.flight.requests.per.connection=1
enable.idempotence=true
```
开启幂等(`enable.idempotence=true`)后,Kafka 2.0+ 可以在 `max.in.flight.requests.per.connection <= 5` 的情况下保证分区内顺序,因为 Broker 端会按序列号去重排序。
#### 2. Consumer Rebalance 导致重复消费
Rebalance 发生时,Consumer 可能重复消费已处理的消息。如果业务处理非幂等,就会出现数据不一致。解决方案:**消费端做好幂等**(数据库唯一键、Redis 去重表等)。
#### 3. 多线程消费打乱顺序
```java
// 错误:多线程处理同一 Partition 的消息会乱序
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
executor.submit(() -> process(record)); // 顺序无法保证
}
}
```
正确做法:按 Key hash 到同一线程处理,保证同 Key 消息的顺序。
```java
// 按 Key 分配到固定线程
int threadIndex = Math.abs(record.key().hashCode()) % threadCount;
executors[threadIndex].submit(() -> process(record));
```
---
### 五、性能与顺序的取舍
| 方案 | 顺序性 | 吞吐量 | 适用场景 |
|------|--------|--------|----------|
| 单 Partition | 全局有序 | 低 | 账户流水、状态机 |
| Key 路由同 Partition | Key 维度有序 | 中 | 订单状态、用户事件 |
| 多 Partition 无 Key | 无序 | 高 | 日志采集、指标上报 |
大多数业务只需要**同一业务实体维度有序**(如同订单、同用户),通过合理设置 Key 即可兼顾顺序与性能。
---
### 追问
**Q: Kafka 能否做到全局有序?为什么不用?**
技术上可以——单分区 + 单 Producer + 单 Consumer。但吞吐量受限于单机,无法水平扩展,只适合流量极低的场景(如金融账户变更日志)。
**Q: enable.idempotence 具体怎么保证顺序的?**
Producer 为每个 `<PID, Partition>` 维护递增的 Sequence Number,Broker 端按 SN 排序写入。即使请求乱序到达,Broker 也会按 SN 重排后再落盘,保证日志中消息有序。
**Q: 消费端如何保证 Exactly-Once 语义?**
Kafka 提供 Consumer 端的 exactly-once 需要配合事务:将消费和写入放在同一个 Kafka 事务中,或者使用幂等 + 手动提交 offset + 业务去重表的组合方案。服务端5月27日 22:28
Kafka Consumer Group Rebalance 什么时候触发?四种分配协议有什么区别?## 核心答案
Consumer Group Rebalance 是 Kafka 在组成员或订阅变化时重新分配 Partition 的过程。面试抓住三点:什么触发、怎么分配、如何减影响。
## 什么情况会触发 Rebalance?
三个维度:
- **成员变化**:Consumer 加入/退出/宕机/心跳超时(超过 `session.timeout.ms`)
- **订阅变化**:Consumer 订阅的 Topic 发生变动
- **分区变化**:Topic 分区数增减
追问:Consumer 处理慢会触发吗?会。两次 `poll()` 间隔超过 `max.poll.interval.ms`,Coordinator 认为 Consumer 已死,踢出并触发 Rebalance。
## Rebalance 分几步?
1. **JoinGroup**:所有 Consumer 向 Group Coordinator 发请求,Coordinator 选出 Leader Consumer
2. **SyncGroup**:Leader 制定分区分配方案,发给 Coordinator,再下发给所有 Consumer
3. **开始消费**:Consumer 按新方案消费
只有 Leader Consumer 负责分配,其他 Consumer 被动接收方案。
## 四种分配策略有什么区别?
| 策略 | 分配方式 | 优点 | 缺点 |
|------|----------|------|------|
| Range(默认) | 按 Partition 序号范围连续分配 | 简单 | 不整除时不均匀 |
| RoundRobin | 轮询分配所有 Partition | 多 Topic 时更均匀 | 订阅不一致时可能不均 |
| Sticky | 均匀分配 + 尽量保持上次分配 | 减少 Partition 迁移 | — |
| CooperativeSticky | 增量式,只重分配受影响 Partition | 不停消费,减少 STW | Kafka 2.4+ 支持 |
关键区分:Range 和 RoundRobin 属 Eager 协议——Rebalance 时所有 Consumer 先停消费再重新分配。CooperativeSticky 属 Cooperative 协议,只迁移需要变动的 Partition,其余不受影响。
## 怎么减少 Rebalance 的负面影响?
**1. 调参数避免误判**
```properties
max.poll.interval.ms=600000
session.timeout.ms=30000
heartbeat.interval.ms=10000
```
**2. 用 CooperativeSticky 策略**
避免全量 STW,只重分配受影响分区。
**3. 手动管理 Offset**
```java
enable.auto.commit=false
consumer.commitSync();
```
自动提交在 Rebalance 期间可能丢数据或重复消费,手动提交更可控。
**4. 静态成员(Static Membership)**
设置 `group.instance.id`,Consumer 重启后 Coordinator 仍认为同一成员,不触发 Rebalance。
## 追问方向
- Eager 协议下 Rebalance 期间能消费吗?不能。Cooperative 协议下未受影响分区可以
- 怎么监控?关注 `RebalanceRatePerSec` 和 `RebalanceTotal` 指标
- Kafka 4.0 消费者组协议变化?原生支持增量 Rebalance,不再需要 Eager 回退服务端5月27日 22:23
Kafka 和 RabbitMQ、RocketMQ 怎么选?## 核心区别一图看懂
| 维度 | Kafka | RabbitMQ | RocketMQ |
|------|-------|----------|----------|
| 定位 | 分布式流处理平台 | 传统消息代理 | 分布式消息中间件 |
| 吞吐量 | 百万级 TPS | 万级 TPS | 十万级 TPS |
| 延迟 | 毫秒级 | 微秒级 | 毫秒级 |
| 消息留存 | 按时间/容量保留,消费后不删 | 消费确认后删除 | 可配置保留策略 |
| 消费模型 | Pull 拉取 | Push 推送为主 | Push + Pull 均支持 |
| 路由能力 | 基于 Topic 和分区 | Exchange 多种路由模式 | Topic + Tag 两级过滤 |
| 顺序保证 | 分区内有序 | 队列内有序 | 全局顺序消息 |
| 事务消息 | 支持(0.11+) | 不支持 | 原生支持,最完善 |
| 适用场景 | 日志流、事件流、大数据管道 | 任务队列、微服务通信、复杂路由 | 电商订单、金融交易、事务消息 |
## 为什么 Kafka 吞吐量远超另外两个?
Kafka 的核心设计围绕"顺序写磁盘 + 零拷贝 + 分区并行"三个点。磁盘顺序写速度可达 600MB/s,远超随机写的 100KB/s。零拷贝(sendfile 系统调用)让数据直接从页缓存到网卡,跳过用户态拷贝。分区机制则将负载分散到多个 Broker 并行处理。
RabbitMQ 的优势不在于吞吐,而在于路由灵活性——四种 Exchange 类型(direct、topic、fanout、headers)能实现复杂的消息分发逻辑,延迟也在微秒级别,适合对实时性要求高但吞吐不大的场景。
RocketMQ 在事务消息上做得最完善:半消息 + 本地事务 + 回查机制,保证分布式事务的最终一致性,这是 Kafka 和 RabbitMQ都不具备的。
## 选型怎么决策?
三个问题就能定:
1. **消息量大吗?** 日均亿级以上选 Kafka,百万级以下 RabbitMQ 够用
2. **需要消息回溯吗?** Kafka 天然支持,RabbitMQ 消费完就删
3. **涉及钱吗?** 金融、订单场景选 RocketMQ,事务消息是刚需
很多团队的做法是 Kafka 做事件流 + RabbitMQ 做任务队列,各取所长。
## 面试追问方向
- **Kafka 为什么用 Pull 不用 Push?** Push 模式下消费者处理能力不一,慢消费者会拖垮 Broker;Pull 让消费者按自己节奏消费,还方便回溯和批量拉取
- **RocketMQ 的 NameServer 和 Kafka 的 ZooKeeper 有什么区别?** NameServer 无状态、无主从,部署简单但功能弱于 ZK;Kafka 新版 KRaft 模式已去 ZK 依赖
- **消息积压怎么处理?** Kafka 扩分区 + 增 Consumer;RabbitMQ 临时加消费者队列;RocketMQ 调大消费线程池服务端5月27日 22:10
Kafka 事务消息的核心机制是什么?## Kafka 事务消息的核心机制是什么?
Kafka 事务消息从 0.11 版本引入,解决的是跨分区、跨会话的原子写入问题。它不是 RocketMQ 那种"半消息+本地事务回查"的模式,而是围绕 **Transaction Coordinator + 两阶段提交** 实现的 Exactly-Once 语义。
核心三要素:幂等 Producer(PID + Sequence Number 去重)、Transaction Coordinator(事务协调器)、`__transaction_state` 内部 Topic(持久化事务日志)。
## 事务消息的完整流程是怎样的?
**1. 查找 Coordinator**
Producer 发送 `FindCoordinatorRequest`,根据 `transactional.id` 哈希定位到某个 Broker 作为该事务的 Coordinator。
**2. 初始化 PID**
Producer 向 Coordinator 发送 `InitPidRequest`。Coordinator 分配 PID 和 epoch,并将 `transactional.id → PID` 映射写入 `__transaction_state`。这一步保证新 Producer 启动后,旧的同 ID Producer 被 fence 掉(Zombie Fencing)。
**3. 开始事务**
`beginTransaction()` 是 Producer 本地操作,仅更新内部状态,不与 Broker 交互。
**4. 注册分区**
Producer 第一次向某个 TopicPartition 发消息前,先发 `AddPartitionsToTxnRequest` 给 Coordinator,注册该分区到当前事务。Coordinator 将分区信息写入事务日志。
**5. 发送消息**
Producer 正常发送 `ProduceRequest`,消息携带 PID、epoch、sequence number。Broker 写入日志但消息对 `read_committed` 消费者不可见。
**6. 提交消费偏移(Consume-Transform-Produce 模式)**
Producer 发 `AddOffsetsToTxnRequest` 将消费偏移纳入事务,再发 `TxnOffsetCommitRequest` 写入 `__consumer_offsets`。保证消费位移和生产消息的原子性。
**7. 提交或中止事务**
- **Commit**:Producer 发 `EndTxnRequest(commit)`,Coordinator 先写入 PREPARE_COMMIT 到事务日志,再向各分区 Leader 发 `WriteTxnMarker` 写入 Control Batch(commit marker),最后写入 COMPLETE_COMMIT。
- **Abort**:流程对称,写 abort marker,消费者丢弃对应消息。
## Consumer 如何处理事务消息?
- **`isolation.level=read_committed`**:Broker 端过滤,只返回已 commit 事务的消息。Consumer 还会收到 abort 标记,丢弃回滚消息。
- **`isolation.level=read_uncommitted`**:所有消息立即可见,包括未提交的。性能更好但可能读到脏数据。
## 关键配置有哪些?
Producer 必须配置:
```properties
enable.idempotence=true
transactional.id=my-txn-id
acks=all
```
Consumer 读事务消息需配置:
```properties
isolation.level=read_committed
```
Broker 端关注 `transaction.state.log.replication.factor` 和 `transactional.id.expiration.ms`(默认 7 天过期)。
## 事务消息的典型代码怎么写?
```java
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("transactional.id", "order-txn-1");
props.put("enable.idempotence", "true");
KafkaProducer<String, String> producer = new KafkaProducer<>(props);
producer.initTransactions();
try {
producer.beginTransaction();
producer.send(new ProducerRecord<>("orders", "key1", "value1"));
producer.send(new ProducerRecord<>("inventory", "key2", "value2"));
producer.commitTransaction();
} catch (ProducerFencedException e) {
producer.close(); // 被 fence,不可恢复
} catch (KafkaException e) {
producer.abortTransaction();
}
```
## 面试追问:事务消息有什么局限?
- Consumer 侧保证弱:`read_committed` 只能过滤未提交消息,无法阻止 Consumer 重复处理已提交消息,消费端幂等需自行保证。
- 事务超时问题:`transaction.timeout.ms` 默认 60s,长事务易超时被 Coordinator 主动 abort。
- 跨系统不原子:Kafka 事务只覆盖 Kafka 内部的生产和消费位移提交,与外部数据库的联动需自行实现补偿机制,无法做到真正的跨系统两阶段提交。服务端5月27日 22:10
Kafka 消息丢失的原因有哪些?怎么解决?## 答案概览
Kafka 消息丢失发生在三个环节:**Producer 发送端**、**Broker 存储端**、**Consumer 消费端**。核心对策:Producer 配 `acks=all` + 重试,Broker 配多副本 + 禁脏选举,Consumer 关自动提交改手动确认。
## Producer 端:消息发出去就丢了?
**丢失原因:**
- `acks=0` 或 `acks=1`,Leader 写入成功就返回,Follower 还没同步 Leader 就挂了
- 异步发送不带回调,发送失败无感知
- `retries` 未配置,网络抖动直接丢消息
**解决方案:**
```properties
acks=all
retries=3
enable.idempotence=true
max.in.flight.requests.per.connection=5
```
`acks=all` 要求所有 ISR 副本确认写入才算成功。`enable.idempotence=true` 开启幂等生产者,防止重试导致消息重复。注意:开启幂等性时 `max.in.flight.requests.per.connection` 需小于等于 5,否则幂等性失效。
用 `producer.send(record, callback)` 代替 `producer.send(record)`,在回调里处理失败逻辑。
## Broker 端:写进去了但读不到了?
**丢失原因:**
- 副本数只有 1,Broker 宕机直接丢数据
- Leader 崩溃后,未同步完的 Follower 被选为新 Leader(脏选举),未同步消息丢失
- 异步刷盘,数据还在 PageCache 没落盘就宕机
**解决方案:**
```properties
default.replication.factor=3
min.insync.replicas=2
unclean.leader.election.enable=false
```
`replication.factor=3` 保证三副本冗余。`min.insync.replicas=2` 要求至少 2 个副本在 ISR 中,否则 Producer 写入报错——宁可不可用也不丢数据。`unclean.leader.election.enable=false` 禁止落后副本参与选举,这是防丢的关键开关。
## Consumer 端:消费了但白消费了?
**丢失原因:**
- `enable.auto.commit=true`,消息拉取后自动提交 offset,但业务还没处理完就挂了,重启后这条消息不会再投递
- 多线程消费时,处理慢的线程还没完成,offset 已被其他线程推进
**解决方案:**
```properties
enable.auto.commit=false
```
关闭自动提交,业务处理完成后手动调用 `consumer.commitSync()`。多线程场景下,每个线程维护自己的 offset,处理完再提交。
消费者还需实现幂等性:同一条消息可能被重复投递(rebalance 后),用唯一标识去重。
## 追问:acks=all 就一定不丢消息吗?
不一定。如果 ISR 中只剩 Leader 自己,`acks=all` 退化为 `acks=1`。所以 `min.insync.replicas=2` 必须配合使用——当 ISR 不足时拒绝写入,用可用性换可靠性。
## 配置速查
| 环节 | 关键配置 | 推荐值 |
|------|---------|--------|
| Producer | acks | all |
| Producer | retries | ≥3 |
| Producer | enable.idempotence | true |
| Broker | replication.factor | 3 |
| Broker | min.insync.replicas | 2 |
| Broker | unclean.leader.election.enable | false |
| Consumer | enable.auto.commit | false |服务端5月27日 22:10
Kafka 支持哪些压缩算法?生产环境怎么选?## Kafka 支持哪些压缩算法
Kafka 支持 Gzip、Snappy、LZ4、Zstd 四种压缩算法,以及不压缩(none)。压缩在生产者端以 batch 为单位执行,Broker 原样存储和转发,Consumer 端解压。理解各算法的取舍是选型的关键。
## 四种算法核心差异
**Gzip**:压缩率最高(文本数据可达 70-80%),但 CPU 开销大、速度慢。适合带宽极度受限、对延迟不敏感的场景。
**Snappy**:Google 出品,速度与压缩率较均衡,是 Kafka 早期版本的默认推荐。适合追求稳定、不想过度调优的常规业务。
**LZ4**:压缩和解压速度最快,CPU 消耗极低,但压缩率一般。适合高吞吐、低延迟的实时流处理场景。
**Zstd**:Facebook 开源,压缩率接近 Gzip,速度接近 Snappy,Kafka 2.1.0 起支持。是当前综合表现最优的选择。
快速对比:
| 算法 | 压缩率 | 压缩速度 | CPU 消耗 | Kafka 最低版本 |
|------|--------|----------|----------|---------------|
| Gzip | 最高 | 慢 | 高 | 0.8.0 |
| Snappy | 中等 | 快 | 低 | 0.8.0 |
| LZ4 | 较低 | 最快 | 极低 | 0.8.2 |
| Zstd | 高 | 较快 | 中等 | 2.1.0 |
## 生产环境怎么选
**首选 Zstd**:如果你的 Kafka 版本 >= 2.1.0,Zstd 几乎是最优解——压缩率比 Snappy 高 20-30%,速度远快于 Gzip,CPU 开销可控。
**高吞吐场景选 LZ4**:实时计算、日志采集等对延迟敏感的场景,LZ4 的极低 CPU 开销和最快解压速度更有优势。
**老旧集群选 Snappy**:无法升级 Kafka 版本时,Snappy 仍然是可靠的兜底方案。
**Gzip 适合归档**:只有"带宽贵过 CPU、数据量极大、延迟无所谓"的场景才考虑 Gzip,比如离线数据同步到冷存储。
## 配置要点
```properties
# Producer 端配置
compression.type=zstd
batch.size=32768
linger.ms=10
```
`batch.size` 和 `linger.ms` 直接影响压缩效果——batch 越大,同一批消息的重复模式越多,压缩率越高。但 batch 过大也会增加延迟,需要权衡。
注意 `compression.type=producer` 是 Broker 端的默认值,表示"由 Producer 决定压缩方式",Broker 不会主动解压或重新压缩。
## 常见追问
**压缩对消息顺序有影响吗?** 没有。压缩以 batch 为单位,batch 内消息顺序不变,batch 之间也保持顺序。
**Broker 端会解压吗?** 一般不会。Broker 收到压缩 batch 后直接落盘和转发。只有当 Broker 端配置了不同的 `compression.type` 时,才会解压再重压缩——这会浪费 CPU,应避免。
**Consumer 端需要配置压缩算法吗?** 不需要。Kafka 在消息头中记录了压缩算法,Consumer 自动识别并解压。
选压缩算法本质上是在 CPU、带宽、存储三个资源之间做权衡。先明确瓶颈在哪,再对号入座,而不是盲目追求压缩率。服务端5月27日 21:50
Kafka 的核心概念和主要特性是什么?## 答案
Kafka 是一个分布式流处理平台,核心概念包括:
- **Producer/Consumer**:消息的生产者和消费者,采用 Pull 模式消费
- **Broker**:Kafka 服务节点,负责存储和转发消息
- **Topic/Partition**:Topic 是消息分类单位,Partition 是 Topic 的物理分区,分布在不同 Broker 上实现并行处理
- **Consumer Group**:消费者组,同组内各消费者分摊 Partition 消费,实现负载均衡
- **Replica**:副本,分为 Leader 和 Follower,保证数据可靠性
主要特性:高吞吐(百万级 TPS)、低延迟(毫秒级)、可扩展(水平扩容 Broker)、持久化(磁盘顺序写 + 页缓存)、容错(副本机制 + ISR 同步)。
## 追问一:Kafka 如何保证消息不丢失?
三层保障:
1. **Producer 端**:通过 `acks` 参数控制——`acks=0` 不等确认,`acks=1` 仅 Leader 确认,`acks=-1(all)` 等 ISR 全部确认才返回成功
2. **Broker 端**:副本机制 + ISR(In-Sync Replicas),只有 ISR 中的副本全部写入后才认为消息提交成功
3. **Consumer 端**:手动提交 offset,处理完业务逻辑后再 commit,避免消费失败导致消息丢失
## 追问二:什么是 ISR?和 AR、LEO、HW 的关系?
- **AR(Assigned Replicas)**:Topic 创建时分配的所有副本集合
- **ISR(In-Sync Replicas)**:与 Leader 保持同步的副本子集,由 `replica.lag.time.max.ms` 控制剔除
- **LEO(Log End Offset)**:每个副本的日志末端位移(下一条消息的 offset)
- **HW(High Watermark)**:所有 ISR 副本 LEO 的最小值,Consumer 只能消费到 HW 之前的消息
Leader 处理写入:先更新自身 LEO,等 ISR 全部同步后推进 HW,Follower 持续从 Leader 拉取数据追赶 LEO。
## 追问三:Kafka 如何保证顺序消费?
Partition 级别保证有序——消息在同一 Partition 内按写入顺序追加,Consumer 按 offset 顺序消费。跨 Partition 不保证全局有序。需要全局有序时,只使用一个 Partition(牺牲吞吐)。
## 追问四:零拷贝原理?
Kafka 利用 Linux 的 `sendfile` 系统调用实现零拷贝:数据从磁盘读取到页缓存后,直接通过 DMA 传输到网卡缓冲区,跳过用户态拷贝,大幅降低 CPU 开销和延迟。
## 追问五:消费者 Rebalance 什么时候触发?
- Consumer Group 中新增或移除消费者
- 订阅的 Topic 分区数变化(如扩容 Partition)
- Consumer 心跳超时(`session.timeout.ms`)被判定离线
- Consumer 主动取消订阅
Rebalance 期间所有消费者暂停消费(Stop The World),频繁 Rebalance 是常见性能问题。