一、架构

最开始叫 MetaQ(Meta-morphosis [ˌmetəˈmɔːfəsɪs]变形)

淘宝原来有一 个自研的 MQ 叫 Notify,Kafka 开源以后,就参考 Kafka 用 Java 语言写了 MetaQ, 所以在涉及思想上有很多跟 Kafka 相似的地方。后来改名字叫 RocketMQ,2012 年开源

1.整体架构

image-20230415151544795

首先分析一下RocketMQ的整体架构图,它的构成也和其他MQ类似,生产者,Broker和消费者

与Kafka不同的是,RocketMQ不需要ZK的配合使用,它自身便有NameServer来负责类似ZK的功能

2.Broker

RocketMQ 的服务,或者说一个进程,叫做 Broker,Broker 的作用是存储和转发消息RocketMQ 单机大约能承受 10 万 QPS 的请求

image-20230415151907234

为了提升 Broker 的可用性(防止单点故障),以及提升服务器的性能(实现负载), 通常会做集群的部署
跟 Kafka 或者 Redis Cluster 一样,RocketMQ 集群的每个 Broker 节点保存总数据的一部分,因此可以实现横向扩展

而为了提升数据的可靠性,每个Broker还会有自己的副本slave

image-20230415152108748

默认情况下,读写都发生在 Master 上

在 slaveReadEnable=true 的情况下, Slave 也可以参与读负载

但是默认只有 brokerId=1 的 Slave 才会参与读负载,而且是在 Master 消费慢的情况下,由whichBrokerWhenConsumeSlowly这个参数决定

注意,在RocketMQ里面,Master指的是副本的主,Slave则指的是副本

3.Topic

Topic 用于将消息按主题做划分,比如订单消息、物流消息

注意,跟 Kafka 不 同的是,在 RocketMQ 中,Topic 是一个逻辑概念,消息不是按 Topic 划分存储的

image-20230415152314114

producer 将消息发往指定的 Topic,Consumer 订阅这个 Topic 就可以收到相应的消息
跟 Kafka 一样,如果 Topic 不存在,会自动创建,BrokerConfig:

1
private boolean autoCreateTopicEnable = true;

Topic 跟生产者和消费者都是多对多的关系,一个生产者可以发送消息到多个Topic,一个消费者也可以订阅多个Topic

4.NameServer

NameServer可以看作是RocketMQ的一个分布式注册中心,它为整个RocketMQ集群提供了元数据信息的注册和查找服务

所有的Broker节点和Client节点都需要与NameServer交互来获取元数据信息和路由信息,从而实现消息的发送和消费,如Topic信息、Producer信息、Consumer信息等,以及消息队列在Broker上的映射关系

4.1 基础结构

当不同的消息存储在不同的 Broker 上,生产者和消费者对于 Broker 的选取,或者说路由选择是一个非常关键的问题

  • 路由:生产者发一条消息,应该发到哪个 Broker?消费者接收消息,从哪个 Broker 获取消息?
  • 服务端增减:如果 Broker 增加或者减少了,客户端怎么知道??
  • 客户端增加:一个新的生产者或者消费者加入,怎么知道有哪些 Broker

所以,跟分布式的服务调用的场景需要一个注册中心一样,在 RocketMQ 中需要有一个角色来管理 Broker 的信息

在Kafka中,使用ZK管理的路由选择功能

在Rocket中,并没有使用ZK做一些路由发现,存活状态监听等工作,而是把这份工作交给了自己创建的组件NameServer来实现

可以把 NameServer 理解为是 RocketMQ 的路由中心,每一个 NameServer 节点都保存着全量的路由信息,为了保证高可用,NameServer 自身也可以做集群的部署

它的作用有点像 Eureka 或者 Redis Sentinel

也就是说,Broker 会在 NameServer 上注册自己,Porducer 和 Consumer 用 NameServer 来发现 Broker

image-20230415153919043

那么,NameServer的工作机制到底是怎样的呢?

每个 Broker 节点在启动时,都会根据配置遍历 NameServer 列表,如下配置

文件:rocketmq/conf/broker.conf

1
namesrvAddr=localhost:9876

存活检测

与每个 NameServer 建立 TCP 长连接,注册自己的信息,之后每隔 30s 发送心跳信息(服务主动注册)

