1. 首页
  2. RocketMq源码解析
  3. 图解 RocketMQ NameServer RocketMQ Broker 主从同步与集群模式原理

图解 RocketMQ NameServer RocketMQ Broker 主从同步与集群模式原理

  • 发布于 2024-10-25
  • 12 次阅读

RocketMQ 的 NameServer 是 RocketMQ 架构中的一个重要组件,它负责管理和广播路由信息。NameServer 本身是一个无状态的服务,集群中的多个 NameServer 实例之间不进行通信,每个 Broker 都会定期向所有 NameServer 发送心跳包,报告其状态和路由信息。

RocketMQ NameServer 的主要职责

  1. 路由信息管理:存储和管理 Broker 的路由信息,包括 Topic 和队列的分布情况。

  2. 心跳检测:接收并处理 Broker 的心跳信息,以监控 Broker 的存活状态。

  3. 路由信息广播:将路由信息广播给生产者和消费者,使它们能够找到正确的 Broker 进行消息的发送和消费。

  4. 负载均衡:帮助客户端(生产者和消费者)进行负载均衡,选择合适的 Broker 进行消息的发送和消费。

图解 RocketMQ NameServer

下面是一个简化的图解,展示了 RocketMQ NameServer 的工作流程:

+-----------------+            +-----------------+            +-----------------+
|  Broker A       |            |  Broker B       |            |  Broker C       |
| (Master/Slave)  |            | (Master/Slave)  |            | (Master/Slave)  |
|  Heartbeat      |------------>  Heartbeat      |------------>  Heartbeat      |
|  Route Info     |<-----------  Route Info      |<-----------  Route Info      |
+-----------------+            +-----------------+            +-----------------+
         |                            |                            |
         |                            |                            |
         |                            |                            |
+--------v--------+            +-------v------+            +-------v------+
|  NameServer 1   |<---------->|  NameServer 2  |<--------->|  NameServer 3  |
|  Route Table    |            |  Route Table   |            |  Route Table   |
|  Heartbeat      |            |  Heartbeat     |            |  Heartbeat     |
+-----------------+            +-----------------+            +-----------------+
         |                            |                            |
         |                            |                            |
         |                            |                            |
+--------v--------+            +-------v------+            +-------v------+
|  Producer 1     |<---------->|  Producer 2   |<--------->|  Consumer 1    |
|  Fetch Route    |            |  Fetch Route   |            |  Fetch Route   |
|  Send Message   |------------>  Send Message  |------------>  Consume Msg   |
+-----------------+            +-----------------+            +-----------------+

工作流程说明

  1. Broker 向 NameServer 发送心跳

    • 每个 Broker 定期向所有的 NameServer 发送心跳包,报告自己的状态和路由信息。

    • 心跳包中包含 Broker 的 IP 地址、端口、Topic 列表等信息。

  2. NameServer 维护路由表

    • NameServer 接收到 Broker 的心跳后,更新其内部的路由表。

    • 路由表记录了每个 Topic 在哪些 Broker 上有队列,以及这些队列的具体位置。

  3. 客户端获取路由信息

    • 生产者和消费者在启动时会从 NameServer 获取最新的路由信息。

    • 客户端通过轮询或广播的方式从 NameServer 获取路由信息,并缓存在本地。

  4. 客户端与 Broker 交互

    • 生产者根据获取到的路由信息,选择合适的 Broker 发送消息。

    • 消费者根据获取到的路由信息,选择合适的 Broker 拉取消息。

  5. NameServer 之间的独立性

    • 多个 NameServer 之间是相互独立的,没有主从关系。

    • 每个 NameServer 都维护一份完整的路由表,客户端可以随机选择一个 NameServer 进行通信。

详细步骤

  1. Broker 启动

    • Broker 启动后,定期向所有配置的 NameServer 发送心跳包。

    • 心跳包中包含 Broker 的基本信息和路由信息。

  2. NameServer 更新路由表

    • NameServer 收到心跳包后,更新其内部的路由表。

    • 如果一段时间内没有收到某个 Broker 的心跳,则认为该 Broker 下线,从路由表中移除其信息。

  3. 客户端获取路由信息

    • 客户端(生产者或消费者)启动时,从 NameServer 获取路由信息。

    • 客户端可以配置多个 NameServer 地址,从中随机选择一个进行通信。

    • 客户端缓存路由信息,并定期刷新以保持最新。

  4. 客户端与 Broker 交互

    • 生产者根据路由信息选择合适的 Broker 发送消息。

    • 消费者根据路由信息选择合适的 Broker 拉取消息。

    • 如果某个 Broker 不可用,客户端会自动切换到其他可用的 Broker。

总结

RocketMQ 的 NameServer 是一个轻量级的、无状态的服务,负责管理和广播路由信息。它通过心跳机制监控 Broker 的状态,并将最新的路由信息提供给客户端,从而实现高可用性和负载均衡。这种设计使得 RocketMQ 能够在大规模分布式环境中高效运行。

