1. 首页
  2. RocketMq源码解析
  3. 图解 RocketMQ 源码之 Broker 启动流程剖析

图解 RocketMQ 源码之 Broker 启动流程剖析

  • 发布于 2024-10-25
  • 15 次阅读

RocketMQ 是一个分布式消息中间件,它具有低延迟、高性能的特点,并且支持发布/订阅和点对点模型。Broker 是 RocketMQ 中的一个核心组件,负责接收来自生产者的消息并存储起来,同时响应消费者的拉取消息请求。下面是 RocketMQ Broker 启动流程的图解及分析:

1. 启动入口

  • 启动命令:通常通过运行 mqbroker 命令来启动 Broker。

  • 主类:启动时会加载 org.apache.rocketmq.broker.BrokerStartup 类。

2. 初始化配置

  • 读取配置:从命令行参数或配置文件中读取必要的配置信息,比如 Broker 的名称、存储路径等。

  • 创建配置对象:使用读取到的配置信息创建 BrokerConfig 对象。

3. 创建 BrokerController

  • BrokerController 实例化:创建 BrokerController 实例,这是控制 Broker 行为的核心控制器。

  • 初始化服务端口监听器:设置用于通信的服务端口监听器,包括 RemotingServer(Netty)。

4. 注册处理器

  • 注册各种处理器:在 RemotingServer 上注册处理不同协议消息的处理器,例如心跳检测处理器、发送消息处理器等。

5. 初始化持久化层

  • 创建 CommitLog 存储:初始化 CommitLog 文件,这是 RocketMQ 消息的主要存储形式。

  • 创建 ConsumerQueue 和 IndexFile:基于 CommitLog 创建索引文件,以便快速查找消息。

6. 启动网络服务

  • 启动 Netty 服务器:启动 NettyServer 来监听客户端连接,准备接受来自 Producer 或 Consumer 的请求。

7. 加载插件系统

  • 加载插件:如果配置了插件,则在此步骤加载这些插件以增强或定制 Broker 功能。

8. 完成启动

  • 通知状态变化:向 Name Server 发送注册信息,告知当前 Broker 已经准备好提供服务。

  • 打印日志:输出启动完成的日志信息,表明 Broker 成功启动。

为了更好地理解 RocketMQ Broker 的启动流程,我们可以使用一个简化的流程图来表示主要步骤。下面是一个文字描述的流程图,以及每个步骤的说明。在实际情况下,你可能需要使用绘图工具(如 Microsoft Visio、Draw.io 或 Lucidchart)来创建一个可视化的流程图。

RocketMQ Broker 启动流程图解

+-----------------+          +--------------------+          +---------------------+
| 读取配置文件    | ---->    | 创建 BrokerController| ---->   | 初始化 Netty 服务端  |
| (mqbroker 命令) |          |                    |          |                     |
+-----------------+          +--------------------+          +---------------------+
        |                           |                               |
        |                           v                               v
+-----------------+          +--------------------+          +---------------------+
| 加载 Broker 配置| <----+   | 初始化存储路径      | ---->   | 注册消息处理器      |
| (BrokerConfig)  |       |   | (CommitLog, etc.)  |          | (SendMessage, etc.)|
+-----------------+       |   +--------------------+          +---------------------+
        |               |               |                               |
        |               |               v                               v
+-----------------+     +--------------------+     +---------------------+  +-------------------+
| 创建 CommitLog  | <---+ 初始化 ConsumerQueue | <---+ 启动 Netty 服务器  |  | 加载插件系统      |
| 和其他索引文件  |     | (ConsumerOffset)     |     | (NettyRemotingServer)|
+-----------------+     +--------------------+     +---------------------+  +-------------------+
        |                   |                               |                       |
        |                   v                               v                       v
+-----------------+     +--------------------+     +---------------------+  +-------------------+
| 注册到 NameServer| <---+ 启动定时任务线程池  | <---+ 监听客户端请求      |  | 完成启动并打印日志 |
| 发送心跳包      |     | (ScheduledExecutor)  |     | (等待生产者/消费者)  |  | (Broker 启动成功)  |
+-----------------+     +--------------------+     +---------------------+  +-------------------+