那么如果 Broker 挂掉了,不发送心跳了,NameServer 怎么发现呢?

因此NameServer也会主动的去探测Broker是否存活

定时探活

每个 NameServer 每隔 10s 检查一下各个Broker 的最近一次心跳时间,如果发现某个 Broker 超过 120s 都没发送心跳,就认为 这个 Broker 已经挂掉了,会将其从路由信息里移除

image-20230415154300081

4.2 为什么不用ZK?

实际上不是不用,在 RocketMQ 的早期版本,即 MetaQ 1.x 和 2.x 阶段,服务管理也是用 Zookeeper 实现的,跟 Kafka 一样,但MetaQ 3.x(即 RocketMQ)却去 掉了 ZooKeeper 依赖,转而采用自己的 NameServer

RocketMQ 的架构设计决定了只需要一个轻量级的元数据服务器就足够了,只需要保持最终一致,而不需要 Zookeeper 这样的强一致性解决方案,不需要再依赖另一个中间件,从而减少整体维护成本

根据著名的 CAP 理论:一致性(Consistency)、可用性(Availability)、分区容错性 (Partiton Tolerance)。Zookeeper 实现了 CP,NameServer 选择了 AP,放弃了实时一致性

4.3 一致性问题

首先声明一个很重要的点,那就是NameServer之间是不互相通信的

上面也提到了NameServer放弃了实时一致性,那它是在何种措施下保证,即使它们之间互相不通信也能保持一致性的呢?

主要依赖三点,服务注册、服务剔除与路由发现

  • 服务注册:这一步是由Broker发起的,如果新增了Broker,那么它就会每隔30s就向所有NameServer发送心跳信息
  • 服务剔除:分两种情况,分别是正常关闭和异常关闭
    • 正常关闭:连接断开,NameServer能够监测到,从而正常剔除Broker信息
    • 异常关闭:NameServer每10s扫描Broker列表,如果某个Broker超过120s未同步心跳,就会剔除掉
  • 路由发现:客户端怎么获取最新的Broker列表呢?
    • 生产者:第一次发送消息时,根据Topic从NameServer获取路由信息
    • 消费者:订阅固定的Topic,启动时获取Broker信息
    • Broker信息如果更新,NameServer可以同步到,那生产者和消费者如何更新呢?这里采用的是定时更新策略,默认30s获取一次

更新这里可以发现一个问题,那就是如果Broker刚挂,客户端 30s以后才更新路由信息,那是不是会出现最多 30 秒钟的数据延迟?比如说一个 Broker 刚挂了,客户端缓存的还是旧的路由信息,发消息和接收消息都会失败

这个问题有几个解决思路:

1)重试:拉取的时间间隔由 DefaultMQPushConsumer 的 pollNameServerInterval 参数 决定,默认是 30 秒

2)把无法连接的 Broker 隔离掉,不再连接

3)或者优先选择延迟小的节点,就能避免连接到容易挂的 Broker

5.Producer 生产者

生产者,用于生产消息,会定时从NameServer拉取路由信息(不用配置 RocketMQ 的服务地址),然后根据路由信息与指定的 Broker 建立 TCP 长连接,从 而将消息发送到 Broker 中

发送逻辑一致的 Producer 可以组成一个 Group,这个Group里的Pro

RocketMQ 的生产者同样支持批量发送,不过 List 要自己传进去

6.Consumer 消费者

消息的消费者,通过 NameServer 集群获得 Topic 的路由信息,连接到对应的 Broker 上消费消息

消费逻辑一致的 Consumer 可以组成一个 Group,这时候消息会在 Consumer 之间负载

建立连接:由于Master和Slave 都可以读取消息,因此 Consumer 会与 Master 和 Slave 都建立连接

注意:同一个 Consumer Group 内的消费者应该订阅同一个 Topic;或者反过来,消费不同 Topic 的消费者不应该采用相同的 Consumer Group 名字。如果不一样,后面的消费者的订阅,会覆盖前面的订阅

在RocketMQ中,消费者有两种消费方式,分别是集群消费广播消费

  • 集群消费:多个消费者消费同一个Topic的消息,每个消费一部分
  • 广播消费:每个消费者都会消费同一个Topic下的所有消息,每个都消费全部消息

