消息中间件之Kafka
一、引言
1.起源
Kafka是如何诞生的呢?
Kafka 是由 LinkedIn 公司开发的一种分布式、可扩展、高吞吐量的消息系统。它的设计灵感来自于 LinkedIn 在处理大规模实时数据处理和数据流的挑战中的经验和教训
在过去,LinkedIn 使用了一些传统的消息系统来处理数据流和实时数据处理,但是这些系统的性能和可扩展性无法满足 LinkedIn 处理大规模实时数据流的需求。为了解决这个问题,LinkedIn 在 2008 年开始设计和开发 Kafka
Kafka 的设计目标是支持高吞吐量、低延迟的数据流处理,同时能够处理海量的数据。Kafka 借鉴了消息队列的概念,但与传统的消息系统不同的是,Kafka 的设计重点放在了可扩展性和高吞吐量上。它使用了一些优秀的设计和算法来实现高效的消息传输和存储,比如消息分区和批量传输等
在 Kafka 的发展历程中,LinkedIn 逐步将它开源,并加入了更多的特性和功能,使得 Kafka 成为了一种非常受欢迎的分布式消息系统。现在 Kafka 已经成为了 Apache 基金会的顶级项目,被广泛应用于实时数据处理、流处理、日志处理等场景
2.应用
Kafka发展到如今,已经应用于诸多系统与领域中,那么大体上分类,它有哪些应用场景呢?
主要可以分为3类
- 数据集成
- 大数据领域
- 流计算生成
但是最基本的,是它可以作为消息队列使用
2.1 消息传递 Messaging
消息传递就是发送数据,作为 TCP HTTP 或者 RPC 的替代方案,可以实现异步、 解耦、削峰
RabbitMQ 和 RocketMQ 能做的事情,它也能做
因为 Kafka 的吞吐量更高,在大规模消息系统中更有优势
第二个是大数据领域的使用,比如网站行为分析
2.2 网站行为分析
2.2.1 Website Activity Tracking 网站活动跟踪
把用户活动发布到数据管道中,可以用来做监控、实时处理、报表等等,比如社交网站的行为跟踪,购物网站的行为跟踪,这样可以实现更加精准的内容推荐
例:外卖、物流、电力系统的实时信息
2.2.2 Log Aggregation 日志聚合
又比如用 Kafka 来实现日志聚合。这样就不用把日志记录到本地磁盘或者数据库,实现分布式的日志聚合
2.2.3 Metrics 应用指标监控
还可以用来记录运营监控数据
例,对于贷款公司,需要监控贷款的业务数据:今天放出去多少笔贷款,放出去的总金额,用户的年龄分布、地区分布、性别分布等等
或者对于运维数据的监控,CPU、内存、磁盘、网络连接的使用情况,可以实现告警
2.3 数据集成+流计算
首先是数据集成,指的是把 Kafka 的数据导入 Hadoop、HBase 等离线数据仓库,实现数据分析
其次是流计算,什么是流(Stream)?
它不是静态的数据,而是没有边界的、 源源不断的产生的数据,就像水流一样。流计算指的就是 Stream 对做实时的计算。 Kafka 在 0.10 版本后,内置了流处理框架 API——Kafka Streams
所以,它跟 RabbitMQ 的定位差别还是比较大的,不仅仅是一个简单的消息中间件,而且是一个流处理平台
在 Kafka 里面,消息被称为日志,日志就是消息的数据文件
3.Kafka和ZK的关系
在安装 Kafka 的时候,必须要依赖 ZK 的服务,在生产环境通常是 ZK 的集群,而且 Kafka 还自带了一个 ZK 服务,那么ZK 做了什么事情呢?
总结概括就是,利用 ZK 的有序节点、临时节点和监听机制,ZK 帮 Kafka 做了以下这些事情
配置中心(管理 Broker、Topic、Partition、Consumer 的信息,包括元数据的 变动)、负载均衡、命名服务、分布式通知、集群管理和选举、分布式锁
关于ZK,可以参考之前的文章Zookeeper
二、基础架构
首先来一张架构图,看懂这张图就看懂了Kafka的架构
1.Broker
Broker:Kafka 作为一个中间件,是帮我们存储和转发消息的,它做的事情就像 中介,所以 Kafka 的服务也叫做 Broker,默认是 9092 的端口。生产者和消费者都需 要跟这个 Broker 建立一个连接,才可以实现消息的收发
2.消息
客户端之间传输的数据叫做消息,或者叫做记录(Record 名词 [ˈrekɔːd ])
在客户端的代码中,Record 可以是一个 KV 键值对。 生产者对应的封装类是 ProducerRecord,消费者对应的封装类是 ConsumerRecord
消息在传输的过程中需要序列化,所以代码里面要指定序列化工具
RabbitMQ是将消息序列化成二机制
消息在服务端的存储格式(RecordBatch 和 Record)
3.生产者
发送消息的一方叫做生产者,接收消息的一方叫做消费者
为了提升消息发送速率,生产者不是逐条发送消息给 Broker,而是批量发送的
多少条发送一次由一个参数决定
1 | pros.put("batch.size",16384); |
4.消费者
一般来说消费者获取消息有两种模式,一种是 Pull 模式,一种是 Push 模式。
Pull 模式就是消费放在 Broker,消费者自己决定什么时候去获取。Push 模式是 消息放在 Consumer,只要有消息到达 Broker,都直接推给消费者。
RabbitMQ Consumer 及支持 Push 又支持 Pull,一般用的是 Push
Kafka 只有 Pull 模式
在 Push 模式下,如果消息产生速度远远大于消费者消费消息的速率,那消费者就会不堪重负(你已经吃不下了,但是还要不断地往你嘴里塞),直到挂掉
而且消费者可以自己控制一次到底获取多少条消息
1 | # 默认 500 在 poll 方法里面可以指定 |
5.Topic
生产者跟消费者是怎么关联起来的呢?或者说,生产者发送的消息,怎么才能到达某个特定的消费者?
他们要通过队列关联起来,也就是说
生产者发送消息,要指定发给哪个队列
消费者接收消息,要指定从哪个队列接收
在 Kafka 里面,这个队列叫做 Topic,是一个逻辑的概念,可以理解为一组消息的集合(不同业务用途的消息)
生产者和 Topic 以及 Topic 和消费者的关系都是多对多,一个生产者可以发送消息到多个 Topic,一个消费者也可以从多个 Topic 获取消息(但是不建议这么做)
注意,生产者发送消息时,如果 Topic 不存在,会自动创建
由一个参数auto.create.topics.enable控制
默认为true,但是如果要彻底删掉一个 Topic,这个参数必须改成 false,否则只要有代码使用这个 Topic,它就会自动创建
6.Partition 与 Cluster
如果说一个 Topic 中的消息太多,会带来两个问题:
第一个是不方便横向扩展,比如我想要在集群中把数据分布在不同的机器上实现扩展,而不是通过升级硬件做到,如果一个 Topic 的消息无法在物理上拆分到多台机器的时候,这个是做不到的。
第二个是并发或者负载的问题,所有的客户端操作的都是同一个 Topic,在高并发 的场景下性能会大大下降。
那么,怎么解决这个问题呢?我们想到的就是把一个 Topic 进行拆分—经典分片思想
Kafka 引入了一个分区(Partition)的概念,一个Topic可以划分成多个分区
分区在创建 Topic 的时候指定,每个 Topic 至少有一个分区
Partition 思想上有点类似于分库分表,实现的也是横向扩展和负载的目的
举个例子,Topic 有 3 个分区,生产者依次发送 9 条消息,对消息进行编号
第一个分区存 1 4 7,第二个分区存 2 5 8 ,第三个分区存 3 6 9,这个就实现了负载
每个 partition 都有一个物理目录
在配置的数据目录下(日志就是数据)::/tmp/kafka-logs/
跟 RabbitMQ 不一样的地方是,Partition 里面的消息被读取之后不会被删除,所以同一批消息在一个 Partition 里面顺序、追加写入的,这个也是 Kafka 吞吐量大的一 个很重要的原因
7.Partition 副本 Replica 机制
如果 Partition 的数据只存储一份,在发生网络或者硬件故障的时候,该分区的数据就无法访问或者无法恢复了
因此,Kafka 在 0.8 的版本之后增加了副本机制
每个 Partition 可以有若干个副本(Replica),副本必须在不同的 Broker 上面。,一般我们说的副本包括其中的主节点
举例:部署了 3 个 Broker,该 Topic 有 3 个分区,每个分区一共 3 个副本
红色就是Leader,是生产者发送消息和消费者读取消息的对象
Follower的数据都是从leader同步的
需要注意的是,只有Leader具有写入和消费能力,Follower不具有单独写入和消费能力
8.Segment
Kafka 的数据是放在后缀.log 的文件里面的,如果一个 Partition 只有一个 log 文件,消息不断地追加,这个 log 文件也会变得越来越大,这个时候要检索数据效率就很低了
所以干脆把 Partition 再做一个切分,切分出来的单位就叫做段(Segment)
实际上Kafka 的存储文件是划分成段来存储的, 默认存储路径:/tmp/kafka-logs/
每个 Segment 都有至少有 1 个数据文件和 2 个索引文件,这 3 个文件是成套出现的
默认一个Segment大小是1G
9.Consumer Group 消费者组
如果生产者生产消息的速率过快,会造成消息在 Broker 的堆积,影响 Broker 的性能
怎么提升消息的消费速率呢?增加消费者的数量
但是这么多消费者,怎么知道大家是不是消费的同一个 Topic 呢?
所以引入了一个 Consumer Group 消费组的概念,在代码中通过 group id 来 配置
消费同一个 Topic 的消费者不一定是同一个组,只有 group id 相同的消费者才是同一个消费者组
注意:同一个 Group 中的消费者,不能消费相同的Partition——Partition 要在消费者之间分配
此时就会出现一个问题,即消费者比Partition多怎么办?比它少又怎么办?
- 消费者比 Partition 少,一个消费者可能消费多个 Partition
- 消费者比 Partition 多,肯定有消费者没有 Partition 可以消费,不会出现一个 Group 里面的消费者消费同一个 Partition 的情况
因此,如果想要同时消费同一个 Partition 的消息,那么需要其他的组来消费
10.Consumer Offset
上面提到了,Partition里面的消息都是顺序写入,且读取之后不会被删除,那么消费者如何确定自己每次消费时的位置呢?
在Kafka当中,由于消息是有序存储,因此可以对消息进行编号,用来标识一条唯一的消息
而这个编号,就可以确定每个Partition当中消息的位置,称为偏移量(Offset)
Offset 记录着下一条将要发送给 Consumer 的消息的序号
这个消费者跟 Partition 之间的偏移量没有保存在 ZK,而是直接保存在服务端
后面会详细介绍offset更新策略,此处点到为止
此处,再回顾一下架构图就很清晰了
首先,这里有 3 台 Broker
有两个 Topic:Topic0 和 Topic1
Topic0 有 2 个分区:Partition0 和 Partition1,每个分区一共 3 个副本
Topic1 只有 1 个分区:Partition0,每个分区一共 3 个副本
图中红色字体的副本代表是 Leader,黑色字体的副本代表是 Follower
绿色的线代表是数据同步
蓝色的线是写消息,橙色的线是读消息,都是针对 Leader 节点
有两个消费者组,第一个消费者组,消费了 Topic0 的两个分区
第二个消费者组
- Consumer0:既消费消费 Topic0的 Partition0,还消费 Topic1 的 Partition0
- Consumer1:消费 Partition0 的 Partition1
- Consumer2:没有 Partition 可以消费
为什么Consumer2没有Partition可以消费呢?
因为总共就三个Partition,已经被前两个相同group id的消费者消费掉了,它就只能闲着了
三、对比
1.Kafka特性
因为 kafka 是用来解决数据流的传输的问题的,所以它有这些特性:
- 高吞吐、低延迟:Kakfa 最大的特点就是收发消息非常快,Kafka 每秒可以处 理几十万条消息,它的最低延迟只有几毫秒
- 高伸缩性:如果可以通过增加分区 Partition 来实现扩容。不同的分区可以在不同的 Broker 中。通过 ZK 来管理 Broker 实现扩展,ZK 管理 Consumer 可以实现负载
- 持久性、可靠性:Kafka 能够允许数据的持久化存储,消息被持久化到磁盘, 并支持数据备份防止数据丢失
- 容错性:允许集群中的节点失败,某个节点宕机,Kafka 集群能够正常工作
- 高并发:支持数千个客户端同时读写
2.Kafka与RabbitMQ对比
Kafka 和 RabbitMQ 的主要区别
- 产品侧重:Kafka:流式消息处理、消息引擎;RabbitMQ:消息代理
- 性能:Kafka 有更高的吞吐量。RabbitMQ 主要是 Push,Kafka 只有 Pull
- 消息顺序:分区里面的消息是有序的,同一个 Consumer Group 里面的一个消费者只能消费一个 Partition,能保证消息的顺序性
- 消息的路由和分发:RabbitMQ 更加灵活
- 延迟消息、死信队列:RabbitMQ 支持
- 消息的留存:Kafka 消费完之后消息会留存,RabbitMQ 消费完就会删除。 Kafka可以设置 retention,清理消息
优先选择 RabbitMQ 的情况:
高级灵活的路由规则;
- 消息时序控制(控制消息过期或者消息延迟);
- 高级的容错处理能力,在消费者更有可能处理消息不成功的情景中(瞬时或者持久)
- 更简单的消费者实现
优先选择 Kafka 的情况
- 严格的消息顺序
- 延长消息留存时间,包括过去消息重放的可能
- 传统解决方案无法满足的高伸缩能力
四、生产者详解
1.幂等性
在RabbitMQ当中,消息的幂等性是依赖全局唯一ID来实现的,需要在消费端实现
在Kafka当中,把这个任务交给了Broker,不再交给消费者解决
但是无论交给谁解决,肯定都会有一个全局唯一ID来标识消息,在Kafka中,这个ID可以通过以下配置产生
1 | props.put("enable.idempotence", true); |
enable.idempotence 设置成 true 后,Producer 自动升级成幂等性 Producer,Kafka 会自动去重,如何实现的?
主要依赖两个重要机制:
PID(Producer ID):幂等性的生产者每个客户端都有一个唯一的编号
sequence number:幂等性的生产者发送的每条消息都会带相应的 sequence
number
Server端就是根据这个sequence number值来判断数据是否重复。如果说发现 sequence number 比服务端已经记录的值要小,那肯定是出现消息重复了
但是,这个 sequence number 并不是全局有序的,它不能保证所有时间上的幂等
所以,它的作用范围是有限的
- 只能保证单分区上的幂等性,即一个幂等性 Producer 能够保证某个主题的一 个分区上不出现重复消息
- 只能实现单会话上的幂等性,这里的会话指的是 Producer 进程的一次运行,当重启了 Producer 进程之后,幂等性不保证
这个也很容易理解,也就说不允许生产者在一次会话中向同一个 patition 发送相同的消息
2.生产者事务
生产者事务是 Kafka 2017 年 0.11.0.0 引入的新特性,通过事务,Kafka 可以保证跨生产者会话的消息幂等发送
为什么需要引入事务的特性呢?什么时候才需要开启事务?
有几种情况:
- 假设只有 1 个 Broker,1 个 Topic 的分区只有 1 个副本,如果要发送多条消息,想要让这些消息全部成功或者全部失败,怎么办?
- 更加复杂的情况,如果生产者发送消息到多个 Topic 或者多个 Partition, 它们有可能分布在不同的服务器上,需要它们全部发送成功或者全部发送失败,应该怎么办?
- 还有一种情况,就是消费者和生产者在同一块代码中(consume-process-produce),从上游接收消息,经过处理后发给下游,这个时候 要保证接收消息和发送消息同时成功
在SpringBoot当中,可以使用事务注解@Transaction开启(声明式事务)
那么它的分布式事务如何实现呢?下面是实现核心思想
- 因为生产者的消息可能会跨分区,所以这里的事务是属于分布式事务。分布式事务的实现方式有很多,Kafka 选择了最常见的两阶段提交(2PC):如果大家都可以 commit,那么就 commit,否则 abort
- 既然是 2PC,必须要有一个协调者的角色,叫做 Transaction Coordinator
- 事务管理必须要有事务日志,来记录事务的状态,以便 Coordinator 在意外挂掉之后继续处理原来的事务。跟消费者 Offset 的存储一样,Kafka 使用一个特殊的topic__transaction_state 来记录事务状态
- 如果生产者挂了,事务要在重启后可以继续处理,接着之前未处理完的事务, 或者在其他机器上处理,必须要有一个唯一的 ID,这个就是 transaction.id,这里可以使用 UUID。配置了 transaction.id,则此时 enable.idempotence 会被设置为 true (事务实现的前提是幂等性)。事务 ID 相同的生产者,可以接着处理原来的事务
看下图事务代码实现流程
步骤描述:
- A:生产者通过 initTransactions API 向 Coordinator 注册事务 ID
- B:Coordinator 记录事务日志
- C:生产者把消息写入目标分区
- D:分区和 Coordinator 的交互,当事务完成以后,消息的状态应该是已提交,这样消费者才可以消费到
3.生产者原理
3.1 生产者消费发送流程
消息发送的整体流程,生产端主要由两个线程协调运行
这两条线程分别为 main 线程和 Sender 线程(发送线程)
KafkaProducer就是代码中实现的生产者,它创建了一个Sender对象,启动了一个IO线程用于发送
3.2 拦截器
拦截器是执行在KafkaProducer之后的代码,它又有什么作用呢?
拦截器的作用是实现消息的定制化
类似于:Spring Interceptor 、MyBatis 的 插件、Quartz 的监听器
可以在生产者的属性中指定多个拦截器,形成拦截器链
举个例子,假设发送消息的时候要扣钱,发一条消息 1 分钱(我把这个功能叫做 按量付费),就可以用拦截器实现
3.3 序列化
消息发送都是需要序列化的,因此在调用 send 方法以后,第二步是利用指定的工具对 key 和 value 进行序列化
Kafka自带许多常见数据类型的序列化工具,此外也可以使用如 Avro、JSON、Thrift、Protobuf 等, 或者使用自定义类型的序列化器来实现,实现 Serializer 接口即可
3.4 路由指定
即一个消息要发送到哪个partition呢?在代码中是这样实现的
1 | int partition = partition(record, serializedKey, serializedValue, cluster); |
它返回的是一个分区编号,从0开始
首先我们分一下,有四种情况:
- 指定了 partition
- 没有指定 partition,自定义了分区器
- 没有指定 partition,没有自定义分区器,但是 key 不为空
- 没有指定 partition,没有自定义分区器,但是 key 是空的
下面分别讨论这四种情况
1)指定了 partition
指定 partition 的情况下,直接将指定的值直接作为partiton值
2)没有指定 partition,自定义了分区器
自定义分区器,将使用自定义的分区器算法选择分区,比如 SimplePartitioner, 用 ProducerAutoPartition 指定,发送消息
3)没有指定 partition,没有自定义分区器,但是 key 不为空
根据key,使用默认分区器
没有指定 partition 值但有 key 的情况下,使用默认分区器 DefaultPartitioner,将 key 的 hash 值与 topic 的 partition 数进行取余得到 partition 值
4)没有指定 partition,没有自定义分区器,但是 key 是空的
差不多就是随机了
既没有 partition 值又没有 key 值的情况下,第一次调用时随机生成一个整数(后面每次调用在这个整数上自增),将这个值与 topic 可用的 partition 总数取余得到 partition 值,也就是常说的 round-robin 算法
3.5 消息累加器
事实上,在选择分区之后并没有直接发送消息,而是将消息放入了消息累加器
本质上是一个ConcurrentMap
一个partition一个batch,batch满了之后就会唤醒Sender线程发送消息,减少IO
4.可靠性保证
4.1 服务器响应策略
在RabbitMQ当中,生产者发送完消息,需要依靠交换机回传的ack来确定消息投递成功,那么在kafka当中,是如何保证消息发送成功的呢?
首先要确定,服务端什么时候才算接收成功呢?
因为消息是存储在不同的Partition里面的,所以是写入到Partition之后响应生产者
但是kafka的Partition是有副本存在的,所以是要写入一定量副本才可以响应
那么写入多少呢?这里主要有两种策略
第一种是需要有半数以上的 Follower 节点完成同步,这样的话客户端等待的时间就短一些,延迟低(为什么通常来说我们部署节点都是奇数?)
第二种需要所有的 Follower 全部完成同步,才发送 ACK 给客户端,延迟相对来说高一些,但是节点挂掉的影响相对来说小一些,因为所有的节点数据都是完整的
那么kafka选择的是哪一种呢?
kafka选择的是第二种,因为安全,而且网络延迟对kafka影响不大
4.2 ISR
如果直接采用第二种思路,不考虑网络延迟,有没有别的问题呢?
假设 Leader 收到数据,所有 Follower 都开始同步数据,但是有一个 Follower 出了问题,没有办法从 Leader 同步数据。按照这个规则,Leader 就要一直等待,无法发送 ACK,可以说成为了害群之马
所以我们的规则就不能那么粗暴了,把规则改一下,不是所有的 follower 都有权利让我等待,而是只有那些正常工作的 follower 同步数据的时候我才会等待
我们应该把那些正常和 leader 保持同步的 replica 维护起来,放到一个动态 set 里面,这个就叫做 in-sync replica set(ISR)。现在只要 ISR 里面的 follower 同步完数据之后,我就给客户端发送 ACK
如果一个 Follower 长时间不同步数据,就要从 ISR 剔除,默认30s
而如果Leader长时间没有同步,那么就表示挂掉了,需要从ISR里重新选举Leader
4.3 ACK应答机制
上面的方案很安全,但是有时候,我不想要很安全的数据传输,我就要快,那怎么实现呢?
Kafka 为客户端提供了三种可靠性级别,用户根据对可靠性和延迟的要求进行权衡,选择相应的配置,配置方式如下
1 | pros.put("acks","1"); |
下面介绍三个参数含义,假设topic 的 partition0 有三个副本
acks=0:producer 不等待 broker 的 ack,这一操作提供了一个最低的延迟,broker 一接收到还没有写入磁盘就已经返回,当 broker 故障时有可能丢失数据
acks=1(默认):producer 等待 broker 的 ack,partition 的leader落盘成功后返回 ack,如果在 follower 同步成功之前 leader 故障,那么将会丢失数据
acks=-1(all):producer 等待 broker 的 ack,partition 的 leader 和 follower 全部落盘成功后才返回 ack
acks=-1(all) 这种方案是完美的吗?有没有可能出问题?
如果在 follower 同步完成后,broker 发送 ack 之前,leader 发生故障,没有给
生产者发送 ACK,那么会造成数据重复
在这种情况下, 把 reties 设置成 0(不重发),才不会重复
三种机制,性能依次递减 (producer 吞吐量降低),数据健壮性则依次递增。我们可以根据业务场景使用不同的参数
五、Broker详解
1.存储原理
1.1 文件的存储结构
配置文件:config/server.properties
logs.dir 配置
默认/tmp/kafka-logs
1.2 Partition 分区
为了实现横向扩展,把不同的数据存放在不同的 Broker 上,同时降低单台服务器的访问压力,我们把一个 Topic 中的数据分隔成多个 Partition
一个 Partition 中的消息是有序的,顺序写入,但是全局不一定有序
在服务器上,每个 Partition 都有一个物理目录,Topic 名字后面的数字标号即代表分区
1.3 Replica 副本
为了提高分区的可靠性,Kafka 又设计了副本机制
创建 Topic 的时候,通过指定 replication-factor 确定 Topic 的副本数
注意:副本数必须小于等于节点数,而不能大于 Broker 的数量,否则会报错
为什么可以等于节点数?因为Leader也称为副本
这样就可以保证,绝对不会有一个分区的两个副本分布在同一个节点上,不然副本机制也失去了备份的意义了
这些所有的副本分为两种角色
- Leader:对外提供读写服务
- Follower:唯一的任务就是从 Leader 异步拉取数据
思考:为什么不能像 MySQL 一样实现读写分离,写操作都在 Leader 上,读操作
都在 Follower 上?
这个是设计思想的不同。读写都发生在 Leader 节点,就不存在读写分离带来的一
致性问题了,这个就叫做单调读一致性
1.4 Leader
如果有多个副本,如何确定Leader呢?
在 Kafka 中,每个分区的初始 Leader 会在创建分区时进行选举。具体来说,当创建新的分区时,Kafka 会根据主题的分区副本分配策略,为该分区的每个副本节点分配一个角色,其中一个节点被选举为 Leader,其余节点则成为 Follower
在分配角色时,Kafka 会优先将 Leader 副本分配给与之前的 Leader 所在节点相同的节点,如果该节点不可用,则按照指定的分配策略从可用节点中选择一个节点作为 Leader
可知,Leader的选举与副本有很深的联系,因此这里先继续看副本
1.5 副本在Broker的分布
副本在 Broker 的分布有什么规则吗?
事实上,副本的分布是由AdminUtils.scala 的 assignReplicasToBrokers 函数决定的
该函数规则如下:
- firt of all,副本因子不能大于 Broker 的个数
- 第一个分区(编号为 0 的分区)的第一个副本放置位置是随机从 brokerList 选择的
- 其他分区的第一个副本放置位置相对于第 0 个分区依次往后移
- 也就是说:如果我们有 5 个 Broker,5 个分区
- 假设第 1 个分区的第 1 个副本放在第四个 Broker 上
- 那么第 2 个分区的第 1 个副本将会放在第五个 Broker 上
- 第三个分区的第 1 个副本将会放在第一个 Broker 上
- 第四个分区的第 1 个副本将会放 在第二个 Broker 上,依次类推(蛇形走位)
- 每个分区剩余的副本相对于第 1 个副本放置位置其实是由 nextReplicaShift决定的,而这个数也是随机产生的。
用箭头解释如下图
但是要注意的是,这里说的都是每个分区第一个副本的放置位置
而这里就对应了上一个小结里说过的,leader怎么选出来的?
初始化时,leader就是第一个分区副本
假如leader挂了,那就由zk进行选举
这样设计可以提高容灾能力,怎么讲?
在每个分区的第一个副本错开之后,一般第一个分区的第一个副本(按 Broker 编 号排序)都是 Leader,Leader 是错开的,不至于一挂影响太大
bin 目录下的 kafka-reassign-partitions.sh 可以根据 Broker 数量变化情况重新分配分区
这里再提另一个问题,一个分区是不是只有一个 文件呢?也就是说,消息日志文件是不是会无限地变大?
1.6 Segment
为了防止 Log 不断追加导致文件过大,导致检索消息效率变低,一个 Partition 又被划分成多个 Segment 来组织数据(MySQL 也有 Segment 的逻辑概念,叶子节点就是数据段,非叶子节点就是索引段)
在磁盘上,每个 Segment 由一个 log 文件和 2 个 index 文件组成,且这三个文件成套出现
- log文件,是命名规则为00000000000000000000.log的文件
- index文件两个
- 00000000000000000000.index
- 00000000000000000000.timeindex
1)日志文件
在一个 Segment 文件里面,日志是追加写入的。如果满足一定条件,就会切分日志文件,产生一个新的 Segment
什么时候会触发 Segment 的切分呢?
第一种是根据日志文件大小。当一个 Segment 写满以后,会创建一个新的 Segment,用最新的 Offset 作为名称
Segment 的默认大小是 1073741824 bytes(1G),由参数log.segment.bytes控制
第二种是根据消息的最大时间戳,和当前系统时间戳的差值
有一个默认的参数,168 个小时(一周):
1 | log.roll.hours=168 |
意味着:如果服务器上次写入消息是一周之前,旧的 Segment 就不写了,现在要创建一个新的 Segment
还可以从更加精细的时间单位进行控制,如果配置了毫秒级别的日志切分间隔, 会优先使用这个单位,否则就用小时的
第三种是根据索引文件/timeindex文件大小,默认是10M
这种方法的含义就是说,如果索引写满了,那数据文件也要跟着一起拆分,不然会对不上
2)索引文件
由于一个 Segment 的文件里面可能存放很多消息,如果要根据 Offset 获取消息, 必须要有一种快速检索消息的机制——这个就是索引
在 Kafka 中设计了两种索引
- 偏移量索引文件记录的是 Offset 和消息物理地址(在 Log 文件中的位置)的映射关系
- 时间戳索引文件记录的是时间戳和 Offset 的关系
当然,内容是二进制的文件,不能以纯文本形式查看
offset索引
bin 目录下有 dumplog 工 具,通过查看最后 10 条 Offset 索引(160 服务器)如下
为什么索引里面记录的 Offset 不是连续的呢?不应该是一条消息一条索引记录吗?
注意 Kafka 的索引并不是每一条消息都会建立索引
而是一种稀疏索引 Sparse Index(DB2 和 MongDB 中都有稀疏索引)
抽象图如下:
所以问题就来了,这个稀疏索引到底有多稀疏?也就是说,隔几条消息才产生一 个索引记录?或者隔多久?或者隔多少大小的消息?
实际上是用消息的大小来控制的,默认是 4KB:
1 | log.index.interval.bytes=4096 |
只要写入的消息超过了 4KB,偏移量索引文件.index 和时间戳索引文 件.timeindex 就会增加一条索引记录(索引项)。
这个值设置越小,索引越密集,值设置越大,索引越稀疏
相对来说,越稠密的索引检索数据更快,但是会消耗更多的存储空间
越稀疏的索引占用存储空间越小,插入和删除时所需的维护开销也小,但是检索慢
Kafka 索引的时间复杂度为 O(log2n)+O(m)
n 是索引文件里索引的个数,m 为稀疏程度
时间戳索引
为什么会有时间戳索引文件呢?光有 Offset 索引还不够吗?会根据时间戳来查找消息?
首先,消息是必须要记录时间戳的
客户端封装的 ProducerRecord 和ConsumerRecord 都有一个 long timestamp 属性
为什么要记录时间戳呢?
- 如果要基于时间切分日志文件,必须要记录时间戳
- 如果要基于时间清理消息,必须要记录时间戳
好吧,既然都已经记录时间戳了,干脆设计一个时间戳索引,可以根据时间戳查询
注意时间戳有两种
一种是消息创建的时间戳
一种是消费在 Broker 追加写入的时间
到底用哪个时间呢?由一个参数来控制
1 | # CreateTime / LogAppendTime |
3)索引查询
既然已经有索引了,那么如何利用索引来进行消息的检索呢?
比如要检索偏移量是 10000666 的消息
- 消费的时候是能够确定分区的,所以第一步是找到在哪个 Segment 中。 Segment 文件是用 Base Offset 命名的,所以可以用二分法很快确定(找到名字不小 于 10000666 的 Segment)
- 这个 Segment 有对应的索引文件,它们是成套出现的。所以现在要在索引文 件中根据 Offset 找 Position
- 得到 Position 之后,到对应的 Log 文件开始查找 Offset,和消息的 Offset 进行比较,直到找到消息
思考一个比较刁钻的面试问题:为什么 Kafka 不用 B+Tree?
Kafka 是写多,查少,如果 Kafka 用 B+Tree,首先会出现大量的 B+Tree,大量插入数据带来的 B+Tree 的调整会非常消耗性能
1.7 总结
总体topic结构如下:
到此为止topic结构已经完全梳理清楚了,但是有个问题还是要提一下,那就是kafka消息消费之后并不会删除,但是也不可能把几年前的日志还保留着,那么会用什么策略保留消息呢?
思考MySQL、redis的内存淘汰机制
2.消息保留和清理机制
这里回忆一下MySQL和Redis的内存淘汰机制
MySQL:使用LRU List,划分young区和old区,冷热分离
Redis:多达七种淘汰策略
Kafka里面提供了两种方式:一种是直接删除 Delete,一种是对日志进行压缩Compact
默认是直接删除
2.1 删除策略
删除是如何删除的呢?
第一种策略:日志数据较均匀时
默认是5min执行一次删除,删除范围是168小时之前的老数据,也就是定时删老数据
定时时间和删多久之前的数据,都可以配置
第二种策略:日志数据分布不均时(可能某一周特别大,另一周特别小)
根据日志大小删除,先删旧的消息,删到不超过这个大小为止
2.2 压缩策略
问题:如果同一个 Key 重复写入多次,会存储多次还是会更新?
比如用来存储位移的这个特殊的 topic:__consumer_offsets
存储的是消费者 id 和 Partition 的 Offset 关系,消费者不断地消费消息 commit 的时候,是直接更新原来的 Offset,还是不断地写入新的 Offset?
肯定是存储多次,不然怎么能实现顺序写
当有了这些 Key 相同的 Value 不同的消息的时候,存储空间就被浪费了
压缩就是把相同的 Key 合并为最后一个 Value——前面的都被覆盖,这也和更新offset差不多
3.高可用及选举策略
3.1 Controller 选举
当Leader宕机后,怎么选出来新的Leader呢?
早期的kafka是使用zk来进行投票选举,但是这样存在一个弊端,如果分区和副本数量过多,所有的副本都直接进行选举的话,一旦某个出现节点的增减,就会造成大量的 Watch 事件被触发,ZK 的就会负载过重,不堪重负
后来kafka更改了策略,不是所有的 Repalica 都参与 Leader 选举,而是由其中的一个 Broker 统一来指挥, 这个 Broker 的角色就叫做 Controller(控制器)
就像 Redis Sentinel 的架构,执行故障转移的时候,必须要先从所有哨兵中选一个负责做故障转移的节点一样,Kafka 也要先从所有 Broker 中选出唯一的一个 Controller
所有的 Broker 会尝试在 Zookeeper 中创建临时节点/controller,只有一个能创建成功(先到先得)
如果 Controller 挂掉了或者网络出现了问题,ZK 上的临时节点会消失,其他的 Broker 通过 Watch 监听到 Controller 下线的消息后,开始竞选新的 Controller,方法跟之前还是一样的,谁先在 ZK 里面写入一个/controller 节点,谁就成为新的Controller,这个 Controller 就相当于选举委员会的主席
那么这时候,这个节点就肩负了更多的责任,具体如下
- 监听 Broker 变化
- 监听 Topic 变化
- 监听 Partition 变化
- 获取和管理 Broker、Topic、Partition 的信息
- 管理 Partiontion 的主从信息
3.2 分区副本Leader选举
Controller 确定以后,就可以开始做分区选主的事情了(我们叫它选举委员会主席)
下面就是找候选人了。显然,每个 Replica 都想推荐自己,但是所有的 replica 都有竞选资格吗?并不是
这里要介绍几个概念
一个分区所有的副本,叫做 Assigned-Replicas(AR)——所有的皇太子
这些所有的副本中,跟 Leader 数据保持一定程度同步的,叫做 In-Sync Replicas (ISR)——天天过来参加早会的,有希望继位的皇太子
跟 Leader 同步滞后过多的副本,叫做 Out-Sync-Replicas(OSR)——天天睡懒觉,不参加早会,没被皇帝放在眼里的皇太子
AR=ISR+OSR
正常情况下 OSR 是空的,大家都正常同步,AR=ISR
谁能够参加选举呢?肯定不是 AR,也不是 OSR,而是 ISR,而且这个 ISR 不是固定不变的,还是一个动态的列表
前面说过,如果同步延迟超过 30 秒,就踢出 ISR,进入 OSR;如果赶上来了,就加入ISR
默认情况下,当 Leader 副本发生故障时,只有在 ISR 集合中的副本才有资格被选举为新的 Leader
如果 ISR 为空呢?——皇帝突然驾崩,太子们都还小,但是群龙不能无首
在这种情 况下,可以让 ISR 之外的副本参与选举。允许 ISR 之外的副本参与选举,叫做 Unclean Leader Election。
1 | unclean.leader.election.enable=false |
把这个参数改成 true(一般情况不建议开启,会造成数据丢失)
那么这时候,候选人有了,举荐人也有了,选择谁呢?根据什么规则呢?
分布式系统中常见的选举协议有哪些?
ZAB(ZK)、Raft(Redis Sentinel)(他们都是 Paxos 算法的变种),它们的 思想归纳起来都是:先到先得、少数服从多数
是 Kafka 没有用这些方法,而是用了一种自己实现的算法
为什么呢?比如 ZAB 这种协议,可能会出现脑裂(节点不能互通的时候,出现多个Leader)、惊群效应(大量 Watch 事件被触发)
那么kafka使用的是什么算法呢?
在这篇文章中: https://kafka.apache.org/documentation/#design_replicatedlog
提到 Kafka 的选举实现,最相近的是微软的 PacificA 算法
在这种算法中,默认是让 ISR 中第一个 Replica 变成 Leader
比如 ISR 是 1、5、9,优先让1成为 Leader——这个跟中国古代皇帝传位是一样的,优先传给皇长子
4.数据同步原理及故障处理
4.1 主从同步
在拥有一个确定的Leader之后,客户端的读写只能操作Leader节点,Follower需要向Leader同步数据
那么,由于不同的 Raplica 的 Offset 是不一样的,同步到底怎么同步呢?
先说下几个概念:
LEO(Log End Offset):下一条等待写入的消息的 Offset(最新的 Offset + 1)
图中分别是 9、8、6
HW(High Watermark):ISR 中最小的 LEO,Leader会管理所有 ISR 中最小的LEO作为HW,目前是 6
实际上,Consumer 最多只能消费到 HW 之前的位置(消费到 Offset 5 的消息)
也就是说,其他的副本没有同步过去的消息,是不能被消费的
为什么要这样设计呢?
如果在同步成功之前就被消费了,Consumer Group 的 Offset 会偏大,如果 Leader 崩溃,中间会缺失消息
所以这是怎么消费的,那么到底怎么同步呢?先看一下过程
Follower1 同步了 1 条消息,Follower2 同步了 2 条消息,此时 HW 推进了 2, 变成 8
Follower1 同步了 0 条消息,Follower2 同步了 1 条消息。此时 HW 推进了 1, 变成 9
LEO 和 HW 重叠,所有的消息都可以消费了
这里,我们关注一下,从节点怎么跟主节点保持同步?
- Follower 节点会向 Leader 发送一个 fetch 请求,Leader 向 Follower 发送数据后,既需要更新 Follower 的 LEO
- Follower 接收到数据响应后,依次写入消息并且更新 LEO
- Leader 更新 HW(ISR 最小的 LEO)
Kafka 设计了独特的 ISR 复制,可以在保障数据一致性情况下又可提供高吞吐量
4.2 Replica 故障处理
Follower故障
首先 Follower 发生故障,会被先踢出 ISR——无法同步了
Follower 恢复之后,从哪里开始同步数据呢?
假设第 1 个 Replica 宕机(中间这个)
恢复以后,首先根据之前记录的 HW(6),把高于 HW 的消息截掉(6、7)
然后向 Leader 同步消息,追上 leader 之后(30 秒),重新加入 ISR
Leader故障
假设图中 Leader 发生故障。
首先选一个 Leader,因为 Replica 1(中间这个)优先,它成为 Leader
为了保证数据一致,其他的 Follower 需要把高于 HW 的消息截取掉(这里没有消息需要截取)
然后 Replica2 同步数据
注意:这种机制只能保证副本之间的数据一致性,并不能保证数据不丢失或者不重复
六、Consumer详解
1.消费者Offet维护原理
1.1 继续消费的Offet
已知,在 Partition 中,消息是不会删除的,所以才可以追加写入,写入的消息连续有序的
这种特性决定了 Kafka 可以消费历史消息,而且按照消息的顺序消费指定消息, 而不是只能消费队头的消息
正常情况下,我们希望消费没有被消费过的数据,而且是从最先发送(序号小的)的开始消费(这样才是有序和公平的)
首先要明确,一个partition的消费是按照消费者组为单位来消费的,一个消费者组里面的消费者不能重复消费同一个partition
对于一个 Partition,消费者组怎么才能做到接着上次消费的位置(Offset)继续消费呢?肯定要把这个对应关系保存起来,下次消费的时候查找一下
那么它应该存在哪里呢?必然不可能存在消费者本地,因为所有消费者都可以使用这个 Consumer Group id,放到本地做不到同一维护的,因此要放在服务端
早期的kafka,是把消费者组和 Partition 的 Offset 直接维护在 ZK 中,但是因为读写的性能消耗太大了
后来就放在一个特殊的 Topic 中,名字叫__consumer_offsets, 默认有 50 个分区(offsets.topic.num.partitions 默认是 50),每个分区默认一个 Replication
那么这个特殊的Topoc是怎么存储消费者组和分区之间的偏移量(offset)的呢?
在这个特殊的Topic里面,主要存储两种对象
- GroupMetadata:保存了消费者组中各个消费者的信息(每个消费者有编号)
- OffsetAndMetadata:保存了消费者组和各个 partition 的 offset 位移信息元数据
即一个保存消费者组和组里消费者的信息,一个保存消费者组和分片之间位移
这样当有新的消费时,可以直接根据这个特殊的Topic找到Offset,从而继续消费
1.2 初始消费的Offet
那么如果有一个新消费组的,那此时这个特殊的Topic里面没有保存Offset,如何开始消费呢?
消费者的代码中有一个参数,用来控制如果找不到偏移量的时候从哪里开始消费
1 | auto.offset.reset |
- 默认值是 latest,也就是从最新的消息(最后发送的)开始消费的,此时历史消费是不能消费的
- earliest 代表从最早的(最先发送的)消息开始消费,可以消费到历史消息
- none,如果 Consumer Group 在服务端找不到 offset 会报错
1.3 更新offset
何时更新offset呢?
offset的更新是由消费者发出一个commit动作之后,才会在broker里面更新
那么这个commit动作什么时候发生呢?
同样,这个动作可以手动提交,也可以自动提交,默认是自动提交true
1 | # 提交方式 |
如果想在消息消费完毕之后再更新,就要改为手动提交,两种提交方式
- consumer.commitSync()的手动同步提交
- consumer.commitAsync()手动异步提交
如果不提交或者提交失败,Broker 的 Offset 不会更新,消费者组下次消费的时候会消费到重复的消息
2.消费者与分区的关系
所谓消费者与分区之间的关系,就是消费者消费哪个分区的问题
这里主要有三种情况
- 消费者组里面的消费者数量等于Topic里面的分区partition数量
- 消费者组里面的消费者数量大于Topic里面的分区partition数量
- 消费者组里面的消费者数量小于Topic里面的分区partition数量
但是要注意的是,消费者组和分区也可存在多对多的关系,唯一的限制就是同一个消费者组里面的消费者不能消费同一个Topic里面的分区两次
2.1 消费策略
这种情况下,必然有一些消费者是消费不到的,那么这些消费者是如何瓜分partition的呢?
默认策略是:RangeAssignor 范围分配
如下图
在这种模式中,会将分区按照范围划分,最后一个消费者可能分到的分区会少一点
还有另外两种策略,分别是RoundRobinAssignor(轮询)和StickyAssignor(粘滞)
RoundRobinAssignor(轮询):即消费者轮流获取分区,按照上图的结构
C1对应分区:P0、P2、P4
C2对应分区:P1、P3
StickyAssignor(粘滞):这种策略复杂一点,但是相对来说均匀一点,每次结果可能不一样
原则
- 分区的分配尽可能的均匀
- 分区的分配尽可能和上次分配保持相同
具体来说,StickyAssignor 会先将每个 Partition 分配给一个消费者,然后将剩余的 Partition 逐个分配给消费者组中还未分配到该 Partition 的消费者,以确保每个消费者都能消费到一定数量的 Partition
2.2 ReBalance 分区再均衡
进入话题之前,先提一个问题,什么叫ReBalance?
ReBalance本质上是一种协议,规定了一个消费者组下的所有消费者如何达成一致来分配订阅Topic的每个分区
比如某Group有20个消费者,订阅了有100个分区的Topic,那这时候Kafka就会为每个消费者分配5个分区,这个过程就是ReBalance
那么什么时候ReBalance呢?
两种情况
- 消费者组的消费者数量发生变化,比如新增了消费者,消费者关闭连接 —— 学生数量变多了
- Topic 的分区数发生变更,新增或者减少 —— 座位数量发生了变化
为了让分区分配尽量地均衡,这个时候会触发 ReBalance 机制
那么谁来执行 ReBalance 和 Consumer Group 管理呢?
Kafka 提供了一个角色:Coordinator来执行对于Consumer Group的管理
Kafka早期版本中的 Coordinator 是依赖 Zookeeper 来实现的
最新的版本中,Kafka 对Coordinator进行了改进,每个Consumer Group都会被分配一个 Coordinator 用于组管理和 Offser 管理
责任:这个Group内的Coordinator比原来承担了更多的责任,比如组成员管理、Offset 提交保护机制等
选取:Consumer Group中的第一个Consumer启动的时候,它会去和 Kafka 服务确定谁是它们组的 Coordinator
协调:然后Group内的所有成员都会和这个Coordinator进行协调通信
很显然,有了Coordinator这个设计,就不再需要Zookeeper了,性能上可以得到很大的提升
那么 Consumer Group 又是如何确定自己的 Coordinator 是谁的呢??
其实非常简单,就是找到分区的Leader所在的Broker,就会被选定为Coordinator
七、Kafka为什么那么快?
MQ 的消息存储有几种选择,一种是内存,比如 ZeroMQ,速度很快但是不可靠
一种是第三方的数据库,会产生额外的网络消耗,而且数据库出问题会影响存储
所以最常见的是把数据放在磁盘上存储
但是我们也都知道,磁盘的 I/O 是比较慢的,选择磁盘作为存储怎么实现高吞吐、低延迟、高性能呢?
总结起来,主要是 4 点:磁盘顺序 I/O、索引机制、批量操作和压紧、零拷贝
1.顺序索引
随机 I/O 就是:读写的多条数据在磁盘上是分散的,寻址会很耗时
顺序 I/O 读写的数据在磁盘上是集中的,不需要重复寻址的过程
Kafka 的 Message 是不断追加到本地磁盘文件末尾的,而不是随机的写入,这使 得 Kafka 写入吞吐量得到了显著提升
内存 I/O 是不是一定比磁盘 I/O 快呢? https://queue.acm.org/detail.cfm?id=1563874
2.索引
参照上面第五节的1.6里面索引介绍
3.批量读写和文件压缩
批量读写和文件压缩 它把所有的消息都变成一个批量的文件,并且进行合理的批量压缩,减少网络 IO 损耗
http://kafka.apache.org/documentation/#recordbatch
4.零拷贝
首先介绍两个名词
第一个是操作系统虚拟内存的内核空间和用户空间
操作系统的虚拟内存分成了两块,一部分是内核空间,一部分是用户空间
这样就可以避免用户进程直接操作内核,保证内核安全
进程在内核空间可以执行任意命令,调用系统的一切资源;在用户空间必须要通过一些系统接口才能向内核发出指令
因此,如果用户要从磁盘读取数据(比如 Kafka 消费消息),必须先把数据从磁盘拷贝到内核缓冲区,然后在从内核缓冲区到用户缓冲区,最后才能返回给用户
第二个是 DMA 拷贝
没有 DMA 技术的时候,拷贝数据的事情需要 CPU 亲自去做,这个时候它没法干其他的事情,如果传输的数据量大那就有问题了
DMA 技术叫做直接内存访问(Direct Memory Access),其实可以理解为 CPU给自己找了一个小弟帮它做数据搬运的事情
在进行 I/O 设备和内存的数据传输的时候,数据搬运的工作全部交给 DMA 控制器,解放了 CPU 的双手(反正就是找了个 小弟)
理解了这两个东西之后,我们来看下传统的 I/O 模型
比如 Kafka 要消费消息,比如要先把数据从磁盘拷贝到内核缓冲区,然后拷贝到用户缓冲区,再拷贝到 Socket 缓冲区,再拷贝到网卡设备
这里面发生了 4 次用户态 和内核态的切换和 4 次数据拷贝,2 次系统函数的调用(read、write),这个过程是非常耗费时间的。怎么优化呢?
在 Linux 操作系统里面提供了一个 sendfile 函数,可以实现“零拷贝”。这个时候就不需要经过用户缓冲区了,直接把数据拷贝到网卡(这里画的是支持 SG-DMA 拷 贝的情况)
因为这个只有 DMA 拷贝,没有 CPU 拷贝,所以叫做“零拷贝”
零拷贝至少可以提高一倍的性能
八、总结—Kafka消息不丢失的配置
1、Producer 端使用 producer.send(msg, callback)带有回调的 send 方法,而不是 producer.send(msg)方法。根据回调,一旦出现消息提交失败的情况,就可以有针对性地进行处理
2、设置 acks = all。acks 是 Producer 的一个参数,代表“已提交”消息的定义。 如果设置成 all,则表明所有 Broker 都要接收到消息,该消息才算是“已提交”
3、设置 retries 为一个较大的值。同样是 Producer 的参数。当出现网络抖动时, 消息发送可能会失败,此时配置了 retries 的 Producer 能够自动重试发送消息,尽量 避免消息丢失
4、设置 unclean.leader.election.enable = false。这是 Broker 端的参数,在 Kafka 版本迭代中社区也多次反复修改过他的默认值,之前比较具有争议。它控制哪些 Broker 有资格竞选分区的 Leader。如果一个 Broker 落后原先的 Leader 太多,那么它一旦成为新的 Leader,将会导致消息丢失。故一般都要将该参数设置成 false
5、设置 replication.factor >= 3。需要三个以上的副本
6、设置 min.insync.replicas > 1。Broker 端参数,控制消息至少要被写入到多少个副本才算是“已提交”。设置成大于 1 可以提升消息持久性。在生产环境中不要使用默认值 1。确保 replication.factor > min.insync.replicas。如果两者相等,那么 只要有一个副本离线,整个分区就无法正常工作了。推荐设置成 replication.factor = min.insync.replicas + 1
7、确保消息消费完成再提交。Consumer 端有个参数 enable.auto.commit,最 好设置成 false,并自己来处理 Offset 的提交更新