一、整体架构图

(此处可自行想象一个简单的架构图示例,或者之后通过工具生成实际图形来辅助理解,以下用文字描述各元素在图中的呈现及关系)


  • NameServer 节点:在图中以一个单独的方块表示,它是整个 RocketMQ 架构中的核心元数据管理中心。通常会有多个 NameServer 节点部署在集群环境中,以实现高可用性。这些节点之间相互独立,并不进行数据的同步等复杂交互(与一些其他分布式系统不同),每个 NameServer 都保存着完整的元数据信息。

  • Broker 节点:用另一种形状的方块(比如圆形)表示,代表消息中转和存储的角色。在图中会有多个 Broker 节点与 NameServer 节点相连接,它们会向 NameServer 上报自身的相关信息,如 Broker 的地址、存储的主题(Topic)信息、每个主题下的队列(Queue)情况等。

  • Producer(生产者)节点:以三角形表示,生产者负责生产并发送消息到 Broker。在图中,生产者节点会先与 NameServer 建立连接,通过 NameServer 获取到 Broker 的相关信息,比如哪些 Broker 上有自己要发送消息的目标主题等,然后再根据这些信息去连接对应的 Broker 并发送消息。

  • Consumer(消费者)节点:同样以某种特定形状(比如菱形)表示,消费者用于接收和消费 Broker 中的消息。它也是先与 NameServer 连接,获取到要消费的主题所在的 Broker 信息,之后去连接相应的 Broker 进行消息的获取和消费。


二、启动流程相关部分


  • NameServer 启动

    • 在架构图中,从一个 “开始” 节点引出一条线指向 NameServer 节点,表示启动的开始。NameServer 启动时,会进行一系列的初始化操作,在图中可以用一些小的矩形框在 NameServer 节点内部表示这些步骤。

    • 首先是加载配置文件,用一个小矩形框标注 “加载配置文件”,箭头指向它表示启动时先执行这一步。配置文件包含了如监听端口、数据存储路径等相关设置。

    • 接着是初始化网络通信模块,另一个小矩形框标注 “初始化网络通信模块”,它依赖于前面配置文件中关于端口等的设置。这一步使得 NameServer 可以在指定端口上监听来自 Broker、Producer 和 Consumer 的连接请求,在图中用箭头表示数据流向,从配置文件指向网络通信模块初始化步骤,再用箭头从网络通信模块指向外部,表示可以接收外部连接。

  • Broker 向 NameServer 注册

    • 当 Broker 启动后,会有一条线从 Broker 节点指向 NameServer 节点,表示 Broker 要向 NameServer 进行注册。在 NameServer 节点内部,对应有一个小矩形框标注 “处理 Broker 注册”。

    • Broker 会将自身的关键信息,如 Broker 的 ID、地址、存储的主题等,发送给 NameServer。在图中可以用箭头表示数据从 Broker 流向 NameServer,并且在 NameServer 的 “处理 Broker 注册” 小矩形框内,有箭头表示对这些信息的接收和处理,比如将接收到的信息存储到内部的一个数据结构中(可以想象一个类似表格的结构在图中简单示意)。


三、信息维护与更新部分


  • 元数据存储与更新

    • 在 NameServer 节点内部,有一个较大的矩形框标注 “元数据存储区”,用来表示 NameServer 存储元数据的地方。这里存储着所有 Broker 上报的信息,包括各个 Broker 的详细情况以及它们所承载的主题、队列等信息。

    • 当有 Broker 的信息发生变化时,比如新增了一个主题或者某个 Broker 的地址变更了,在图中用一条线从 Broker 节点(有一个小标注表示信息变化的触发点,如 “新增主题” 或 “地址变更”)指向 NameServer 节点的 “元数据存储区”,并在 “元数据存储区” 内部有相应的箭头表示对新信息的接收和更新操作,即将新的信息更新到原有的元数据存储结构中。

  • 定时任务相关

    • NameServer 会执行一些定时任务来维护元数据的准确性。在 NameServer 节点内部,有一个小矩形框标注 “定时任务执行”,用箭头从 “元数据存储区” 指向它,表示定时任务会基于存储的元数据进行操作。

    • 例如,定时检查 Broker 的心跳信息,以确定各个 Broker 是否存活。在图中可以用一条线从 “定时任务执行” 小矩形框引出,连接到各个 Broker 节点,并在连接线上有一个小标注 “检查心跳”,表示定时任务会向 Broker 发送心跳检查请求,若收到 Broker 的回应则确认其存活,若未收到回应则可能会对相关的元数据进行处理(如标记该 Broker 为不可用状态,在图中可以用在 “元数据存储区” 内部的一个小操作示意,比如对相应 Broker 记录添加一个 “不可用” 标注)。