消费者的消费模式在创建消费者时进行指定,并且创建之后无法更改

按照消费模型来说,RocketMQ也有两种模式,分别是PullPush

  • Pull:轮询从Broker拉取消息,使用长轮询实现,所谓长轮询就是如果轮询不到相关数据,就会hold住请求,等到有数据或者等一定时间后再返回,返回后客户端立即发起下一次长轮询
  • push:Broker推送给Consumer,但是在RocketMQ里面,这个模式实际上是依赖Pull实现的,在Pull模式基础上封装了一层,所以它不是真正的“推模式”

push的本质是用监听器,监听到获取了消息后,唤醒Consumer进行pull消费

7.Message Queue 消息队列

RocketMQ 支持多 Master 的架构

思考一个这样的问题:当有多个 Master 的时候,发往一个 Topic 的多条消息会在多个 Master 的 Broker 上存储,那么,发往某一个 Topic 的多条消息,是不是在所有的 Broker 上存储完全相同的内容?

这样不符合分片的思想,因此这些消息也肯定是分片存储在不同的Broker上的

在 Kafka 里面设计了一个 Partition,一个 Topic 可以拆分成多个 Partition,这些Partition 可以分布在不同的 Broker 上,这样就实现了数据的分片,也决定了 Kafka 可以实现横向扩展

那么Rocket如何分片呢?

在一个 Broker 上,RocketMQ 只有一个存储文件,并没有像 Kafka 一样按照不 同的 Topic 分开存储

数据目录:/usr/local/rocketmq/store/broker-a/commitlog
也就说,如果有 3 个 Broker,也就是只有 3 个用来存储不同数据的 commitlog

那问题就来了,如果不按照分区去分布,数据应该根据什么分布呢?

RocketMQ 里面设计了一个叫做 Message Queue 的逻辑概念,作用跟 Partition 类似

首先,创建 Topic 的时候会指定队列的数量,一个叫 writeQueueNums(写队列数量),一个 readQueueNums(读队列数量)

写队列的数量决定了有几个 Message Queue,读队列的数量决定了有几个线程来消费这些 Message Queue(只是用来负载的)

那可以不指定MQ吗?可以

如果不指定的话,应该创建几个默认队列呢?参数指定

下面看看它的正式定义和相关概念

在 RocketMQ 中,Message Queue 是指一个 Topic 下的一个子主题,也可以理解为一个逻辑的消息队列;消息发送到一个 Topic 中,但是实际上这些消息是存储在不同的 Message Queue 中,每个 Message Queue 存储了一部分消息;在 RocketMQ 中,Message Queue 被设计为可以动态添加和删除,以实现消息的负载均衡和容错能力

一个 Message Queue 被描述为一个四元组:Topic、Broker Name、队列 ID、Broker 地址

  • Topic:表示该 Message Queue 所属的 Topic
  • Broker Name:表示该 Message Queue 所在的 Broker 名称
  • 队列 ID:表示该 Message Queue 的 ID
  • Broker地址:表示该 Message Queue 所在的 Broker 的网络地址

二、原理解析

1.生产者

上面提到了,Message Queue提供了横向扩展的能力,生产者可以利用Message Queue实现消息的负载和平均分布,但是这里产生了一个问题,那就是什么时候消息会发送到哪个队列呢?

1.1 消息发送规则

追踪源码,可以知道消息发送有三个实现策略对应的实现类

  • SelectMessageQueueByHash:它是一种不断自增、轮询的方式
  • SelectMessageQueueByRandom:随机选择一个队列
  • SelectMessageQueueByMachineRoom:返回空,没有实现

此外,还可以自定义策略

1.2 顺序消息

在MQ中,消息的顺序性向来是一项绕不过去的问题,那么在RocketMQ里面如何保证的消息顺序呢?

在介绍消费顺序之前,需要先区分一个概念,那就是全局有序和局部有序

  • 全局有序:就是不管有几个生产 者,在服务端怎么写入,有几个消费者,消费的顺序跟生产的顺序都是一致的
  • 局部有序:只需要保证同一个订单相关的消息,消费的时候有序即可,如下图所示
image-20230415193817802

