RocketMQ 的 NameServer 是 RocketMQ 架构中的一个重要组件,它负责管理和广播路由信息。NameServer 本身是一个无状态的服务,集群中的多个 NameServer 实例之间不进行通信,每个 Broker 都会定期向所有 NameServer 发送心跳包,报告其状态和路由信息。
RocketMQ NameServer 的主要职责
路由信息管理:存储和管理 Broker 的路由信息,包括 Topic 和队列的分布情况。
心跳检测:接收并处理 Broker 的心跳信息,以监控 Broker 的存活状态。
路由信息广播:将路由信息广播给生产者和消费者,使它们能够找到正确的 Broker 进行消息的发送和消费。
负载均衡:帮助客户端(生产者和消费者)进行负载均衡,选择合适的 Broker 进行消息的发送和消费。
图解 RocketMQ NameServer
下面是一个简化的图解,展示了 RocketMQ NameServer 的工作流程:
+-----------------+ +-----------------+ +-----------------+
| Broker A | | Broker B | | Broker C |
| (Master/Slave) | | (Master/Slave) | | (Master/Slave) |
| Heartbeat |------------> Heartbeat |------------> Heartbeat |
| Route Info |<----------- Route Info |<----------- Route Info |
+-----------------+ +-----------------+ +-----------------+
| | |
| | |
| | |
+--------v--------+ +-------v------+ +-------v------+
| NameServer 1 |<---------->| NameServer 2 |<--------->| NameServer 3 |
| Route Table | | Route Table | | Route Table |
| Heartbeat | | Heartbeat | | Heartbeat |
+-----------------+ +-----------------+ +-----------------+
| | |
| | |
| | |
+--------v--------+ +-------v------+ +-------v------+
| Producer 1 |<---------->| Producer 2 |<--------->| Consumer 1 |
| Fetch Route | | Fetch Route | | Fetch Route |
| Send Message |------------> Send Message |------------> Consume Msg |
+-----------------+ +-----------------+ +-----------------+
工作流程说明
Broker 向 NameServer 发送心跳:
每个 Broker 定期向所有的 NameServer 发送心跳包,报告自己的状态和路由信息。
心跳包中包含 Broker 的 IP 地址、端口、Topic 列表等信息。
NameServer 维护路由表:
NameServer 接收到 Broker 的心跳后,更新其内部的路由表。
路由表记录了每个 Topic 在哪些 Broker 上有队列,以及这些队列的具体位置。
客户端获取路由信息:
生产者和消费者在启动时会从 NameServer 获取最新的路由信息。
客户端通过轮询或广播的方式从 NameServer 获取路由信息,并缓存在本地。
客户端与 Broker 交互:
生产者根据获取到的路由信息,选择合适的 Broker 发送消息。
消费者根据获取到的路由信息,选择合适的 Broker 拉取消息。
NameServer 之间的独立性:
多个 NameServer 之间是相互独立的,没有主从关系。
每个 NameServer 都维护一份完整的路由表,客户端可以随机选择一个 NameServer 进行通信。
详细步骤
Broker 启动:
Broker 启动后,定期向所有配置的 NameServer 发送心跳包。
心跳包中包含 Broker 的基本信息和路由信息。
NameServer 更新路由表:
NameServer 收到心跳包后,更新其内部的路由表。
如果一段时间内没有收到某个 Broker 的心跳,则认为该 Broker 下线,从路由表中移除其信息。
客户端获取路由信息:
客户端(生产者或消费者)启动时,从 NameServer 获取路由信息。
客户端可以配置多个 NameServer 地址,从中随机选择一个进行通信。
客户端缓存路由信息,并定期刷新以保持最新。
客户端与 Broker 交互:
生产者根据路由信息选择合适的 Broker 发送消息。
消费者根据路由信息选择合适的 Broker 拉取消息。
如果某个 Broker 不可用,客户端会自动切换到其他可用的 Broker。
总结
RocketMQ 的 NameServer 是一个轻量级的、无状态的服务,负责管理和广播路由信息。它通过心跳机制监控 Broker 的状态,并将最新的路由信息提供给客户端,从而实现高可用性和负载均衡。这种设计使得 RocketMQ 能够在大规模分布式环境中高效运行。
一、整体架构图
(此处可自行想象一个简单的架构图示例,或者之后通过工具生成实际图形来辅助理解,以下用文字描述各元素在图中的呈现及关系)
NameServer 节点:在图中以一个单独的方块表示,它是整个 RocketMQ 架构中的核心元数据管理中心。通常会有多个 NameServer 节点部署在集群环境中,以实现高可用性。这些节点之间相互独立,并不进行数据的同步等复杂交互(与一些其他分布式系统不同),每个 NameServer 都保存着完整的元数据信息。
Broker 节点:用另一种形状的方块(比如圆形)表示,代表消息中转和存储的角色。在图中会有多个 Broker 节点与 NameServer 节点相连接,它们会向 NameServer 上报自身的相关信息,如 Broker 的地址、存储的主题(Topic)信息、每个主题下的队列(Queue)情况等。
Producer(生产者)节点:以三角形表示,生产者负责生产并发送消息到 Broker。在图中,生产者节点会先与 NameServer 建立连接,通过 NameServer 获取到 Broker 的相关信息,比如哪些 Broker 上有自己要发送消息的目标主题等,然后再根据这些信息去连接对应的 Broker 并发送消息。
Consumer(消费者)节点:同样以某种特定形状(比如菱形)表示,消费者用于接收和消费 Broker 中的消息。它也是先与 NameServer 连接,获取到要消费的主题所在的 Broker 信息,之后去连接相应的 Broker 进行消息的获取和消费。
二、启动流程相关部分
NameServer 启动:
在架构图中,从一个 “开始” 节点引出一条线指向 NameServer 节点,表示启动的开始。NameServer 启动时,会进行一系列的初始化操作,在图中可以用一些小的矩形框在 NameServer 节点内部表示这些步骤。
首先是加载配置文件,用一个小矩形框标注 “加载配置文件”,箭头指向它表示启动时先执行这一步。配置文件包含了如监听端口、数据存储路径等相关设置。
接着是初始化网络通信模块,另一个小矩形框标注 “初始化网络通信模块”,它依赖于前面配置文件中关于端口等的设置。这一步使得 NameServer 可以在指定端口上监听来自 Broker、Producer 和 Consumer 的连接请求,在图中用箭头表示数据流向,从配置文件指向网络通信模块初始化步骤,再用箭头从网络通信模块指向外部,表示可以接收外部连接。
Broker 向 NameServer 注册:
当 Broker 启动后,会有一条线从 Broker 节点指向 NameServer 节点,表示 Broker 要向 NameServer 进行注册。在 NameServer 节点内部,对应有一个小矩形框标注 “处理 Broker 注册”。
Broker 会将自身的关键信息,如 Broker 的 ID、地址、存储的主题等,发送给 NameServer。在图中可以用箭头表示数据从 Broker 流向 NameServer,并且在 NameServer 的 “处理 Broker 注册” 小矩形框内,有箭头表示对这些信息的接收和处理,比如将接收到的信息存储到内部的一个数据结构中(可以想象一个类似表格的结构在图中简单示意)。
三、信息维护与更新部分
元数据存储与更新:
在 NameServer 节点内部,有一个较大的矩形框标注 “元数据存储区”,用来表示 NameServer 存储元数据的地方。这里存储着所有 Broker 上报的信息,包括各个 Broker 的详细情况以及它们所承载的主题、队列等信息。
当有 Broker 的信息发生变化时,比如新增了一个主题或者某个 Broker 的地址变更了,在图中用一条线从 Broker 节点(有一个小标注表示信息变化的触发点,如 “新增主题” 或 “地址变更”)指向 NameServer 节点的 “元数据存储区”,并在 “元数据存储区” 内部有相应的箭头表示对新信息的接收和更新操作,即将新的信息更新到原有的元数据存储结构中。
定时任务相关:
NameServer 会执行一些定时任务来维护元数据的准确性。在 NameServer 节点内部,有一个小矩形框标注 “定时任务执行”,用箭头从 “元数据存储区” 指向它,表示定时任务会基于存储的元数据进行操作。
例如,定时检查 Broker 的心跳信息,以确定各个 Broker 是否存活。在图中可以用一条线从 “定时任务执行” 小矩形框引出,连接到各个 Broker 节点,并在连接线上有一个小标注 “检查心跳”,表示定时任务会向 Broker 发送心跳检查请求,若收到 Broker 的回应则确认其存活,若未收到回应则可能会对相关的元数据进行处理(如标记该 Broker 为不可用状态,在图中可以用在 “元数据存储区” 内部的一个小操作示意,比如对相应 Broker 记录添加一个 “不可用” 标注)。
四、Producer 和 Consumer 获取信息部分
Producer 获取 Broker 信息:
生产者启动时,会先与 NameServer 建立连接,在图中用一条线从 Producer 节点指向 NameServer 节点,并在 NameServer 节点内部有一个小矩形框标注 “处理 Producer 连接”。
之后,Producer 会向 NameServer 请求获取要发送消息的目标主题所在的 Broker 信息。在图中用箭头表示数据从 Producer 流向 NameServer 的 “处理 Producer 连接” 小矩形框,然后从这个小矩形框引出一条线指向 “元数据存储区”,表示从存储的元数据中查找相关信息,最后再用一条线从 “元数据存储区” 引出,连接到 Producer 节点,表示将找到的 Broker 信息返回给 Producer。
Consumer 获取 Broker 信息:
类似地,消费者启动时先与 NameServer 建立连接,用一条线从 Consumer 节点指向 NameServer 节点,在 NameServer 节点内部有一个小矩形框标注 “处理 Consumer 连接”。
然后 Consumer 会向 NameServer 请求获取要消费的主题所在的 Broker 信息。同样在图中用箭头表示数据从 Consumer 流向 NameServer 的 “处理 Consumer 连接” 小矩形框,接着从这个小矩形框引出一条线指向 “元数据存储区”,表示从存储的元数据中查找相关信息,最后再用一条线从 “元数据存储区” 引出,连接到 Consumer 节点,表示将找到的 Broker 信息返回给 Consumer。
01 总体概述
「NameServer」是专门为 RocketMQ 设计的「轻量级名字服务」,它主要用来实现「服务发现」的,其具有「简单」、「相互独立」、「可集群横向扩展」、「无状态」,「节点间互不通信」等特点。
它是组成 RocketMQ 的重要组件之一,是除了 Broker 之外另一个需要部署的服务,关于安装部署,可以点击:
不知道你是否会有这样的问题:
RocketMQ 的 Topic 分布在不同的 Broker 节点上,作为消息的「生产者」和「消费者」,如何知道要从哪个 Broker 节点去生产或者消费消息呢?
如果此时连接的 Broker 宕机了,如何在不重启的情况下感知到?
那么「NameServer」就是为了解决这些问题设计的,接下来我们就来深度剖析下其架构设计和原理。
02 为什么要设计 NameServer
目前市面上可以作为「注册中心/服务发现」的组件有很多,比如:「ETCD」、「Consul」、「Zookeeper」、「Nacos」等:
既然上面这些都可以实现「注册中心/服务发现」功能,为什么 RocketMQ 还要自己开发一个 「NameServer」呢?笔者认为大概有以下原因:
根据「CAP」理论,同时最多只能满足两个点,而 Zookeeper 满足的是 CP,也就是说 Zookeeper 并不能保证服务的「可用性」,Zookeeper 在进行选举的时候,整个选举的时间太长,期间整个集群都处于不可用的状态,而这对于一个注册中心来说肯定是不能接受的,作为服务发现来说就应该是为可用性而设计。
首先 RocketMQ 的架构设计决定了需要一个「轻量级元数据服务器」,只需保持「最终一致性」,而不需要像 Zookeeper、ETCD 那样的「强一致性」方案,所以无需再依赖另一个中间件,从而可以减少整体的维护成本。
另外「NameServer」通常也是「集群」的方式部署,彼此之间是「相互独立」,「互不通信」,Broker 会向每个 「NameServer」注册自己的路由信息,所以每个「NameServer」都保存一份完整的路由信息,即使某台「NameServer」挂掉,Broker 仍然可以向其它「NameServer」同步路由信息,所以客户端仍然可以动态感知到 Broker 的路由信息。
03 NameServer 整体架构设计
「NameServer」是⼀个非常简单的 「Topic 路由注册中心」,其角色类似 Kafka、Dubbo 中的 Zookeeper ,支持 Broker 的「动态注册与发现」。
主要包含两个功能:
Broker 管理:NameServer 接收来自 Broker 集群的注册信息并且保存下来作为「路由信息」的基本数据。然后提供「心跳检测机制」用来检查 Broker 是否还存活。
路由信息管理:每个 NameServer 将保存 Broker 集群的整个路由信息和用来客户端查询的队列信息。然后 Producer 端和 Conumser 端通过 NameServer 就知道整个 Broker 集群的路由信息,从而进行消息的生产和消费。
RocketMQ 5.0 版本之后 NameServer 同时也可以作为 Controller 模块的一个容器,Controller 模块内嵌到NameServer 中。
通过上图可以看出,RocketMQ 整个架构上主要分为四部分: 「Broker」、「Producer」、「Consumer」、「NameServer」,其他三个都会与 「NameServer」进行通信,整个流程如下:
2.1 NameServer 启动
启动「NameServer」服务,监听 TCP 端口, 其集群多节点之间互不通信,然后等待 「Broker」、「Producer」、「Consumer」连上来。
2.2 Broker 启动
启动「Broker」服务,此时会每隔 30 秒向所有的 NameServer 发送心跳命令,进行注册「路由信息」,「NameServer」接收到来自「Broker」心跳请求之后,保存「路由信息」到本地内存中,将注册成功结果返回给 Broker 服务。
2.3 Producer 发送消息
当「Producer」启动之后,会随机的选择「NameServer」集群中的其中⼀个「NameServer」建立长连接,并从 「NameServer」中获取当前发送的 Topic 存在哪些 Broker 上,轮询从队列列表中选择⼀个队列,然后与队列所在的 Broker 建立长连接从而向 Broker 发消息。
2.3.1 Producer 与 NameServer 关系
连接:单个 Producer 和一台 NameServer 保持长连接,如果该 NameServer 挂掉,生产者会自动连接下一个 NameServer,直到有可用连接为止,并能自动重连。
心跳:与 NameServer 没有心跳。
轮询时间:默认情况下,生产者每隔 30 秒从 NameServer 获取所有 Topic 的最新队列情况,这意味着某个 Broker 如果宕机,生产者最多要 30 秒才能感知,在此期间发往该 Broker 的消息发送失败。
2.3.2 Producer 与 Broker 关系
连接:单个生产者和该生产者关联的所有 Broker 保持长连接。
2.4 Consumer 订阅消息
「Consumer」跟 「Producer」类似,也是跟其中⼀台 「NameServer」建立长连接,获取当前订阅 Topic 存在哪些 Broker 上,然后直接跟 Broker 建立连接并准备开始消费消息。
2.4.1 Consumer 与 NameServer 关系
连接:单个 Consumer 和一台 NameServer 保持长连接,如果该 NameServer 挂掉,消费者会自动连接下一个 NameServer,直到有可用连接为止,并能自动重连。
心跳:与 NameServer 没有心跳
轮询时间:默认情况下,消费者每隔 30 秒从 NameServer 获取所有 Topic 的最新队列情况,这意味着某个 Broker 如果宕机,客户端最多要 30 秒才能感知。
连接:单个消费者和该消费者关联的所有 Broker 保持长连接。
2.4.3 Consumer 负载均衡
集群消费模式下,一个消费组集群的多台机器共同消费一个 Topic 的多个队列,一个队列只会被一个消费者消费。如果某个消费者挂掉,分组内其它消费者会接替挂掉的消费者继续消费。
04 NameServer 心跳设计
如图所示,可以看到 「Consumer」、「Producer」、「Broker」均隔每 30s 向 「NameServer」发起一次请求,在「NameServer」内部中也会启动定时器用来「定期扫描」和 「更新」内部数据。
4.1 Broker 发起心跳
每隔 30s 向 「NameServer」集群的每台节点都发送心跳包,包含自身 Topic 队列的路由信息。
当有 Topic 改动「创建/更新」,Broker 会立即发送 Topic 增量信息到 「NameServer」,同时触发 「NameServer」的数据版本号发生变更。
心跳包组成部分:「请求头」、「请求体」。
4.2 NameServer 处理请求
将「路由信息」保存在内存中。它只会被其他模块调用,「Broker 注册」、「客户端拉取」,并不会主动调用其他模块。
启动一个「定时任务线程」,每隔「10s」扫描 「brokerLiveTable」中所有的 Broker 上次发送心跳时间,如果超过「120s」没有收到心跳,则从存活 Broker 表中移除该 Broker。
05 NameServer 路由管理设计
前面说过了 Broker 节点在启动的时向所有 「NameServer」进行注册。
消息生产者 Producer 在发送消息之前先从「NameServer」获取 Broker 服务器地址列表然后根据「负载均衡」算法从列表中选择一台服务器进行发送。
「NameServer」与每台「Broker」保持「长连接」,并且每间隔「30s」通过心跳检测「Broker」是否存活,如果「120s」秒内没收到 「Broker」的上报消息,在「NameServer」内部中会每隔「10s」扫描一次Broker 列表,移除不活跃的 Broker),那么就认为检测到 Broker 宕机,则从「路由注册表」中删除。
但是路由变化不会马上通知消息生产者。这样设计的目的是为了降低「NameServer」实现的复杂度,在消息发送端提供「容错机制」保证消息发送的可用性。
「NameServer」本身的高可用是通过部署多台「NameServer」来实现,但彼此之间不通讯,也就是「NameServer」服务器之间在某一个时刻的数据并不完全相同,但这对消息发送并不会造成任何影响,这也是「NameServer」设计的一个亮点。总之,RocketMQ 设计追求简单高效,这也是为了高可用放弃强一致性的表现吧。
总结,「NameServer」的主要作用是为消息的生产者和消息消费者提供关于主题 Topic 的路由信息,那么 「NameServer」需要存储路由的基础信息,还要管理 Broker 节点,包括「路由注册」、「路由删除」、「路由发现」。
通过对源码的研读,「NameServer」的路由关系都保存在「RouteInfoManager」中的 4 个 HashMap 中,「路由注册」、「路由删除」、「路由发现」基本都是操作这 4 个 HashMap,这里不多对源码进行剖析,会放到源码系列中
假如说我们搭建了如下「双主双从」的集群,集群名字为「rocketmq-cluster」 。
ip 架构模式
192.168.56.1 Master1
192.168.56.2 Master2
192.168.56.3 Slave1
192.168.56.4 Slave2
相关配置如下:
master1
# 所属集群名字
brokerClusterName=rocketmq-cluster
# broker名字
brokerName=broker-a
# 0 表示 Master, 大于0 表示 Slave
brokerId=0
master2
# 所属集群名字
brokerClusterName=rocketmq-cluster
# broker名字
brokerName=broker-b
# 0 表示 Master, 大于0 表示 Slave
brokerId=0
slave1
# 所属集群名字
brokerClusterName=rocketmq-cluster
# broker名字
brokerName=broker-a
# 0 表示 Master, 大于0 表示 Slave
brokerId=1
slave2
# 所属集群名字
brokerClusterName=rocketmq-cluster
# broker名字
brokerName=broker-b
# 0 表示 Master, 大于0 表示 Slave
brokerId=1
假如说我们在「rocketmq-cluster」集群的「broker-a」和「broker-b」上创建一个 topic,名字为 hello-huazai,「读写队列」都默认为「4」个,消息的分布情况如下图所示:
那么上面 4 个 HashMap 对应的值为:
topicQueueTable
{
"hello-huazai": [
{
"brokerName": "broker-a",
"readQueueNums": 4,
"writeQueueNums": 4,
"perm": 6,
"topicSynFlag": 0
},
{
"brokerName": "broker-b",
"readQueueNums": 4,
"writeQueueNums": 4,
"perm": 6,
"topicSynFlag": 0
}
]
}
brokerAddrTable
{
"broker-a": {
"cluster": "rocketmq-cluster",
"brokerName": "broker-a",
"brokerAddrs": {
"0": "192.168.56.1:10912",
"1": "192.168.56.3:10912"
}
},
"broker-b": {
"cluster": "rocketmq-cluster",
"brokerName": "broker-a",
"brokerAddrs": {
"0": "192.168.56.2:10912",
"1": "192.168.56.4:10912"
}
}
}
clusterAddrTable
{
"rocketmq-cluster": [
"broker-a",
"broker-a"
]
}
brokerLiveTable
{
"192.168.56.1:10912": {
"lastUpdateTimestamp": 1698394578378,
"haServerAddr": ""
},
"192.168.56.2:10912": {
"lastUpdateTimestamp": 1698398886278,
"haServerAddr": ""
},
"192.168.56.3:10912": {
"lastUpdateTimestamp": 1698398872578,
"haServerAddr": ""
},
"192.168.56.4:10912": {
"lastUpdateTimestamp": 1698398864578,
"haServerAddr": ""
}
}
06 5.0 相对 4.0 的 NameServer 改进
「NameServer」在 RocketMQ 5.0 的版本做了很多的优化工作:
broker 注册线程池和客户端路由获取线程池隔离。
当前「NameServer」会用同一个线程池和队列去处理所有的客户端路由请求,服务端注册请求等,并且队列的大小和线程数都是不可配置的,如果其中一个类型的请求打爆线程池,将会影响到所有请求。将线程池进行了隔离,将最重要要的客户端路由请求单独隔离出来,队列的大小和线程数均是可配置的。线程池之间的请求处理相互隔离,不受影响。
Topic 路由缓存的优化
当前「NameServer」当客户端发送路由请求时,会利用 topicQueueTable 和 brokerAddrTable 来构造出最终的路由信息TopicRouteData,这里涉及了在读锁中遍历 broker,有一定的 cpu 耗费。通过构造TopicRoute 的缓存 topicRouteDataMap,直接在客户端请求时返回 TopicRoute,而额外的代价是在 broker请求、下线,删除topic 等行为时同时操作 topicRouteDataMap。
批量注销 Broker
增加 BatchUnRegisterService,异步化批量处理 broker 下线,加速 broker 下线流程。
RIP-29 是「NameServer」的改进增强。
07 优化联想
生产者和消费者连接「NameServer」获取路由信息都是随机的,从给定的「NameServer」列表中打散列表随机选择一个「NameServer」进行连接获取数据。这种情况存在如下弊端:
「NameServer」部署的较好的机器无法发挥机器的所有性能,而性能较差的机器可能连接很多导致服务宕机。
不能够指定「NameServer」进行连接(只配置一个「NameServer」地址除外)。
所以考虑在客户端(生产者和消费者)增加选择连接「NameServer」的策略模式,由开发者自己选择或者实现策略来选择「NameServer」进行连接。可以考虑一下策略模式:
随机策略:随机一个「NameServer」进行连接(当前的模式)。
指定策略:指定一个特定的「NameServer」进行连接。
轮询策略:当前应用中的客户端自行在给定的「NameServer」。
「NameServer」最小客户端连接数策略:获取当前「NameServer」中客户端连接数最小的进行连接。
08总结
这里,我们一起来总结一下这篇文章的重点。
1、带你剖析了「RocketMQ」为什么要设计「NameServer」。
2、接着带你剖析了「NameServer」整体架构设计以及「心跳设计」。
2、最后带你剖析了「NameServer」的路由管理设计。
下篇我们来深度剖析「Broker 主从架构与集群模式管理」,大家期待,我们下期见。
图解 RocketMQ Broker 主从同步与集群模式原理
01 总体概述
与 Kafka 类似,「Broker」是 RocketMQ 的核心,大部分「重量级」的工作都是由 「Broker」来完成的,主要包括「接收 Producer 发来的消息」、「处理 Consumer 消费的消息」、「消息的持久化存储」、「消息的 HA 机制」、「消息查询」以及「服务高可用」等 。
02 Broker 集群部署模式
RocketMQ 的 Broker 有以下三种集群部署方式:
1.单台 Master 部署;
2.多台 Master 部署;
3.多 Master 多 Slave 部署;
2.1 单 Master 模式
单 master 也就是只有一个 master 节点,如果该 master 节点挂掉了,会导致整个服务不可用,线上不建议使用,适合个人学习使用。
2.2 多 Master 模式
多个 master 节点组成集群,单个 master 节点宕机或者重启对应用没有影响。
和 kafka 不一样,RocketMQ 中并没有 master「选举功能」,在 RocketMQ 集群中,1 台机器只能要么是Master,要么是 Slave,这个在初始的机器配置就定死了的。
早期 RocketMQ 版本不会像 kafka 那样存在 master 动态选举,所以通过配置多个 master 节点来保证 RocketMQ 的高可用。
我们来看下该模式下的优缺点:
优点:高可用,该模式是所有模式中「性能最高」的。
缺点:可能会有少量消息丢失(配置相关),单台机器重启或宕机期间,该机器下未被消费的消息在机器恢复前不可订阅,影响消息实时性。
注意:使用同步刷盘可以保证消息不丢失,同时 Topic 相对应的 queue 应该分布在集群中各个 master 节点,而不是只在某个 master 节点上,否则该节点宕机会对订阅该 topic 的应用造成影响。
2.3 多 Master 同步/异步复制模式
多 master 模式根据 Master 和 Slave 之间的数据同步方式可以分为:
多 master 多 slave 异步复制模式
多 master 多 slave 同步复制模式
总体来说,RocketMQ 集群部署模式为四种,下面是第三种方式的部署方式架构图:
当采用多 Master 方式时,Master 与 Master 之间是「不需要知道彼此」的,这样的设计思想直接降低了 Broker实现的复杂性。
看到这里你是否有所疑惑?你可以想象一下,如果 Master 与 Master 之间「需要知道彼此」的存在,此时需要在维护一个 Master 列表,而且必然涉及到「Master 注册发现」和「活跃 Master 数量变更」等很多状态更新问题,所以最简单最可靠的做法就是 Master 只做好自己的事情「与 Slave 进行数据同步」即可。
通过这样的设计,在分布式环境中,当某台 Master 宕机或上线,不会对其他 Master 造成任何影响。
那么我们如何才能知道集群中有多少台 Master 和 Slave 呢?
这就是我们上一篇剖析的结果 图解 RocketMQ NameServer 架构设计,没错就是那个大名鼎鼎的「NameServer」。
这里再回顾下「NameServer」的功能,大概有以下 4 类:
NameServer 承担了「注册中心」的职能。
NameServer 用来保存「活跃 broker 列表」,包括 Master 和 Slave。
NameServer 用来保存所有 topic 和该 topic 所有队列的列表。
NameServer 用来保存所有 broker 的「Filter 列表」。
2.4 多 Master 多 Slave 部署集群工作流程
NameServer 是一个几乎无状态节点,可集群部署,节点之间无任何信息同步。
Broker 部署相对复杂,Broker 分为 Master 与 Slave,一个 Master 可以对应多个 Slave,但是一个 Slave 只能对应一个 Master,Master 与 Slave 的对应关系通过指定「相同的 BrokerName」,「不同的 BrokerId」来定义,BrokerId 为 0 表示 Master,非 0 表示 Slave。Master 也可以部署多个。每个 Broker 与 NameServer 集群中的所有节点建立长连接,定时注册 Topic 信息到所有 NameServer。
注意:当前RocketMQ版本在部署架构上支持一Master多Slave,但只有BrokerId=1的从服务器才
会参与消息的读负载。
Producer 与 NameServer 集群中的其中一个节点(随机选择)建立「长连接」,定期从 NameServer 获取「Topic 路由信息」,并向提供 Topic 服务的 Master 建立长连接,且定时向 Master 「发送心跳」。Producer完全无状态,可集群部署。
Consumer 与 NameServer 集群中的其中一个节点(随机选择)建立长连接,定期从 NameServer 获取 Topic 路由信息,并向提供 Topic 服务的 Master、Slave 建立长连接,且定时向 Master、Slave 发送心跳。
Consumer 既可以从 Master 订阅消息,也可以从 Slave 订阅消息,消费者在向 Master 拉取消息时,Master服务器会根据拉取偏移量与最大偏移量的距离(判断是否读老消息,产生读 I/O),以及从服务器是否可读等因素建议下一次是从 Master 还是 Slave 拉取。
2.3.1 多 Master 多 Slave 异步复制模式
在多 master 模式的基础上,每个 master 节点都有至少一个对应的 slave。master 节点「可读可写」,但是 slave 「只能读不可写」,类似于 MYSQL 的主备模式。
其优缺点如下:
优点: 在 master 宕机时,消费者可以从 slave 读取消息,消息的实时性不会受影响,性能几乎和多 master 一样。
缺点:使用「异步复制」的同步方式有可能会有「消息丢失」消息丢失的问题。
RocketMQ 天生对集群的支持非常友好,天然支持「高可用」,它可以支持「多主多从」的部署架构,这也是和
Kafka 区别之一,Kafka 的分区副本可以看成「一主多从」。
其中 Master 的broker id = 0,Slave 的 broker id > 0。这有点类似于 MYSQL 的主从概念,当 master 挂了以后,slave 仍然可以提供「读服务」,但是由于有多主的存在,当一个 master 挂了以后,可以写到其他的 master上。
2.3.2 多 master 多 slave 同步复制模式
同多 master 多 slave 异步复制模式类似,区别在于 master 和 slave 之间的数据同步方式。
其优缺点如下:
优点:同步双写的同步模式能保证数据不丢失。
缺点:发送单个消息「RT 会略长」,性能相比异步复制「低 10%」左右。
刷盘策略:同步刷盘和异步刷盘,它指的是节点自身数据是同步还是异步存储。
注意:要保证数据可靠,需采用同步刷盘和同步双写的方式,但性能会较其他方式低。
2.5 BrokerServer 高可用
RocketMQ 是通过 Master 和 Slave 的配合达到 BrokerServer 模块的高可用。一个 Master 可以配置多个 Slave,同时也支持配置多个 Master-Slave 组。
当其中一个 Master 出现问题时:
由于 Slave 只负责读,当 Master 不可用,它对应的 Slave 仍能保证消息被正常消费。
由于配置多组 Master-Slave 组,其他的 Master-Slave 组也会保证消息的正常发送和消费。
老版本的 RocketMQ 不支持把 Slave 自动转成 Master,如果机器资源不足, 需要把 Slave 转成 Master,则
要「手动停止」Slave 角色的 Broker,「更改配置文件」,用新的配置文件启动 Broker。
新版本(4.5)的 RocketMQ,支持 Slave 自动转成 Master。RocketMQ 「dledger」使用了Raft 协议,保证所有节点数据最终一致性,同时保证了集群的可用性。
关于「dledger」我们会在后面单篇剖析,这里就不介绍了,下面我们先介绍原来的主从同步的机制。
03 Broker 主从同步机制
上面说了,RocketMQ 支持「集群部署」来保证高可用。它基于「主从模式」,将节点分为 Master、Slave 两个角色,集群中可以有多个 Master 节点,一个 Master 节点可以有多个 Slave 节点。「Master 节点」负责接收生产者发送的写入请求,将消息写入「CommitLog」文件,「Slave 节点」会与「Master 节点」建立连接,从 「Master 节点」同步消息数据,消费者可以从 Master 节点拉取消息,也可以从 Slave 节点拉取消息。
RocketMQ 主从模式下,是通过 Slave 节点主动向 Master 节点发送请求通知主节点进行数据同步的。
3.1 主从消息同步
3.1.1 建立连接
主节点监听连接事件
「主从节点」传输数据,那么肯定会先「建立连接」,所以「主节点」在启动的时候,会开启一个端口 haListenPort 用于监听「从节点」的连接请求「注册了 ACCEPT 连接事件监听」,默认端口是「10912」,当然也可以通过配置修改 haListenPort 的值使用其他端口。
在端口绑定之后,「主节点」会专门开启一个「线程」,用于「监听到从节点的连接事件」,如果「从节点」发起了连接请求,会与「从节点」建立连接,与「从节点」的连接信息会封装在 HAConnection 类中,主节点和从节点的数据同步逻辑也在 HAConnection 中。
从节点发起连接事件
「从节点」在启动时会向「主节点」发起连接请求,上面说过「主节点」会监听「从节点」的连接请求,所以此时主从节点的连接建立完成,待完成后「从节点」会在连接上注册「READ 可读事件监听」,处理连接上的可读事件。
3.1.2 消息同步
主节点处理从节点请求
这里可以分两种事件:「READ 可读事件即处理从节点发送过来的请求」、「WRITE 可写事件即向从节点发送请求」,整个逻辑如下:
先来看「读事件」,上面说到「从节点」会定时向「主节点」汇报消息同步的进度,「主节点」会开启一个「线程」专门处理监听到的「可读事件」,也就是处理「从节点」发来的请求,处理逻辑在 ReadSocketService 中,可以先自行阅读。
再来看「写事件」,「主节点」也会开启了一个「线程」来处理网络中的写事件,主节点向从节点发送同步消息数据的处理,它也会开启一个循环,只要主节点未停止服务,就不断进行处理。
最后再来看下「从节点收到消息的处理结果」,「从节点」会监听到网络中的可读数据,收到消息后将消息写入自己的「CommitLog」中。
主从同步流程如下:
从节点处理主从同步
从节点处理主从同步的逻辑主要在 HAClient 中,它内部会开启了一个「线程」处理主从同步,只要 Slave 节点未停止,就会不断循环处理,逻辑如下:
「从节点」会定时向「主节点」上报消息同步的偏移量,所以每次循环开始都会判断是否需要向主节点「发送消息同步偏移量」,如果已经有一段时间内没有向主节点上报,此时就会向主节点「发送消息同步偏移量」,告诉主节点现在同步到哪条消息了。
等待与「主节点」建立的连接上产生「READ 可读事件」。
处理「READ 可读事件」,主要是判断「主节点」是否发来了数据,如果「主节点」发送了数据,就要从网络中读取数据,将读取到的消息内容写到「从节点」自己的 「CommitLog」。
3.2 等待主从复制结果
这里分为两种同步方式:「同步复制 SYNC_MASTER」、「异步复制 ASYNC_MASTER」。
先来看下「同步复制」,当消息写入「主节点」之后,需要等待「从节点」也写入完毕才能返回成功。
再来看下「异步复制」,当消息写入「主节点」之后即可返回成功,「主从同步」数据异步进行,不需要等待「从节点」写入完毕即可返回成功。
当「主从同步」开始之后,如果有新的消息写入主节点的「CommitLog」,如果 Master 节点配置的是「同步复制 SYNC_MASTER」,在消息写入主节点之后还需要等待「从节点」同步完毕,「主节点」会开启一个数据同步线程,专门来判断数据是否同步完毕。
首先消息在写入「CommitLog」之后会构建一个消息提交请求 「GroupCommitRequest」,请求中会携带本次消息写入之后的「偏移量」,将其提交到一个集合「requestsRead」中,这个线程可以称为「主线程」,然后主线程会「唤醒」数据同步线程来判断数据是否同步完毕,之后「主线程」进入等待状态。
3.3 主从模式下消息消费流程
在「主从模式」下,消费者向 Broker 发送拉取消息请求后,Broker 对拉取请求进行处理时会设置一个 broker ID,建议消费者下次从 Broker 拉取消息,接下来会看下 Broker 根据什么条件决定返回哪个 Broker ID 的。
Broker 在处理消费者拉取请求时,获取消息后会在返回结果中设置一个是否建议从 Slave 节点拉取值放在isSuggestPullingFromSlave 这个变量中,这个值的判断方式如下:
long diff = maxOffsetPy - maxPhyOffsetPulling;
long memory = (long) (StoreUtil.TOTAL_PHYSICAL_MEMORY_SIZE
* (this.messageStoreConfig.getAccessMessageInMemoryMaxRatio() / 100.0));
getResult.setSuggestPullingFromSlave(diff > memory);
maxOffsetPy:表示当前最大物理偏移量。
maxPhyOffsetPulling: 表示本次消息拉取最大物理偏移量。
diff:当前 Broker 的 CommitLog 最大偏移量减去本次拉取消息的最大物理偏移量,表示剩余未拉取的消息。
memory:消息在 PageCache 中的总大小,计算方式是总物理内存 * 消息存储在内存中的阀值(默认为40)/100,也就是说 MQ 会缓存一部分消息在操作系统的 PageCache 中,加速访问。
如果 diff 大于 memory 的值,表示未拉取的消息过多,已经超出了 PageCache 缓存的数据的大小,还需要从磁盘中获取消息,所以此时会建议下次从 Slave 节点拉取,将 isSuggestPullingFromSlave 的值置为 true,否则为false。
04 总结
这里,我们一起来总结一下这篇文章的重点。
1、带你剖析了「RocketMQ」为什么要设计「Broker」以及 「Broker」的作用和功能。
2、接着带你剖析了「Broker」的几种集群部署方式。
2、最后带你剖析了「Broker」主从同步机制的实现原理和流程。