四、Producer 和 Consumer 获取信息部分


  • Producer 获取 Broker 信息

    • 生产者启动时,会先与 NameServer 建立连接,在图中用一条线从 Producer 节点指向 NameServer 节点,并在 NameServer 节点内部有一个小矩形框标注 “处理 Producer 连接”。

    • 之后,Producer 会向 NameServer 请求获取要发送消息的目标主题所在的 Broker 信息。在图中用箭头表示数据从 Producer 流向 NameServer 的 “处理 Producer 连接” 小矩形框,然后从这个小矩形框引出一条线指向 “元数据存储区”,表示从存储的元数据中查找相关信息,最后再用一条线从 “元数据存储区” 引出,连接到 Producer 节点,表示将找到的 Broker 信息返回给 Producer。

  • Consumer 获取 Broker 信息

    • 类似地,消费者启动时先与 NameServer 建立连接,用一条线从 Consumer 节点指向 NameServer 节点,在 NameServer 节点内部有一个小矩形框标注 “处理 Consumer 连接”。

    • 然后 Consumer 会向 NameServer 请求获取要消费的主题所在的 Broker 信息。同样在图中用箭头表示数据从 Consumer 流向 NameServer 的 “处理 Consumer 连接” 小矩形框,接着从这个小矩形框引出一条线指向 “元数据存储区”,表示从存储的元数据中查找相关信息,最后再用一条线从 “元数据存储区” 引出,连接到 Consumer 节点,表示将找到的 Broker 信息返回给 Consumer。




01 总体概述

NameServer是专门为 RocketMQ 设计的轻量级名字服务,它主要用来实现服务发现」的,其具有简单相互独立」、「可集群横向扩展无状态节点间互不通信等特点。

它是组成 RocketMQ 的重要组件之一,是除了 Broker 之外另一个需要部署的服务,关于安装部署,可以点击:

【入门实战系列第三篇】RocketMQ 安装入门实战

不知道你是否会有这样的问题:

  1. RocketMQ 的 Topic 分布在不同的 Broker 节点上,作为消息的生产者消费者,如何知道要从哪个 Broker 节点去生产或者消费消息呢?

  2. 如果此时连接的 Broker 宕机了,如何在不重启的情况下感知到?

那么NameServer就是为了解决这些问题设计的,接下来我们就来深度剖析下其架构设计和原理。

02 为什么要设计 NameServer

目前市面上可以作为注册中心/服务发现」的组件有很多,比如:ETCDConsulZookeeperNacos等:

既然上面这些都可以实现注册中心/服务发现」功能,为什么 RocketMQ 还要自己开发一个 NameServer」呢?笔者认为大概有以下原因:

  1. 根据CAP理论,同时最多只能满足两个点,而 Zookeeper 满足的是 CP,也就是说 Zookeeper 并不能保证服务的可用性,Zookeeper 在进行选举的时候,整个选举的时间太长,期间整个集群都处于不可用的状态,而这对于一个注册中心来说肯定是不能接受的,作为服务发现来说就应该是为可用性而设计。

  2. 首先 RocketMQ 的架构设计决定了需要一个轻量级元数据服务器,只需保持最终一致性,而不需要像 Zookeeper、ETCD 那样的强一致性方案,所以无需再依赖另一个中间件,从而可以减少整体的维护成本。

  3. 另外NameServer通常也是集群的方式部署,彼此之间是「相互独立互不通信,Broker 会向每个 NameServer注册自己的路由信息,所以每个NameServer都保存一份完整的路由信息,即使某台NameServer挂掉,Broker 仍然可以向其它NameServer同步路由信息,所以客户端仍然可以动态感知到 Broker 的路由信息。


03 NameServer 整体架构设计

NameServer是⼀个非常简单的 Topic 路由注册中心,其角色类似 Kafka、Dubbo 中的 Zookeeper ,支持 Broker 的动态注册与发现」。

主要包含两个功能:

  1. Broker 管理:NameServer 接收来自 Broker 集群的注册信息并且保存下来作为路由信息的基本数据。然后提供心跳检测机制」用来检查 Broker 是否还存活。

  2. 路由信息管理:每个 NameServer 将保存 Broker 集群的整个路由信息和用来客户端查询的队列信息。然后 Producer 端和 Conumser 端通过 NameServer 就知道整个 Broker 集群的路由信息,从而进行消息的生产和消费。

RocketMQ 5.0 版本之后 NameServer 同时也可以作为 Controller 模块的一个容器,Controller 模块内嵌到NameServer 中。

通过上图可以看出,RocketMQ 整个架构上主要分为四部分: BrokerProducerConsumerNameServer,其他三个都会与 NameServer进行通信,整个流程如下:

2.1 NameServer 启动

启动「NameServer」服务,监听 TCP 端口, 其集群多节点之间互不通信,然后等待 「BrokerProducerConsumer」连上来。

2.2 Broker 启动

启动「Broker」服务,此时会每隔 30 秒向所有的 NameServer 发送心跳命令,进行注册路由信息」,「NameServer」接收到来自「Broker」心跳请求之后,保存「路由信息」到本地内存中,将注册成功结果返回给 Broker 服务。



2.3 Producer 发送消息

当「Producer」启动之后,会随机的选择「NameServer」集群中的其中⼀个「NameServer」建立长连接,并从 「NameServer」中获取当前发送的 Topic 存在哪些 Broker 上,轮询从队列列表中选择⼀个队列,然后与队列所在的 Broker 建立长连接从而向 Broker 发消息。

2.3.1 Producer 与 NameServer 关系

  1. 连接:单个 Producer 和一台 NameServer 保持长连接,如果该 NameServer 挂掉,生产者会自动连接下一个 NameServer,直到有可用连接为止,并能自动重连。

  2. 心跳:与 NameServer 没有心跳。

  3. 轮询时间:默认情况下,生产者每隔 30 秒从 NameServer 获取所有 Topic 的最新队列情况,这意味着某个 Broker 如果宕机,生产者最多要 30 秒才能感知,在此期间发往该 Broker 的消息发送失败。

2.3.2 Producer 与 Broker 关系

  1. 连接:单个生产者和该生产者关联的所有 Broker 保持长连接。

2.4 Consumer 订阅消息

Consumer」跟 「Producer」类似,也是跟其中⼀台 「NameServer」建立长连接,获取当前订阅 Topic 存在哪些 Broker 上,然后直接跟 Broker 建立连接并准备开始消费消息。

2.4.1 Consumer 与 NameServer 关系

  1. 连接:单个 Consumer 和一台 NameServer 保持长连接,如果该 NameServer 挂掉,消费者会自动连接下一个 NameServer,直到有可用连接为止,并能自动重连。

  2. 心跳:与 NameServer 没有心跳

  3. 轮询时间:默认情况下,消费者每隔 30 秒从 NameServer 获取所有 Topic 的最新队列情况,这意味着某个 Broker 如果宕机,客户端最多要 30 秒才能感知。


  1. 连接
    :单个消费者和该消费者关联的所有 Broker 保持长连接。

2.4.3 Consumer 负载均衡

集群消费模式下,一个消费组集群的多台机器共同消费一个 Topic 的多个队列,一个队列只会被一个消费者消费。如果某个消费者挂掉,分组内其它消费者会接替挂掉的消费者继续消费。

04 NameServer 心跳设计


如图所示,可以看到 Consumer」、「Producer」、「Broker均隔每 30s 向 NameServer发起一次请求,在NameServer」内部中也会启动定时器用来定期扫描更新」内部数据。

4.1 Broker 发起心跳

  1. 每隔 30s 向 「NameServer」集群的每台节点都发送心跳包,包含自身 Topic 队列的路由信息。

  2. 当有 Topic 改动「创建/更新」,Broker 会立即发送 Topic 增量信息到 「NameServer」,同时触发 「NameServer」的数据版本号发生变更。

  3. 心跳包组成部分:「请求头」、「请求体」。

4.2 NameServer 处理请求

  1. 将「路由信息」保存在内存中。它只会被其他模块调用,「Broker 注册」、「客户端拉取」,并不会主动调用其他模块。

  2. 启动一个「定时任务线程」,每隔「10s」扫描 「brokerLiveTable」中所有的 Broker 上次发送心跳时间,如果超过「120s」没有收到心跳,则从存活 Broker 表中移除该 Broker。

05 NameServer 路由管理设计

前面说过了 Broker 节点在启动的时向所有 「NameServer」进行注册。

消息生产者 Producer 在发送消息之前先从「NameServer」获取 Broker 服务器地址列表然后根据「负载均衡」算法从列表中选择一台服务器进行发送。

NameServer」与每台「Broker」保持「长连接」,并且每间隔「30s」通过心跳检测「Broker」是否存活,如果「120s」秒内没收到 「Broker」的上报消息,在「NameServer」内部中会每隔「10s」扫描一次Broker 列表,移除不活跃的 Broker),那么就认为检测到 Broker 宕机,则从「路由注册表」中删除。

但是路由变化不会马上通知消息生产者。这样设计的目的是为了降低「NameServer」实现的复杂度,在消息发送端提供「容错机制」保证消息发送的可用性。

NameServer」本身的高可用是通过部署多台「NameServer」来实现,但彼此之间不通讯,也就是「NameServer」服务器之间在某一个时刻的数据并不完全相同,但这对消息发送并不会造成任何影响,这也是「NameServer」设计的一个亮点。总之,RocketMQ 设计追求简单高效,这也是为了高可用放弃强一致性的表现吧

总结,「NameServer」的主要作用是为消息的生产者和消息消费者提供关于主题 Topic 的路由信息,那么 NameServer」需要存储路由的基础信息,还要管理 Broker 节点,包括「路由注册」、「路由删除」、「路由发现」。