流程说明

  1. 读取配置文件:通过 mqbroker 命令行参数或配置文件读取 Broker 运行所需的配置信息。

  2. 创建 BrokerController:根据配置信息创建 BrokerController 实例,这是 Broker 的核心控制器。

  3. 加载 Broker 配置:将读取到的配置信息应用到 BrokerController 中。

  4. 初始化存储路径:设置用于存储消息数据的物理路径,并初始化相应的文件结构,比如 CommitLog。

  5. 创建 CommitLog 和其他索引文件:初始化 CommitLog 文件及相关的 ConsumerQueue 和 IndexFile。

  6. 注册消息处理器:为 Netty 服务端注册处理不同协议消息的处理器,如发送消息处理器。

  7. 初始化 Netty 服务端:准备网络层,设置监听端口等。

  8. 启动 Netty 服务器:正式启动 Netty 服务端,开始监听指定端口上的连接。

  9. 启动定时任务线程池:启动定时任务线程池,用于执行诸如清理过期文件等定期任务。

  10. 注册到 NameServer:向 NameServer 注册当前 Broker 的信息,并定期发送心跳保持联系。

  11. 监听客户端请求:开始接受来自生产者和消费者的请求。

  12. 加载插件系统:如果有自定义插件,则在此处加载以增强功能。

  13. 完成启动并打印日志:输出启动成功的日志信息。

这个流程图和说明可以帮助你从宏观上把握 RocketMQ Broker 的启动过程。如果你需要更详细的源码分析,可以深入研究 RocketMQ 的源代码,特别是 BrokerStartup 类及其相关类的方法实现。


  1. 整体架构概述

    • RocketMQ 主要由 Producer(生产者)、Consumer(消费者)和 Broker(消息中转角色)组成。Broker 在整个消息传递过程中起到了关键作用,它负责接收生产者发送的消息,存储消息,并将消息推送给消费者。

    • 在启动时,Broker 要完成一系列的初始化工作,包括配置加载、模块初始化等。

  2. 配置加载阶段

    • 加载配置文件

      • Broker 启动时,首先会读取配置文件。配置文件包含了诸如存储路径、监听端口、集群相关配置等众多参数。例如,broker.conf文件中的storePathRootDir参数指定了消息存储的根目录路径。

      • 它会使用Properties类(Java 语言下)或者类似的配置解析工具将配置文件中的键值对读取到内存中。这个过程就像是给 Broker 的运行设定了基本规则,告诉它应该在哪里存储消息,如何与外界通信等。

    • 解析命令行参数

      • 除了配置文件,Broker 也会解析命令行参数。这些参数可以在启动脚本中指定,用于覆盖配置文件中的某些设置。比如,可以通过命令行参数指定特定的日志级别,这样在调试或者运行特定环境时可以灵活调整。

  3. 存储模块初始化

    • 消息存储路径创建与检查

      • 根据配置文件中指定的存储路径,Broker 会检查路径是否存在。如果不存在,会创建相应的目录结构。例如,对于消息存储目录,会创建像commitlog(存储消息主体)、consumequeue(存储消息索引)等子目录。

      • 这一步就好比为仓库(存储模块)建造合适的房间(目录)来存放不同类型的货物(消息和索引)。

    • 存储组件初始化

      • 初始化消息存储相关的组件,如CommitLog对象。CommitLog是 RocketMQ 存储消息的核心组件,它负责将生产者发送的消息顺序写入磁盘文件。

      • 同时会初始化ConsumeQueue组件,它的主要作用是为消费者快速定位消息提供索引。例如,一个消费者要获取特定主题的消息,ConsumeQueue可以帮助它快速找到消息在CommitLog中的位置。

  4. 网络通信模块初始化

    • 监听端口设置

      • 根据配置,Broker 会在指定的端口上开启监听。例如,默认情况下会在10911端口监听生产者的连接请求,在10912端口监听消费者的连接请求。

      • 这个过程类似于在港口(Broker)设置码头(监听端口),等待船只(生产者和消费者)的到来。

    • 网络处理器初始化

      • 初始化网络处理器,用于处理生产者和消费者的连接、请求和响应。这些网络处理器会按照 RocketMQ 定义的协议来解析和处理网络请求。

      • 比如,当生产者发送一条消息时,网络处理器会接收消息字节流,按照协议解析出消息的主题、标签等信息,然后将消息交给存储模块进行存储。

  5. 消息过滤服务初始化

    • 加载过滤规则

      • 如果有消息过滤的配置,Broker 会加载相应的过滤规则。这些规则可以是基于消息主题、标签等的过滤条件。例如,可以配置只接收主题为order_topic且标签为new_order的消息。

      • 这就像在仓库(Broker)的入口设置了一个筛选器,只有符合特定条件的货物(消息)才能进入某些区域(消费者接收队列)。

    • 初始化过滤组件

      • 初始化用于执行消息过滤的组件,当消费者请求消息时,这些组件会根据过滤规则筛选出符合条件的消息发送给消费者。

  6. 集群相关模块初始化(如果是集群模式)

    • 发现集群中的其他 Broker

      • 在集群模式下,Broker 会通过配置的集群发现机制(如 Namesrv 地址列表)来查找其他 Broker。例如,它会向 Namesrv 发送请求,获取集群中其他 Broker 的信息,包括它们的地址和状态。

      • 这类似于一个新成员加入团队时,先去了解其他团队成员的位置和职责。

    • 初始化集群通信组件

      • 为了与其他 Broker 进行通信,会初始化集群通信组件。这些组件用于同步消息状态、处理主从复制(如果有主从架构)等操作。例如,在主从架构中,主 Broker 会通过集群通信组件将消息复制到从 Broker,以保证数据的冗余和高可用性。

  7. 启动完成后的检查与等待

    • 检查各模块状态

      • Broker 会检查各个初始化后的模块状态是否正常。例如,检查存储模块是否能够正常读写消息,网络通信模块是否能够正常接收和发送请求等。

    • 进入消息处理循环

      • 当所有检查都通过后,Broker 就进入消息处理循环,等待生产者发送消息和消费者请求消息,开始正常的工作流程。这个过程就像是仓库(Broker)正式开门营业,等待货物(消息)的进出和处理。

01 总体概述

Broker 是 RocketMQ 中最为复杂的一个模块,绝大部分消息队列的特性都在这个模块中实现,它主要负责消息的存储投递查询以及服务高可用保证。

单机可以支撑上万队列规模,具有上亿级消息堆积能力,可以严格保证消息的有序性

Broker 相关代码集中在 rocketmq-broker 模块下,如下:

模块如下:

  1. client:生产者管理器、消费者管理器、Broker-Client 调用器等。

  2. Controller:ReplicasManager 相关,controller 模式。

  3. dledger:基于 DLedger 技术的 Broker 高可用。

  4. filter:ConsumerFilter 相关,用于消费者消费消息过滤

  5. longpolling:PullRequest 相关,消费者读取消息。

  6. offset:ComsumerOffset 相关,消费偏移量管理。

  7. processor:Broker 的 Netty 处理器。

  8. schedule:定时任务相关。

  9. salve:主从同步组件。

  10. subscription:订阅组管理。

  11. topic:Topic 管理。

  12. transaction:事务管理。

重要模块组成如下:

  1. 远程连接模块指整个 Broker 实体,负责处理来自客户端请求。

  2. 客户端管理模块负责管理客户端Producer」、「Consumer和维护 Consumer 上的 Topic 订阅信息。

  3. 存储服务模块提供了方便简单的 API 接口来处理消息,存储到物理硬盘以及查询相关功能。

  4. HA 服务模块提供高可用服务,Master Broker 和 Slave Broker 之间的数据同步功能。

  5. 索引服务创建消息的索引,以提供快速查询功能。


02 Broker 启动流程

在剖析NameServer」时, 我们知道它的启动类和控制器分别为NameSrvStartupNameSrvController,对应的 Broker 的启动类和控制器分别为BrokerStartup」、「BrokerController,这就是 Broker 源码的入口。

NameSrvStartup类似,BrokerStartup也是会先构建控制器BrokerController,再调用其 start() 方法进行启动 Broker。

换句话说就是 Broker 启动的大部分逻辑都在BrokerController中。

