标签

Kafka

Apache Kafka 是一个开源的流处理平台,由 LinkedIn 开发,并于 2011 年贡献给 Apache 软件基金会。它主要用于构建实时的数据管道和流应用程序。Kafka 能够以高吞吐量、可扩展性和容错性的方式处理数据流。

Kafka
服务端5月31日 16:34
Kafka 为什么曾经依赖 ZooKeeper?它到底负责什么?在早期和大量存量 Kafka 集群里,ZooKeeper 负责保存和协调集群元数据:Broker 注册、Controller 选举、Topic 与分区信息、部分配置和权限数据。它不是 Kafka 的消息存储层,真正的消息仍然写在 Broker 本地日志里。ZooKeeper 更像一个一致性协调中心,帮助 Kafka 判断“谁还活着、谁来当 Controller、元数据现在是什么状态”。 ## ZooKeeper 管哪些事 Broker 启动后会连接 ZooKeeper,并在 `/brokers/ids` 下创建临时节点。临时节点和会话绑定,如果 Broker 宕机或网络断开超过会话超时时间,节点会消失,其他组件就能感知 Broker 不可用。这个机制让 Kafka 能快速发现集群成员变化。 Controller 选举也依赖 ZooKeeper。多个 Broker 会竞争创建 `/controller` 节点,创建成功者成为 Controller。Controller 负责分区 leader 选举、副本状态变化、Topic 创建删除等集群级动作。ZooKeeper 在这里提供的是互斥和通知能力,避免多个 Broker 同时认为自己是控制者。 ```text /brokers/ids/1 # Broker 注册信息 /controller # 当前 Controller /brokers/topics/orders # Topic 分区与副本元数据 /config/topics/orders # Topic 级配置 /kafka-acl/Topic/orders # ACL 信息 ``` 这些路径有助于理解 ZooKeeper 的定位:它保存的是元数据和状态,不负责传输业务消息。 ## Consumer offset 也在 ZooKeeper 吗 这要看 Kafka 版本和客户端。早期 Consumer 会把 offset 写到 ZooKeeper 的 `/consumers` 路径下,但新版本 Kafka 默认把 offset 存到内部 topic `__consumer_offsets`。所以面试或排查时不能简单说“ZooKeeper 管 Consumer offset”,更准确的说法是:旧消费者协议依赖 ZooKeeper,新消费者组协调和 offset 存储主要由 Kafka 自己完成。 这个细节很容易踩坑。运维老集群时,如果还有旧客户端,ZooKeeper 里可能仍能看到 consumer 路径;新集群则更多关注 Group Coordinator 和内部 topic。判断时看客户端版本和 `kafka-consumer-groups.sh` 查询结果,比背路径更可靠。 ## ZooKeeper 配置要注意什么 Kafka 侧至少要配置连接串和超时。生产环境不要只放一个 ZooKeeper 节点,通常使用 3 或 5 个节点组成 quorum。节点数用奇数,是为了在故障时仍能形成多数派。 ```properties # server.properties zookeeper.connect=zk1:2181,zk2:2181,zk3:2181/kafka-prod zookeeper.connection.timeout.ms=6000 zookeeper.session.timeout.ms=18000 ``` ```properties # zoo.cfg tickTime=2000 initLimit=10 syncLimit=5 dataDir=/var/lib/zookeeper clientPort=2181 server.1=zk1:2888:3888 server.2=zk2:2888:3888 server.3=zk3:2888:3888 ``` `zookeeper.session.timeout.ms` 不是越短越好。太短会把短暂网络抖动误判成 Broker 下线,引发 Controller 处理分区变化;太长又会让真实故障发现变慢。生产里要结合网络质量、GC 暂停和业务可用性要求调整。 ## KRaft 出现后 ZooKeeper 还重要吗 Kafka 新版本已经引入 KRaft,用 Kafka 自己的 Raft 元数据 quorum 替代 ZooKeeper。新建集群可以优先评估 KRaft,部署和运维链路更简单。但很多公司仍有 ZooKeeper 模式的存量集群,迁移涉及版本、工具链、监控、备份和回滚方案,不是改一行配置就结束。 因此 ZooKeeper 仍然是高价值知识点。理解它能帮助你排查 Controller 抖动、Broker 频繁上下线、Topic 元数据异常和老集群权限问题。 ## 追问 ### ZooKeeper 会保存 Kafka 的消息数据吗? 不会,Kafka 消息写在 Broker 的日志目录里,ZooKeeper 保存的是元数据和协调状态。把 ZooKeeper 当消息存储是常见误解,也会导致错误的备份方案。备份 ZooKeeper 只能保护部分元数据,不能恢复业务消息。真正的数据可靠性要看 topic 副本数、ISR、acks 和 Broker 磁盘。 ### ZooKeeper 挂了 Kafka 会立刻不可用吗? 如果 ZooKeeper 集群只是少数节点故障,多数派还在,Kafka 通常可以继续工作。如果 ZooKeeper quorum 整体不可用,已有 leader 分区可能还能短时间处理读写,但创建 Topic、Controller 选举、Broker 变更等元数据操作会受影响。边界在于“已有数据面”和“控制面变更”不是一回事。生产上不能因为消息还能写就忽视 ZooKeeper 故障。 ### 为什么 ZooKeeper 要部署奇数个节点? ZooKeeper 依赖多数派确认,3 个节点允许坏 1 个,5 个节点允许坏 2 个。偶数节点不会提升多数派容错能力,反而增加成本和通信开销。比如 4 个节点仍然需要 3 个形成多数,容错能力和 3 节点一样。常见取舍是中小集群用 3 个,跨机房或更高可用要求再评估 5 个。 ### Kafka 迁到 KRaft 后就不用理解 ZooKeeper 了吗? 新建 KRaft 集群确实不再依赖 ZooKeeper,但存量系统、面试和故障复盘里仍经常遇到 ZooKeeper 模式。迁移还要考虑 Kafka 版本、元数据迁移、监控指标变化和回滚路径。只知道 KRaft 的配置,不理解旧模式,很难解释老集群 Controller 抖动或 Broker 注册异常。更稳的做法是同时理解两套元数据管理方式。 ### ZooKeeper 相关故障通常怎么排查? 先看 Kafka Broker 日志里是否有 session expired、controller moved、zookeeper disconnected 等关键字。再检查 ZooKeeper 四字命令或监控指标,例如连接数、延迟、leader/follower 状态和磁盘 fsync 时间。很多问题不是 ZooKeeper 进程挂了,而是网络抖动、磁盘慢或 Broker 长 GC 导致会话过期。排查时要把 Kafka 日志、ZooKeeper 日志和节点资源放在同一时间线看。
服务端5月28日 08:30
Kafka 出现消息重复消费怎么解决?## 面试官为什么爱问这个问题 Kafka 默认提供的是 at-least-once 语义,消息至少被消费一次,但可能重复。在金融支付、订单处理等场景下,重复消费意味着重复扣款、重复发货,后果严重。面试官问这道题,考察的是你对 Kafka 消费语义的理解深度,以及在实际业务中如何保证 exactly-once。 ## 重复消费是怎么产生的 根本原因只有一个:**Consumer 消费了消息,但 Offset 没有成功提交**。下次重启或 Rebalance 后,Kafka 认为这条消息没消费过,重新投递。 常见触发场景: 1. **Rebalance 导致 Offset 丢失**:Consumer 处理消息耗时超过 `max.poll.interval.ms`(默认 5 分钟),Kafka 认为该 Consumer 已死,触发 Rebalance,未提交的 Offset 对应的消息会被重新分配给其他 Consumer 消费 2. **Consumer 异常宕机**:启用了自动提交(`enable.auto.commit=true`,默认开启),但提交间隔内宕机,最近一次提交之后消费的消息都会被重复消费 3. **手动提交失败**:关闭自动提交后调用 `commitSync()` 或 `commitAsync()`,提交请求因网络问题失败,消息被重复消费 4. **Producer 重试导致重复写入**:Producer 发送消息后未收到 ACK,触发重试,实际上第一次已经写入成功,造成 Broker 端消息重复 ## 解决方案:从 Kafka 机制到业务层,逐层防御 ### 第一层:Producer 幂等性——防止重复写入 Kafka 0.11 引入幂等 Producer,原理是为每个 Producer 分配一个 PID(Producer ID),为每条消息分配递增的 SequenceNumber。Broker 端维护每个 `<PID, Partition>` 的最新 SequenceNumber,收到消息时比对:如果新消息的 SequenceNumber ≤ 已记录的值,判定为重复写入,直接丢弃。 ```properties # 开启幂等性(Kafka 3.0 后默认开启) enable.idempotence=true # 等价于同时设置: # acks=all # retries=Integer.MAX_VALUE # max.in.flight.requests.per.connection<=5 ``` **注意限制**:幂等性只保证单 Partition 内的去重,跨 Partition 或 Producer 重启(PID 变化)后无法去重。 ### 第二层:Kafka 事务——跨分区 Exactly-Once 当需要将消费 Offset 提交和消息发送放在同一个事务中时(典型的 consume-transform-produce 模式),需要 Kafka 事务支持: ```java // 事务型 Producer 配置 props.put("transactional.id", "order-tx-001"); producer.initTransactions(); try { producer.beginTransaction(); // 消费-处理-发送,放在同一个事务中 producer.send(new ProducerRecord<>("topic", key, value)); // 将消费 Offset 也提交到事务中 producer.sendOffsetsToTransaction(offsets, consumerGroupId); producer.commitTransaction(); } catch (Exception e) { producer.abortTransaction(); } ``` 事务保证:要么消费 Offset 提交和消息发送同时成功,要么同时回滚,从源头杜绝重复。 ### 第三层:Consumer 端幂等——业务层最后一道防线 无论 Producer 和 Broker 做了多少保障,Consumer 端的幂等性是必须实现的,因为 Rebalance 场景下 Kafka 无法完全避免重复投递。 **方案一:数据库唯一约束** 利用数据库主键或唯一索引天然去重,最可靠: ```sql -- 以消息 ID 作为唯一索引 INSERT INTO orders (order_id, user_id, amount, status) VALUES ('msg-123', 1001, 99.9, 'PAID') ON DUPLICATE KEY UPDATE status = status; -- MySQL 写法,重复则跳过 ``` **方案二:Redis SET 去重** 适合高吞吐场景,利用 Redis Set 的去重特性: ```java // 消费前先判断是否已处理 String dedupeKey = "kafka:processed:" + topic + ":" + partition; Boolean isNew = redisTemplate.opsForSet().add(dedupeKey, messageId); if (Boolean.TRUE.equals(isNew)) { processMessage(message); // 设置过期时间避免 Key 无限膨胀 redisTemplate.expire(dedupeKey, 24, TimeUnit.HOURS); } ``` **方案三:状态机控制** 适用于有明确状态流转的业务,如订单从"待支付"到"已支付",重复消费时状态已变更,业务逻辑自然跳过: ```java // 利用业务状态天然幂等 Order order = orderDao.getById(orderId); if (order.getStatus() == OrderStatus.PAID) { log.info("订单已支付,跳过重复消息: {}", orderId); return; // 已处理,直接跳过 } order.setStatus(OrderStatus.PAID); orderDao.update(order); ``` ### 第四层:Offset 提交策略优化 ```properties # 关闭自动提交,掌控提交时机 enable.auto.commit=false ``` 手动提交的选择: - `commitSync()`:同步提交,阻塞等待 Broker 确认,可靠但吞吐低 - `commitAsync()`:异步提交,不阻塞,但可能丢失提交确认 - **推荐做法**:正常消费用 `commitAsync()` 保证吞吐,Consumer 关闭时用 `commitSync()` 兜底确保最后一次提交成功 ```java try { while (true) { ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100)); for (ConsumerRecord<String, String> record : records) { processMessage(record); } consumer.commitAsync(); // 异步提交,追求吞吐 } } finally { consumer.commitSync(); // 关闭前同步提交,确保不丢 consumer.close(); } ``` ## 面试追问与加分回答 **追问:Kafka 幂等 Producer 的 PID 在重启后会变吗?** 会变。PID 是 Broker 在 Producer 启动时分配的,重启后获得新的 PID,之前的 SequenceNumber 也会重置,所以幂等性无法跨会话去重。跨会话去重需要配合 `transactional.id` 使用事务机制。 **追问:为什么不在 Broker 端做全局去重?** Broker 端全局去重需要维护所有消息的索引,存储和计算开销极大,与 Kafka 追求高吞吐的设计目标冲突。Kafka 选择在 Producer 端做有限去重(单 Partition 内),把跨 Partition、跨会话的去重责任交给业务层。
服务端5月28日 08:28
Kafka 消息积压如何处理?## Kafka 消息积压如何处理? Kafka 消息积压是生产环境最常见的故障之一,也是面试高频考点。核心表现为 Consumer 消费速度跟不上 Producer 生产速度,消息在 Broker 端持续堆积。处理思路是:**先定位原因,再分层治理——短期应急止血,长期架构优化**。 ### 积压原因定位 消息积压不是单一问题,通常由以下几类原因叠加导致: **消费端瓶颈** - 消费逻辑耗时过长,单条消息处理耗时超过生产间隔 - 单线程消费,未能充分利用分区并行度 - 外部依赖(数据库、RPC)响应慢,拖慢整体消费速率 **生产端突增** - 业务高峰(大促、秒杀)导致消息量激增 - Producer 批量发送配置不当,瞬时流量过大 - 上游系统异常重试导致消息重复涌入 **数据倾斜** - 消息 Key 分布不均,部分分区积压严重,其他分区空闲 - 典型场景:用 userId 做 Key 时,大客户的消息集中在少数分区 **系统故障** - Consumer 宕机或频繁 Rebalance - 依赖服务宕机导致消费持续失败重试 - 网络抖动引起消费超时 ### 监控与快速诊断 定位积压的第一步是看 Consumer Lag: ```bash # 查看 Consumer Group 的 Lag 情况 kafka-consumer-groups --bootstrap-server localhost:9092 \ --describe --group my-group # 输出关键字段:CURRENT-OFFSET、LOG-END-OFFSET、LAG # LAG = LOG-END-OFFSET - CURRENT-OFFSET ``` 关键监控指标: | 指标 | 含义 | 告警建议 | |------|------|----------| | Consumer Lag | 积压消息数 | > 10万触发 P2,> 100万触发 P1 | | 消费速率 (msg/s) | 每秒消费消息数 | 持续低于生产速率触发告警 | | Rebalance 频率 | 消费者组重平衡次数 | 5分钟内超过3次需排查 | | 分区 Lag 分布 | 各分区积压差异 | 最大分区 Lag > 平均值3倍需关注数据倾斜 | ### 短期应急方案 #### 1. 紧急扩容:临时 Topic 分流 这是处理百万级以上积压最有效的短期方案,核心思路是将积压数据快速分散到更多分区并行消费: **操作步骤:** ```bash # 第一步:创建临时 Topic,分区数为原来的 N 倍 kafka-topics --bootstrap-server localhost:9092 \ --create --topic my-topic-temp --partitions 50 \ --replication-factor 3 ``` ```java // 第二步:写一个分发 Consumer,消费积压消息并轮询写入临时 Topic // 关键:不做业务处理,只做数据搬运 while (true) { ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(500)); for (ConsumerRecord<String, String> record : records) { // 轮询写入临时 Topic 的各分区,保证均匀分布 ProducerRecord<String, String> pr = new ProducerRecord<>( "my-topic-temp", partitionCounter % 50, record.key(), record.value()); producer.send(pr); partitionCounter++; } consumer.commitSync(); } ``` ```bash # 第三步:部署 N 倍的 Consumer 消费临时 Topic # 第四步:积压消费完毕后,恢复原架构,下线临时 Consumer ``` **注意**:此方案会打破消息分区内的顺序性。如果业务要求顺序消费,需要将相同 Key 的消息路由到同一临时分区。 #### 2. 增加消费者实例 最直接的方式,但受限于分区数: ```bash # 先查看当前分区数 kafka-topics --bootstrap-server localhost:9092 \ --describe --topic my-topic # Consumer 数量不能超过 Partition 数量 # 如果 Consumer 已满,需要先增加分区 kafka-topics --bootstrap-server localhost:9092 \ --alter --topic my-topic --partitions 20 ``` **关键限制**:一个分区同一时刻只能被同一个 Consumer Group 中的一个 Consumer 消费。Consumer 数量 = 分区数时并行度最大,超过则空闲。 #### 3. 消费端快速优化 **批量处理替代逐条处理:** ```java ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100)); List<Record> batch = new ArrayList<>(); for (ConsumerRecord<String, String> record : records) { batch.add(mapRecord(record)); if (batch.size() >= 500) { // 批量写入数据库,而非逐条插入 db.batchInsert(batch); batch.clear(); } } if (!batch.isEmpty()) { db.batchInsert(batch); } ``` **多线程消费(单 Consumer 多 Worker):** ```java ExecutorService executor = Executors.newFixedThreadPool(8); ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100)); for (ConsumerRecord<String, String> record : records) { executor.submit(() -> processMessage(record)); } // 注意:提交 Offset 必须在所有 Worker 处理完成后 ``` **调整消费配置:** ```properties # 增大单次拉取量 max.poll.records=1000 # 延长拉取间隔,给处理留更多时间 max.poll.interval.ms=300000 # 适当缩短会话超时,加快故障感知 session.timeout.ms=25000 heartbeat.interval.ms=8000 ``` ### 处理数据倾斜 数据倾斜是积压的隐蔽原因,表现为少数分区 Lag 远高于其他分区: ```bash # 查看各分区 Lag 分布 kafka-consumer-groups --bootstrap-server localhost:9092 \ --describe --group my-group # 如果 PARTITION-0 Lag=500万,其他分区 Lag=10万,就是数据倾斜 ``` **解决方案:** - **修改分区策略**:将热点 Key 加随机后缀打散,如 `userId_123` → `userId_123_0` ~ `userId_123_9` - **自定义 Partitioner**:在 Producer 端实现更均匀的分区分配逻辑 - **临时方案**:对热点分区单独部署 Consumer 消费 ### 消息过期的特殊处理 如果消息设置了 `retention.ms`,积压期间消息可能被 Broker 清理,导致数据丢失: ```bash # 紧急延长消息保留时间 kafka-configs --bootstrap-server localhost:9092 \ --entity-type topics --entity-name my-topic \ --alter --add-config retention.ms=604800000 # 7天 ``` 如果消息已经被清理,需要从数据源重新回放: ```java // 从业务数据库或备份系统重新生成消息 // 写入一个新的补偿 Topic,单独消费处理 ``` ### 丢弃非关键消息 仅适用于日志采集、指标上报等可容忍丢失的场景: ```java // 方式一:跳到最新 Offset,丢弃积压消息 consumer.seekToEnd(partitions); // 方式二:按时间跳转,只消费最近 N 小时的消息 long timestamp = System.currentTimeMillis() - Duration.ofHours(2).toMillis(); Map<TopicPartition, Long> timestamps = partitions.stream() .collect(Collectors.toMap(tp -> tp, tp -> timestamp)); Map<TopicPartition, OffsetAndTimestamp> offsets = consumer.offsetsForTimes(timestamps); offsets.forEach((tp, offsetAndTimestamp) -> { if (offsetAndTimestamp != null) { consumer.seek(tp, offsetAndTimestamp.offset()); } }); ``` **严禁在订单、支付等业务场景使用,必须先备份再处理。** ### 保证消费顺序性 扩容和并行消费会打破分区内的消息顺序。如果业务要求局部有序: ```java // 方案:按 Key 分队列,每个队列单线程处理 Map<String, BlockingQueue<Record>> keyQueues = new ConcurrentHashMap<>(); ExecutorService[] singleThreadExecutors = new ExecutorService[16]; for (ConsumerRecord<String, String> record : records) { String key = record.key(); int queueIndex = Math.abs(key.hashCode()) % 16; singleThreadExecutors[queueIndex].submit(() -> processMessage(record)); } ``` ### 长期预防机制 **容量规划:** - 根据业务峰值 QPS 评估所需 Consumer 数量,预留 30% 冗余 - 分区数按预估峰值设定,宁多勿少(增加分区不影响已有数据,但减少分区不支持) **监控告警体系:** - Consumer Lag 分级告警:P2(10万)、P1(100万)、P0(500万) - 消费速率持续低于生产速率超5分钟触发预警 - Rebalance 频率异常告警 **限流与降级:** ```java // Producer 端限流,防止突发流量冲垮消费端 RateLimiter rateLimiter = RateLimiter.create(5000); // 5000 msg/s rateLimiter.acquire(); producer.send(record); ``` - 高峰期降级非核心功能的消费逻辑 - 关闭非必要的数据同步消费任务 **应急预案:** - 提前准备临时扩容脚本,5分钟内可完成 Topic 创建和 Consumer 部署 - 定期演练积压场景,验证扩容方案的有效性 - 建立消息备份机制,支持从数据源回放 ### 面试回答模板 **30秒核心回答:** "Kafka 消息积压处理分三步:第一步定位原因——看 Consumer Lag 和分区分布,区分是消费慢、流量突增还是数据倾斜;第二步短期应急——如果积压量大,创建临时 Topic 扩大分区数并行消费,同时优化消费逻辑做批量处理和异步化;第三步长期治理——做好容量规划、监控告警和限流降级,从根本上预防积压。" **追问方向:** - 数据倾斜怎么发现、怎么处理?—— 看各分区 Lag 差异,热点 Key 加随机后缀打散 - 消息过期被清理了怎么办?—— 延长 retention 时间,从数据源回放补偿 - 如何保证消费顺序性?—— 按 Key 分队列单线程处理,或使用 Coordinated Rebalance 减少顺序中断 - Consumer 数量能无限增加吗?—— 不能超过分区数,一个分区只能被一个 Consumer 消费
服务端5月28日 08:26
Kafka 的副本机制是如何工作的?## 副本机制的核心作用 Kafka 的副本机制解决的是分布式环境下的两个根本问题:**数据可靠性和服务可用性**。每个 Partition 可以配置多个副本(Replica),分布在不同 Broker 上。当某个 Broker 宕机,其他副本可以继续提供服务,保证消息不丢失、服务不中断。 副本因子(`replication.factor`)决定了每个 Partition 有多少个副本。生产环境通常设置为 3,意味着每个 Partition 有 3 份相同数据,允许最多 2 个节点故障而不丢数据。 ## Leader 与 Follower 的分工 Kafka 的副本采用 Leader-Follower 模型: - **Leader 副本**:每个 Partition 只有 1 个 Leader,负责处理该 Partition 的所有读写请求。Producer 写消息、Consumer 读消息都直接与 Leader 交互。 - **Follower 副本**:被动地从 Leader 拉取数据并写入本地日志,不对外提供读写服务。Follower 的唯一职责是保持与 Leader 的数据同步,以便在 Leader 故障时接管。 这种设计的优势在于读写都走 Leader,避免了多副本写入的一致性问题,同时也简化了 Consumer 的消费逻辑——不需要关心从哪个副本读取。 ## ISR 机制:同步副本的动态管理 ISR(In-Sync Replicas)是 Kafka 副本机制中最关键的概念之一。它不是静态的副本列表,而是一个由 Leader 动态维护的同步副本集合。 ### ISR 的判定标准 Follower 是否留在 ISR 中,取决于它是否在规定时间内与 Leader 保持同步。判定依据是时间而非消息条数——早期 Kafka 用 `replica.lag.max.messages` 判定(已在 0.9.0 版本移除),现在只用 `replica.lag.time.max.ms`(默认 10 秒)。如果一个 Follower 超过这个时间没有发送拉取请求或虽然发送了但还没追上 Leader 的 LEO,就会被移出 ISR,进入 OSR(Out-of-Sync Replicas)。 ### AR、ISR、OSR 的关系 ``` AR(Assigned Replicas)= ISR + OSR ``` AR 是 Partition 分配的所有副本集合,ISR 是与 Leader 同步的副本,OSR 是落后于 Leader 的副本。理想状态下 ISR = AR,即所有副本都在同步。当 ISR 缩小时,说明集群出现了同步延迟。 ### ISR 与消息可靠性 `min.insync.replicas` 配合 Producer 的 `acks=all` 使用时,能提供强可靠性保证。当 ISR 中的副本数小于 `min.insync.replicas` 时,Broker 会拒绝写入,抛出 `NotEnoughReplicasException`。这是一种宁可不可用也不丢数据的策略。 典型配置:`replication.factor=3` + `min.insync.replicas=2` + `acks=all`,允许 1 个节点故障仍能正常写入。 ## HW 与 LEO:副本同步的位置标记 理解副本同步,必须搞清楚两个核心位移标记: - **LEO(Log End Offset)**:每个副本(包括 Leader 和 Follower)各自维护的日志末端位移,表示下一条待写入消息的位置。每个副本的 LEO 可能不同。 - **HW(High Watermark)**:所有 ISR 副本中最小的 LEO。Consumer 只能消费到 HW 之前的消息,HW 之后的消息对 Consumer 不可见。 ### 同步过程中 HW 和 LEO 的变化 1. Producer 向 Leader 写入消息,Leader 的 LEO 递增 2. Follower 从 Leader 拉取消息,Follower 的 LEO 递增 3. Leader 收到所有 ISR 副本的 LEO 更新后,推进 HW(取所有 ISR 副本 LEO 的最小值) 4. Follower 在下一次拉取时获取 Leader 的 HW,更新自己的 HW 这个过程确保了:**HW 之前的消息已经被所有 ISR 副本确认,是安全可消费的**。 ## Leader 选举:故障恢复的核心流程 当 Leader 所在 Broker 宕机,Controller 会从 ISR 中选出一个新的 Leader。选举过程不是"投票",而是 Controller 直接指定。 ### 选举策略 Kafka 的 Leader 选举策略根据触发场景不同分为四种: | 策略 | 触发场景 | 选举逻辑 | |------|---------|---------| | OfflinePartition | Leader Broker 宕机 | 优先从 ISR 中选第一个存活的副本 | | ReassignPartition | 分区副本重分配 | 从新 AR 中选第一个在线且在 ISR 中的副本 | | PreferredReplica | 自动均衡 | 选 AR 中的第一个副本(如果在线且在 ISR 中) | | ControlledShutdown | Broker 优雅关闭 | 选 ISR 中不在关闭 Broker 上的第一个副本 | ### Unclean 选举:可用性与一致性的权衡 当 ISR 为空(所有副本都不同步)时,是否允许从 OSR 中选举 Leader?这由 `unclean.leader.election.enable` 控制: - **开启(默认 false)**:允许从 OSR 选 Leader,保证可用性,但可能丢数据——因为 OSR 副本的消息落后于原 Leader - **关闭**:分区不可用直到原 Leader 恢复,保证数据一致性 金融场景通常关闭此选项,宁可短暂不可用也不冒数据丢失的风险。 ### Leader Epoch 解决的问题 早期 Kafka 依赖 HW 截断日志来保证副本一致性,但这会导致数据不一致问题。Kafka 0.11 引入了 Leader Epoch 机制:每个 Partition 维护一个单调递增的 Epoch 编号,新 Leader 产生时 Epoch 递增。Follower 用 Leader Epoch 而非 HW 来判断截断位置,避免了 HW 截断导致的数据丢失和分歧。 典型场景:两个 Follower 先后重启,旧 HW 截断可能导致先重启的 Follower 把已提交的消息截掉,而后重启的 Follower 又从前者拉取到不完整数据。Leader Epoch 通过记录每个 Epoch 对应的起始位移,让 Follower 精确知道从哪里截断。 ## 副本同步的完整流程 1. **Producer 发送消息到 Leader**:消息写入 Leader 的本地日志,Leader LEO 递增 2. **Follower 拉取消息**:Follower 主动向 Leader 发送 `FetchRequest`,携带自己的 LEO 3. **Leader 返回消息**:Leader 根据 Follower 的 LEO 返回对应数据,同时返回 Leader 当前的 HW 4. **Follower 写入并更新**:Follower 将消息写入本地日志,更新 LEO,然后更新 HW(取 min(LEO, Leader HW)) 5. **Leader 推进 HW**:Leader 在收到 Follower 的下一次拉取请求时,根据所有 ISR 副本的 LEO 更新 HW 注意:Follower 是主动拉取而非 Leader 推送,这是 Kafka 副本同步与很多其他系统(如 MySQL 主从)的区别。拉取模式让 Follower 自己控制同步节奏,避免被 Leader 压垮。 ## 副本分配与机架感知 创建 Topic 时,Kafka 自动分配副本到不同 Broker。分配算法考虑两个原则: - 同一 Partition 的副本分布在不同 Broker 上 - 开启机架感知(`broker.rack` 配置)后,副本尽量分布在不同机架 机架感知的意义在于:如果整个机架故障(如电源故障),其他机架上的副本仍可用。不配置机架信息时,Kafka 只保证 Broker 级别分布,无法防御机架级故障。 ```properties # Broker 机架配置 broker.rack=rack1 # 副本因子 default.replication.factor=3 # 最小同步副本数 min.insync.replicas=2 ``` ## 关键配置参数一览 | 参数 | 默认值 | 说明 | |------|-------|------| | `replication.factor` | 1 | 副本数,生产环境建议 ≥ 3 | | `min.insync.replicas` | 1 | 最小同步副本数,配合 acks=all 使用 | | `acks` | 1 | Producer 确认级别:0/1/all | | `replica.lag.time.max.ms` | 10000 | Follower 落后超时时间 | | `unclean.leader.election.enable` | false | 是否允许非 ISR 副本当选 Leader | | `auto.leader.rebalance.enable` | true | 是否自动均衡 Leader 到 Preferred 副本 | ## 监控核心指标 排查副本相关问题时,重点关注以下 JMX 指标: - **`UnderReplicatedPartitions`**:ISR 副本数 < AR 副本数的 Partition 数量,大于 0 说明有副本同步滞后 - **`OfflinePartitionsCount`**:没有 Leader 的 Partition 数量,大于 0 说明有分区不可用 - **`IsrShrinksPerSec` / `IsrExpandsPerSec`**:ISR 缩减和扩张速率,频繁变动说明集群不稳定 - **`ActiveControllerCount`**:应该始终为 1,大于 1 说明有脑裂风险 ## 生产环境实践建议 **副本数不是越多越好**。3 副本能满足大多数场景的可靠性要求,增加到 5 或 7 会显著降低写入吞吐(更多副本需要同步)并增加存储成本。只在极少数场景(如金融核心链路)才需要更高副本数。 **务必开启 `min.insync.replicas`**。只配 `acks=all` 不够——如果 ISR 缩减到只剩 Leader,`acks=all` 等于 `acks=1`,此时 Leader 宕机仍会丢数据。`min.insync.replicas=2` 确保至少 2 个副本确认才算写入成功。 **关注 ISR 抖动**。ISR 频繁缩扩通常不是正常波动,往往暗示网络延迟、磁盘 IO 瓶颈或 GC 问题。收到 ISR 缩减告警时,先排查 Follower 所在 Broker 的负载和延迟。 **优雅下线优于故障下线**。使用 `kafka-reassign-partitions` 先迁移 Leader 和副本,再下线 Broker,可以避免不必要的 Leader 选举和数据恢复开销。
服务端5月28日 08:24
Kafka 为什么能够实现高吞吐量?## Kafka 为什么能够实现高吞吐量? Kafka 是目前业界吞吐量最高的消息队列之一,单机每秒可处理数十万条消息。这并非依赖某种银弹技术,而是多个设计决策协同作用的结果。理解这些原理,不仅能帮你在面试中给出有层次的回答,更能指导实际场景中的性能调优。 ## 顺序写:磁盘也能很快 很多人对磁盘的性能认知停留在"慢",但这只对随机读写成立。顺序写磁盘的速度可以达到 600MB/s 以上,甚至超过随机写内存的效率。 Kafka 的做法很直接:所有消息以追加(append)的方式写入日志文件,永远不修改已有数据。Consumer 也按偏移量顺序读取,整个读写路径上几乎没有随机 I/O。 这个设计还带来一个额外好处——操作系统对顺序写有天然优化。数据先进入 Page Cache,由 OS 异步刷盘,Kafka 本身不需要调用 fsync(除非配置了强制刷盘),相当于写内存的速度。 ## 零拷贝:省掉两次不必要的数据搬运 传统的网络数据发送要经历四次拷贝和四次上下文切换: ``` 磁盘 → 内核缓冲区 → 用户缓冲区 → Socket缓冲区 → 网卡 ``` 其中"内核缓冲区 → 用户缓冲区 → Socket缓冲区"这两步是完全可以避免的。Kafka 使用 Linux 的 `sendfile` 系统调用,数据直接从内核缓冲区传输到网卡: ``` 磁盘 → 内核缓冲区 → 网卡 ``` 拷贝次数从 4 次降到 2 次,CPU 上下文切换也从 4 次降到 2 次。在高吞吐场景下,这个差距会被放大到非常显著的程度。 **补充一点**:Kafka 还用了 `mmap`(内存映射文件)来处理 Consumer 的偏移量索引文件,让索引查找避免一次用户态拷贝。`sendfile` 处理数据流,`mmap` 处理索引,两者配合覆盖了 Kafka 主要的数据路径。 ## Page Cache:让 Kafka 不用自己管缓存 很多中间件选择在 JVM 堆内维护缓存,但 JVM 的 GC 是吞吐量杀手——堆越大,GC 暂停越长,对延迟敏感的场景尤其致命。 Kafka 反其道而行:不维护堆内缓存,直接依赖操作系统的 Page Cache。写入时数据进入 Page Cache 就算成功,读取时如果命中缓存就直接返回,都没经过 JVM 堆。 这样做的好处: - **避免 GC 问题**:Kafka 进程的堆可以设得很小(通常 6GB 足够),GC 暂停极短 - **缓存不随进程重启丢失**:进程挂了,Page Cache 还在,重启后数据依然热 - **利用 OS 的 LRU 策略**:操作系统比应用层更清楚哪些页面该淘汰 ## 批量处理:把零散请求打包 网络请求的固定开销很高——一次 TCP 往返的延迟,加上协议解析、线程调度等开销。如果每条消息都单独发送,吞吐量会被网络开销吃掉。 Kafka 在 Producer 端做了两层批量: ```properties # 一批消息的最大字节数 batch.size=16384 # 等待多久再发送(即使没凑满一批) linger.ms=5 ``` `batch.size` 控制批量上限,`linger.ms` 控制等待时间。两者配合,Producer 会攒够一批再发,或者等 5ms 没有新消息也发出去。这种微小的延迟换取的是网络请求次数的大幅减少。 Consumer 端同理,`fetch.min.bytes` 和 `fetch.max.wait.ms` 也是同样的思路——宁可多等一会,也要一次多拉一些数据。 ## 分区并行:水平扩展的基础 单个分区只能被一个 Consumer 消费,这就是单分区的吞吐量上限。Kafka 通过分区实现并行: - Producer 可以同时向不同分区写入,Broker 端不同分区的写入由不同线程处理 - Consumer Group 中,每个分区分配给一个 Consumer 实例,多个实例并行消费 - 分区分布在不同 Broker 上,网络 I/O 和磁盘 I/O 都被分散 分区数决定了并行度的上限。但分区数也不是越多越好——每个分区在 Broker 上有对应的目录和索引文件,分区过多会增加文件句柄、增大 Leader 选举时间、加重 ZooKeeper/KRaft 负担。一般建议单 Broker 分区数不超过 1000-2000。 ## 数据压缩:端到端减少传输量 Kafka 支持在 Producer 端压缩、Broker 端保持原样、Consumer 端解压,即端到端压缩。这意味着压缩的收益不仅体现在网络传输,还体现在磁盘存储上。 常用压缩算法对比: | 算法 | 压缩比 | 压缩速度 | 适用场景 | |------|--------|----------|----------| | Snappy | 中等 | 快 | 通用场景,延迟敏感 | | LZ4 | 中等 | 最快 | 极致低延迟 | | Gzip | 高 | 慢 | 带宽受限,对延迟不敏感 | | Zstd | 较高 | 较快 | Kafka 2.1+ 推荐 | 选择压缩算法本质是 CPU 与带宽的权衡。CPU 有余量、带宽紧张就选高压缩比;延迟敏感就选快压缩。 ## 面试追问:如何进一步提升 Kafka 吞吐量? 在理解原理的基础上,实际调优时可以从几个方向入手: **Producer 侧**:增大 `batch.size` 和 `buffer.memory`,适当调高 `linger.ms`,开启压缩,使用异步发送(`acks=0` 或 `acks=1`,牺牲部分可靠性换吞吐)。 **Broker 侧**:增加分区数提升并行度,将日志目录挂载到不同磁盘实现 I/O 分散,调整 `num.io.threads` 匹配磁盘数量。 **Consumer 侧**:增加 Consumer 实例数(不超过分区数),调大 `fetch.min.bytes` 和 `max.poll.records`,开启自动提交减少偏移量提交开销。 **硬件层面**:SSD 替换 HDD 对顺序写提升有限(因为顺序写 HDD 也不慢),但对随机读和副本同步有明显帮助;增加内存扩大 Page Cache 命中率;万兆网卡消除网络瓶颈。 需要强调的是,高吞吐和强可靠性是矛盾的。`acks=all` + `min.insync.replicas=2` 能保证数据不丢,但吞吐量会比 `acks=0` 低一个量级。生产环境中,金融、订单等关键业务必须优先可靠性,日志采集等场景可以优先吞吐量。
服务端5月28日 08:24
Kafka 如何保证消息的顺序性?## Kafka 在 Partition 级别保证消息顺序,不保证 Topic 级别的全局顺序 Kafka 的顺序性保证是面试高频考点。核心结论:**Kafka 只在同一个 Partition 内保证消息的写入和消费顺序一致,跨 Partition 没有顺序保证。** 如果业务需要严格顺序,必须把相关消息路由到同一个 Partition。 --- ### 一、Partition 内为什么有序 每个 Partition 本质上是一个追加写入的日志(append-only log),每条消息分配一个单调递增的 offset。Consumer 按 offset 顺序拉取,因此分区内天然有序。 ```java // Producer 发送时指定 Key,相同 Key 的消息进入同一 Partition ProducerRecord<String, String> record = new ProducerRecord<>( "order-topic", orderId, // Key — 决定分区路由 orderEvent // Value ); producer.send(record); ``` Kafka 默认分区策略:有 Key 则 `hash(key) % numPartitions`,无 Key 则轮询(round-robin)或粘性分区。 --- ### 二、跨 Partition 为什么无序 一个 Topic 通常有多个 Partition,不同 Partition 的消息并行写入和消费,无法保证先后关系。 示例:订单创建消息进入 Partition-0,支付消息进入 Partition-1,Consumer 可能先读到支付消息。 ``` Partition-0: [订单创建] [发货通知] Partition-1: [支付成功] [签收确认] ↓ 并行消费,顺序不可控 ``` --- ### 三、如何保证业务上的顺序性 #### 方法 1:用相同的 Key 路由到同一 Partition(最常用) ```java // 同一订单的所有事件使用 orderId 作为 Key producer.send(new ProducerRecord<>("order-topic", orderId, event)); ``` 优点:简单、无需额外代码;缺点:同一 Key 的消息无法并行处理,可能成为热点。 #### 方法 2:自定义分区器 ```java public class OrderPartitioner implements Partitioner { @Override public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) { // 按业务规则路由:如按用户ID所在区域分区 String userId = (String) key; int regionCode = getUserIdRegion(userId); return regionCode % cluster.partitionCountForTopic(topic); } } ``` 配置方式: ```properties partitioner.class=com.example.OrderPartitioner ``` #### 方法 3:单分区 Topic ```bash # 创建只有一个 Partition 的 Topic kafka-topics.sh --create --topic strict-order-topic \ --partitions 1 --replication-factor 3 ``` 全局有序但吞吐极低,仅适用于顺序性要求极严且流量小的场景。 --- ### 四、容易踩的坑 #### 1. Producer 端重试导致乱序 Producer 开启重试(`retries > 0`)时,如果 `max.in.flight.requests.per.connection > 1`,第一批消息失败重试后可能排在第二批后面,造成乱序。 ```properties # 严格顺序场景下必须这样配置 retries=2147483647 max.in.flight.requests.per.connection=1 enable.idempotence=true ``` 开启幂等(`enable.idempotence=true`)后,Kafka 2.0+ 可以在 `max.in.flight.requests.per.connection <= 5` 的情况下保证分区内顺序,因为 Broker 端会按序列号去重排序。 #### 2. Consumer Rebalance 导致重复消费 Rebalance 发生时,Consumer 可能重复消费已处理的消息。如果业务处理非幂等,就会出现数据不一致。解决方案:**消费端做好幂等**(数据库唯一键、Redis 去重表等)。 #### 3. 多线程消费打乱顺序 ```java // 错误:多线程处理同一 Partition 的消息会乱序 while (true) { ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100)); for (ConsumerRecord<String, String> record : records) { executor.submit(() -> process(record)); // 顺序无法保证 } } ``` 正确做法:按 Key hash 到同一线程处理,保证同 Key 消息的顺序。 ```java // 按 Key 分配到固定线程 int threadIndex = Math.abs(record.key().hashCode()) % threadCount; executors[threadIndex].submit(() -> process(record)); ``` --- ### 五、性能与顺序的取舍 | 方案 | 顺序性 | 吞吐量 | 适用场景 | |------|--------|--------|----------| | 单 Partition | 全局有序 | 低 | 账户流水、状态机 | | Key 路由同 Partition | Key 维度有序 | 中 | 订单状态、用户事件 | | 多 Partition 无 Key | 无序 | 高 | 日志采集、指标上报 | 大多数业务只需要**同一业务实体维度有序**(如同订单、同用户),通过合理设置 Key 即可兼顾顺序与性能。 --- ### 追问 **Q: Kafka 能否做到全局有序?为什么不用?** 技术上可以——单分区 + 单 Producer + 单 Consumer。但吞吐量受限于单机,无法水平扩展,只适合流量极低的场景(如金融账户变更日志)。 **Q: enable.idempotence 具体怎么保证顺序的?** Producer 为每个 `<PID, Partition>` 维护递增的 Sequence Number,Broker 端按 SN 排序写入。即使请求乱序到达,Broker 也会按 SN 重排后再落盘,保证日志中消息有序。 **Q: 消费端如何保证 Exactly-Once 语义?** Kafka 提供 Consumer 端的 exactly-once 需要配合事务:将消费和写入放在同一个 Kafka 事务中,或者使用幂等 + 手动提交 offset + 业务去重表的组合方案。
服务端5月27日 22:28
Kafka Consumer Group Rebalance 什么时候触发?四种分配协议有什么区别?## 核心答案 Consumer Group Rebalance 是 Kafka 在组成员或订阅变化时重新分配 Partition 的过程。面试抓住三点:什么触发、怎么分配、如何减影响。 ## 什么情况会触发 Rebalance? 三个维度: - **成员变化**:Consumer 加入/退出/宕机/心跳超时(超过 `session.timeout.ms`) - **订阅变化**:Consumer 订阅的 Topic 发生变动 - **分区变化**:Topic 分区数增减 追问:Consumer 处理慢会触发吗?会。两次 `poll()` 间隔超过 `max.poll.interval.ms`,Coordinator 认为 Consumer 已死,踢出并触发 Rebalance。 ## Rebalance 分几步? 1. **JoinGroup**:所有 Consumer 向 Group Coordinator 发请求,Coordinator 选出 Leader Consumer 2. **SyncGroup**:Leader 制定分区分配方案,发给 Coordinator,再下发给所有 Consumer 3. **开始消费**:Consumer 按新方案消费 只有 Leader Consumer 负责分配,其他 Consumer 被动接收方案。 ## 四种分配策略有什么区别? | 策略 | 分配方式 | 优点 | 缺点 | |------|----------|------|------| | Range(默认) | 按 Partition 序号范围连续分配 | 简单 | 不整除时不均匀 | | RoundRobin | 轮询分配所有 Partition | 多 Topic 时更均匀 | 订阅不一致时可能不均 | | Sticky | 均匀分配 + 尽量保持上次分配 | 减少 Partition 迁移 | — | | CooperativeSticky | 增量式,只重分配受影响 Partition | 不停消费,减少 STW | Kafka 2.4+ 支持 | 关键区分:Range 和 RoundRobin 属 Eager 协议——Rebalance 时所有 Consumer 先停消费再重新分配。CooperativeSticky 属 Cooperative 协议,只迁移需要变动的 Partition,其余不受影响。 ## 怎么减少 Rebalance 的负面影响? **1. 调参数避免误判** ```properties max.poll.interval.ms=600000 session.timeout.ms=30000 heartbeat.interval.ms=10000 ``` **2. 用 CooperativeSticky 策略** 避免全量 STW,只重分配受影响分区。 **3. 手动管理 Offset** ```java enable.auto.commit=false consumer.commitSync(); ``` 自动提交在 Rebalance 期间可能丢数据或重复消费,手动提交更可控。 **4. 静态成员(Static Membership)** 设置 `group.instance.id`,Consumer 重启后 Coordinator 仍认为同一成员,不触发 Rebalance。 ## 追问方向 - Eager 协议下 Rebalance 期间能消费吗?不能。Cooperative 协议下未受影响分区可以 - 怎么监控?关注 `RebalanceRatePerSec` 和 `RebalanceTotal` 指标 - Kafka 4.0 消费者组协议变化?原生支持增量 Rebalance,不再需要 Eager 回退
服务端5月27日 22:23
Kafka 和 RabbitMQ、RocketMQ 怎么选?## 核心区别一图看懂 | 维度 | Kafka | RabbitMQ | RocketMQ | |------|-------|----------|----------| | 定位 | 分布式流处理平台 | 传统消息代理 | 分布式消息中间件 | | 吞吐量 | 百万级 TPS | 万级 TPS | 十万级 TPS | | 延迟 | 毫秒级 | 微秒级 | 毫秒级 | | 消息留存 | 按时间/容量保留,消费后不删 | 消费确认后删除 | 可配置保留策略 | | 消费模型 | Pull 拉取 | Push 推送为主 | Push + Pull 均支持 | | 路由能力 | 基于 Topic 和分区 | Exchange 多种路由模式 | Topic + Tag 两级过滤 | | 顺序保证 | 分区内有序 | 队列内有序 | 全局顺序消息 | | 事务消息 | 支持(0.11+) | 不支持 | 原生支持,最完善 | | 适用场景 | 日志流、事件流、大数据管道 | 任务队列、微服务通信、复杂路由 | 电商订单、金融交易、事务消息 | ## 为什么 Kafka 吞吐量远超另外两个? Kafka 的核心设计围绕"顺序写磁盘 + 零拷贝 + 分区并行"三个点。磁盘顺序写速度可达 600MB/s,远超随机写的 100KB/s。零拷贝(sendfile 系统调用)让数据直接从页缓存到网卡,跳过用户态拷贝。分区机制则将负载分散到多个 Broker 并行处理。 RabbitMQ 的优势不在于吞吐,而在于路由灵活性——四种 Exchange 类型(direct、topic、fanout、headers)能实现复杂的消息分发逻辑,延迟也在微秒级别,适合对实时性要求高但吞吐不大的场景。 RocketMQ 在事务消息上做得最完善:半消息 + 本地事务 + 回查机制,保证分布式事务的最终一致性,这是 Kafka 和 RabbitMQ都不具备的。 ## 选型怎么决策? 三个问题就能定: 1. **消息量大吗?** 日均亿级以上选 Kafka,百万级以下 RabbitMQ 够用 2. **需要消息回溯吗?** Kafka 天然支持,RabbitMQ 消费完就删 3. **涉及钱吗?** 金融、订单场景选 RocketMQ,事务消息是刚需 很多团队的做法是 Kafka 做事件流 + RabbitMQ 做任务队列,各取所长。 ## 面试追问方向 - **Kafka 为什么用 Pull 不用 Push?** Push 模式下消费者处理能力不一,慢消费者会拖垮 Broker;Pull 让消费者按自己节奏消费,还方便回溯和批量拉取 - **RocketMQ 的 NameServer 和 Kafka 的 ZooKeeper 有什么区别?** NameServer 无状态、无主从,部署简单但功能弱于 ZK;Kafka 新版 KRaft 模式已去 ZK 依赖 - **消息积压怎么处理?** Kafka 扩分区 + 增 Consumer;RabbitMQ 临时加消费者队列;RocketMQ 调大消费线程池
服务端5月27日 22:10
Kafka 事务消息的核心机制是什么?## Kafka 事务消息的核心机制是什么? Kafka 事务消息从 0.11 版本引入,解决的是跨分区、跨会话的原子写入问题。它不是 RocketMQ 那种"半消息+本地事务回查"的模式,而是围绕 **Transaction Coordinator + 两阶段提交** 实现的 Exactly-Once 语义。 核心三要素:幂等 Producer(PID + Sequence Number 去重)、Transaction Coordinator(事务协调器)、`__transaction_state` 内部 Topic(持久化事务日志)。 ## 事务消息的完整流程是怎样的? **1. 查找 Coordinator** Producer 发送 `FindCoordinatorRequest`,根据 `transactional.id` 哈希定位到某个 Broker 作为该事务的 Coordinator。 **2. 初始化 PID** Producer 向 Coordinator 发送 `InitPidRequest`。Coordinator 分配 PID 和 epoch,并将 `transactional.id → PID` 映射写入 `__transaction_state`。这一步保证新 Producer 启动后,旧的同 ID Producer 被 fence 掉(Zombie Fencing)。 **3. 开始事务** `beginTransaction()` 是 Producer 本地操作,仅更新内部状态,不与 Broker 交互。 **4. 注册分区** Producer 第一次向某个 TopicPartition 发消息前,先发 `AddPartitionsToTxnRequest` 给 Coordinator,注册该分区到当前事务。Coordinator 将分区信息写入事务日志。 **5. 发送消息** Producer 正常发送 `ProduceRequest`,消息携带 PID、epoch、sequence number。Broker 写入日志但消息对 `read_committed` 消费者不可见。 **6. 提交消费偏移(Consume-Transform-Produce 模式)** Producer 发 `AddOffsetsToTxnRequest` 将消费偏移纳入事务,再发 `TxnOffsetCommitRequest` 写入 `__consumer_offsets`。保证消费位移和生产消息的原子性。 **7. 提交或中止事务** - **Commit**:Producer 发 `EndTxnRequest(commit)`,Coordinator 先写入 PREPARE_COMMIT 到事务日志,再向各分区 Leader 发 `WriteTxnMarker` 写入 Control Batch(commit marker),最后写入 COMPLETE_COMMIT。 - **Abort**:流程对称,写 abort marker,消费者丢弃对应消息。 ## Consumer 如何处理事务消息? - **`isolation.level=read_committed`**:Broker 端过滤,只返回已 commit 事务的消息。Consumer 还会收到 abort 标记,丢弃回滚消息。 - **`isolation.level=read_uncommitted`**:所有消息立即可见,包括未提交的。性能更好但可能读到脏数据。 ## 关键配置有哪些? Producer 必须配置: ```properties enable.idempotence=true transactional.id=my-txn-id acks=all ``` Consumer 读事务消息需配置: ```properties isolation.level=read_committed ``` Broker 端关注 `transaction.state.log.replication.factor` 和 `transactional.id.expiration.ms`(默认 7 天过期)。 ## 事务消息的典型代码怎么写? ```java Properties props = new Properties(); props.put("bootstrap.servers", "localhost:9092"); props.put("transactional.id", "order-txn-1"); props.put("enable.idempotence", "true"); KafkaProducer<String, String> producer = new KafkaProducer<>(props); producer.initTransactions(); try { producer.beginTransaction(); producer.send(new ProducerRecord<>("orders", "key1", "value1")); producer.send(new ProducerRecord<>("inventory", "key2", "value2")); producer.commitTransaction(); } catch (ProducerFencedException e) { producer.close(); // 被 fence,不可恢复 } catch (KafkaException e) { producer.abortTransaction(); } ``` ## 面试追问:事务消息有什么局限? - Consumer 侧保证弱:`read_committed` 只能过滤未提交消息,无法阻止 Consumer 重复处理已提交消息,消费端幂等需自行保证。 - 事务超时问题:`transaction.timeout.ms` 默认 60s,长事务易超时被 Coordinator 主动 abort。 - 跨系统不原子:Kafka 事务只覆盖 Kafka 内部的生产和消费位移提交,与外部数据库的联动需自行实现补偿机制,无法做到真正的跨系统两阶段提交。
服务端5月27日 22:10
Kafka 消息丢失的原因有哪些?怎么解决?## 答案概览 Kafka 消息丢失发生在三个环节:**Producer 发送端**、**Broker 存储端**、**Consumer 消费端**。核心对策:Producer 配 `acks=all` + 重试,Broker 配多副本 + 禁脏选举,Consumer 关自动提交改手动确认。 ## Producer 端:消息发出去就丢了? **丢失原因:** - `acks=0` 或 `acks=1`,Leader 写入成功就返回,Follower 还没同步 Leader 就挂了 - 异步发送不带回调,发送失败无感知 - `retries` 未配置,网络抖动直接丢消息 **解决方案:** ```properties acks=all retries=3 enable.idempotence=true max.in.flight.requests.per.connection=5 ``` `acks=all` 要求所有 ISR 副本确认写入才算成功。`enable.idempotence=true` 开启幂等生产者,防止重试导致消息重复。注意:开启幂等性时 `max.in.flight.requests.per.connection` 需小于等于 5,否则幂等性失效。 用 `producer.send(record, callback)` 代替 `producer.send(record)`,在回调里处理失败逻辑。 ## Broker 端:写进去了但读不到了? **丢失原因:** - 副本数只有 1,Broker 宕机直接丢数据 - Leader 崩溃后,未同步完的 Follower 被选为新 Leader(脏选举),未同步消息丢失 - 异步刷盘,数据还在 PageCache 没落盘就宕机 **解决方案:** ```properties default.replication.factor=3 min.insync.replicas=2 unclean.leader.election.enable=false ``` `replication.factor=3` 保证三副本冗余。`min.insync.replicas=2` 要求至少 2 个副本在 ISR 中,否则 Producer 写入报错——宁可不可用也不丢数据。`unclean.leader.election.enable=false` 禁止落后副本参与选举,这是防丢的关键开关。 ## Consumer 端:消费了但白消费了? **丢失原因:** - `enable.auto.commit=true`,消息拉取后自动提交 offset,但业务还没处理完就挂了,重启后这条消息不会再投递 - 多线程消费时,处理慢的线程还没完成,offset 已被其他线程推进 **解决方案:** ```properties enable.auto.commit=false ``` 关闭自动提交,业务处理完成后手动调用 `consumer.commitSync()`。多线程场景下,每个线程维护自己的 offset,处理完再提交。 消费者还需实现幂等性:同一条消息可能被重复投递(rebalance 后),用唯一标识去重。 ## 追问:acks=all 就一定不丢消息吗? 不一定。如果 ISR 中只剩 Leader 自己,`acks=all` 退化为 `acks=1`。所以 `min.insync.replicas=2` 必须配合使用——当 ISR 不足时拒绝写入,用可用性换可靠性。 ## 配置速查 | 环节 | 关键配置 | 推荐值 | |------|---------|--------| | Producer | acks | all | | Producer | retries | ≥3 | | Producer | enable.idempotence | true | | Broker | replication.factor | 3 | | Broker | min.insync.replicas | 2 | | Broker | unclean.leader.election.enable | false | | Consumer | enable.auto.commit | false |
服务端5月27日 22:10
Kafka 支持哪些压缩算法?生产环境怎么选?## Kafka 支持哪些压缩算法 Kafka 支持 Gzip、Snappy、LZ4、Zstd 四种压缩算法,以及不压缩(none)。压缩在生产者端以 batch 为单位执行,Broker 原样存储和转发,Consumer 端解压。理解各算法的取舍是选型的关键。 ## 四种算法核心差异 **Gzip**:压缩率最高(文本数据可达 70-80%),但 CPU 开销大、速度慢。适合带宽极度受限、对延迟不敏感的场景。 **Snappy**:Google 出品,速度与压缩率较均衡,是 Kafka 早期版本的默认推荐。适合追求稳定、不想过度调优的常规业务。 **LZ4**:压缩和解压速度最快,CPU 消耗极低,但压缩率一般。适合高吞吐、低延迟的实时流处理场景。 **Zstd**:Facebook 开源,压缩率接近 Gzip,速度接近 Snappy,Kafka 2.1.0 起支持。是当前综合表现最优的选择。 快速对比: | 算法 | 压缩率 | 压缩速度 | CPU 消耗 | Kafka 最低版本 | |------|--------|----------|----------|---------------| | Gzip | 最高 | 慢 | 高 | 0.8.0 | | Snappy | 中等 | 快 | 低 | 0.8.0 | | LZ4 | 较低 | 最快 | 极低 | 0.8.2 | | Zstd | 高 | 较快 | 中等 | 2.1.0 | ## 生产环境怎么选 **首选 Zstd**:如果你的 Kafka 版本 >= 2.1.0,Zstd 几乎是最优解——压缩率比 Snappy 高 20-30%,速度远快于 Gzip,CPU 开销可控。 **高吞吐场景选 LZ4**:实时计算、日志采集等对延迟敏感的场景,LZ4 的极低 CPU 开销和最快解压速度更有优势。 **老旧集群选 Snappy**:无法升级 Kafka 版本时,Snappy 仍然是可靠的兜底方案。 **Gzip 适合归档**:只有"带宽贵过 CPU、数据量极大、延迟无所谓"的场景才考虑 Gzip,比如离线数据同步到冷存储。 ## 配置要点 ```properties # Producer 端配置 compression.type=zstd batch.size=32768 linger.ms=10 ``` `batch.size` 和 `linger.ms` 直接影响压缩效果——batch 越大,同一批消息的重复模式越多,压缩率越高。但 batch 过大也会增加延迟,需要权衡。 注意 `compression.type=producer` 是 Broker 端的默认值,表示"由 Producer 决定压缩方式",Broker 不会主动解压或重新压缩。 ## 常见追问 **压缩对消息顺序有影响吗?** 没有。压缩以 batch 为单位,batch 内消息顺序不变,batch 之间也保持顺序。 **Broker 端会解压吗?** 一般不会。Broker 收到压缩 batch 后直接落盘和转发。只有当 Broker 端配置了不同的 `compression.type` 时,才会解压再重压缩——这会浪费 CPU,应避免。 **Consumer 端需要配置压缩算法吗?** 不需要。Kafka 在消息头中记录了压缩算法,Consumer 自动识别并解压。 选压缩算法本质上是在 CPU、带宽、存储三个资源之间做权衡。先明确瓶颈在哪,再对号入座,而不是盲目追求压缩率。
服务端5月27日 21:50
Kafka 的核心概念和主要特性是什么?## 答案 Kafka 是一个分布式流处理平台,核心概念包括: - **Producer/Consumer**:消息的生产者和消费者,采用 Pull 模式消费 - **Broker**:Kafka 服务节点,负责存储和转发消息 - **Topic/Partition**:Topic 是消息分类单位,Partition 是 Topic 的物理分区,分布在不同 Broker 上实现并行处理 - **Consumer Group**:消费者组,同组内各消费者分摊 Partition 消费,实现负载均衡 - **Replica**:副本,分为 Leader 和 Follower,保证数据可靠性 主要特性:高吞吐(百万级 TPS)、低延迟(毫秒级)、可扩展(水平扩容 Broker)、持久化(磁盘顺序写 + 页缓存)、容错(副本机制 + ISR 同步)。 ## 追问一:Kafka 如何保证消息不丢失? 三层保障: 1. **Producer 端**:通过 `acks` 参数控制——`acks=0` 不等确认,`acks=1` 仅 Leader 确认,`acks=-1(all)` 等 ISR 全部确认才返回成功 2. **Broker 端**:副本机制 + ISR(In-Sync Replicas),只有 ISR 中的副本全部写入后才认为消息提交成功 3. **Consumer 端**:手动提交 offset,处理完业务逻辑后再 commit,避免消费失败导致消息丢失 ## 追问二:什么是 ISR?和 AR、LEO、HW 的关系? - **AR(Assigned Replicas)**:Topic 创建时分配的所有副本集合 - **ISR(In-Sync Replicas)**:与 Leader 保持同步的副本子集,由 `replica.lag.time.max.ms` 控制剔除 - **LEO(Log End Offset)**:每个副本的日志末端位移(下一条消息的 offset) - **HW(High Watermark)**:所有 ISR 副本 LEO 的最小值,Consumer 只能消费到 HW 之前的消息 Leader 处理写入:先更新自身 LEO,等 ISR 全部同步后推进 HW,Follower 持续从 Leader 拉取数据追赶 LEO。 ## 追问三:Kafka 如何保证顺序消费? Partition 级别保证有序——消息在同一 Partition 内按写入顺序追加,Consumer 按 offset 顺序消费。跨 Partition 不保证全局有序。需要全局有序时,只使用一个 Partition(牺牲吞吐)。 ## 追问四:零拷贝原理? Kafka 利用 Linux 的 `sendfile` 系统调用实现零拷贝:数据从磁盘读取到页缓存后,直接通过 DMA 传输到网卡缓冲区,跳过用户态拷贝,大幅降低 CPU 开销和延迟。 ## 追问五:消费者 Rebalance 什么时候触发? - Consumer Group 中新增或移除消费者 - 订阅的 Topic 分区数变化(如扩容 Partition) - Consumer 心跳超时(`session.timeout.ms`)被判定离线 - Consumer 主动取消订阅 Rebalance 期间所有消费者暂停消费(Stop The World),频繁 Rebalance 是常见性能问题。