通过对源码的研读,「NameServer」的路由关系都保存在「RouteInfoManager」中的 4 个 HashMap 中,「路由注册」、「路由删除」、「路由发现」基本都是操作这 4 个 HashMap,这里不多对源码进行剖析,会放到源码系列中

假如说我们搭建了如下「双主双从」的集群,集群名字为「rocketmq-cluster」 。

     ip                  架构模式
192.168.56.1             Master1
192.168.56.2             Master2
192.168.56.3             Slave1
192.168.56.4             Slave2

相关配置如下:

master1

# 所属集群名字
brokerClusterName=rocketmq-cluster
# broker名字
brokerName=broker-a
# 0 表示 Master, 大于0 表示 Slave
brokerId=0

master2

# 所属集群名字

brokerClusterName=rocketmq-cluster

# broker名字

brokerName=broker-b

# 0 表示 Master, 大于0 表示 Slave

brokerId=0

slave1

# 所属集群名字

brokerClusterName=rocketmq-cluster

# broker名字

brokerName=broker-a

# 0 表示 Master, 大于0 表示 Slave

brokerId=1

slave2

# 所属集群名字

brokerClusterName=rocketmq-cluster

# broker名字

brokerName=broker-b

# 0 表示 Master, 大于0 表示 Slave

brokerId=1

假如说我们在「rocketmq-cluster」集群的「broker-a」和「broker-b」上创建一个 topic,名字为 hello-huazai,「读写队列」都默认为「4」个,消息的分布情况如下图所示:


那么上面 4 个 HashMap 对应的值为:

topicQueueTable

{

"hello-huazai": [

 {

"brokerName": "broker-a",

"readQueueNums": 4,

"writeQueueNums": 4,

"perm": 6,

"topicSynFlag": 0

 },

 {

"brokerName": "broker-b",

"readQueueNums": 4,

"writeQueueNums": 4,

"perm": 6,

"topicSynFlag": 0

 }

 ]

}

brokerAddrTable

{

"broker-a": {

"cluster": "rocketmq-cluster",

"brokerName": "broker-a",

"brokerAddrs": {

"0": "192.168.56.1:10912",

"1": "192.168.56.3:10912"

 }

 },

"broker-b": {

"cluster": "rocketmq-cluster",

"brokerName": "broker-a",

"brokerAddrs": {

"0": "192.168.56.2:10912",

"1": "192.168.56.4:10912"

 }

 }

}

clusterAddrTable



{

"rocketmq-cluster": [

"broker-a",

"broker-a"

 ]

}

brokerLiveTable

{

"192.168.56.1:10912": {

"lastUpdateTimestamp": 1698394578378,

"haServerAddr": ""

 },

"192.168.56.2:10912": {

"lastUpdateTimestamp": 1698398886278,

"haServerAddr": ""

 },

"192.168.56.3:10912": {

"lastUpdateTimestamp": 1698398872578,

"haServerAddr": ""

 },

"192.168.56.4:10912": {

"lastUpdateTimestamp": 1698398864578,

"haServerAddr": ""

 }

}

06 5.0 相对 4.0 的 NameServer 改进

NameServer在 RocketMQ 5.0 的版本做了很多的优化工作:

  1. broker 注册线程池和客户端路由获取线程池隔离。

  2. 当前NameServer会用同一个线程池和队列去处理所有的客户端路由请求,服务端注册请求等,并且队列的大小和线程数都是不可配置的,如果其中一个类型的请求打爆线程池,将会影响到所有请求。将线程池进行了隔离,将最重要要的客户端路由请求单独隔离出来,队列的大小和线程数均是可配置的。线程池之间的请求处理相互隔离,不受影响。

  3. Topic 路由缓存的优化

  4. 当前NameServer当客户端发送路由请求时,会利用 topicQueueTable 和 brokerAddrTable 来构造出最终的路由信息TopicRouteData,这里涉及了在读锁中遍历 broker,有一定的 cpu 耗费。通过构造TopicRoute 的缓存 topicRouteDataMap,直接在客户端请求时返回 TopicRoute,而额外的代价是在 broker请求、下线,删除topic 等行为时同时操作 topicRouteDataMap。

  5. 批量注销 Broker

  6. 增加 BatchUnRegisterService,异步化批量处理 broker 下线,加速 broker 下线流程。

RIP-29NameServer的改进增强。


07 优化联想

生产者和消费者连接NameServer获取路由信息都是随机的,从给定的NameServer列表中打散列表随机选择一个NameServer进行连接获取数据。这种情况存在如下弊端:

  1. NameServer部署的较好的机器无法发挥机器的所有性能,而性能较差的机器可能连接很多导致服务宕机。

  2. 不能够指定NameServer进行连接(只配置一个NameServer地址除外)。