源码位置:https://github.com/apache/rocketmq/blob/release-5.1.2/broker/src/main/java/org/apache/rocketmq/broker/BrokerStartup.java

2.1 Broker 启动入口

// 创建 Broker 控制器并启动 Broker 控制器
public static void main(String[] args) {
        start(createBrokerController(args));
}

public static BrokerController start(BrokerController controller) {
        try {
            // 启动 Broker 控制器
            controller.start();

            String tip = String.format("The broker[%s, %s] boot success. serializeType=%s",
                controller.getBrokerConfig().getBrokerName(), controller.getBrokerAddr(),
                RemotingCommand.getSerializeTypeConfigInThisServer());

            if (null != controller.getBrokerConfig().getNamesrvAddr()) {
                tip += " and name server is " + controller.getBrokerConfig().getNamesrvAddr();
            }
            // 打印日志与输出
            log.info(tip);
            System.out.printf("%s%n", tip);
            return controller;
        } catch (Throwable e) {
            e.printStackTrace();
            System.exit(-1);
        }

        return null;
}

知道了入口之后,接下来就来剖析下它是如何启动的,启动的过程中都做了什么?

从上面的部分代码中,不难看出,它首先就是创建BrokerController对象,然后调用 start() 方法启动,那就一步一步走进去看看细节吧。

2.2 BrokerController 对象是如何创建的


总共 5 个操作步骤:

  1. 构建 Broker 控制器。

  2. 初始化 Broker 控制器。

  3. 如果初始化失败,则退出表示 Broker 启动失败。

  4. 添加 JVM 钩子函数在 JVM 停止时关闭 BrokerController

  5. 返回控制器对象。

我们来分别看下每一步的操作。