而要保证消息有序,需要拆分成以下几个环节来进行

  • 生产者发送消息的时候,到达 Broker 应该是有序的
    • 所以对于生产者,不能使用多线程异步发送,而是顺序发送
  • 写入 Broker 的时候,应该是顺序写入的
    • 也就是相同主题的消息应该集中写入,选择同一个 Queue,而不是分散写入
  • 消费者消费的时候只能有一个线程
    • 否则由于消费的速率不同,有可能出现 记录到数据库的时候无序

1.3 事务消息

即要确保发送出去的消息能够成功被Broker接收/或者都不接收

举个例子,比如先发送MQ消息,再操作本地数据库,如何保证数据一致性呢?

MQ发送失败或者发送成功之外,还有一种情况要考虑,那就是第一步成功,第二步失败

这时候消息已经交给Broker了,而Mysql存储失败,怎么办?

因此不能先发给MQ再发给Mysql

那如果顺序调换呢?先发送给Mysql再发给MQ?

同样有第一步成功,第二步失败的问题

所以在Rocket里面,参考了2PC的设计思想,把发送消息分成了两步,然后把操作本地数据库的步骤放进了这个流程里面,具体实现步骤如下

  • 在最开始,生产者先发送给Broker一条消息试探一下,收到这条消息的Broker把这条消息标记未“未确认”
  • Broker通知生产者接收消息成功,然后可以去执行本地事务
  • 生产者执行本地事务
  • 本地事务执行成功,那么就再给Broker发送消息,把消息标记为”已确认“
  • 本地事务执行失败,那么就再给Broker发送消息,把消息标记为”丢弃“
  • 如果Broker迟迟收不到生产者发送的确认消息,那么就去本地检查是否成功,再改状态

在这个过程中,出现了两个新的概念

  • 半消息 half message:即暂时不能投递给消费者的消息,等待生产者二次确认
  • 消息回查 Message Status Check:即上面的最后一步,Broker主动去查询消息状态
image-20230415195343967

具体流程描述:

1.生产者向 MQ 服务端发送消息

2.MQ 服务端将消息持久化成功之后,向发送方 ACK 确认消息已经发送成功, 此时消息为半消息

3.发送方开始执行本地数据库事务逻辑

4.发送方根据本地数据库事务执行结果向 MQ Server 提交二次确认(Commit 或是 Rollback),MQ Server 收到 Commit 状态则将半消息标记为可投递,订阅方最终将收到该消息;MQ Server 收到 Rollback 状态则删除半消息,订阅方将不会接受该消息

5.在断网或者是应用重启的特殊情况下,上述步骤 4 提交的二次确认最终未到达MQ Server,经过固定时间后 MQ Server 将对该消息发起消息回查

6.发送方收到消息回查后,需要检查对应消息的本地事务执行的最终结果

7.发送方根据检查得到的本地事务的最终状态再次提交二次确认,MQ Server 仍 按照步骤 4 对半消息进行操作(Commit/Rollback)。

1.4 延迟消息

在 RabbitMQ 里面需要通过死信队列或者插件来实现

RocketMQ 可以直接支持 延迟消息,但是开源版本功能被阉割了,只能支持特定等级的消息,商业版本可以任意指定时间

2.Broker

具体参考github上RocketMQ设计文档

img

2.1 消息存储

RocketMQ 的消息存储与 Kafka 有所不同,既没有分区的概念,也没有按分区存储消息

先看看RocketMQ对Kafka的存储设计看法:

  • 每个分区存储整个消息数据。尽管每个分区都是有序写入磁盘的,但随着并发 写入分区的数量增加,从操作系统的角度来看,写入变成了随机的
  • 由于数据文件分散,很难使用 Linux IO Group Commit 机制(指的是一次把多个数据文件刷入磁盘。例如:批量发送 3 条数据,分散在 3 个文件里面,做不到一 次刷盘)

因此,RocketMQ设计了一种新的文件存储方式,就是把同一个Topic的所有消息全部写到同一个文件中(这种存储方式叫集中型存储或者混合型存储),从而保证顺序写

优势:

1)队列轻量化,单个队列数据量非常少

2)对磁盘的访问串行化,完全顺序写,避免磁盘竞争,不会因为队列增加导致 IOWAIT 增高