所以考虑在客户端(生产者和消费者)增加选择连接NameServer的策略模式,由开发者自己选择或者实现策略来选择NameServer进行连接。可以考虑一下策略模式:

  1. 随机策略:随机一个NameServer进行连接(当前的模式)。

  2. 指定策略:指定一个特定的NameServer进行连接。

  3. 轮询策略:当前应用中的客户端自行在给定的NameServer

  4. NameServer最小客户端连接数策略:获取当前NameServer中客户端连接数最小的进行连接。


08总结

这里,我们一起来总结一下这篇文章的重点。

1、带你剖析了RocketMQ」为什么要设计「NameServer」。

2、接着带你剖析了NameServer」整体架构设计以及「心跳设计」。

2、最后带你剖析了「NameServer」的路由管理设计。

下篇我们来深度剖析「Broker 主从架构与集群模式管理」,大家期待,我们下期见。

图解 RocketMQ Broker 主从同步与集群模式原理

01 总体概述

与 Kafka 类似,「Broker」是 RocketMQ 的核心,大部分「重量级」的工作都是由 「Broker」来完成的,主要包括接收 Producer 发来的消息」、「处理 Consumer 消费的消息」、「消息的持久化存储」、「消息的 HA 机制消息查询以及服务高可用」等

02 Broker 集群部署模式


RocketMQ 的 Broker 有以下三种集群部署方式:

1.单台 Master 部署;

2.多台 Master 部署;

3.多 Master 多 Slave 部署;

2.1 单 Master 模式

单 master 也就是只有一个 master 节点,如果该 master 节点挂掉了,会导致整个服务不可用,线上不建议使用,适合个人学习使用。

2.2 多 Master 模式

多个 master 节点组成集群,单个 master 节点宕机或者重启对应用没有影响。

和 kafka 不一样,RocketMQ 中并没有 master选举功能,在 RocketMQ 集群中,1 台机器只能要么是Master,要么是 Slave,这个在初始的机器配置就定死了的。

早期 RocketMQ 版本不会像 kafka 那样存在 master 动态选举,所以通过配置多个 master 节点来保证 RocketMQ 的高可用。

我们来看下该模式下的优缺点:

  1. 优点:高可用,该模式是所有模式中性能最高」的

  2. 缺点:可能会有少量消息丢失(配置相关),单台机器重启或宕机期间,该机器下未被消费的消息在机器恢复前不可订阅,影响消息实时性。

注意:使用同步刷盘可以保证消息不丢失,同时 Topic 相对应的 queue 应该分布在集群中各个 master 节点,而不是只在某个 master 节点上,否则该节点宕机会对订阅该 topic 的应用造成影响。

2.3 多 Master 同步/异步复制模式

多 master 模式根据 Master 和 Slave 之间的数据同步方式可以分为:

  1. 多 master 多 slave 异步复制模式

  2. 多 master 多 slave 同步复制模式

总体来说,RocketMQ 集群部署模式为四种,下面是第三种方式的部署方式架构图:

当采用多 Master 方式时,Master 与 Master 之间是不需要知道彼此的,这样的设计思想直接降低了 Broker实现的复杂性。

看到这里你是否有所疑惑?你可以想象一下,如果 Master 与 Master 之间需要知道彼此的存在,此时需要在维护一个 Master 列表,而且必然涉及到Master 注册发现活跃 Master 数量变更等很多状态更新问题,所以最简单最可靠的做法就是 Master 只做好自己的事情与 Slave 进行数据同步即可。

通过这样的设计,在分布式环境中,当某台 Master 宕机或上线,不会对其他 Master 造成任何影响。

那么我们如何才能知道集群中有多少台 Master 和 Slave 呢?

这就是我们上一篇剖析的结果 图解 RocketMQ NameServer 架构设计,没错就是那个大名鼎鼎的「NameServer」。

这里再回顾下NameServer」的功能,大概有以下 4 类:


  1. NameServer 承担了注册中心的职能。

  2. NameServer 用来保存活跃 broker 列表,包括 Master 和 Slave。

  3. NameServer 用来保存所有 topic 和该 topic 所有队列的列表。

  4. NameServer 用来保存所有 broker 的Filter 列表

2.4 多 Master 多 Slave 部署集群工作流程


  1. NameServer 是一个几乎无状态节点,可集群部署,节点之间无任何信息同步。

  2. Broker 部署相对复杂,Broker 分为 Master 与 Slave,一个 Master 可以对应多个 Slave,但是一个 Slave 只能对应一个 Master,Master 与 Slave 的对应关系通过指定相同的 BrokerName不同的 BrokerId来定义,BrokerId 为 0 表示 Master,非 0 表示 Slave。Master 也可以部署多个。每个 Broker 与 NameServer 集群中的所有节点建立长连接,定时注册 Topic 信息到所有 NameServer。

注意:当前RocketMQ版本在部署架构上支持一Master多Slave,但只有BrokerId=1的从服务器才

会参与消息的读负载。

  1. Producer 与 NameServer 集群中的其中一个节点(随机选择)建立长连接,定期从 NameServer 获取「Topic 路由信息」,并向提供 Topic 服务的 Master 建立长连接,且定时向 Master 「发送心跳」。Producer完全无状态,可集群部署。

  2. Consumer 与 NameServer 集群中的其中一个节点(随机选择)建立长连接,定期从 NameServer 获取 Topic 路由信息,并向提供 Topic 服务的 Master、Slave 建立长连接,且定时向 Master、Slave 发送心跳。

  3. Consumer 既可以从 Master 订阅消息,也可以从 Slave 订阅消息,消费者在向 Master 拉取消息时,Master服务器会根据拉取偏移量与最大偏移量的距离(判断是否读老消息,产生读 I/O),以及从服务器是否可读等因素建议下一次是从 Master 还是 Slave 拉取。

2.3.1 多 Master 多 Slave 异步复制模式

在多 master 模式的基础上,每个 master 节点都有至少一个对应的 slave。master 节点可读可写,但是 slave 只能读不可写,类似于 MYSQL 的主备模式。

其优缺点如下:

  1. 优点: 在 master 宕机时,消费者可以从 slave 读取消息,消息的实时性不会受影响,性能几乎和多 master 一样。

  2. 缺点:使用异步复制的同步方式有可能会有消息丢失消息丢失的问题。

RocketMQ 天生对集群的支持非常友好,天然支持高可用,它可以支持多主多从的部署架构,这也是和

Kafka 区别之一,Kafka 的分区副本可以看成一主多从

其中 Master 的broker id = 0,Slave 的 broker id > 0。这有点类似于 MYSQL 的主从概念,当 master 挂了以后,slave 仍然可以提供读服务,但是由于有多主的存在,当一个 master 挂了以后,可以写到其他的 master上。

2.3.2 多 master 多 slave 同步复制模式

同多 master 多 slave 异步复制模式类似,区别在于 master 和 slave 之间的数据同步方式。

其优缺点如下:

  1. 优点:同步双写的同步模式能保证数据不丢失。

  2. 缺点:发送单个消息RT 会略长,性能相比异步复制低 10%左右。

刷盘策略:同步刷盘和异步刷盘,它指的是节点自身数据是同步还是异步存储。

注意:要保证数据可靠,需采用同步刷盘和同步双写的方式,但性能会较其他方式低。


2.5 BrokerServer 高可用

RocketMQ 是通过 Master 和 Slave 的配合达到 BrokerServer 模块的高可用。一个 Master 可以配置多个 Slave,同时也支持配置多个 Master-Slave 组。

当其中一个 Master 出现问题时:

  1. 由于 Slave 只负责读,当 Master 不可用,它对应的 Slave 仍能保证消息被正常消费。

  2. 由于配置多组 Master-Slave 组,其他的 Master-Slave 组也会保证消息的正常发送和消费。

老版本的 RocketMQ 不支持把 Slave 自动转成 Master,如果机器资源不足, 需要把 Slave 转成 Master,则

手动停止Slave 角色的 Broker,更改配置文件,用新的配置文件启动 Broker。

新版本(4.5)的 RocketMQ,支持 Slave 自动转成 Master。RocketMQ dledger使用了Raft 协议,保证所有节点数据最终一致性,同时保证了集群的可用性。

关于dledger」我们会在后面单篇剖析,这里就不介绍了,下面我们先介绍原来的主从同步的机制。

03 Broker 主从同步机制

上面说了,RocketMQ 支持集群部署来保证高可用。它基于主从模式,将节点分为 Master、Slave 两个角色,集群中可以有多个 Master 节点,一个 Master 节点可以有多个 Slave 节点。Master 节点负责接收生产者发送的写入请求,将消息写入CommitLog文件,Slave 节点会与Master 节点建立连接,从 Master 节点同步消息数据,消费者可以从 Master 节点拉取消息,也可以从 Slave 节点拉取消息。


RocketMQ 主从模式下,是通过 Slave 节点主动向 Master 节点发送请求通知主节点进行数据同步的。

3.1 主从消息同步

3.1.1 建立连接

主节点监听连接事件

主从节点传输数据,那么肯定会先建立连接,所以主节点在启动的时候,会开启一个端口 haListenPort 用于监听从节点的连接请求注册了 ACCEPT 连接事件监听,默认端口是10912,当然也可以通过配置修改 haListenPort 的值使用其他端口。

在端口绑定之后,主节点会专门开启一个线程,用于监听到从节点的连接事件,如果从节点发起了连接请求,会与从节点建立连接,与从节点的连接信息会封装在 HAConnection 类中,主节点和从节点的数据同步逻辑也在 HAConnection 中。

从节点发起连接事件