public static BrokerController buildBrokerController(String[] args) throws Exception {
        // 利用 System.setProperty() 设置 RocketMQ 的版本号
        System.setProperty(RemotingCommand.REMOTING_VERSION_KEY, Integer.toString(MQVersion.CURRENT_VERSION));
        // 1、创建 broker 的相关配置类对象
        // Broker核心配置类 : 用来封装其绝大多数基本配置信息,比如ROCKETMQ_HOME
        final BrokerConfig brokerConfig = new BrokerConfig(); 
        // Netty服务器配置类:Broker 作为服务端,比如接受来自客户端(Producer/Consumer)的消息的时候
        final NettyServerConfig nettyServerConfig = new NettyServerConfig(); 
        // Netty 客户端配置类:Broker 作为客户端,比如连接 NameServer 的时候
        final NettyClientConfig nettyClientConfig = new NettyClientConfig(); 
        // Broker 存储对象配置类:例如各种文件大小等
        final MessageStoreConfig messageStoreConfig = new MessageStoreConfig(); 
        // 2、设置 netty 的服务端监听的端口号 10911,对外提供消息读写服务的端口,Broker 存储对象配置端口 10912
        nettyServerConfig.setListenPort(10911);
        messageStoreConfig.setHaListenPort(0);
        // 3、构建命令行操作
        Options options = ServerUtil.buildCommandlineOptions(new Options());
        // 解析命令行为 'mgbroker’的参数
        CommandLine commandLine = ServerUtil.parseCmdLine(
            "mqbroker", args, buildCommandlineOptions(options), new DefaultParser());
        if (null == commandLine) {
            // 如果为空,直接退出,退出状态码为-1   
            System.exit(-1);
        }

        Properties properties = null;
        // 解析外部配置文件,读取 broker.conf 配置文件,-c 参数指定 Broker 配置文件
        // 判断命令行中是否包含字符'-c’(即为否包含通过命令行指定配置文件的命令)
        // 例如 broker 启动的时候添加的 -c /home/wangjianghua/src/rocketmq/rocketmq-all-4.9.4-bin-release/conf/broker.conf命令
        if (commandLine.hasOption('c')) {
            // 获取该命令指定的配置文件
            String file = commandLine.getOptionValue('c');
            if (file != null) {
                CONFIG_FILE_HELPER.setFile(file);
                BrokerPathConfigHelper.setBrokerConfigPath(file);
                // 加载配置文件到 properties,基于缓冲输入流
                properties = CONFIG_FILE_HELPER.loadConfig();
            }
        }
        // 4、加载配置文件中的配置
        if (properties != null) { // 得到的 properties 配置不为空
            // 将配置文件内容利用反射设置到对应的配置类中
            // 将 rmgAddressServerDomain、rmgAddressServerSubGroup 属性设置为系统属性
            properties2SystemEnv(properties);
            // 设置 brokerConfig 的配置信息
            MixAll.properties2Object(properties, brokerConfig);
            // 设置 nettyservice 的配置信息
            MixAll.properties2Object(properties, nettyServerConfig);
            // 设置 nettyClient 的配置信息
            MixAll.properties2Object(properties, nettyClientConfig);
            // 设置 messageStore 的配置信息
            MixAll.properties2Object(properties, messageStoreConfig);
        }
        // 4、加载命令行中的参数 
        MixAll.properties2Object(ServerUtil.commandLine2Properties(commandLine), brokerConfig);
        if (null == brokerConfig.getRocketmqHome()) {
            // 如果从 brokerConfig 配置文件中获取到的 RocketmgHome 为空,打印提示并设置系统状态码为 -2,需要我们自己设置一下环境变量
            System.out.printf("Please set the %s variable in your environment " +
                "to match the location of the RocketMQ installation", MixAll.ROCKETMQ_HOME_ENV);
            System.exit(-2);
        }

        // Validate namesrvAddr
        // 5、获取Nameserver地址,用 ; 分割解析为数组,Nameserver 可能为集群 所以有多个
        String namesrvAddr = brokerConfig.getNamesrvAddr();
        if (StringUtils.isNotBlank(namesrvAddr)) {
            try {
                // 拆分 NameServer 的地址
                // 可以指定多个 NameServer 的地址,以";"分隔,形成一个地址宇符串数组
                String[] addrArray = namesrvAddr.split(";");
                // 将字符串的地址,转换为网络连接的 scoketAddress,检查格式是否正确
                for (String addr : addrArray) {
                    NetworkUtil.string2SocketAddress(addr);
                }
            } catch (Exception e) {
                System.out.printf("The Name Server Address[%s] illegal, please set it as follows, " + "\"127.0.0.1:9876;192.168.0.1:9876\"%n", namesrvAddr);
                System.exit(-3);
            }
        }
        /**
         * 6、Slave-Broker的参数配置信息
         * 如果 broker 的角色是 slave (默认 broker 的角色是异步 master),设置命中消息在内存的最大比例 
         * 超过所设置的最大内存,消息将被置换出内存; 设置成比默认的 40 %,还要小 10 %
         */
        if (BrokerRole.SLAVE == messageStoreConfig.getBrokerRole()) {
            int ratio = messageStoreConfig.getAccessMessageInMemoryMaxRatio() - 10;
            messageStoreConfig.setAccessMessageInMemoryMaxRatio(ratio);
        }

        // Set broker role according to ha config
        // 7、不启动控制器模式,不支持自动切换主从角色。
        if (!brokerConfig.isEnableControllerMode()) {
            // 判断 Broker 角色作相应的处理,这里 master 有两种同步消息方式
            // 设置、校验brokerId
            // 根据检查 broker 的角色配置 brokerId: 默认角色是 ASYNC_MASTER 
            // 通过此配置可知 brokerId 为 0 表示 master,非 0 表示 slave
            // broker 的角色分为: 
            // ASYNC_MASTER : 异步同步消息到 slave
            // SYNCMASTER : 同步同步消息到 s1ave
            // SLAVE
            switch (messageStoreConfig.getBrokerRole()) {
                // 异步复制:生产者写入消息到 Master 后无需等待消息复制到 slave 即可返回,消息的复制由旁路线程进行异步复制
                case ASYNC_MASTER:
                /**
                 * 同步复制的方式,表现出来的是类似同步双写的策略。即 Master 写入完消息之后,需要等待 Slave 的复制成功。
                 * 注:这里只需要有一个 Slave 复制成功并成功应答即算成功,所以在这种模式下,如果有 3 个 Slave,当生产者获得
                 * SEND_OK 的应答时,代表消息已经达到 Master 和一个 Slave (注:这里并不代表已经持久化到磁盘,而只能证明
                 * 肯定到了 PageCache,是否能刷到磁盘取决于刷盘策略是同步刷盘还是异步刷盘),而还有两个 Slave 实际上是无法保证的,
                 * 并且这里也不支持配置,即不支持如 ”同步半数以上” 之类的设置
                 */
                case SYNC_MASTER:
                    // 如果是 master 角色,设置 brokerId = 0
                    brokerConfig.setBrokerId(MixAll.MASTER_ID);
                    break;
                /**
                 * 消息发送的状态除了 SEND_OK 外,还会多出以下的状态:
                 * FLUSH_SLAVE_TIMEOUT:同步到 slave 等待超时,即一直等 Slave 上报同步的进度,但过了超时时间都没有成功没有同步完。
                 * SLAVE_NOT_AVAILABLE:当前没有可用的 Slave。注:如果 Slave 落后 Master 实在太多,那个 slave 也会认为是暂时不
                 * 可用的 Slave,直到它同步到接近的范围为止,这个不可用的阈值由 broker 配置 haSlaveFallbehindMax(默认是1024 * 1024 * 256)决定
                 */
                case SLAVE:
                    // 如果是 slave 角色,需要 brokerId > 0 
                    if (brokerConfig.getBrokerId() <= MixAll.MASTER_ID) {
                        System.out.printf("Slave's brokerId must be > 0%n");
                        System.exit(-3);
                    }
                    break;
                default:
                    break;
            }
        }
        // 7、是否基于 DLeger (即是否启用RocketMQ 主从切换,默认值是false )技术来管理主从同步和 CommitLog,如果需要开启主从切换,则该值需要设置为true,那么设置 brokerId = -1
        if (messageStoreConfig.isEnableDLegerCommitLog()) {
            brokerConfig.setBrokerId(-1);
        }

        if (brokerConfig.isEnableControllerMode() && messageStoreConfig.isEnableDLegerCommitLog()) {
            System.out.printf("The config enableControllerMode and enableDLegerCommitLog cannot both be true.%n");
            System.exit(-4);
        }
        // 8、设置 Ha 存储监听端口 10912
        // 设置高可用通信监听端口,为监听端口+1,默认就是10912
        // 该端口主要用于:比如主从同步之类的高可用操作,在配置 broker 集群的时候需要注意,配置集群时可能会抛出: Address already in use
        // 这是因为一个 broker 机器会占用三个端口,监所 ip 端口,以及监听 ip 端口 + 1的端口,监听ip 端口 - 2 的端口
        if (messageStoreConfig.getHaListenPort() <= 0) {
            messageStoreConfig.setHaListenPort(nettyServerConfig.getListenPort() + 1);
        }
        // 设置 broker 容器:默认为false,不能进行手动设置,这个和启动模式有关
        // 1.通过 Brokerstartup 启动 broker 则为 false
        // 2.通过 BrokerContainer 启动 broker 则为 true
        brokerConfig.setInBrokerContainer(false);
        // 日志相关配置
        System.setProperty("brokerLogDir", "");
        // isIso1ateLogEnab1e 属性表示在同一台机器上部署多个 broker 时是否区分日志路径,默认 fa1se
        if (brokerConfig.isIsolateLogEnable()) {
            System.setProperty("brokerLogDir", brokerConfig.getBrokerName() + "_" + brokerConfig.getBrokerId());
        }
        if (brokerConfig.isIsolateLogEnable() && messageStoreConfig.isEnableDLegerCommitLog()) {
            System.setProperty("brokerLogDir", brokerConfig.getBrokerName() + "_" + messageStoreConfig.getdLegerSelfId());
        }
        // 9、通过 -p 参数打印所有参数项, 启动时候日志打印配置信息
        if (commandLine.hasOption('p')) {
            Logger console = LoggerFactory.getLogger(LoggerName.BROKER_CONSOLE_NAME);
            MixAll.printObjectProperties(console, brokerConfig);
            MixAll.printObjectProperties(console, nettyServerConfig);
            MixAll.printObjectProperties(console, nettyClientConfig);
            MixAll.printObjectProperties(console, messageStoreConfig);
            System.exit(0);
        // 通过 -m 打印重要的参数项,即解析命令行参数"-m”,启动时候日志打印导入的配置信息。
        } else if (commandLine.hasOption('m')) {
            Logger console = LoggerFactory.getLogger(LoggerName.BROKER_CONSOLE_NAME);
            MixAll.printObjectProperties(console, brokerConfig, true);
            MixAll.printObjectProperties(console, nettyServerConfig, true);
            MixAll.printObjectProperties(console, nettyClientConfig, true);
            MixAll.printObjectProperties(console, messageStoreConfig, true);
            System.exit(0);
        }
        // 打印当前 broker 的配置日志
        log = LoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
        MixAll.printObjectProperties(log, brokerConfig);
        MixAll.printObjectProperties(log, nettyServerConfig);
        MixAll.printObjectProperties(log, nettyClientConfig);
        MixAll.printObjectProperties(log, messageStoreConfig);
        // 10、基于上面的配置参数来构建 BrokerController
        final BrokerController controller = new BrokerController(
            brokerConfig, nettyServerConfig, nettyClientConfig, messageStoreConfig);
        // 将所有的 -c 的外部信息配置,保存到 NamesrvContro1ler 中的 Configuration 对象屈性的a11Config 属性中
        // Remember all configs to prevent discard
        controller.getConfiguration().registerConfig(properties);

        return controller;
}