缺点就是消费复杂了

在 Kafka中是一个Topic下面的Partition有独立的文件,只要在一个Topic里面找消息就OK了,Kafka把这个Consumer Group跟Topic的Offset的关系保存在一个特殊的Topic中

现在变成了:要到一个统一的巨大的 commitLog 种去找消息,需要遍历全部的消息,效率太低了,那怎么办呢?

如果想要每个 Consumer Group 只查找自己的 Topic 的 Offset 信息,可以为每 一个Consumer Group 把他们消费的 Topic 的最后消费到的 Offset 单独存储在一个地方,这个存储消息的偏移量的对象就叫做Consumer Queue

image-20230415200723674

简单点说就是,Broker为每个消费者组都保存了一个在commitLog中的偏移量

也就是说,消息在 Broker 存储的时候,不仅写入 commitlog,同时也把在commit log中的最新的 Offset(异步)写入对应的 Consume Queue

消费者在消费消息的时候,先从 Consume Queue 读取持久化消息的起始物理位置偏移量Offset、大小 size 和消息 Tag 的 HashCode 值,随后再从 commit log 中 进行读取待拉取消费消息的真正实体内容部分

Consume Queue 可以理解为消息的索引,它里面没有存消息

总结:

  • 写是顺序写,但是读却变成了完全的随机读(对于 commit log)
  • 读一条消息,会先读 Consume Queue,再读 commit log,增加了开销

更底层的存储文件不再介绍

2.2 存储关键技术(持久化/刷盘)

RocketMQ消息存储在磁盘上,按理说不会很快,那么它又是如何做到这么低的延迟和这么高的吞吐呢?

CPU如果要读取或者操作磁盘上的数据,必须要把磁盘的数据加载到内存,这个是由硬件结构和访问速度的差异决定的

这个加载的大小有一个固定的单位,叫做 Page。x86 的 Linux 中一个标准页面大小是 4KB。如果要提升磁盘访问速度,或者说尽量减少磁盘 I/O,可以把访问过的 Page 在内存中缓存起来。这个内存的区域就叫做 Page Cache

下次处理 I/O 请求的时候,先到 Page Cache 查找,找到了就直接操作,没找到就到磁盘查找

Page Cache 本身也会对数据文件进行预读取,对于每个文件的第一个读请求操作, 系统在读入所请求页面的同时会读入紧随其后的少数几个页面

十分类似MySql的Buffer Pool,传送门

但这里还是有一个问题,虚拟内存分为内核空间和用户空间,而Page Cache 属于内核空间,用户空间访问不了,因此读取数据还需要从内核空间拷贝到用户空间缓冲区

image-20230415201643338

可以看到数据需要从 Page Cache 再经过一次拷贝程序才能访问得到,这个 Copy 的过程会降低数据访问的速度。有什么办法可以避免从内核空间到用户空间的 Copy 呢?

没错,就是零拷贝!这个技术在kafka当中也有使用,但是两者原理不同,传送门

image-20230415201901909

在这里是干脆把 Page Cache 的数据在用户空间中做一个地址映射,这样用户进程就可以通过指针操作直接读写 Page Cache,不再需要系统调用(例如 read())和内存拷贝

RocketMQ 中具体的实现是使用 MMAP(Memory Map,内存映射),不论是 CommitLog 还是 CosumerQueue 都采用了 MMAP

Kafka 用的是 sendfile

2.3 文件清理策略

跟 Kafka 一样,RocketMQ 中被消费过的消息是不会删除的,所以保证了文件的顺序写入

如果不清理文件的话,文件数量不断地增加,最终会导致磁盘可用空间越来越少

清除哪些数据呢?主要清除 CommitLog、ConsumeQueue 的过期文件

什么情况下这些文件变成过期文件?默认是超过 72 个小时的文件

过期的文件什么时候删除呢?

  • 通过定时任务,每天凌晨 4 点,删除这些过期的文件
  • 磁盘使用空间超过了 75%,开始删除过期文件;如果问题更严重,磁盘空间使用率超过 85%,会开始批量清理文件,不管有没有过期,直到空间充足;如果磁盘空间使用率超过 90%,会拒绝消息写入

3.消费者

在集群消费模式下(广播模式忽略),如果要提高消费者的负载能力, 必然要增加消费者的数量