从节点在启动时会向主节点发起连接请求,上面说过主节点会监听从节点的连接请求,所以此时主从节点的连接建立完成,待完成后从节点会在连接上注册READ 可读事件监听,处理连接上的可读事件。

3.1.2 消息同步

主节点处理从节点请求

这里可以分两种事件:READ 可读事件即处理从节点发送过来的请求」、「WRITE 可写事件即向从节点发送请求」,整个逻辑如下:


  1. 先来看「读事件」,上面说到从节点会定时向主节点汇报消息同步的进度,主节点会开启一个线程专门处理监听到的可读事件,也就是处理从节点发来的请求,处理逻辑在 ReadSocketService 中,可以先自行阅读。

  2. 再来看「写事件」,「主节点」也会开启了一个线程来处理网络中的写事件,主节点向从节点发送同步消息数据的处理,它也会开启一个循环,只要主节点未停止服务,就不断进行处理。

  3. 最后再来看下从节点收到消息的处理结果」,「从节点会监听到网络中的可读数据,收到消息后将消息写入自己的CommitLog中。

主从同步流程如下:

从节点处理主从同步

从节点处理主从同步的逻辑主要在 HAClient 中,它内部会开启了一个线程处理主从同步,只要 Slave 节点未停止,就会不断循环处理,逻辑如下:

  1. 从节点会定时向主节点」上报消息同步的偏移量,所以每次循环开始都会判断是否需要向主节点发送消息同步偏移量,如果已经有一段时间内没有向主节点上报,此时就会向主节点发送消息同步偏移量,告诉主节点现在同步到哪条消息了。

  2. 等待与主节点建立的连接上产生READ 可读事件」。

  3. 处理READ 可读事件,主要是判断主节点是否发来了数据,如果主节点发送了数据,就要从网络中读取数据,将读取到的消息内容写到从节点自己的 CommitLog

3.2 等待主从复制结果

这里分为两种同步方式:同步复制 SYNC_MASTER」、「异步复制 ASYNC_MASTER」。

  1. 先来看下同步复制,当消息写入主节点之后,需要等待从节点也写入完毕才能返回成功。

  2. 再来看下异步复制」,当消息写入主节点之后即可返回成功,主从同步数据异步进行,不需要等待从节点写入完毕即可返回成功。

主从同步开始之后,如果有新的消息写入主节点的CommitLog,如果 Master 节点配置的是同步复制 SYNC_MASTER,在消息写入主节点之后还需要等待从节点同步完毕,主节点会开启一个数据同步线程,专门来判断数据是否同步完毕。

首先消息在写入CommitLog之后会构建一个消息提交请求 GroupCommitRequest,请求中会携带本次消息写入之后的偏移量,将其提交到一个集合requestsRead中,这个线程可以称为主线程,然后主线程会唤醒数据同步线程来判断数据是否同步完毕,之后主线程进入等待状态。

3.3 主从模式下消息消费流程

主从模式下,消费者向 Broker 发送拉取消息请求后,Broker 对拉取请求进行处理时会设置一个 broker ID,建议消费者下次从 Broker 拉取消息,接下来会看下 Broker 根据什么条件决定返回哪个 Broker ID 的。

Broker 在处理消费者拉取请求时,获取消息后会在返回结果中设置一个是否建议从 Slave 节点拉取值放在isSuggestPullingFromSlave 这个变量中,这个值的判断方式如下:

long diff = maxOffsetPy - maxPhyOffsetPulling;
long memory = (long) (StoreUtil.TOTAL_PHYSICAL_MEMORY_SIZE
         * (this.messageStoreConfig.getAccessMessageInMemoryMaxRatio() / 100.0));
getResult.setSuggestPullingFromSlave(diff > memory);
  1. maxOffsetPy:表示当前最大物理偏移量。

  2. maxPhyOffsetPulling: 表示本次消息拉取最大物理偏移量。

  3. diff:当前 Broker 的 CommitLog 最大偏移量减去本次拉取消息的最大物理偏移量,表示剩余未拉取的消息。

  4. memory:消息在 PageCache 中的总大小,计算方式是总物理内存 * 消息存储在内存中的阀值(默认为40)/100,也就是说 MQ 会缓存一部分消息在操作系统的 PageCache 中,加速访问。

如果 diff 大于 memory 的值,表示未拉取的消息过多,已经超出了 PageCache 缓存的数据的大小,还需要从磁盘中获取消息,所以此时会建议下次从 Slave 节点拉取,将 isSuggestPullingFromSlave 的值置为 true,否则为false。

04 总结

这里,我们一起来总结一下这篇文章的重点。

1、带你剖析了RocketMQ」为什么要设计「Broker」以及 「Broker」的作用和功能。

2、接着带你剖析了Broker」的几种集群部署方式。

2、最后带你剖析了「Broker」主从同步机制的实现原理和流程。