看完是不是觉得 BrokerController 构建的逻辑与 NamesrvController 是类似的,构建命令行参数、处理一些配置、创建 BrokerController 等,核心的逻辑如下:

  1. 创建BrokerNetty 服务端/客户端」、「消息存储等配置对象,可以看到,Broker 是既有 NettyServer,又有 NettyClient 的配置对象,因为 Broker会作为客户端调用 NameServer,也会作为服务端被生产者/消费者调用。

  2. Broker 端的 NettyServer 监听端口默认设置为 10911。

  3. 构建 Broker 端的命令行 CommandLine,这个就是用来在启动 Broker 时进行命令行参数交互的。

  4. 读取命令行 -c 参数指定的 Broker 配置文件加载配置,再读取命令行中的参数。

  5. 获取 Nameserver 地址,用 ; 分割解析为数组, Nameserver 可能为集群 所以有多个。

  6. 判断 broker 是否为 slave, 如果是则将消息占用内存百度分减去 10 %, 默认 40 %,超过内存的消息将置换出内存。

  7. 如果是 Master 节点,设置 brokerId=0,Slave 节点的 bokerId 必须大于 0;而如果启用了 DLeger 技术,brokerId 都设置为 -1。

  8. 设置 Ha 存储监听端口 10912。

  9. 可以通过 -p 参数打印所有参数项,通过 -m 打印重要的参数项。

  10. 最后通过配置对象创建 BrokerController

别看这么多,其实创建 BrokerController 时,核心就做了2件事情:

  1. 解析各种配置 命令行等、创建 BrokerController 需要的各种配置对象:BrokerConfigNettyServerConfigNettyClientConfigMessageStoreConfig

  2. BrokerController 相当于 Broker 的一个中央控制类。创建了 BrokerController 的对象后,再调用 BrokerController 对象的 initialize 方法,进行初始化操作。

从构建 BrokerController 的代码中可以看出,BrokerController 的依赖的四个核心配置如下图所示:

这些配置类实际上就是一些普通的 POJO 类。所以此时 Broker 的整个组件结构应该是这样的:


这里来重点看下部分步骤。

2.2.1.1 创建配置对象

这里主要创建了几个重要的配置对象:

  1. 创建 Broker 核心配置对象。

  2. 创建 Netty 客户端 和 服务端配置对象。

  3. 创建存储对象的配置。

  4. 设置 Broker 默认端口 10911 以及Ha初始端口 0,后面会设置为 10912。

2.2.1.2 读取配置文件

读取 broker 启动时指定的 broker.conf 配置文件,将配置信息映射到上面的配置类中。


还有一些其他配置:

  1. 比如设置 master slave 通信的端口为 10912即就是 broker 客户端通信端口 + 1。

  2. 设置 brokerId如果是 master 则 brokerid=0 ; slave brokerid > 0。

03 总结

最后,总结下 BrokerStartup 的启动初始化流程,如下图:

未完待续