RocketMQ 是一个分布式消息中间件,它具有低延迟、高性能的特点,并且支持发布/订阅和点对点模型。Broker 是 RocketMQ 中的一个核心组件,负责接收来自生产者的消息并存储起来,同时响应消费者的拉取消息请求。下面是 RocketMQ Broker 启动流程的图解及分析:
1. 启动入口
启动命令:通常通过运行
mqbroker
命令来启动 Broker。主类:启动时会加载
org.apache.rocketmq.broker.BrokerStartup
类。
2. 初始化配置
读取配置:从命令行参数或配置文件中读取必要的配置信息,比如 Broker 的名称、存储路径等。
创建配置对象:使用读取到的配置信息创建
BrokerConfig
对象。
3. 创建 BrokerController
BrokerController 实例化:创建
BrokerController
实例,这是控制 Broker 行为的核心控制器。初始化服务端口监听器:设置用于通信的服务端口监听器,包括 RemotingServer(Netty)。
4. 注册处理器
注册各种处理器:在
RemotingServer
上注册处理不同协议消息的处理器,例如心跳检测处理器、发送消息处理器等。
5. 初始化持久化层
创建 CommitLog 存储:初始化 CommitLog 文件,这是 RocketMQ 消息的主要存储形式。
创建 ConsumerQueue 和 IndexFile:基于 CommitLog 创建索引文件,以便快速查找消息。
6. 启动网络服务
启动 Netty 服务器:启动
NettyServer
来监听客户端连接,准备接受来自 Producer 或 Consumer 的请求。
7. 加载插件系统
加载插件:如果配置了插件,则在此步骤加载这些插件以增强或定制 Broker 功能。
8. 完成启动
通知状态变化:向 Name Server 发送注册信息,告知当前 Broker 已经准备好提供服务。
打印日志:输出启动完成的日志信息,表明 Broker 成功启动。
为了更好地理解 RocketMQ Broker 的启动流程,我们可以使用一个简化的流程图来表示主要步骤。下面是一个文字描述的流程图,以及每个步骤的说明。在实际情况下,你可能需要使用绘图工具(如 Microsoft Visio、Draw.io 或 Lucidchart)来创建一个可视化的流程图。
RocketMQ Broker 启动流程图解
+-----------------+ +--------------------+ +---------------------+
| 读取配置文件 | ----> | 创建 BrokerController| ----> | 初始化 Netty 服务端 |
| (mqbroker 命令) | | | | |
+-----------------+ +--------------------+ +---------------------+
| | |
| v v
+-----------------+ +--------------------+ +---------------------+
| 加载 Broker 配置| <----+ | 初始化存储路径 | ----> | 注册消息处理器 |
| (BrokerConfig) | | | (CommitLog, etc.) | | (SendMessage, etc.)|
+-----------------+ | +--------------------+ +---------------------+
| | | |
| | v v
+-----------------+ +--------------------+ +---------------------+ +-------------------+
| 创建 CommitLog | <---+ 初始化 ConsumerQueue | <---+ 启动 Netty 服务器 | | 加载插件系统 |
| 和其他索引文件 | | (ConsumerOffset) | | (NettyRemotingServer)|
+-----------------+ +--------------------+ +---------------------+ +-------------------+
| | | |
| v v v
+-----------------+ +--------------------+ +---------------------+ +-------------------+
| 注册到 NameServer| <---+ 启动定时任务线程池 | <---+ 监听客户端请求 | | 完成启动并打印日志 |
| 发送心跳包 | | (ScheduledExecutor) | | (等待生产者/消费者) | | (Broker 启动成功) |
+-----------------+ +--------------------+ +---------------------+ +-------------------+
流程说明
读取配置文件:通过
mqbroker
命令行参数或配置文件读取 Broker 运行所需的配置信息。创建 BrokerController:根据配置信息创建
BrokerController
实例,这是 Broker 的核心控制器。加载 Broker 配置:将读取到的配置信息应用到
BrokerController
中。初始化存储路径:设置用于存储消息数据的物理路径,并初始化相应的文件结构,比如 CommitLog。
创建 CommitLog 和其他索引文件:初始化 CommitLog 文件及相关的 ConsumerQueue 和 IndexFile。
注册消息处理器:为 Netty 服务端注册处理不同协议消息的处理器,如发送消息处理器。
初始化 Netty 服务端:准备网络层,设置监听端口等。
启动 Netty 服务器:正式启动 Netty 服务端,开始监听指定端口上的连接。
启动定时任务线程池:启动定时任务线程池,用于执行诸如清理过期文件等定期任务。
注册到 NameServer:向 NameServer 注册当前 Broker 的信息,并定期发送心跳保持联系。
监听客户端请求:开始接受来自生产者和消费者的请求。
加载插件系统:如果有自定义插件,则在此处加载以增强功能。
完成启动并打印日志:输出启动成功的日志信息。
这个流程图和说明可以帮助你从宏观上把握 RocketMQ Broker 的启动过程。如果你需要更详细的源码分析,可以深入研究 RocketMQ 的源代码,特别是 BrokerStartup
类及其相关类的方法实现。
整体架构概述
RocketMQ 主要由 Producer(生产者)、Consumer(消费者)和 Broker(消息中转角色)组成。Broker 在整个消息传递过程中起到了关键作用,它负责接收生产者发送的消息,存储消息,并将消息推送给消费者。
在启动时,Broker 要完成一系列的初始化工作,包括配置加载、模块初始化等。
配置加载阶段
加载配置文件
Broker 启动时,首先会读取配置文件。配置文件包含了诸如存储路径、监听端口、集群相关配置等众多参数。例如,
broker.conf
文件中的storePathRootDir
参数指定了消息存储的根目录路径。它会使用
Properties
类(Java 语言下)或者类似的配置解析工具将配置文件中的键值对读取到内存中。这个过程就像是给 Broker 的运行设定了基本规则,告诉它应该在哪里存储消息,如何与外界通信等。
解析命令行参数
除了配置文件,Broker 也会解析命令行参数。这些参数可以在启动脚本中指定,用于覆盖配置文件中的某些设置。比如,可以通过命令行参数指定特定的日志级别,这样在调试或者运行特定环境时可以灵活调整。
存储模块初始化
消息存储路径创建与检查
根据配置文件中指定的存储路径,Broker 会检查路径是否存在。如果不存在,会创建相应的目录结构。例如,对于消息存储目录,会创建像
commitlog
(存储消息主体)、consumequeue
(存储消息索引)等子目录。这一步就好比为仓库(存储模块)建造合适的房间(目录)来存放不同类型的货物(消息和索引)。
存储组件初始化
初始化消息存储相关的组件,如
CommitLog
对象。CommitLog
是 RocketMQ 存储消息的核心组件,它负责将生产者发送的消息顺序写入磁盘文件。同时会初始化
ConsumeQueue
组件,它的主要作用是为消费者快速定位消息提供索引。例如,一个消费者要获取特定主题的消息,ConsumeQueue
可以帮助它快速找到消息在CommitLog
中的位置。
网络通信模块初始化
监听端口设置
根据配置,Broker 会在指定的端口上开启监听。例如,默认情况下会在
10911
端口监听生产者的连接请求,在10912
端口监听消费者的连接请求。这个过程类似于在港口(Broker)设置码头(监听端口),等待船只(生产者和消费者)的到来。
网络处理器初始化
初始化网络处理器,用于处理生产者和消费者的连接、请求和响应。这些网络处理器会按照 RocketMQ 定义的协议来解析和处理网络请求。
比如,当生产者发送一条消息时,网络处理器会接收消息字节流,按照协议解析出消息的主题、标签等信息,然后将消息交给存储模块进行存储。
消息过滤服务初始化
加载过滤规则
如果有消息过滤的配置,Broker 会加载相应的过滤规则。这些规则可以是基于消息主题、标签等的过滤条件。例如,可以配置只接收主题为
order_topic
且标签为new_order
的消息。这就像在仓库(Broker)的入口设置了一个筛选器,只有符合特定条件的货物(消息)才能进入某些区域(消费者接收队列)。
初始化过滤组件
初始化用于执行消息过滤的组件,当消费者请求消息时,这些组件会根据过滤规则筛选出符合条件的消息发送给消费者。
集群相关模块初始化(如果是集群模式)
发现集群中的其他 Broker
在集群模式下,Broker 会通过配置的集群发现机制(如 Namesrv 地址列表)来查找其他 Broker。例如,它会向 Namesrv 发送请求,获取集群中其他 Broker 的信息,包括它们的地址和状态。
这类似于一个新成员加入团队时,先去了解其他团队成员的位置和职责。
初始化集群通信组件
为了与其他 Broker 进行通信,会初始化集群通信组件。这些组件用于同步消息状态、处理主从复制(如果有主从架构)等操作。例如,在主从架构中,主 Broker 会通过集群通信组件将消息复制到从 Broker,以保证数据的冗余和高可用性。
启动完成后的检查与等待
检查各模块状态
Broker 会检查各个初始化后的模块状态是否正常。例如,检查存储模块是否能够正常读写消息,网络通信模块是否能够正常接收和发送请求等。
进入消息处理循环
当所有检查都通过后,Broker 就进入消息处理循环,等待生产者发送消息和消费者请求消息,开始正常的工作流程。这个过程就像是仓库(Broker)正式开门营业,等待货物(消息)的进出和处理。
01 总体概述
Broker 是 RocketMQ 中最为复杂的一个模块,绝大部分消息队列的特性都在这个模块中实现,它主要负责消息的「存储」、「投递」和「查询」以及「服务高可用」保证。
单机可以支撑上万队列规模,具有上亿级消息堆积能力,可以严格保证消息的有序性。
Broker 相关代码集中在 rocketmq-broker 模块下,如下:
模块如下:
client:生产者管理器、消费者管理器、Broker-Client 调用器等。
Controller:ReplicasManager 相关,controller 模式。
dledger:基于 DLedger 技术的 Broker 高可用。
filter:ConsumerFilter 相关,用于消费者消费消息过滤
longpolling:PullRequest 相关,消费者读取消息。
offset:ComsumerOffset 相关,消费偏移量管理。
processor:Broker 的 Netty 处理器。
schedule:定时任务相关。
salve:主从同步组件。
subscription:订阅组管理。
topic:Topic 管理。
transaction:事务管理。
重要模块组成如下:
远程连接模块:指整个 Broker 实体,负责处理来自客户端请求。
客户端管理模块:负责管理客户端「Producer」、「Consumer」和维护 Consumer 上的 Topic 订阅信息。
存储服务模块:提供了方便简单的 API 接口来处理消息,存储到物理硬盘以及查询相关功能。
HA 服务模块:提供高可用服务,Master Broker 和 Slave Broker 之间的数据同步功能。
索引服务:创建消息的索引,以提供快速查询功能。
02 Broker 启动流程
在剖析「NameServer」时, 我们知道它的启动类和控制器分别为「NameSrvStartup」 、「NameSrvController」,对应的 Broker 的启动类和控制器分别为「BrokerStartup」、「BrokerController」,这就是 Broker 源码的入口。
与 「NameSrvStartup」类似,「BrokerStartup」也是会先构建控制器「BrokerController」,再调用其 start() 方法进行启动 Broker。
换句话说就是 Broker 启动的大部分逻辑都在「BrokerController」中。
2.1 Broker 启动入口
// 创建 Broker 控制器并启动 Broker 控制器
public static void main(String[] args) {
start(createBrokerController(args));
}
public static BrokerController start(BrokerController controller) {
try {
// 启动 Broker 控制器
controller.start();
String tip = String.format("The broker[%s, %s] boot success. serializeType=%s",
controller.getBrokerConfig().getBrokerName(), controller.getBrokerAddr(),
RemotingCommand.getSerializeTypeConfigInThisServer());
if (null != controller.getBrokerConfig().getNamesrvAddr()) {
tip += " and name server is " + controller.getBrokerConfig().getNamesrvAddr();
}
// 打印日志与输出
log.info(tip);
System.out.printf("%s%n", tip);
return controller;
} catch (Throwable e) {
e.printStackTrace();
System.exit(-1);
}
return null;
}
知道了入口之后,接下来就来剖析下它是如何启动的,启动的过程中都做了什么?
从上面的部分代码中,不难看出,它首先就是创建「BrokerController」对象,然后调用 start() 方法启动,那就一步一步走进去看看细节吧。
2.2 BrokerController 对象是如何创建的
总共 5 个操作步骤:
构建 Broker 控制器。
初始化 Broker 控制器。
如果初始化失败,则退出表示 Broker 启动失败。
添加 JVM 钩子函数在 JVM 停止时关闭 BrokerController。
返回控制器对象。
我们来分别看下每一步的操作。
public static BrokerController buildBrokerController(String[] args) throws Exception {
// 利用 System.setProperty() 设置 RocketMQ 的版本号
System.setProperty(RemotingCommand.REMOTING_VERSION_KEY, Integer.toString(MQVersion.CURRENT_VERSION));
// 1、创建 broker 的相关配置类对象
// Broker核心配置类 : 用来封装其绝大多数基本配置信息,比如ROCKETMQ_HOME
final BrokerConfig brokerConfig = new BrokerConfig();
// Netty服务器配置类:Broker 作为服务端,比如接受来自客户端(Producer/Consumer)的消息的时候
final NettyServerConfig nettyServerConfig = new NettyServerConfig();
// Netty 客户端配置类:Broker 作为客户端,比如连接 NameServer 的时候
final NettyClientConfig nettyClientConfig = new NettyClientConfig();
// Broker 存储对象配置类:例如各种文件大小等
final MessageStoreConfig messageStoreConfig = new MessageStoreConfig();
// 2、设置 netty 的服务端监听的端口号 10911,对外提供消息读写服务的端口,Broker 存储对象配置端口 10912
nettyServerConfig.setListenPort(10911);
messageStoreConfig.setHaListenPort(0);
// 3、构建命令行操作
Options options = ServerUtil.buildCommandlineOptions(new Options());
// 解析命令行为 'mgbroker’的参数
CommandLine commandLine = ServerUtil.parseCmdLine(
"mqbroker", args, buildCommandlineOptions(options), new DefaultParser());
if (null == commandLine) {
// 如果为空,直接退出,退出状态码为-1
System.exit(-1);
}
Properties properties = null;
// 解析外部配置文件,读取 broker.conf 配置文件,-c 参数指定 Broker 配置文件
// 判断命令行中是否包含字符'-c’(即为否包含通过命令行指定配置文件的命令)
// 例如 broker 启动的时候添加的 -c /home/wangjianghua/src/rocketmq/rocketmq-all-4.9.4-bin-release/conf/broker.conf命令
if (commandLine.hasOption('c')) {
// 获取该命令指定的配置文件
String file = commandLine.getOptionValue('c');
if (file != null) {
CONFIG_FILE_HELPER.setFile(file);
BrokerPathConfigHelper.setBrokerConfigPath(file);
// 加载配置文件到 properties,基于缓冲输入流
properties = CONFIG_FILE_HELPER.loadConfig();
}
}
// 4、加载配置文件中的配置
if (properties != null) { // 得到的 properties 配置不为空
// 将配置文件内容利用反射设置到对应的配置类中
// 将 rmgAddressServerDomain、rmgAddressServerSubGroup 属性设置为系统属性
properties2SystemEnv(properties);
// 设置 brokerConfig 的配置信息
MixAll.properties2Object(properties, brokerConfig);
// 设置 nettyservice 的配置信息
MixAll.properties2Object(properties, nettyServerConfig);
// 设置 nettyClient 的配置信息
MixAll.properties2Object(properties, nettyClientConfig);
// 设置 messageStore 的配置信息
MixAll.properties2Object(properties, messageStoreConfig);
}
// 4、加载命令行中的参数
MixAll.properties2Object(ServerUtil.commandLine2Properties(commandLine), brokerConfig);
if (null == brokerConfig.getRocketmqHome()) {
// 如果从 brokerConfig 配置文件中获取到的 RocketmgHome 为空,打印提示并设置系统状态码为 -2,需要我们自己设置一下环境变量
System.out.printf("Please set the %s variable in your environment " +
"to match the location of the RocketMQ installation", MixAll.ROCKETMQ_HOME_ENV);
System.exit(-2);
}
// Validate namesrvAddr
// 5、获取Nameserver地址,用 ; 分割解析为数组,Nameserver 可能为集群 所以有多个
String namesrvAddr = brokerConfig.getNamesrvAddr();
if (StringUtils.isNotBlank(namesrvAddr)) {
try {
// 拆分 NameServer 的地址
// 可以指定多个 NameServer 的地址,以";"分隔,形成一个地址宇符串数组
String[] addrArray = namesrvAddr.split(";");
// 将字符串的地址,转换为网络连接的 scoketAddress,检查格式是否正确
for (String addr : addrArray) {
NetworkUtil.string2SocketAddress(addr);
}
} catch (Exception e) {
System.out.printf("The Name Server Address[%s] illegal, please set it as follows, " + "\"127.0.0.1:9876;192.168.0.1:9876\"%n", namesrvAddr);
System.exit(-3);
}
}
/**
* 6、Slave-Broker的参数配置信息
* 如果 broker 的角色是 slave (默认 broker 的角色是异步 master),设置命中消息在内存的最大比例
* 超过所设置的最大内存,消息将被置换出内存; 设置成比默认的 40 %,还要小 10 %
*/
if (BrokerRole.SLAVE == messageStoreConfig.getBrokerRole()) {
int ratio = messageStoreConfig.getAccessMessageInMemoryMaxRatio() - 10;
messageStoreConfig.setAccessMessageInMemoryMaxRatio(ratio);
}
// Set broker role according to ha config
// 7、不启动控制器模式,不支持自动切换主从角色。
if (!brokerConfig.isEnableControllerMode()) {
// 判断 Broker 角色作相应的处理,这里 master 有两种同步消息方式
// 设置、校验brokerId
// 根据检查 broker 的角色配置 brokerId: 默认角色是 ASYNC_MASTER
// 通过此配置可知 brokerId 为 0 表示 master,非 0 表示 slave
// broker 的角色分为:
// ASYNC_MASTER : 异步同步消息到 slave
// SYNCMASTER : 同步同步消息到 s1ave
// SLAVE
switch (messageStoreConfig.getBrokerRole()) {
// 异步复制:生产者写入消息到 Master 后无需等待消息复制到 slave 即可返回,消息的复制由旁路线程进行异步复制
case ASYNC_MASTER:
/**
* 同步复制的方式,表现出来的是类似同步双写的策略。即 Master 写入完消息之后,需要等待 Slave 的复制成功。
* 注:这里只需要有一个 Slave 复制成功并成功应答即算成功,所以在这种模式下,如果有 3 个 Slave,当生产者获得
* SEND_OK 的应答时,代表消息已经达到 Master 和一个 Slave (注:这里并不代表已经持久化到磁盘,而只能证明
* 肯定到了 PageCache,是否能刷到磁盘取决于刷盘策略是同步刷盘还是异步刷盘),而还有两个 Slave 实际上是无法保证的,
* 并且这里也不支持配置,即不支持如 ”同步半数以上” 之类的设置
*/
case SYNC_MASTER:
// 如果是 master 角色,设置 brokerId = 0
brokerConfig.setBrokerId(MixAll.MASTER_ID);
break;
/**
* 消息发送的状态除了 SEND_OK 外,还会多出以下的状态:
* FLUSH_SLAVE_TIMEOUT:同步到 slave 等待超时,即一直等 Slave 上报同步的进度,但过了超时时间都没有成功没有同步完。
* SLAVE_NOT_AVAILABLE:当前没有可用的 Slave。注:如果 Slave 落后 Master 实在太多,那个 slave 也会认为是暂时不
* 可用的 Slave,直到它同步到接近的范围为止,这个不可用的阈值由 broker 配置 haSlaveFallbehindMax(默认是1024 * 1024 * 256)决定
*/
case SLAVE:
// 如果是 slave 角色,需要 brokerId > 0
if (brokerConfig.getBrokerId() <= MixAll.MASTER_ID) {
System.out.printf("Slave's brokerId must be > 0%n");
System.exit(-3);
}
break;
default:
break;
}
}
// 7、是否基于 DLeger (即是否启用RocketMQ 主从切换,默认值是false )技术来管理主从同步和 CommitLog,如果需要开启主从切换,则该值需要设置为true,那么设置 brokerId = -1
if (messageStoreConfig.isEnableDLegerCommitLog()) {
brokerConfig.setBrokerId(-1);
}
if (brokerConfig.isEnableControllerMode() && messageStoreConfig.isEnableDLegerCommitLog()) {
System.out.printf("The config enableControllerMode and enableDLegerCommitLog cannot both be true.%n");
System.exit(-4);
}
// 8、设置 Ha 存储监听端口 10912
// 设置高可用通信监听端口,为监听端口+1,默认就是10912
// 该端口主要用于:比如主从同步之类的高可用操作,在配置 broker 集群的时候需要注意,配置集群时可能会抛出: Address already in use
// 这是因为一个 broker 机器会占用三个端口,监所 ip 端口,以及监听 ip 端口 + 1的端口,监听ip 端口 - 2 的端口
if (messageStoreConfig.getHaListenPort() <= 0) {
messageStoreConfig.setHaListenPort(nettyServerConfig.getListenPort() + 1);
}
// 设置 broker 容器:默认为false,不能进行手动设置,这个和启动模式有关
// 1.通过 Brokerstartup 启动 broker 则为 false
// 2.通过 BrokerContainer 启动 broker 则为 true
brokerConfig.setInBrokerContainer(false);
// 日志相关配置
System.setProperty("brokerLogDir", "");
// isIso1ateLogEnab1e 属性表示在同一台机器上部署多个 broker 时是否区分日志路径,默认 fa1se
if (brokerConfig.isIsolateLogEnable()) {
System.setProperty("brokerLogDir", brokerConfig.getBrokerName() + "_" + brokerConfig.getBrokerId());
}
if (brokerConfig.isIsolateLogEnable() && messageStoreConfig.isEnableDLegerCommitLog()) {
System.setProperty("brokerLogDir", brokerConfig.getBrokerName() + "_" + messageStoreConfig.getdLegerSelfId());
}
// 9、通过 -p 参数打印所有参数项, 启动时候日志打印配置信息
if (commandLine.hasOption('p')) {
Logger console = LoggerFactory.getLogger(LoggerName.BROKER_CONSOLE_NAME);
MixAll.printObjectProperties(console, brokerConfig);
MixAll.printObjectProperties(console, nettyServerConfig);
MixAll.printObjectProperties(console, nettyClientConfig);
MixAll.printObjectProperties(console, messageStoreConfig);
System.exit(0);
// 通过 -m 打印重要的参数项,即解析命令行参数"-m”,启动时候日志打印导入的配置信息。
} else if (commandLine.hasOption('m')) {
Logger console = LoggerFactory.getLogger(LoggerName.BROKER_CONSOLE_NAME);
MixAll.printObjectProperties(console, brokerConfig, true);
MixAll.printObjectProperties(console, nettyServerConfig, true);
MixAll.printObjectProperties(console, nettyClientConfig, true);
MixAll.printObjectProperties(console, messageStoreConfig, true);
System.exit(0);
}
// 打印当前 broker 的配置日志
log = LoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
MixAll.printObjectProperties(log, brokerConfig);
MixAll.printObjectProperties(log, nettyServerConfig);
MixAll.printObjectProperties(log, nettyClientConfig);
MixAll.printObjectProperties(log, messageStoreConfig);
// 10、基于上面的配置参数来构建 BrokerController
final BrokerController controller = new BrokerController(
brokerConfig, nettyServerConfig, nettyClientConfig, messageStoreConfig);
// 将所有的 -c 的外部信息配置,保存到 NamesrvContro1ler 中的 Configuration 对象屈性的a11Config 属性中
// Remember all configs to prevent discard
controller.getConfiguration().registerConfig(properties);
return controller;
}
看完是不是觉得 BrokerController 构建的逻辑与 NamesrvController 是类似的,构建命令行参数、处理一些配置、创建 BrokerController 等,核心的逻辑如下:
创建「Broker」、「Netty 服务端/客户端」、「消息存储」等配置对象,可以看到,Broker 是既有 NettyServer,又有 NettyClient 的配置对象,因为 Broker会作为客户端调用 NameServer,也会作为服务端被生产者/消费者调用。
Broker 端的 NettyServer 监听端口默认设置为 10911。
构建 Broker 端的命令行 CommandLine,这个就是用来在启动 Broker 时进行命令行参数交互的。
读取命令行 -c 参数指定的 Broker 配置文件加载配置,再读取命令行中的参数。
获取 Nameserver 地址,用 ; 分割解析为数组, Nameserver 可能为集群 所以有多个。
判断 broker 是否为 slave, 如果是则将消息占用内存百度分减去 10 %, 默认 40 %,超过内存的消息将置换出内存。
如果是 Master 节点,设置 brokerId=0,Slave 节点的 bokerId 必须大于 0;而如果启用了 DLeger 技术,brokerId 都设置为 -1。
设置 Ha 存储监听端口 10912。
可以通过 -p 参数打印所有参数项,通过 -m 打印重要的参数项。
最后通过配置对象创建 BrokerController。
别看这么多,其实创建 BrokerController 时,核心就做了2件事情:
解析各种配置 命令行等、创建 BrokerController 需要的各种配置对象:BrokerConfig、NettyServerConfig、NettyClientConfig、MessageStoreConfig。
BrokerController 相当于 Broker 的一个中央控制类。创建了 BrokerController 的对象后,再调用 BrokerController 对象的 initialize 方法,进行初始化操作。
从构建 BrokerController 的代码中可以看出,BrokerController 的依赖的四个核心配置如下图所示:
这些配置类实际上就是一些普通的 POJO 类。所以此时 Broker 的整个组件结构应该是这样的:
这里来重点看下部分步骤。
2.2.1.1 创建配置对象
这里主要创建了几个重要的配置对象:
创建 Broker 核心配置对象。
创建 Netty 客户端 和 服务端配置对象。
创建存储对象的配置。
设置 Broker 默认端口 10911 以及Ha初始端口 0,后面会设置为 10912。
2.2.1.2 读取配置文件
读取 broker 启动时指定的 broker.conf 配置文件,将配置信息映射到上面的配置类中。
还有一些其他配置:
03 总结
最后,总结下 BrokerStartup 的启动初始化流程,如下图: