《深入理解Kafka 核心设计与实践原理》
Kafka是一款高性能的消息中间件,包括Producer,Consumer,Broker,以及Zookeeper,Zookeeper用来负责集群元数据管理,控制器的选举等操作,Producer将消息发送到Broker,由Broker负责将收到的消息存储到磁盘中,Consumer负责从Broker订阅并消费消息。Kafka中的消息是以主题为单位,主题可以分布在不同的分区,分区可以分布于不同的Broker,分区有Leader 与副本follower,follower负责从leader同步数据,leader负责读写请求
从4.0.0开始彻底去掉了Zookeeper,转为使用 KRaft 模式
| 特性 | Zookeeper 模式 | KRaft 模式 |
|---|---|---|
| 元数据存储 | 存储在 Zookeeper 中(内存 + 磁盘) | 存储在 Controller 节点的本地磁盘(Raft 日志) |
| 控制器选举 | 由 Zookeeper 协调选举 | 基于 Raft 协议自主选举(Leader 节点即控制器) |
| 元数据更新效率 | 需通过网络请求 Zookeeper,延迟较高 | 本地元数据直接更新,延迟低(毫秒级) |
| 集群扩展性 | Zookeeper 集群扩容复杂,易成瓶颈 | 支持动态添加 Controller 节点,扩展性更好 |
| 启动速度 | 需等待 Zookeeper 和 Kafka 双重初始化 | 仅需启动 Kafka 节点,启动更快 |
| 数据一致性保障 | 依赖 Zookeeper 的 Paxos 协议 | 基于 Raft 协议,强一致性更易理解和维护 |
| 安全性 | 需分别配置 Kafka 和 Zookeeper 的安全策略 | 统一的安全配置(如 SSL、SASL) |
主要是防止消息重复消费,通过业务方去保证消息的幂等性,每条消息设置一个唯一Id,数据库的话通过唯一索引。
kafka可以有多个Borker,一个topic会将数据存储在不同的partiton上, 并且有多个副本来同步数据,但只会有一个leader,数据以Log的形式存储在硬盘中,并且记录了消费的offset。如果leader挂掉,会从ISR集合中的副本选出一个做为leader。
在启动Kafka集群之前,我们需要配置好log.dirs参数,其值是kafka数据的存放目录,这个参数可以配置多个目录,目录之间使用逗号分隔,通常这些目录是分布在不同的磁盘上用于提高读写性能。
也可以配置log.dir参数,含义一样,只需要设置其中一个即可。如果log.dirs参数只配置了一个目录,那么分配到各个Broker上的分区肯定只能在这个目录下创建文件夹用于存储数据。
但是如果log.dirs参数配置了多个目录,Kafka会在含有分区目录最少的文件夹中创建新的分区目录,分区目录名为Topic名 + 分区ID。注意,是分区文件夹总数最少的目录,而不是磁盘使用量最少的目录
ack 有三个值 0 1 -1
0 : 生产者不会等待borker返回,延迟最低但是存储的保证最弱当server挂掉的时候就会丢数据
1 : 等待leader确认消息返回,但如果Leader挂掉后不保证是否复制完成
-1: 等待所有的副本确认消息返回
kafka保证消息可靠性,可以通过如下几个配置:
- 生产者配置 acks = all (-1) ISR中的所有副本都写入成功,才代表该消息发送成功
- min.insync.replicas默认为1,指定了ISR集合中最小的副本数,不满足条件就会抛出NotEnoughReplicasException 或 NotEnoughReplicasAfterAppendException,也就是必须保证有一个副本同步数据跟得上leader
- unclean.leader.election.enable 默认为false, 当leader下线的时候可以从非ISR集合中选举新的leader,这样能提高可用性,但会丢数据
- 配置log.flush.interval.messages 和 log.flush.interval.ms 同步刷盘策略
- 消费端手动提交位移,enable.auto.commit 设置为false,自动提交会带来重复消费和消息丢失的问题,客户端如果消息消费失败应该放入死信队列,以便后期排除故障
- 回溯消费功能,消息貌似已经成功消费,但实际消息失败了,不知道是什么错误造成的,就可以通过回溯消费补偿,或者复现 “丢失”,seek()方法
如果消息积压了太多,一直消费不了,需要检查是不是consumer有问题,或者服务端磁盘是否快满了。
- consumer有问题就修复问题。但由于积压的数据太多,用原程序消费还是太慢。就需要扩容,新建临时topic,将分区改为原来的10倍,写程序将原来积压的消息发送到新建的topic中,启动10倍的机器来消费这些数据
- 服务端磁盘满,就只能扩容服务端磁盘,再采用第一种办法来修复问题
消息要顺序消费的场景,比如发送了一个用户新增的消息,随后用户修改了发送了一个修改的消息,最后又删除了发送了一个删除的消息,由于Kafka的多分区,多消费者,消费端势必会变成无序消费,但消费端业务需要顺序处理,如果先消费了删除消息,根本没数据,随后又消费了新增消息,最后消息没有删除,变成了脏数据。
解决方法是:
-
生产者发送消息的时候,根据用户id指定分区key,指定后kafka会将消息发送到指定的分区中,这样保证了分区中消息的顺序。消费端,可以使用单线程从指定分区中消费,如果要保证性能,消费端定义多个内存队列,将相同用户id的消息发送到同一个内存队列中,然后开启多线程从来消费多个内存队列,一个线程处理一个内存队列
-
让消费者只消费一个指定的分区,速度会变慢
消费者客户端参数partition.assignment.strategy 来配置消费分区策略
- RangeAssignor 默认分配策略 通过 分区数/消费者总数 来获得一个跨度进行分配
- RoundRobinAssignor 轮询分配策略
- StickyAssignor 能够使分区的分配尽可能与上一次保持一致,避免过度重分配
- 自定义分配,实现PartitionAssignor接口
- 安装zk集群,修改各个节点的kafka配置文件server.properties(broker.id、listeners、zookeeper.connect)
- 启动zk、启动kafka
k8s 上创建:K8s - 安装部署Kafka、Zookeeper集群教程(支持从K8s外部访问) - 蜂蜜log - 博客园 (cnblogs.com)
- AR(Assigned Repllicas)一个partition的所有副本(即使replica,不区分leader或follower)
- ISR(In-Sync Replicas)能够和leader保持同步的follower+leader本身组成的集合
- OSR(Out-Sync Relipcas)不能和leader 保持同步的follower集合
NameServer: 轻量级服务发现中心,管理Broker的地址路由信息;无状态,支持快速扩容
Broker: 消息存储和转发节点,负责接收生产者消息、持久化存储、投递给消费者;主从架构,支持同步/异步复制
Producer: 消息生产者,通过NameServer找到目标Broker发送消息,支持同步、异步、单向发送模式
Consumer: 消息消费者,从Broker拉取消息,支持集群消费和广播消费
- 点对点(Queue模型):
消息通过队列存储,同一消费者组内竞争消费(每条消息仅被一个消费者处理)
-
发布/订阅(Pub-Sub):
-
延迟消息
-
顺序消息
通过MessageQueueSelector 保证同一业务键(如订单ID)的消息发送到同一队列,消费者按队列顺序消费
-
生产者端:同步发送+重试机制;事务消息
-
Broker端:消息持久化,同步刷盘或异步刷盘;同步复制,保证Slave写入成功后才返回ACK
-
消费者端:手动提交消费偏移量;消费失败重试
生产者:通过MessageQueueSelector 将同一业务键(如订单ID)的消息发送到同一队列
Broker:单个队列内的消息天然有序
消费者:单线程消费队列(或锁保证并发安全),并且关闭异步提交消费偏移量
- 扩容消费者:增加消费者实例数(不超过队列数),提升并行消费能力
- 调整消费逻辑:优化消费代码(如批量处理,异步)
- 跳过非关键消息:在业务允许时,重置消费偏移量到最新位置
RokectMQ支持在分布式场景下保障消息生产和本地事务的最终一致性。
- 第一阶段(发送半消息):
- 生产者发送“半消息”(对消费者不可见)到 Broker。
- Broker 返回 ACK 确认半消息持久化成功。
- 第二阶段(执行本地事务):
- 生产者执行本地事务(如数据库操作),生成事务状态(提交/回滚)。
- Broker 回调检查:
- 若生产者未响应,Broker 定期回调查询本地事务状态。
- 最终提交/回滚:
- 根据事务状态提交(投递消息)或回滚(丢弃消息)。
RabbitMQ 是一个开源的消息代理软件,核心有Producer(生产者)、Consumer(消费者)、Queue(队列)、Exchange(交换机)、Binding(绑定)、Message(消息),交换机类型有:Fanout(广播到所有绑定的队列)、Direct(精确匹配路由键)、Topic(基于通配符的路由)、Headers(通过消息头属性匹配),另外有死信队列(DLX)可以处理失败或超时的消息,用来实现延时消息