消费者的数量增加了,怎么做到尽量平均的消费消息呢? 队列怎么分配给相应的消费者?

首先,队列的数量是固定的。比如有 4 个队列,假设有 3 个消费者,或者 5 个消费者,这个时候队列应该怎么分配?

如果消费者挂了?消费者增加了?队列又怎么分配?

3.1 消费端的负载均衡与Rebalance

消费者启动的时候,或者有消费者挂掉的时候,默认最多 20 秒,就会做一次 ReBalance,让所有的消费者可以尽量均匀地消费队列的消息

那么有哪些ReBalance策略呢?

  • AllocateMessageQueueAveragely:连续分配(默认)
  • AllocateMessageQueueAveragelyByCircle:每人轮流一个
  • AllocateMessageQueueByConfig:通过配置
  • AllocateMessageQueueConsistentHash:一致性哈希
  • AllocateMessageQueueByMachineRoom:指定一个 broker 的 topic 中的 queue 消费
  • AllocateMachineRoomNearby:按 Broker 的机房就近分配

队列的数量尽量要大于消费者的数量

3.2 消费端重试与死信队列

什么叫消费端重试?

就是如果消费者消费成功,是不是要返回给Broker一个ack,改一下Offset的位置?

那如果消费失败了呢?返回东西呢?返回啥?

消费失败时,比如数据库不可用,网络问题啥的,消费者就会返回给Broker一个RECONSUME_LATER,意思是稍候重试

然后过一阵儿,Broker会把这个重试消息再发给消费者让他消费,如果还是异常,就再次重试,这个过程中等待时间是不断衰减增加的,从最初的10s会直到2h,最多经历16次重试

那16次之后呢?那就是真消费不了,直接扔进死信队列

Broker 会创建一个死信队列,死信队列的名字是%DLQ% + ConsumerGroupName

4.特性

现在市面上有这么多流行的消息中间件,RocketMQ 又有什么不同之处?

一般我们会从使用、功能、性能、可用性和可靠性四个方面来衡量

其中有一些是基础特性,这里重点说一下 RocketMQ 比较突出的:

1.单机可以支撑上万个队列的管理——可以满足众多项目创建大量队列的需求

2.上亿级消息堆积能力——在存储海量消息的情况下,不影响收发性能;

3.具有多副本容错机制——消息可靠性高,数据安全

4.可以快速扩容缩容,有状态管理服务器——那就意味着具备了横向扩展的能力

5.可严格保证消息的有序性——满足特定业务场景需求

6.Consumer 支持 Push 和 Pull 两种消费模式——更灵活(主要是 Pull)

7.支持集群消费广播消息——适合不同业务场景

8.低延迟:客户端消息延迟控制在毫秒级别(从双十一的复盘情况来看延迟在 1ms 以内的消息比例 99.6%;延迟在 10ms 以内的消息占据 99.996%)——效率高

三、高可用

在 RocketMQ 的高可用架构中,本文主要关注两块:主从同步和故障转移

image-20230415203901051

1.主从同步的意义

  • 数据备份:保证了两/多台机器上的数据冗余,特别是在主从同步复制的情况下,一定程度上保证了Master出现不可恢复的故障以后,数据不丢失
  • 高可用性:即使 Master 掉线, Consumer 会自动重连到对应的 Slave 机器,不会出现消费停滞的情况
  • 提高性能:主要表现为可分担 Master读的压力,当从 Master 拉取消息,拉取消息的最大物理偏移与本地存储的最大物理偏移的差值超过一定值,会转向 Slave(默认brokerId=1)进行读取,减轻了 Master 压力
  • 消费实时:Master 节点挂掉之后,依然可以从 Slave 节点读取消息,而且会选择一个副本作为新的 Master,保证正常消费

2.数据同步

2.1 主从联系

主从服务器怎么联系在一起?

比如 A 机器上的 broker-a-master 和 B 机器上的 broker-a-slave

依靠一下几点

  • 集群的名字相同,brokerClusterName=tom-cluster
  • 连接到相同的 NameServer
  • 在配置文件中:brokerId = 0 代表是 Master,brokerId = 1 代表是 Slave

2.2 主从同步和刷盘类型

这些都在部署节点时,在配置文件中进行了设置,即设置了Broker角色和刷盘方式

属性 含义
brokerRole //Broker 的角色 ASYNC_MASTER 主从异步复制 Master写成功,返回客户端成功。拥有较低的延迟和较高的吞吐量,但是当Master出现故障后,有可能造成数据丢失
SYNC_MASTER 主从同步双写(推荐) Master 和 Slave 均写成功,才返回客户端成功。Maste 挂了以后可以保证数据不丢失,但是同步复制会增加数 据写入延迟,降低吞吐量
flushDiskType //刷盘方式 ASYNC_FLUSH 异步刷盘(默认) 生产者发送的每一条消息并不是立即保存到磁盘,而是 暂时缓存起来,然后就返回生产者成功。随后再异步的 将缓存数据保存到磁盘,有两种情况
1 是定期将缓存中更新的数据进行刷盘,
2 是当缓存中更新的数据条数达到某一设定值后进行刷 盘。 这种方式会存在消息丢失(在还未来得及同步到磁盘的 时候宕机),但是性能很好。默认是这种模式。
SYNC_FLUSH 同步刷盘 生产者发送的每一条消息都在保存到磁盘成功后才返回 告诉生产者成功。这种方式不会存在消息丢失的问题, 但是有很大的磁盘 IO 开销,性能有一定影响

img

通常情况下,会把 Master 和 Slave 的 Broker 均配置成 ASYNC_FLUSH 异步刷盘方式,主从之间配置成 SYNC_MASTER 同步复制方式,即:异步刷盘+同步复制

2.3 主从同步流程

主从同步流程:

1、从服务器主动建立 TCP 连接主服务器,然后每隔 5s 向主服务器发送 commitLog 文件最大偏移量拉取还未同步的消息;

2、主服务器开启监听端口,监听从服务器发送过来的信息,主服务器收到从服务 器发过来的偏移量进行解析,并返回查找出未同步的消息给从服务器;

3、客户端收到主服务器的消息后,将这批消息写入 commitLog 文件中,然后更新commitLog 拉取偏移量,接着继续向主服务拉取未同步的消息

3.HA故障转移

在之前的版本中,RocketMQ 只有 Master/Slave 一种部署方式,一组 Broker 中有一个 Master,有零到多个 Slave,Slave 通过同步复制或异步复制方式去同步 Master 的数据

Master/Slave 部署模式,提供了一定的高可用性

但这样的部署模式有一定缺陷,比如故障转移方面,如果主节点挂了还需要人为手动的进行重启或者切换,无法自动将一个从节点转换为主节点

如果要实现自动故障转移,根本上要解决的问题是自动选主的问题,即还是选Leader

比如 Kafka 用 Zookeeper 选 controller,用类 PacificA 算法选 Leader、Redis 哨兵用 Raft 协议选 Leader

RocketMQ 2019 年 3 月发布的 4.5.0 版本中,利用 Dledger 技术解决了自动选主的问题

DLedger 就是一个基于 Raft 协议的 commitlog 存储库,也是 RocketMQ 实现新的高可用多副本架构的关键。它的优点是不需要引入外部组件,自动选主逻辑集成到各个节点的进程中,节点之间通过通信就可以完成选主

image-20230415205559155

这种情况下,commitlog 是 Dledger 管理的,具有选主的功能,具体选举流程如下

  1. 当主节点失效后,集群中的其他节点会检测到主节点失效,并开始进行新的 leader 选举。
  2. 集群中的每个节点都会将自己的状态报告给其他节点,并且收集其他节点的状态报告。
  3. 节点根据收到的状态报告,计算出每个节点的可用性,并选择可用性最高的节点作为新的 leader 节点。
  4. 选出新的 leader 节点后,节点将新的 leader 节点信息广播给整个集群,以通知其他节点。
  5. 集群中的每个节点更新自己的 leader 节点信息,然后继续进行数据的读写操作。

需要注意的是,leader 选举过程中可能会出现网络分区(network partition)等问题,导致集群出现多个主节点。为了避免这种情况,DLedger 采用了类似于 Paxos 算法中的多数派原则,要求集群中超过半数的节点认同新的 leader 节点才能生效,从而保证集群只有一个有效的主节点