1. 首页
  2. RocketMq源码解析
  3. RocketMQ 源码之生产者剖析 一、二

RocketMQ 源码之生产者剖析 一、二

  • 发布于 2024-09-18
  • 27 次阅读

【 第一篇】图解 RocketMQ 源码之生产者启动流程剖析

这是第三篇,我们来剖析下 RocketMQ 源码之生产者启动流程剖析。

这里我将以RocketMQ 4.9.7版本为主,通过场景驱动的方式带大家一点点的对 RocketMQ 源码进行深度剖析,正式开启「RocketMQ 的源码之旅」,跟我一起来掌握 RocketMQ 源码核心架构设计思想吧。


01 总体概述

我们都知道在 RocketMQ 中,我们把产生消息的一方称为生产者即 Producer,它是 RocketMQ 核心组件之一,也是消息的来源所在。那么这些生产者产生的消息是如何传到 RocketMQ 服务端的呢?初始化启动过程是怎么样的呢?接下来会逐一讲解说明。


02 生产者初始化核心组件及流程剖析

2.1 生产者发送示例

在剖析生产者启动流程时,先看下生产者发送消息的示例代码,Producer 发送消息的示例在org.apache.rocketmq.example.simple.Producer 类中,代码如下:

// 同步发送
public class Producer {
    public static void main(String[] args) throws MQClientException, InterruptedException {
        // 1、实例化消息生产者 Producer
        DefaultMQProducer producer = new DefaultMQProducer("rocketmq-test-huazai-group");
        // 2、启动 Producer 实例
        producer.start();
        // 3、发送消息到 broker
        for (int i = 0; i < 128; i++)
            try {
                {
                    // 创建消息,并指定 Topic,Tag 和消息体
                    Message msg = new Message("TopicTestHuaZai",
                        "TagA",
                        "OrderID188",
                        "Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));
                    SendResult sendResult = producer.send(msg);
                    // 通过 sendResult 返回消息是否成功送达
                    System.out.printf("%s%n", sendResult);
                }
            } catch (Exception e) {
                e.printStackTrace();
            }
        // 如果不再发送消息,关闭 Producer 实例。
        producer.shutdown();
    }
}

从代码中不难看出,生产者发送消息只需要 3 个步骤:

  1. 创建生产者客户端 DefaultMQProducer对象。

  2. 启动 Producer。

  3. 发送消息。

那么我们就按照这三个步骤逐步分析下它是如何工作的。

2.2 创建生产者客户端对象

// 实例化消息生产者 Producer,指定生产者组名称
DefaultMQProducer producer = new DefaultMQProducer("rocketmq-test-huazai-group");

在实例化 Producer 时,会传入一个 ProducerGroup 的名称,ProducerGroup 代表同一类生产者的合集。其实这个类就是一个外观类,RocketMQ 对于 Producer 有一个默认的实现类DefaultMQProducerImpl。所以在初始化 DefaultMQProducer 时会初始化一个 DefaultMQProducerImpl 对象实例并赋值给 Producer 的成员变量,我们看下它的构造器方法。


源码位置:https://github.com/apache/rocketmq/blob/release-4.9.7/client/src/main/java/org/apache/rocketmq/client/producer/DefaultMQProducer.java

public DefaultMQProducer() {
    this(null, MixAll.DEFAULT_PRODUCER_GROUP, null);
}

public DefaultMQProducer(final String namespace, final String producerGroup, RPCHook rpcHook) {
        this.namespace = namespace;
        // 赋值 ProducerGroup 名称
        this.producerGroup = producerGroup;
        // 它才是真正工作的类,将 defaultMQProducerImpl 对象保存在成员变量中
        defaultMQProducerImpl = new DefaultMQProducerImpl(this, rpcHook);
}

同时,在初始化 DefaultMQProducerImpl 实例时也会将 producer 对象作为成员变量保存在DefaultMQProducerImpl 实例中,那么我们再看下它的构造器方法。


源码位置:https://github.com/apache/rocketmq/blob/release-4.9.7/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java

public DefaultMQProducerImpl(final DefaultMQProducer defaultMQProducer, RPCHook rpcHook) {
        // 将 defaultMQProducer 对象保存在成员变量中
        this.defaultMQProducer = defaultMQProducer;
        this.rpcHook = rpcHook;
        // 1、创建一个异步发送的队列
        this.asyncSenderThreadPoolQueue = new LinkedBlockingQueue<Runnable>(50000);
        // 2、创建一个异步发送的线程池
        this.defaultAsyncSenderExecutor = new ThreadPoolExecutor(
            Runtime.getRuntime().availableProcessors(),
            Runtime.getRuntime().availableProcessors(),
            1000 * 60,
            TimeUnit.MILLISECONDS,
            this.asyncSenderThreadPoolQueue,
            new ThreadFactory() {
                private AtomicInteger threadIndex = new AtomicInteger(0);

                @Override
                public Thread newThread(Runnable r) {
                    return new Thread(r, "AsyncSenderExecutor_" + this.threadIndex.incrementAndGet());
                }
            });
}

从该构造器中我们得出,它主要就是在做两件事:

  1. 创建一个异步发送的队列: asyncSenderThreadPoolQueue

  2. 创建一个异步发送的线程池:defaultAsyncSenderExecutor

然后是给 Producer 设置 NameServer 的地址,因为 NameServer 是一个注册中心,保存着 Broker 信息和 Topic路由信息。

2.3 生产者启动

只需要执行下面代码即可启动上方初始化的 producer 对象。

// 启动生产者客户端服务
producer.start();


producer#start()
方法具体的源码如下,其源码位置:https://github.com/apache/rocketmq/blob/release-4.9.7/client/src/main/java/org/apache/rocketmq/client/producer/DefaultMQProducer.java

public void start() throws MQClientException {
        // 基于命名空间对 ProducerGroup 再次封装, 一般用于在不同的业务场景下做隔离
        // 设置生产者组,由于 DefaultMQProducer 继承了 ClientConfig,所以可以直接使用 ClientConfig#withNamespace 方法
        this.setProducerGroup(withNamespace(this.producerGroup));
        // 调用 DefaultMQProducerImpl#start()方法启动生产者服务
        this.defaultMQProducerImpl.start();
        // 用于做消息追踪,消息轨迹追踪功能可以通过 DefaultMQProducer 对象的多个参数的构造器来开启,默认是关闭
        if (null != traceDispatcher) {
            try {
                // 启动消息轨迹追踪
                traceDispatcher.start(this.getNamesrvAddr(), this.getAccessChannel());
            } catch (MQClientException e) {
                log.warn("trace dispatcher start failed ", e);
            }
        }
}

可以看到 DefaultMQProducer 只是一个门面类,具体的实现都是由DefaultMQProducerImpl去做的,该方法也比较简单,主要做了 3 件事情:

  1. 设置生产者组,基于命名空间对 ProducerGroup 再次封装, 一般用于在不同的业务场景下做隔离。

  2. 生产者启动是通过这个对象来启动的。

  3. 用于做消息追踪,消息轨迹追踪功能可以通过 DefaultMQProducer 对象的多个参数的构造器来开启,默认是关闭。

接下来我们分别看下前两步骤的具体实现逻辑。

2.3.1 设置生产者组


由于Namespace的存在,因此在启动producer时首先会重新设置producerGroup,我们需要重点关注经过withNamespace()方法处理后返回的生产者组名。


源码位置:https://github.com/apache/rocketmq/blob/release-4.9.7/client/src/main/java/org/apache/rocketmq/client/ClientConfig.java

public String withNamespace(String resource) {
        // this.getNamespace()不设置的话返回的是 null
        return NamespaceUtil.wrapNamespace(this.getNamespace(), resource);
}

可以看出该方法仅仅是调用了NamespaceUtil#wrapNamespace()方法,并将NamespaceproducerGroup作为参数一并传入处理。


源码位置:https://github.com/apache/rocketmq/blob/release-4.9.7/common/src/main/java/org/apache/rocketmq/common/protocol/NamespaceUtil.java

public static String wrapNamespace(String namespace, String resourceWithOutNamespace) {
        // 1、如果 namespace 为空或者 resourceWithOutNamespace 为空,则直接返回resourceWithOutNamespace
        if (StringUtils.isEmpty(namespace) || StringUtils.isEmpty(resourceWithOutNamespace)) {
            return resourceWithOutNamespace;
        }
        // 如果 resourceWithOutNamespace 是 SystemResource 或者 resourceWithOutNamespace 已经组合了 Namespace,则直接返回 resourceWithOutNamespace
        if (isSystemResource(resourceWithOutNamespace) || isAlreadyWithNamespace(resourceWithOutNamespace, namespace)) {
            return resourceWithOutNamespace;
        }

        String resourceWithoutRetryAndDLQ = withOutRetryAndDLQ(resourceWithOutNamespace);
        StringBuilder stringBuilder = new StringBuilder();
        // 重试 Topic 处理标识
        if (isRetryTopic(resourceWithOutNamespace)) {
            stringBuilder.append(MixAll.RETRY_GROUP_TOPIC_PREFIX);
        }
        // 死信 Topic 处理标识
        if (isDLQTopic(resourceWithOutNamespace)) {
            stringBuilder.append(MixAll.DLQ_GROUP_TOPIC_PREFIX);
        }
        // 返回 [RETRY_PREFIX] + [DLQ_PREFIX] + namespace + % + resourceWithoutRetryAndDLQ
        return stringBuilder.append(namespace).append(NAMESPACE_SEPARATOR).append(resourceWithoutRetryAndDLQ).toString();
}

该方法也比较简单,由于我们并没有设置 Producer 的 Namespace,所以会直接返回 producerGroup。最后的效果就是在这个生产者启动过程中第一行代码没有任何效果。

2.3.2 启动生产者服务

可以看到这里最重要的是第二步,启动生产者。

this.defaultMQProducerImpl.start();

最终调用 defaultMQProducerImpl#start() 方法进行启动生产者。可以看到 DefaultMQProducer 的构造器,send() 和start() 等相关的方法,其实都是围绕 DefaultMQProducerImpl 来转,defaultMQProducerImpl:默认生产者的实现类,其 start() 方法作为生产者启动的核心方法,接下来将核心分析其 start() 方法的实现。

public void start() throws MQClientException {
       this.start(true);
}
/**
 * mq-producer 启动
 * @param startFactory
 * @throws MQClientException
 */
public void start(final boolean startFactory) throws MQClientException {
        // 如果状态为 CREATE_JUST,执行启动逻辑。该对象创建时默认状态为 CREATE_JUST
        switch (this.serviceState) {
            case CREATE_JUST:
                // 1、状态设置启动失败
                this.serviceState = ServiceState.START_FAILED;
                // 2、检查配置,比如生产者组名是否合法,是否超过最大字符限制等
                this.checkConfig();
                // 3、改变生产者的 instanceName 为进程 ID,避免同一个服务器上的多个生产者实例名相同。即将 instanceName 属性更改为 PID,如果实例名为默认值则将生产者的 instanceName设置为 UtilAll.getPid() + "#" + System.nanoTime()
                if (!this.defaultMQProducer.getProducerGroup().equals(MixAll.CLIENT_INNER_PRODUCER_GROUP)) {
                    this.defaultMQProducer.changeInstanceNameToPID();
                }
                // 4、创建客户端 MQClientInstance 对象实例,使用 MQClientManager.getInstance() 返回一个单例的MQClientManager 对象,defaultMQProducer 继承了 ClientConfig,因此 getOrCreateMQClientInstance 方法的参数可以是 defaultMQProducer,mQClientFactory 是 MQClientInstance 的一个实例,MQClientInstance 是 MQClientManager 的内部类
                this.mQClientFactory = MQClientManager.getInstance().getOrCreateMQClientInstance(this.defaultMQProducer, rpcHook);
                // 5、注册生产者信息到本地,即注册 producer 实例:将生产者组名作为key,defaultMQProducerImpl对象作为value保存到MQClientInstance的producerTable中
            // 方便后续调用网络请求、进行心跳检测等
                boolean registerOK = mQClientFactory.registerProducer(this.defaultMQProducer.getProducerGroup(), this);
                if (!registerOK) {
                    // 如果注册失败则将 serviceState 重设为 CREATE_JUST
                    this.serviceState = ServiceState.CREATE_JUST;
                    throw new MQClientException("The producer group[" + this.defaultMQProducer.getProducerGroup()
                        + "] has been created before, specify another name please." + FAQUrl.suggestTodo(FAQUrl.GROUP_NAME_DUPLICATE_URL),
                        null);
                }
                // 6、设置 topic 路由表信息,不过这里的 topic 是“TBW102”
                // 将 defaultMQProducer 的 createTopicKey 作为 key,TopicPublishInfo 作为 value,放入到 defaultMQProducerImpl的topicPublishInfoTable中,createTopicKey 的默认值为 TBW102
               // topicPublishInfoTable 的作用是存储 topic 的路由信息,包括 topic 的 queue 数目、brokerName、brokerId等
                this.topicPublishInfoTable.put(this.defaultMQProducer.getCreateTopicKey(), new TopicPublishInfo());
                // 7、启动生产者客户端 MQClientInstance 实例,如果已经启动,则不会执行
                if (startFactory) {
                    // MQClientInstance 的 start 方法会启动 MQClientInstance 的定时任务
                    // 包括定时向所有 broker 发送心跳、定时清理过期的 topic、定时清理过期的consumer、定时清理过期的 producer
                    mQClientFactory.start();
                }

                log.info("the producer [{}] start OK. sendMessageWithVIPChannel={}", this.defaultMQProducer.getProducerGroup(),
                    this.defaultMQProducer.isSendMessageWithVIPChannel());
                // 8、启动成功则将 serviceState 设置为 RUNNING
                this.serviceState = ServiceState.RUNNING;
                break;
            // 其他状态直接抛异常
            case RUNNING:
            case START_FAILED:
            case SHUTDOWN_ALREADY:
                throw new MQClientException("The producer service state not OK, maybe started once, " + this.serviceState + FAQUrl.suggestTodo(FAQUrl.CLIENT_SERVICE_NOT_OK), null);
            default:
                break;
        }
        // 9、启动后马上向 NameServer 发送心跳
        this.mQClientFactory.sendHeartbeatToAllBrokerWithLock();
        // 10、开启一个定时任务来处理所有的 Request 状态,对异步的请求根据状态处理回调函数,这个异步请求指的并不是在 send 中的异步回调机制,而是 Request-Reply 特性,用来模拟 RPC 调用
        RequestFutureHolder.getInstance().startScheduledTask(this);
}

该方法比较重要,主要用来启动生产者」,分为几个状态,如下:


  1. 对于 CREATE_JUST 状态,处理逻辑有以下几步:

  2. 状态设置启动失败。

  3. 检查配置,比如 groupName 是否为空,是否超过最大字符限制等。

  4. 将 instanceName 属性更改为 PID。

  5. 创建客户端对象。

  6. 注册生产者信息到本地。

  7. 设置 topic 路由表信息,不过这里的 topic 是TBW102

  8. 启动生产者客户端。

  9. 如果初始化状态不等于 CREATE_JUST,则异常抛出。

  10. 启动后马上向 NameServer 发送心跳。

  11. 开启一个定时任务来处理所有的 Request 状态,对异步的请求根据状态处理回调函数,这个异步请求指的并不是在 send 中的异步回调机制,而是 Request-Reply 特性,用来模拟 RPC 调用。

注意上述代码中有一段注释为 createTopicKey 的默认值为 TBW102,这个 Topic 在自动创建 topic 时有关键作用

最后的 RequestFutureHolder.getInstance().startScheduledTask(this) 用来扫描和处理过期的异步请求,但是需要注意的是这个异步请求指的并不是在 send 中的异步回调机制,而是 Request-Reply 特性,用来模拟 RPC 调用。

RocketMQ 有两种异步请求的方式,一种是在 send 方法中传入一个回调函数,当消息发送成功或失败时,会调用这个回调函数。这种方式不需要等待服务器的响应,只需要等待服务器的确认。

另一种是在 RocketMQ 4.7.0 版本后加入的 Request-Reply 特性,这种方式是模拟 RPC 调用,需要等待服务器的响应,并返回一个结果。这种方式需要使用 RequestResponseFuture 对象来封装请求和响应的信息。

接着,我们来分别看下该方法的每个具体步骤的操作。

2.3.1.1 不同状态

对于一个 Producer 实例来说,总共会有 4 种不同的状态,如下图所示:



我们在看的 Producer 启动时,它需要关心的状态就只有CREATE_JUST 状态,这也是 Producer 实例化之后默认的状态,在初始化时就会设置一个默认值。

// 默认初始化就是 CREATE_JUST 状态
private ServiceState serviceState = ServiceState.CREATE_JUST;

当其调用了start()成功之后,Producer 就会将状态修改为RUNNING 状态,失败了就会变成START_FAILED 状态

2.3.1.2 检测配置

这里主要检测 ProducerGroup 是否合法,主要是:

  1. ProducerGroup 是否为空?

  2. ProducerGroup 名称是否超过了最大长度?这个值 CHARACTER_MAX_LENGTH 默认是 255。

  3. ProducerGroup 名称是否包含非法字符?

源码如下:

// 该方法主要检测 producerGroup 的合法性
private void checkConfig() throws MQClientException {
        Validators.checkGroup(this.defaultMQProducer.getProducerGroup());
        // 生产所属组 不能等于 DEFAULT_PRODUCER,直接抛异常
        if (this.defaultMQProducer.getProducerGroup().equals(MixAll.DEFAULT_PRODUCER_GROUP)) {
            throw new MQClientException("producerGroup can not equal " + MixAll.DEFAULT_PRODUCER_GROUP + ", please specify another one.",
                null);
        }
}

public static void checkGroup(String group) throws MQClientException {
        // 检查 producer 的 groupName 是否为空
        if (UtilAll.isBlank(group)) {
            throw new MQClientException("the specified group is blank", null);
        }
        // 检查 producer 的 groupName 长度是否超过255字符
        if (group.length() > CHARACTER_MAX_LENGTH) {
            throw new MQClientException("the specified group is longer than group max length 255.", null);
        }

        // 检查 producer的 groupName 是否包含特殊字符,否则抛异常
        if (isTopicOrGroupIllegal(group)) {
            throw new MQClientException(String.format(
                    "the specified group[%s] contains illegal characters, allowing only %s", group,
                    "^[%|a-zA-Z0-9_-]+$"), null);
        }
}

2.3.1.3 创建客户端实例

源码位置:https://github.com/apache/rocketmq/blob/release-4.9.7/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java

this.mQClientFactory = MQClientManager.getInstance().getOrCreateMQClientInstance(this.defaultMQProducer, rpcHook);

public class MQClientInstance {
    ....
    // 生产者表,producer 启动时创建一个新的 MQClientInstance 实例对象,将生产者信息注册到这里。生产者实例对象中消费者信息是空
    private final ConcurrentMap<String/* group */, MQProducerInner> producerTable = new ConcurrentHashMap<String, MQProducerInner>();
    // 消费者表,consumer 启动时创建一个新的 MQClientInstance 实例对象,将消费者信息注册到这里。消费者实例对象中生产者信息是空
    private final ConcurrentMap<String/* group */, MQConsumerInner> consumerTable = new ConcurrentHashMap<String, MQConsumerInner>();
    ....
    // topic 路由信息,producer 和 consumer 都会使用
    private final ConcurrentMap<String/* Topic */, TopicRouteData> topicRouteTable = new ConcurrentHashMap<String, TopicRouteData>();
    private final Lock lockNamesrv = new ReentrantLock();
    private final Lock lockHeartbeat = new ReentrantLock();
    // broker 信息,producer 和 consumer 都会用到
    private final ConcurrentMap<String/* Broker Name */, HashMap<Long/* brokerId */, String/* address */>> brokerAddrTable =
        new ConcurrentHashMap<String, HashMap<Long, String>>();
    private final ConcurrentMap<String/* Broker Name */, HashMap<String/* address */, Integer>> brokerVersionTable =
        new ConcurrentHashMap<String, HashMap<String, Integer>>();
    // 定时执行器
    private final ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(new ThreadFactory() {
        @Override
        public Thread newThread(Runnable r) {
            return new Thread(r, "MQClientFactoryScheduledThread");
        }
    });
    // 客户端处理器
    private final ClientRemotingProcessor clientRemotingProcessor;
    // 拉取消息的服务
    private final PullMessageService pullMessageService;
    // 重平衡服务
    private final RebalanceService rebalanceService;
    private final DefaultMQProducer defaultMQProducer;
    // 消费者状态
    private final ConsumerStatsManager consumerStatsManager;
    ....
    public MQClientInstance(ClientConfig clientConfig, int instanceIndex, String clientId, RPCHook rpcHook) {
        this.clientConfig = clientConfig;
        this.instanceIndex = instanceIndex;
        this.nettyClientConfig = new NettyClientConfig();
        this.nettyClientConfig.setClientCallbackExecutorThreads(clientConfig.getClientCallbackExecutorThreads());
        this.nettyClientConfig.setUseTLS(clientConfig.isUseTLS());
         // 客户端处理器,比如在集群消费模式下,有新的消费者加入,则通知消费者客户端重平衡,主要是给消费者用的,这里可以忽略
        this.clientRemotingProcessor = new ClientRemotingProcessor(this);
         // 它的内部会创建 netty 客户端对象(NettyRemotingClient),用于和 broker 通信
        this.mQClientAPIImpl = new MQClientAPIImpl(this.nettyClientConfig, this.clientRemotingProcessor, rpcHook, clientConfig);

        if (this.clientConfig.getNamesrvAddr() != null) {
            this.mQClientAPIImpl.updateNameServerAddressList(this.clientConfig.getNamesrvAddr());
            log.info("user specified name server address: {}", this.clientConfig.getNamesrvAddr());
        }

        this.clientId = clientId;

        this.mQAdminImpl = new MQAdminImpl(this);
        // 拉取消息的服务,和消费者相关,我们这里启动的是生产者实例,和消费者无关,忽略
        this.pullMessageService = new PullMessageService(this);
        // 重平衡服务,和消费者相关,我们这里启动的是生产者实例,和消费者无关,忽略
        this.rebalanceService = new RebalanceService(this);
        // 实例化内部的 producer,用来消费失败或超时的消息,sendMessageBack 回发给 broker,放到retry topic 中重试消费。
        this.defaultMQProducer = new DefaultMQProducer(MixAll.CLIENT_INNER_PRODUCER_GROUP);
        this.defaultMQProducer.resetClientConfig(clientConfig);

        this.consumerStatsManager = new ConsumerStatsManager(this.scheduledExecutorService);

        log.info("Created a new client Instance, InstanceIndex:{}, ClientID:{}, ClientConfig:{}, ClientVersion:{}, SerializerType:{}",
            this.instanceIndex,
            this.clientId,
            this.clientConfig,
            MQVersion.getVersionDesc(MQVersion.CURRENT_VERSION), RemotingCommand.getSerializeTypeConfigInThisServer());
    }
}

注意:客户端实例是 MQClientInstance 类, 生产者和消费者都作为客户端,所以它保存了 producer 和consumer 相关的所有信息,后面我们在分析消费者启动时,还会看到根据这个类创建消费者实例。这里我们只关注生产者,所以消费者相关的内容可以忽略。

这里我们关注新初始化的 DefaultMQProducer 实例的 producerGroup="CLIENT_INNER_PRODUCER",instanceName = "DEFAULT"

接着,在初始化实例后又执行了 this.defaultMQProducer.resetClientConfig(clientConfig) 这行代码。

2.3.1.4 注册生产者信息到本地

boolean registerOK = mQClientFactory.registerProducer(this.defaultMQProducer.getProducerGroup(), this);

public synchronized boolean registerProducer(final String group, final DefaultMQProducerImpl producer) {
        if (null == group || null == producer) {
            return false;
        }
        // 这里的 this 指的就是 MQClientInstance
        // 很简单,就是将生产者信息注册到 MQClientInstance 对象中的 producerTable 表中。
        MQProducerInner prev = this.producerTable.putIfAbsent(group, producer);
        if (prev != null) {
            log.warn("the producer group[{}] exist already.", group);
            return false;
        }

        return true;
}

该方法很简单,就是将生产者信息注册到 MQClientInstance 对象中维护的 producerTable 表中,里面包含了会存储当前客户端中 Producer 的一些信息。但大家应该还记得前面提到过 Consumer 内部也会使用 MQClientInstance,在其内部还有一个consumerTable,用于存储消费者客户端里的所有 Consumer 信息。

2.3.1.5 设置 Topic 路由表

// 先有个印象就行,它放置的 topic=“TBW102”,等后面分析 producer 发送消息时再详细说明。
this.topicPublishInfoTable.put(this.defaultMQProducer.getCreateTopicKey(), new TopicPublishInfo());

2.3.1.6 启动生产者客户端

源码位置:https://github.com/apache/rocketmq/blob/release-4.9.7/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java

if (startFactory) {
    // 7、启动生产者客户端
    mQClientFactory.start();
}

/**
 * 启动客户端代理
 *
 * @throws MQClientException
 */
public void start() throws MQClientException {
        // 用synchronized修饰保证线程安全性与内存可见性
        synchronized (this) {
            switch (this.serviceState) {
                case CREATE_JUST:
                    this.serviceState = ServiceState.START_FAILED;
                    // 我们一般都为生产者指定 nameserver 地址,所以这里为 false
                    // 由于传入了 NameServer 的地址,因此不进入分支
                    if (null == this.clientConfig.getNamesrvAddr()) {
                        this.mQClientAPIImpl.fetchNameServerAddr();
                    }
                    // 启动用于和 broker 通信的 netty 客户端 
                    this.mQClientAPIImpl.start();
                    // 启动定时任务,包括心跳,拉取topic路由信息,更新broker信息,清理过期消息等
                    this.startScheduledTask();
                    // 启动拉取消息的服务,这个是和消费者相关的,我们这里是 producer,忽略
                    this.pullMessageService.start();
                    // 启动重平衡服务,它和消费者相关,忽略
                    this.rebalanceService.start();
                    // Start push service
                    // 当消费失败的时候,需要把消息发回去
                    this.defaultMQProducer.getDefaultMQProducerImpl().start(false);
                    log.info("the client factory [{}] start OK", this.clientId);
                    this.serviceState = ServiceState.RUNNING;
                    break;
                case START_FAILED:
                    throw new MQClientException("The Factory object[" + this.getClientId() + "] has been created before, and failed.", null);
                default:
                    break;
            }
        }
}

启动方法也比较简单,主要做了以下几件事情:

  1. 为生产者指定 nameserver 地址。

  2. 启动 netty 客户端。

  3. 启动定时任务。

  4. 启动拉取消息的服务,这个是和消费者相关的,我们这里是 producer,忽略。

  5. 启动 push 服务。

此时我们先来看下启动定时任务都做了哪些?

2.3.1.7 启动定时任务

/**
 * 启动定时任务
 */
private void startScheduledTask() {
        if (null == this.clientConfig.getNamesrvAddr()) {
             // 如果没有指定namesrv地址,则定时获取namesrv地址
            this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {

                @Override
                public void run() {
                    try {
                        MQClientInstance.this.mQClientAPIImpl.fetchNameServerAddr();
                    } catch (Exception e) {
                        log.error("ScheduledTask fetchNameServerAddr exception", e);
                    }
                }
            }, 1000 * 10, 1000 * 60 * 2, TimeUnit.MILLISECONDS);
        }

        /**
          * 默认每隔 30s 从 NameServer 获取 Topic 路由信息
          * 包括 生产者和消费者
          */
        this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {

            @Override
            public void run() {
                try {
                    MQClientInstance.this.updateTopicRouteInfoFromNameServer();
                } catch (Exception e) {
                    log.error("ScheduledTask updateTopicRouteInfoFromNameServer exception", e);
                }
            }
        }, 10, this.clientConfig.getPollNameServerInterval(), TimeUnit.MILLISECONDS);

        /**
          * 每 30s 向 Broker 端发送心跳包,
          * 1. 清除离线的 Broker
          * 2. 汇报心跳给 Broker
          */
        this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {

            @Override
            public void run() {
                try {
                    MQClientInstance.this.cleanOfflineBroker();
                    MQClientInstance.this.sendHeartbeatToAllBrokerWithLock();
                } catch (Exception e) {
                    log.error("ScheduledTask sendHeartbeatToAllBroker exception", e);
                }
            }
        }, 1000, this.clientConfig.getHeartbeatBrokerInterval(), TimeUnit.MILLISECONDS);


        /**
         * 每隔 5s 持久化消费者当前消费进度 offset 持久化(对MQConsumer有效)
         */
        this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {

            @Override
            public void run() {
                try {
                    MQClientInstance.this.persistAllConsumerOffset();
                } catch (Exception e) {
                    log.error("ScheduledTask persistAllConsumerOffset exception", e);
                }
            }
        }, 1000 * 10, this.clientConfig.getPersistConsumerOffsetInterval(), TimeUnit.MILLISECONDS);

        /**
         * 每隔 60s 根据当前的积压调优线程池的核心线程数
         */
        this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {

            @Override
            public void run() {
                try {
                    MQClientInstance.this.adjustThreadPool();
                } catch (Exception e) {
                    log.error("ScheduledTask adjustThreadPool exception", e);
                }
            }
        }, 1, 1, TimeUnit.MINUTES);
}

分别有以下5大定时任务:

  1. 获取 NameServer 地址:这里值得注意的是,只有在 Producer 没有指定 NameServer 地址时才会注册、运行这个定时任务,也就是以下的代码:

producer.setNamesrvAddr("127.0.0.1:9876");
  1. 定时从 NameServer 更新本地维护的 Topic 相关数据:这里是批量地运行,也就是说 MQClientInstance 在运行这部分更新数据的逻辑是不会关心是 Producer 还是 Consumer,它会从两个 Table 中解析出所有的 Topic 的列表,然后批量地去 NameServer 更新数据,因为无论是 Producer 还是 Consumer 都需要使用到这些元数据。

  1. 定时清理无效的、下线的 Broker:这里就根据拿到 Topic 元数据当中的 Broker 相关数据,和本地维护的 Broker 数据进行对比,清理掉在 Topic 元数据中不存在的 Broker。

  2. 定时向所有的 Broker 发送心跳:简单来看有两层含义:一是告诉 Broker 我还活着,二是定时刷新 Broker 存的客户端数据。发送心跳的可以是 Producer,也可以是 Consumer,具体看谁在使用 MQClientInstance

  3. 如果是 Producer,那么心跳所包含的数据很少,就只有当前客户端的所有生产者组。

  4. 如果是 Consumer,那数据就多了,比如都有哪些消费者组的名称、消费的模式是广播还是集群、从哪里开始消费数据、消费者消费的 Topic 的简要数据等。

  5. 定时持久化 Offset(只针对 Consumer):如果当前客户端是 Consumer,就会将当前消费到哪儿了持久化起来,不然下次重启就不知道从哪里开始,从头开始?那已经消费过的消息再消费一次不就变成重复消费了吗?所以定时持久化 Offset 是非常必要的一个操作。

剖析完生产者启动大致流程后,我们来看看消息轨迹启动」。



2.4 消息轨迹追踪

traceDispatcher 又是个什么东西?

traceDispatcher 的作用是追踪消息的发送和消费轨迹,它是一个AsyncTraceDispatcher对象,它实现了TraceDispatcher接口,用来异步地发送追踪消息到 Broker。

它可以帮助用户查看每条消息的完整链路数据,包括发送时间消费时间存储时间等。我们可以通过使用下面的构造函数构造出一个含有 traceDispatcherDefaultMQProducer 实例。

public DefaultMQProducer(final String namespace, final String producerGroup, RPCHook rpcHook,
        boolean enableMsgTrace, final String customizedTraceTopic) {
        this.namespace = namespace;
        this.producerGroup = producerGroup;
        defaultMQProducerImpl = new DefaultMQProducerImpl(this, rpcHook);
        //if client open the message trace feature
        if (enableMsgTrace) {
            try {
                AsyncTraceDispatcher dispatcher = new AsyncTraceDispatcher(producerGroup, TraceDispatcher.Type.PRODUCE, customizedTraceTopic, rpcHook);
                dispatcher.setHostProducer(this.defaultMQProducerImpl);
                traceDispatcher = dispatcher;
                this.defaultMQProducerImpl.registerSendMessageHook(
                    new SendMessageTraceHookImpl(traceDispatcher));
                this.defaultMQProducerImpl.registerEndTransactionHook(
                    new EndTransactionTraceHookImpl(traceDispatcher));
            } catch (Throwable e) {
                log.error("system mqtrace hook init failed ,maybe can't send msg trace data");
            }
        }
}

由于我们在初始化 DefaultMQProducer 实例时没有生成 traceDispatcher 实例,因此 null != traceDispatcher 返回 FALSE,不调用 traceDispatcher#start 方法。



03 总结

执行 producer.start() 启动一个 producer。

  1. 重新设置生产者组名

  2. 调用 defaultMQProducerImpl 的 start() 方法,是进行启动实现的入口

  3. 检查当前状态,如果是 CREATE_JUST 则进入启动流程

  4. 检查生产者组名称是否合法

  5. 更改 producer 的 instanceName

  6. 创建一个 MQClientInstance 类型的 mQClinetFactory 实例

  7. 创建一个新的 defaultMQProducer 实例作为 mQClinetFactory 实例的成员变量

  8. 新的 defaultMQProducer 实例内部又会创建一个新的 defaultMQProducerImpl 实例

  9. 将新的 defaultMQProducer 实例的 ClinetConfig 属性复制粘贴为 producer 的 ClinetConfig 属性

  10. 将 producer 实例放入 mQClinetFactory 的 producerTable 中,key为 producer 的生产者组名

  11. 将 defaultMQProducer 的 createTopicKey 作为key,TopicPublishInfo 作为value,放入到 defaultMQProducerImpl 的 topicPublishInfoTable 中

  12. 启动 mQClinetFactory

  13. 如果没有 NameServer 地址则尝试获取

  14. 启动用于和 Broker 通信的 netty 客户端

  15. 启动定时任务

  16. 如果没有指定 NameServer 地址,则定时获取 NameServer 地址

  17. 定期从 NameServer 更新 topic 路由信息

  18. 定期清除离线 Broker,并向所有 Broker 发送心跳包

  19. 定时持久化消费者当前消费进度(对 MQConsumer 有效)

  20. 定时根据当前的积压调优线程池的核心线程数,但是实现是空的

  21. 启动 pullMessageService 从 Broker 拉取消息

  22. 启动消费者客户端的负载均衡服务

  23. 启动 mQClinetFactory 内部的 defaultMQProducerImpl 实例

  24. 检查当前状态,如果是 CREATE_JUST 则进入启动流程

  25. 检查生产者组名称是否合法

  26. 由于其 instanceName 等于 MixAll.CLIENT_INNER_PRODUCER_GROUP,因此不更改

  27. 创建一个 MQClientInstance 类型的 mQClinetFactory 实例

  28. 将 producer 实例放入 mQClinetFactory 的 producerTable 中,key为 producer 的生产者组名

  29. 将 defaultMQProducer 的 createTopicKey 作为key,TopicPublishInfo 作为value,放入到 defaultMQProducerImpl 的 topicPublishInfoTable 中

  30. 将当前状态设置为 RUNNING

  31. 如果上述都成功,则立即发送心跳到所有的 Broker

  32. 启动定时任务扫描和处理过期的异步请求

  33. 将当前状态设置为 RUNNING

  34. 如果上述都成功,则立即发送心跳到所有的 Broker

  35. 启动定时任务扫描和处理过期的异步请求

  36. 如果 traceDispatch 不为空则启动 traceDispatcher

【第二篇】图解 RocketMQ 源码之生产者发送消息核心流程剖析


今天开始我将为大家奉上 RocketMQ 生产者源码剖析系列文章,正式开启「RocketMQ 的生产者源码之旅」,这是第四篇,我们来剖析下 RocketMQ 源码之生产者发送消息核心流程剖析。

这里我将以RocketMQ 4.9.7版本为主,通过场景驱动的方式带大家一点点的对 RocketMQ 源码进行深度剖析,正式开启「RocketMQ 的源码之旅」,跟我一起来掌握 RocketMQ 源码核心架构设计思想吧。

01 总体概述

我们都知道在 RocketMQ 中,我们把产生消息的一方称为生产者即 Producer,它是 RocketMQ 核心组件之一,也是消息的来源所在。那么这些生产者产生的消息是如何传到 RocketMQ 服务端的呢?本篇我们主要来聊聊生产者发送消息的整个流程是怎么样的呢?接下来会逐一讲解说明。


02 生产者发送消息流程

2.1 生产者发送示例

在剖析生产者启动流程时,先看下生产者发送消息的示例代码,Producer 发送消息的示例在org.apache.rocketmq.example.simple.Producer 类中,代码如下:

// 同步发送
public class Producer {
    public static void main(String[] args) throws MQClientException, InterruptedException {
        // 1、实例化消息生产者 Producer
        DefaultMQProducer producer = new DefaultMQProducer("rocketmq-test-huazai-group");
        // 2、启动 Producer 实例
        producer.start();
        // 3、发送消息到 broker
        for (int i = 0; i < 128; i++)
            try {
                {
                    // 创建消息,并指定 Topic,Tag 和消息体
                    Message msg = new Message("TopicTestHuaZai",
                        "TagA",
                        "OrderID188",
                        "Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));
                    SendResult sendResult = producer.send(msg);
                    // 通过 sendResult 返回消息是否成功送达
                    System.out.printf("%s%n", sendResult);
                }
            } catch (Exception e) {
                e.printStackTrace();
            }
        // 如果不再发送消息,关闭 Producer 实例。
        producer.shutdown();
    }
}

从代码中不难看出,生产者发送消息只需要 3 个步骤:

  1. 创建生产者客户端 DefaultMQProducer对象。

  2. 启动 Producer。

  3. 发送消息。

2.2 基础知识梳理

2.2.1 三种消息发送方式

RocketMQ 支持 3 种消息发送方式:同步、异步和单向。

  1. 同步 Sync

  2. 发送消息时,同步等待,直到 Broker 返回发送结果。

  3. 这种方式最为可靠,但是发送性能最差。

  4. 在一些可靠性要求非常高的场景下,推荐使用同步方式。比如:重要的消息通知,短信通知。

  5. 异步 Async

  6. 发送消息时,无需等待 Broker 返回发送结果,发送线程不阻塞。执行发送 API 时指定消息发送成功后的回调函数。

  7. 这种方式相比于同步发送,性能可以提升多个数量级,但可靠性不如同步发送。

  8. 在对响应时间敏感、流量较大的场景下,推荐使用异步方式。异步发送是使用最广泛的发送方式。

  9. 单向 One-way

  10. 发送消息时,直接返回,不等待 Broker 返回发送结果,也不注册回调函数。

  11. 这种发送方式性能最高,可靠性最差。它不关心发送结果,不在乎消息是否成功存储在 Broker 中。

  12. 适用于消息丢失也没有太大影响的场景,例如发送日志。

这三种发送方式中,异步发送时最为广泛使用的发送方式。配合一些重试和补偿机制,可以达成较高的可靠性和很高的性能。

2.2.2 特殊消息类型梳理

下面简单介绍一下几种特殊消息类型。

  1. 普通消息:发送效率最高、使用场景最广泛的消息类型。普通消息可以由客户端并发发送。不保证普通消息消费的顺序。单 Broker 性能可以达到十万级别。(视 Broker 配置而变)

  2. 队列级别顺序消息:RocketMQ 将一个 Topic 分为多个队列,以提高消费速度。每隔分区内的消息遵循先生产先消费的顺序。

  3. Topic 级别顺序消息:如果把一个 Topic 的队列数量设为 1,那么该 Topic 中的消息也遵循先生产先消费。

  4. 延迟消息:消息发送后,消费者并不马上收到消息,而是等待一段指定的时间之后才能消费到该消息。

  5. 事务消息:提供分布式事务功能,可以保证发送消息和另外的操作同时成功或者同时失败。

  6. 批量消息:将多个消息包装成一个批量消息,一起发送。降低网络传输次数,提升传输效率。

2.2.3 路由机制

RocketMQ 的 Topic 可以分为多个队列,每个队列可能分布在不同 Broker 上。

消息的路由指的是发送消息时需要先获取 Topic 的路由信息(其中包含每个 Topic 的队列和它们所在的 Broker 地址),然后选择一个队列进行发送。

消息发送的 API 提供了参数,可以传入要发送的队列信息,或者传入队列选择方法,以供用户选择发往某个 Broker 的具体队列。

2.3 生产者发送消息过程

消息发送的流程涉及到 RocketMQ 的三个组件:生产者BrokerNameServer。其中生产者负责发送消息,Broker 负责处理消息发送请求,NameServer 负责更新和提供路由信息。

2.3.1 消息类型梳理

在 RocketMQ 中的消息类主要有 3 个:

  1. Message为客户端需要使用的消息类。

  2. MessageExt为消息扩展属性类,它扩展了Message,在 Broker 上产生此对象。

  3. MessageExtBrokerInner是存储内部使用的 Message 对象,在 rocketmq-store 模块使用。

在发送消息时,用到的是Message,可以指定消息的属性消息体」和 「flag

2.3.2 生产者类图梳理

  1. 首先,DefaultMQProducer是 RocketMQ 中默认的生产者实现,它实现了 MQAdmin 接口。

  2. DefaultMQProducer内部包装了一个DefaultMQProducerImpl 字段,它是生产者的具体实现类,DefaultMQProducer调用它内部的DefaultMQProducerImpl来发送命令。

  3. DefaultMQProducerImpl内部注册了一个MQClientInstance字段。MQClientInstance是与 NameServer 和 Broker 通信的桥梁。MQClientInstanceClientId是一一对应的,而 ClientIdclientIpinstanceNameunitName组成。如果不手动修改,一般来说一个启动的客户端进程只有一个MQClientInstance实例,这样可以节省客户端资源。

  4. MQClientInstance内部的producerTable注册了 ClientIdDefaultMQProducerImpl的对应关系。

  5. MQClientAPIImpl提供了发送消息的 API,它调用RemotingClient执行发送。

2.3.3 生产者发送流程

源码位置:https://github.com/apache/rocketmq/blob/release-4.9.7/client/src/main/java/org/apache/rocketmq/client/producer/DefaultMQProducer.java

@Override
public SendResult send(
        Message msg) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
        // 1、如果设置了 namespace 则对 topic 进行包装,一般都不会设置 namespace,所以返回源生 topic
        msg.setTopic(withNamespace(msg.getTopic()));
        // 2、调用 DefaultMQProducerImpl 对象的 send(msg) 方法发送消息
        return this.defaultMQProducerImpl.send(msg);
}

可以看到在发送消息时 DefaultMQProducer 也只是一个门面类,具体的实现都是由DefaultMQProducerImpl去做的,主要两个步骤:

  1. 如果设置了 namespace,则对 topic 进行包装,一般都不会设置 namespace,所以返回源生 topic。

  2. 调用 DefaultMQProducerImpl 对象的 send(msg) 方法发送消息。

接着我们来看下 DefaultMQProducerImpl 类的实现。

源码位置:https://github.com/apache/rocketmq/blob/release-4.9.7/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java

/**
 * DEFAULT SYNC -------------------------------------------------------
 */
public SendResult send(
        Message msg) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
        // 发送超时时间
        return send(msg, this.defaultMQProducer.getSendMsgTimeout());
}

// SYNC 同步
public SendResult send(Message msg,
        long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
        // 可以看到这里默认采用 SYNC 的方式
        return this.sendDefaultImpl(msg, CommunicationMode.SYNC, null, timeout);
}

总共有 4 个参数:

  1. msg:就是我们最开始打包好的 Message。

  2. communicationMode:发送模式的枚举类,其实就是刚刚讲过的SYNC、ASYNC、ONEWAY。

  3. sendCallback:当 CommunicationMode 为ASYNC时,才会传入的回调函数。

  4. timeout:发送的超时时间,如果发送流程卡住了,Producer 不可能一直在这里等待,等到超过了指定的超时时间,就会抛出异常。


可以看到,基本就是继续调用了几个函数以补齐缺失的参数如超时时间、发送消息的类型和回调函数(由于是同步发送因此回调函数为 null),发送消息的逻辑则主要是在sendDefaultImpl方法中实现的。


源码位置:https://github.com/apache/rocketmq/blob/release-4.9.7/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java

/**
 * 发送消息实现
 * - 验证合法性 checkMessage
 * - 查找主题路由信息 tryToFindTopicPublishInfo
 * - 选择消息队列 selectOneMessageQueue
 * - 发送消息 sendKernelImpl
 *
 * @param msg
 * @param communicationMode
 * @param sendCallback
 * @param timeout
 * @return
 * @throws MQClientException
 * @throws RemotingException
 * @throws MQBrokerException
 * @throws InterruptedException
 */
private SendResult sendDefaultImpl(
        Message msg,
        final CommunicationMode communicationMode,
        final SendCallback sendCallback,
        final long timeout
    ) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
        // 1、检查生产者处于运行状态,确认生产者处于 RUNNING 状态 
        this.makeSureStateOK();
        // 2、检查消息是否合法
        Validators.checkMessage(msg, this.defaultMQProducer);
        final long invokeID = random.nextLong();
        long beginTimestampFirst = System.currentTimeMillis();
        long beginTimestampPrev = beginTimestampFirst;
        long endTimestamp = beginTimestampFirst;
        // 3、从 nameserever 获取 topic 的路由信息 
        TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic());
        // 4、topicPublishInfo不为空且可用
        if (topicPublishInfo != null && topicPublishInfo.ok()) {
            boolean callTimeout = false;
            MessageQueue mq = null;
            Exception exception = null;
            SendResult sendResult = null;
            // 4.1、计算重试次数,CommunicationMode.SYNC 为 3 次
            int timesTotal = communicationMode == CommunicationMode.SYNC ? 1 + this.defaultMQProducer.getRetryTimesWhenSendFailed() : 1;
            int times = 0;
            String[] brokersSent = new String[timesTotal];
            // 循环执行发送,处理同步发送重试。同步发送共重试 timesTotal 次,默认为 2 次,异步发送只执行一次
            for (; times < timesTotal; times++) {
                String lastBrokerName = null == mq ? null : mq.getBrokerName();
                // 选择一个 queue
                MessageQueue mqSelected = this.selectOneMessageQueue(topicPublishInfo, lastBrokerName);
                if (mqSelected != null) {
                    mq = mqSelected;
                    brokersSent[times] = mq.getBrokerName();
                    try {
                        beginTimestampPrev = System.currentTimeMillis();
                        if (times > 0) {
                            //Reset topic with namespace during resend.
                            msg.setTopic(this.defaultMQProducer.withNamespace(msg.getTopic()));
                        }
                        long costTime = beginTimestampPrev - beginTimestampFirst;
                        if (timeout < costTime) {
                            callTimeout = true;
                            break;
                        }
                        // 4.1.2、发送消息  
                        sendResult = this.sendKernelImpl(msg, mq, communicationMode, sendCallback, topicPublishInfo, timeout - costTime);
                        endTimestamp = System.currentTimeMillis();
                        // 处理发送异常,更新失败条目
                        this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, false);
                        switch (communicationMode) {
                            case ASYNC:
                                return null;
                            case ONEWAY:
                                return null;
                            case SYNC:
                                if (sendResult.getSendStatus() != SendStatus.SEND_OK) {
                                    if (this.defaultMQProducer.isRetryAnotherBrokerWhenNotStoreOK()) {
                                        continue;
                                    }
                                }

                                return sendResult;
                            default:
                                break;
                        }
                    } catch (RemotingException e) {
                       ....
                    } catch (MQClientException e) {
                       ....
                    } catch (MQBrokerException e) {
                       ....
                    } catch (InterruptedException e) {
                       ....
                    }
                } else {
                    break;
                }
            }
            // 发送成功,返回发送结果
            if (sendResult != null) {
                return sendResult;
            }
            ....
        }
        // 5、校验 NameServer 配置是否正确
        validateNameServerSetting();

        throw new MQClientException("No route info of this topic: " + msg.getTopic() + FAQUrl.suggestTodo(FAQUrl.NO_TOPIC_ROUTE_INFO),
            null).setResponseCode(ClientErrorCode.NOT_FOUND_TOPIC_EXCEPTION);

这个方法代码比较多,其实主要就是三件事:


  1. 确认生产者处于 RUNNING 状态。

  2. 检查消息是否合法。

  3. Nameserever 获取 topic 的路由信息。

  4. topicPublishInfo 不为空且可用,计算重试次数。

  5. 同步发送 CommunicationMode.SYNC 失败重试时,对于普通消息,消息发送默认采用round-robin策略来选择所发送到的队列。如果发送失败, 默认重试3次。但在重试时是不会选择上次发送失败的Broker,而是选择其它Broker,这是因为它具有失败隔离功能,使Producer尽量选择未发生过发送失败的Broker作为目标Broker。其可以保证其它消息尽量不发送到问题Broker,为了提升消息发送效率,降低消息发送耗时。

  6. 异步发送失败重试时,异步重试不会选择其他broker,仅在同一个broker上做重试,所以该策略无法保证消息不丢。

  7. 选择一个 MessageQueue,其中 MessageQueue 是逻辑消费队列,可以暂时理解为它就是消费者的 ConsumeQueue

  8. 发送消息。

  9. 最后如果 NameServer 配置为空则抛出 No name server address 异常,否则抛出 No route info of this topic 异常。

2.3.3.1 消息校验

这里先来看下校验方法。

// 源码位置:
// 子项目: client
// 包名: org.apache.rocketmq.client;
// 文件: Validators
// 行数: 56
public static void checkMessage(Message msg, DefaultMQProducer defaultMQProducer) throws MQClientException {
        // 最基本的检查
        if (null == msg) {
            throw new MQClientException(ResponseCode.MESSAGE_ILLEGAL, "the message is null");
        }
        // 检查 Topic 是否合法
        Validators.checkTopic(msg.getTopic());
        Validators.isNotAllowedSendTopic(msg.getTopic());

        // 判断 body 字段是否是 nil
        if (null == msg.getBody()) {
            throw new MQClientException(ResponseCode.MESSAGE_ILLEGAL, "the message body is null");
        }
        // 判断 body 是否长度为 0 
        if (0 == msg.getBody().length) {
            throw new MQClientException(ResponseCode.MESSAGE_ILLEGAL, "the message body length is zero");
        }
        // 判断 body 的长度是否超过最大的长度, MaxMessageSize 默认是 4M
        if (msg.getBody().length > defaultMQProducer.getMaxMessageSize()) {
            throw new MQClientException(ResponseCode.MESSAGE_ILLEGAL,
                "the message body size over max value, MAX: " + defaultMQProducer.getMaxMessageSize());
        }
}

2.3.3.2 消息重投机制

消息重投机制 是 RocketMQ 自带的、在投递消息失败的情况下的重试机制。该机制能够最大限度地保证消息的成功投递,并保证消息不丢失。

在享受消息重投带来成功投递保障的同时,在某些极端情况下也可能会导致消息重复的问题。

比如 Producer 以为投递失败了,但实际上可能只是响应超时导致了报错,消息实际上已经成功地存储在了 Broker 上,此时再进行重试就会导致重复。

说了这么多,那么消息重投到底会重投多少次呢?废话不多说,上关键源码

// 源码位置:
// 子项目: client
// 包名: org.apache.rocketmq.client.impl.producer;
// 文件: DefaultMQProducerImpl
// 行数: 554
// 计算重试次数,CommunicationMode.SYNC 为 3 次
int timesTotal = communicationMode == CommunicationMode.SYNC ? 1 + this.defaultMQProducer.getRetryTimesWhenSendFailed() : 1;

这里的communicationMode默认都是SYNC,所以这里数值的计算就由1 + this.defaultMQProducer.getRetryTimesWhenSendFailed()来确定了,至于 retryTimesWhenSendFailed,它是可配置的,其默认值为2

// 源码位置:
// 子项目: client
// 包名: org.apache.rocketmq.client.producer;
// 文件: DefaultMQProducer
// 行数: 112
private int retryTimesWhenSendFailed = 2;

所以,在默认情况下timesTotal计算的结果就为3


上面的timesTotal严格来说不代表重投次数,而是总的投递次数,因为无论怎么样,始终都会发送一次,所以留给重试的次数就只有3 - 1 = 2次。

2.3.4 从 NameServer 获取 Topic 路由消息

下面我们重点看下从 Nameserver 获取 Topic 路由消息的过程,简单的示意图如下:

DefaultMQProducerImpl#sendDefaultImpl() 方法里,有一行非常关键的代码,如下:

// 尝试从本地缓存 TopicPublishInfo 中获取,如果不存在,再从 NameServer 中拉取
TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic());


其实看到这行代码,或许你已经什么都明白了,也就是每次当发送消息的时候,都会先去检查一下要发送消息的Topic 的路由数据是否在你客户端本地缓存 TopicPublishInfo 表中,如果不存在的话会发送请求到NameServer那里去拉取一下的,然后缓存在客户端本地缓存 TopicPublishInfo 表中。


源码位置:https://github.com/apache/rocketmq/blob/release-4.9.7/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java

在发送消息的源码中所指定的 topic 并没有在发送前去创建,那么它是如何选择的呢? 我们先看它获取 topic 路由信息的源码实现:

private TopicPublishInfo tryToFindTopicPublishInfo(final String topic) {
        // 1、从本地缓存 TopicPublishInfo 中尝试获取,第一次肯定为空   
        TopicPublishInfo topicPublishInfo = this.topicPublishInfoTable.get(topic);
        // 如果路由信息没有找到,则从 NameServer 上获取
        if (null == topicPublishInfo || !topicPublishInfo.ok()) {
            // 2、先写入一条数据到 topicPublishInfoTable 表中
            this.topicPublishInfoTable.putIfAbsent(topic, new TopicPublishInfo());
            // 3、尝试从 NameServer 获取特定 topic 路由信息并更新本地缓存配置
            this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic);
            // 再重新获取一次
            topicPublishInfo = this.topicPublishInfoTable.get(topic);
        }
        // 4、如果找到可用的路由信息并返回
        if (topicPublishInfo.isHaveTopicRouterInfo() || topicPublishInfo.ok()) {
            return topicPublishInfo;
        } else {
            // 5、如果第一次未找到路由信息,则再次尝试使用默认的 topic 获取路由信息
            // 核心,注意看,第二个参数是true
            this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic, true, this.defaultMQProducer);
            topicPublishInfo = this.topicPublishInfoTable.get(topic);
            return topicPublishInfo;
        }
}

该方法比较重要,假如第一次对 topic 为 test-huazai-topic 进行发送消息,但没有通过其他方式创建过该 topic,流程如下:

  1. 首先会从本地的 topicPublishInfoTable 表中获取 Topic 路由信息,由于之前没有向 topic 发送过消息,所以第一次从本地找不到。

  2. 此时向 topicPublishInfoTable 表中添加空白 topicPublishInfo

  3. 尝试从 NameServer 获取特定 topic 路由信息并更新本地缓存配置。

  4. 再次从本地的 topicPublishInfoTable 表中获取 Topic 路由信息。

  5. 如果找到可用的路由信息并返回。

  6. 如果第一次未找到路由信息,再次从 NameServer 中使用默认的 topic 获取路由信息,但是,请注意第二个参数为 true。

2.3.5 获取并更新 Topic 路由消息

从上面源码可知此方法被调用了两次,第一次尝试从 NameServer 获取特定 topic 路由信息并更新本地缓存配置失败,第二次尝试使用默认的 topic 获取路由信息。

// 尝试使用特定的 topic 从 NameServer 获取路由信息并更新本地缓存配置
this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic);
// 尝试使用默认的 topic 从 NameServer 获取获取路由信息并更新本地缓存配置。核心,注意看,第二个参数是true
this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic, true, this.defaultMQProducer);

该方法根据参数不同会分为两种情况来处理,接下来我们来分别看下两种情况下的处理逻辑。


2.3.5.1 使用特定 Topic 获取路由消息

第一次尝试使用特定 topic 获取路由信息,调用方法为 updateTopicRouteInfoFromNameServer(topic)

public boolean updateTopicRouteInfoFromNameServer(final String topic) {
        // 此方法又会调用其重载方法,其中 isDefault 传入的值为 false
        return updateTopicRouteInfoFromNameServer(topic, false, null);
}

// 由于方法的代码比较多,这里我们只看代码执行过的部分
public boolean updateTopicRouteInfoFromNameServer(final String topic, boolean isDefault,
        DefaultMQProducer defaultMQProducer) {
        TopicRouteData topicRouteData;
        // 默认第一次不会执行 if 分支
        if (isDefault && defaultMQProducer != null) {
            // 如果 isDefault 是 true,则获取 Rocketmq 内部默认的 topic 路由信息
            ....
        } else {
           // 从 nameserver 获取用户指定的topic的路由信息,其中 getTopicRouteInfoFromNameServer 方法通过 Netty 使用 RPC 调用获取 Topic 路由信息
           topicRouteData = this.mQClientAPIImpl.getTopicRouteInfoFromNameServer(topic, clientConfig.getMqClientApiTimeout());
        }
}

可以看到这里第一次执行的是 else 分支代码,其中getTopicRouteInfoFromNameServer方法通过 Netty 使用 RPC 调用获取 Topic 路由信息。

public TopicRouteData getTopicRouteInfoFromNameServer(final String topic, final long timeoutMillis)
        throws RemotingException, MQClientException, InterruptedException {
        // allowTopicNotExist = true
        return getTopicRouteInfoFromNameServer(topic, timeoutMillis, true);
}

public TopicRouteData getTopicRouteInfoFromNameServer(final String topic, final long timeoutMillis,boolean allowTopicNotExist) throws MQClientException, InterruptedException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException {
        GetRouteInfoRequestHeader requestHeader = new GetRouteInfoRequestHeader();
        // 请求头中设置 topic 信息
        requestHeader.setTopic(topic);

        RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_ROUTEINFO_BY_TOPIC, requestHeader);

        RemotingCommand response = this.remotingClient.invokeSync(null, request, timeoutMillis);
        assert response != null;
        switch (response.getCode()) {
            // 当我们向一个不存在的 topic 发送消息时,会进入 case ResponseCode.TOPIC_NOT_EXIST 分支。又因为 allowTopicNotExist 传入的值为 true,所以打印警告并抛出异常,方法结束
            case ResponseCode.TOPIC_NOT_EXIST: {
                if (allowTopicNotExist) {
                    log.warn("get Topic [{}] RouteInfoFromNameServer is not exist value", topic);
                }

                break;
            }
            // 如果成功的话
            case ResponseCode.SUCCESS: {
                byte[] body = response.getBody();
                if (body != null) {
                    // 解析路由信息数据
                    return TopicRouteData.decode(body, TopicRouteData.class);
                }
            }
            default:
                break;
        }

        throw new MQClientException(response.getCode(), response.getRemark());
}

这里当我们向一个不存在的 topic 发送消息时,会进入 case ResponseCode.TOPIC_NOT_EXIST 分支。又因为 allowTopicNotExist 传入的值为 true,所以打印警告并抛出异常,方法结束。


2.3.5.2 使用默认 Topic 获取路由消息

第二次获取时调用了 updateTopicRouteInfoFromNameServer(final String topic, boolean isDefault, DefaultMQProducer defaultMQProducer) ,注意:其中 isDefault 传入的值为 true。

public boolean updateTopicRouteInfoFromNameServer(final String topic, boolean isDefault,
        DefaultMQProducer defaultMQProducer) {
   TopicRouteData topicRouteData;
   
   if (isDefault && defaultMQProducer != null) {
        // 此时 isDefault 是 true,则从 NameServer 获取默认的topic路由信息
        topicRouteData = this.mQClientAPIImpl.getDefaultTopicRouteInfoFromNameServer(defaultMQProducer.getCreateTopicKey(),
                            clientConfig.getMqClientApiTimeout());
        // 如果可以获取到 topic 路由信息
        if (topicRouteData != null) { 
            // 修正topic路由信息中的读写队列数,使其最大不超过默认的topic队列数
            for (QueueData data : topicRouteData.getQueueDatas()) {
                // broker 端模板配置的读写队列都是 8,生产者端默认配置的是4,所以取二者小的,就是4
                int queueNums = Math.min(defaultMQProducer.getDefaultTopicQueueNums(), data.getReadQueueNums());
                 data.setReadQueueNums(queueNums);
                 data.setWriteQueueNums(queueNums);
             }
         }
    }
}


可以看到该部分代码分为两个步骤:

  1. 从 NameServer 中获取默认 topic 即 TBW102 的路由信息。

  2. 修正获取到的默认 topic 路由信息中的读写队列数,使其最大不超过默认的 topic 队列数。

此时我们的 topicRouteData 不为空,且其 QueueData 属性也经过了修正,具体内容如下:

TopicRouteData [
	orderTopicConf=null, 
	queueDatas=[
		QueueData [
			brokerName=broker-a, 
			readQueueNums=4, 
			writeQueueNums=4, 
			perm=6, 
			topicSysFlag=0
		]
	], 
	brokerDatas=[
		BrokerData [
			brokerName=broker-a, 
			brokerAddrs={0=192.168.56.1:10911}, 
			enableActingMaster=false
		]
	], 
	filterServerTable={}, 
	topicQueueMappingInfoTable=null
]

当这两个分支代码剖析完成后,我们来看下该方法的剩下的源码:

if (topicRouteData != null) {
   TopicRouteData old = this.topicRouteTable.get(topic);
   // 与本地缓存中的 topic 发布信息进行比较,如果有变化,则需要同步更新生产者、消费者关于该 topic 的缓存
   boolean changed = topicRouteDataIsChange(old, topicRouteData);
   if (!changed) {
      changed = this.isNeedUpdateTopicRouteInfo(topic);
   } else {
      log.info("the topic[{}] route info changed, old[{}] ,new[{}]", topic, old, topicRouteData);
   }
   // 如果有变化,则需要同步更新生产者、消费者关于该 topic 的缓存
   if (changed) {
      // 将模板克隆一份,这个将用作我们自己的 topic 配置信息
      TopicRouteData cloneTopicRouteData = topicRouteData.cloneTopicRouteData();

      for (BrokerData bd : topicRouteData.getBrokerDatas()) {
          // 更新 broker 地址
          this.brokerAddrTable.put(bd.getBrokerName(), bd.getBrokerAddrs());
      }

      // Update Pub info
      if (!producerTable.isEmpty()) {
          // 根据 topic 路由信息组装 TopicPublishInfo 对象
          TopicPublishInfo publishInfo = topicRouteData2TopicPublishInfo(topic, topicRouteData);
          publishInfo.setHaveTopicRouterInfo(true);
          Iterator<Entry<String, MQProducerInner>> it = this.producerTable.entrySet().iterator();
          while (it.hasNext()) {
                Entry<String, MQProducerInner> entry = it.next();
                MQProducerInner impl = entry.getValue();
                if (impl != null) {
                       // 更新 DefaultMQProducerImpl#topicPublishInfoTable 表
                       impl.updateTopicPublishInfo(topic, publishInfo);
                }
           }
     }

     // Update sub info  消费者此时为空 忽略该部分源码
     if (!consumerTable.isEmpty()) {
          ....
     }
     log.info("topicRouteTable.put. Topic = {}, TopicRouteData[{}]", topic, cloneTopicRouteData);
     this.topicRouteTable.put(topic, cloneTopicRouteData);
     return true;
  }
}

该部分源码逻辑如下:

  1. 与本地缓存中的 topic 发布信息进行比较,如果有变化,则需要同步更新生产者、消费者关于该 topic 的缓存。

  2. 将模板克隆一份,这个将用作我们自己的 topic 配置信息。

  3. 接着会根据 topicRouteData 组装 TopicPublishInfo 对象,并将其保存到 DefaultMQProducerImpltopicPublishInfoTable 中,key 为 topic 名称,value 为 TopicPublishInfo 对象。

  4. 最后将 topicRouteData 保存在 topicRouteTable 中。

TopicPublishInfo内部维护着队列数据(MessageQueue)以及路由数据(比如broker数据,以及queue属于哪个broker数据)

到这里,本地 topic 路由信息就有了,但 Broker 和 NameServer 还没有。

2.3.6 选择 MessageQueue

源码位置:https://github.com/apache/rocketmq/blob/release-4.9.7/client/src/main/java/org/apache/rocketmq/client/producer/DefaultMQProducer.java

在上面获取的 topic 路由信息对象 TopicPublishInfo 中,其内部包含了队列的信息:

public class TopicPublishInfo {
    private boolean orderTopic = false;
    private boolean haveTopicRouterInfo = false;
    // 保存着队列信息,默认是 4 个 queue
    private List<MessageQueue> messageQueueList = new ArrayList<MessageQueue>();
    private volatile ThreadLocalIndex sendWhichQueue = new ThreadLocalIndex();
    private TopicRouteData topicRouteData;
    ....
}

queue 的选择算法主要有两种:

  1. 轮询算法:该算法保证了每个 Queue 中可以均匀的获取到消息。

  2. 最小投递延迟算法:该算法会统计每次消息投递的时间延迟,然后根据统计出的结果将消息投递到时间延迟最小的 Queue。如果延迟相同,则采用轮询算法投递。该算法可以有效提升消息的投递性能。

  3. 默认使用的是轮询算法

所以,通过轮询算法从 TopicPublishInfo 对象的 queue 集合 messageQueueList 中获取一个 MessageQueue

// 选择一个 queue
MessageQueue mqSelected = this.selectOneMessageQueue(topicPublishInfo, lastBrokerName);

其中selectOneMessageQueue方法就是选择一个可用的MessageQueue发送消息。


如上图所示,MessageQueue有一个三元组标识唯一一个队列,即(topic, brokerName, queueId),最上方的MessageQueue 的三元组可能是(TopicTestHuaZai, broker-a, 0)

2.3.6.1 默认机制 故障延迟机制关闭

源码位置:https://github.com/apache/rocketmq/blob/release-4.9.7/client/src/main/java/org/apache/rocketmq/client/impl/producer/TopicPublishInfo.java

/**
 * 选择队列
 * 上一次发送成功则选择下一个队列,上一次发送失败会规避上次发送的 MessageQueue 所在的 Broker
 *
 * @param lastBrokerName 上次发送的 Broker 名称,如果为空表示上次发送成功
 * @return
 */
public MessageQueue selectOneMessageQueue(final String lastBrokerName) {
        if (lastBrokerName == null) {
            // 轮询队列,选择下一个队列
            return selectOneMessageQueue();
        } else {
            // 上次发送失败,规避上次发送的 MessageQueue 所在的 Broker
            for (int i = 0; i < this.messageQueueList.size(); i++) {
                int index = this.sendWhichQueue.incrementAndGet();
                int pos = Math.abs(index) % this.messageQueueList.size();
                if (pos < 0)
                    pos = 0;
                MessageQueue mq = this.messageQueueList.get(pos);
                if (!mq.getBrokerName().equals(lastBrokerName)) {
                    return mq;
                }
            }
            return selectOneMessageQueue();
        }
}


2.3.6.2 故障延迟机制

源码位置:https://github.com/apache/rocketmq/blob/release-4.9.7/client/src/main/java/org/apache/rocketmq/client/latency/MQFaultStrategy.java

/**
 * 选择发送的队列,根据是否启用 Broker 故障延迟机制走不同逻辑
 *
 * sendLatencyFaultEnable=false,默认不启用 Broker 故障延迟机制
 * sendLatencyFaultEnable=true,启用 Broker 故障延迟机制
 *
 * @param tpInfo
 * @param lastBrokerName
 * @return
 */
public MessageQueue selectOneMessageQueue(final TopicPublishInfo tpInfo, final String lastBrokerName) {
        // 启用 Broker 故障延迟机制
        if (this.sendLatencyFaultEnable) {
            try {
                // 轮询获取一个消息队列
                int index = tpInfo.getSendWhichQueue().incrementAndGet();
                for (int i = 0; i < tpInfo.getMessageQueueList().size(); i++) {
                    int pos = Math.abs(index++) % tpInfo.getMessageQueueList().size();
                    if (pos < 0)
                        pos = 0;
                    MessageQueue mq = tpInfo.getMessageQueueList().get(pos);
                    // 验证该消息队列是否可用,规避注册过不可用的 Broker。
                    if (latencyFaultTolerance.isAvailable(mq.getBrokerName()))
                        return mq;
                }
                // 如果没有可用的 Broker,尝试从规避的 Broker 中选择一个可用的 Broker,如果没有找到,返回 null
                final String notBestBroker = latencyFaultTolerance.pickOneAtLeast();
                int writeQueueNums = tpInfo.getQueueIdByBroker(notBestBroker);
                if (writeQueueNums > 0) {
                    final MessageQueue mq = tpInfo.selectOneMessageQueue();
                    if (notBestBroker != null) {
                        mq.setBrokerName(notBestBroker);
                        mq.setQueueId(tpInfo.getSendWhichQueue().incrementAndGet() % writeQueueNums);
                    }
                    return mq;
                } else {
                    latencyFaultTolerance.remove(notBestBroker);
                }
            } catch (Exception e) {
                log.error("Error occurred when selecting message queue", e);
            }

            return tpInfo.selectOneMessageQueue();
        }

        return tpInfo.selectOneMessageQueue(lastBrokerName);
}

当我们得到了要发送的 MessageQueue 后就开始执行发送消息的步骤。

// DefaultMQProducerImpl#sendDefaultImpl 方法部分源码
// 向 MessageQueue 发送消息
sendResult = this.sendKernelImpl(msg, mq, communicationMode, sendCallback, topicPublishInfo, timeout - costTime);
    
endTimestamp = System.currentTimeMillis();
this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, false);
switch (communicationMode) {
        case ASYNC:
            return null;
        case ONEWAY:
            return null;
        case SYNC:
            // 同步调用方式(SYNC)下如果发送失败则执行失败重试策略,默认重试两次,即最多发送三次
            if (sendResult.getSendStatus() != SendStatus.SEND_OK) {
                if (this.defaultMQProducer.isRetryAnotherBrokerWhenNotStoreOK()) {
                    continue;
                }
            }

            return sendResult;
        default:
            break;
}


当有了以上信息后,就可以将消息发往 Broker 了。

2.4 发送消息给 Broker

可以看到上面 DefaultMQProducerImpl#sendDefaultImpl 方法中会调用 sendKernelImpl 方法去发送消息。

首先将消息内容封装到 SendMessageRequestHeader 对象中,完整源码如下:

/**
 * 消息发送 API 核心入口
 * 1. 根据 MessageQueue 获取 Broker 地址
 * 2. 为消息分配全局唯一 ID,执行消息压缩和事务
 * 3. 如果注册了发送钩子函数,则执行发送之前的钩子函数
 * 4. 构建消息发送请求包
 * 5. 根据消息发送方式(同步、异步、单项)进行网络传输
 * 6. 如果注册了发送钩子函数,执行发送之后的钩子函数
 *
 * @param msg 待发送消息
 * @param mq 发送的消息队列
 * @param communicationMode 消息发送模式:SYNC、ASYNC、ONEWAY
 * @param sendCallback 异步发送回调函数
 * @param topicPublishInfo 主题路由信息
 * @param timeout 消息发送超时时间
 * @return 消息发送结果
 */
private SendResult sendKernelImpl(final Message msg,
        final MessageQueue mq,
        final CommunicationMode communicationMode,
        final SendCallback sendCallback,
        final TopicPublishInfo topicPublishInfo,
        final long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
        long beginStartTime = System.currentTimeMillis();
        // 根据 MessageQueue 获取 Broker 的网络地址
        String brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(mq.getBrokerName());
        if (null == brokerAddr) {
            // 如果 MQClientInstance 的 brokerAddrTable 未缓存该 Broker 信息,则尝试从 nameserever 主动获取 topic 的路由信息
            tryToFindTopicPublishInfo(mq.getTopic());
            // 重新设置 broker 地址信息
            brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(mq.getBrokerName());
        }

        SendMessageContext context = null;
        // 找到 topic 的路由信息
        if (brokerAddr != null) {
             // 根据配置判断是否使用 VIP 通道
            brokerAddr = MixAll.brokerVIPChannel(this.defaultMQProducer.isSendMessageWithVIPChannel(), brokerAddr);

            byte[] prevBody = msg.getBody();
            try {
                //for MessageBatch,ID has been set in the generating process
                // 检查消息是否为 MessageBatch 类型 
                if (!(msg instanceof MessageBatch)) {
                    // 设置消息的全局唯一 ID(UNIQUE_ID),对于批量消息,在生成过程中已经设置了 ID
                    MessageClientIDSetter.setUniqID(msg);
                }
                // 处理命名空间逻辑
                boolean topicWithNamespace = false;
                // 检查客户端配置中是否设置了命名空间
                if (null != this.mQClientFactory.getClientConfig().getNamespace()) {
                    msg.setInstanceId(this.mQClientFactory.getClientConfig().getNamespace());
                    topicWithNamespace = true;
                }
                // sysFlag 是消息的系统标志位,包含压缩标志位、事务标志位、批量标志位、多队列标志位等
                // 处理压缩,默认消息体超过 4KB 的消息进行 zip 压缩,并设置压缩标识
                int sysFlag = 0;
                boolean msgBodyCompressed = false;
                // 尝试压缩消息体
                if (this.tryToCompressMessage(msg)) {
                    sysFlag |= MessageSysFlag.COMPRESSED_FLAG;
                    sysFlag |= compressType.getCompressionFlag();
                    msgBodyCompressed = true;
                }
                // 处理事务 Prepared 消息,并设置事务标识
                final String tranMsg = msg.getProperty(MessageConst.PROPERTY_TRANSACTION_PREPARED);
                // 检查消息是否为事务消息
                if (Boolean.parseBoolean(tranMsg)) {
                    sysFlag |= MessageSysFlag.TRANSACTION_PREPARED_TYPE;
                }
                // 校验禁用钩子
                if (hasCheckForbiddenHook()) {
                    CheckForbiddenContext checkForbiddenContext = new CheckForbiddenContext();
                    checkForbiddenContext.setNameSrvAddr(this.defaultMQProducer.getNamesrvAddr());
                    checkForbiddenContext.setGroup(this.defaultMQProducer.getProducerGroup());
                    checkForbiddenContext.setCommunicationMode(communicationMode);
                    checkForbiddenContext.setBrokerAddr(brokerAddr);
                    checkForbiddenContext.setMessage(msg);
                    checkForbiddenContext.setMq(mq);
                    checkForbiddenContext.setUnitMode(this.isUnitMode());
                    this.executeCheckForbiddenHook(checkForbiddenContext);
                }
                // 发送消息前的钩子 
                if (this.hasSendMessageHook()) {
                    context = new SendMessageContext();
                    context.setProducer(this);
                    context.setProducerGroup(this.defaultMQProducer.getProducerGroup());
                    context.setCommunicationMode(communicationMode);
                    context.setBornHost(this.defaultMQProducer.getClientIP());
                    context.setBrokerAddr(brokerAddr);
                    context.setMessage(msg);
                    context.setMq(mq);
                    context.setNamespace(this.defaultMQProducer.getNamespace());
                    String isTrans = msg.getProperty(MessageConst.PROPERTY_TRANSACTION_PREPARED);
                    if (isTrans != null && isTrans.equals("true")) {
                        context.setMsgType(MessageType.Trans_Msg_Half);
                    }

                    if (msg.getProperty("__STARTDELIVERTIME") != null || msg.getProperty(MessageConst.PROPERTY_DELAY_TIME_LEVEL) != null) {
                        context.setMsgType(MessageType.Delay_Msg);
                    }
                    this.executeSendMessageHookBefore(context);
                }
                // 构建消息发送请求
                // 设置发送消息的请求头
                SendMessageRequestHeader requestHeader = new SendMessageRequestHeader();
                // 设置生产者组
                requestHeader.setProducerGroup(this.defaultMQProducer.getProducerGroup());
                // 设置 topic
                requestHeader.setTopic(msg.getTopic());
                // 设置默认的 topic:TBW102
                requestHeader.setDefaultTopic(this.defaultMQProducer.getCreateTopicKey());
                // 设置默认的队列数量:4
                requestHeader.setDefaultTopicQueueNums(this.defaultMQProducer.
                getDefaultTopicQueueNums());
                // 设置 queueid
                requestHeader.setQueueId(mq.getQueueId());
                // 设置 sysFlag,它是消息的系统标志位,包含压缩标志位、事务标志位、批量标志位、多队列标志位等
                requestHeader.setSysFlag(sysFlag);
                //TODO:设置消息的生产时间
                requestHeader.setBornTimestamp(System.currentTimeMillis());
                requestHeader.setFlag(msg.getFlag());
                // 设置 properties,比如 TAGS,KEYS                                 
                requestHeader.setProperties(MessageDecoder.messageProperties2String
                (msg.getProperties()));
                requestHeader.setReconsumeTimes(0);
                requestHeader.setUnitMode(this.isUnitMode());
                // 是否为批量消息
                requestHeader.setBatch(msg instanceof MessageBatch);
                requestHeader.setBname(mq.getBrokerName());
                // 如果是重发消息,则设置重发消息的次数
                if (requestHeader.getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
                    // 重发消息的次数
                    String reconsumeTimes = MessageAccessor.getReconsumeTime(msg);
                    if (reconsumeTimes != null) {
                        // 设置重发消息的次数
                        requestHeader.setReconsumeTimes(Integer.valueOf(reconsumeTimes));
                        // 清除消息的重发次数属性,因为消息的重发次数属性是在消息重发时设置的
                        MessageAccessor.clearProperty(msg, MessageConst.PROPERTY_RECONSUME_TIME);
                    }
                    // 消息的最大重发次数
                    String maxReconsumeTimes = MessageAccessor.getMaxReconsumeTimes(msg);
                    if (maxReconsumeTimes != null) {
                        // 设置消息的最大重发次数
                        requestHeader.setMaxReconsumeTimes(Integer.valueOf(maxReconsumeTimes));
                        // 清除消息的最大重发次数属性,因为消息的最大重发次数属性是在消息重发时设置的
                        MessageAccessor.clearProperty(msg, MessageConst.PROPERTY_MAX_RECONSUME_TIMES);
                    }
                }
                // 根据消息发送方式进行网络传输
                SendResult sendResult = null;
                // 选择发送模式
                switch (communicationMode) {
                    case ASYNC:
                        Message tmpMessage = msg;
                        boolean messageCloned = false;
                        if (msgBodyCompressed) {
                            //If msg body was compressed, msgbody should be reset using prevBody.
                            //Clone new message using commpressed message body and recover origin massage.
                            //Fix bug:https://github.com/apache/rocketmq-externals/issues/66
                            tmpMessage = MessageAccessor.cloneMessage(msg);
                            messageCloned = true;
                            // 防止压缩后的消息体重发时被再次压缩
                            msg.setBody(prevBody);
                        }

                        if (topicWithNamespace) {
                            if (!messageCloned) {
                                tmpMessage = MessageAccessor.cloneMessage(msg);
                                messageCloned = true;
                            }
                             // 防止设置了命名空间的topic重发时被再次设置命名空间
                            msg.setTopic(NamespaceUtil.withoutNamespace(msg.getTopic(), this.defaultMQProducer.getNamespace()));
                        }

                        long costTimeAsync = System.currentTimeMillis() - beginStartTime;
                        if (timeout < costTimeAsync) {
                            throw new RemotingTooMuchRequestException("sendKernelImpl call timeout");
                        }
                        // 异步发送:Producer 发出消息后无需等待 MQ 返回 ACK,直接发送下⼀条消息。该方式的消息可靠性可以得到保障,消息发送效率也可以。
                        sendResult = this.mQClientFactory.getMQClientAPIImpl().sendMessage(
                            brokerAddr,
                            mq.getBrokerName(),
                            tmpMessage,
                            requestHeader,
                            timeout - costTimeAsync,
                            communicationMode,
                            sendCallback,
                            topicPublishInfo,
                            this.mQClientFactory,
                            this.defaultMQProducer.getRetryTimesWhenSendAsyncFailed(),
                            context,
                            this);
                        break;
                    case ONEWAY:
                    case SYNC:
                        long costTimeSync = System.currentTimeMillis() - beginStartTime;
                        if (timeout < costTimeSync) {
                            throw new RemotingTooMuchRequestException("sendKernelImpl call timeout");
                        }
                        // 执行客户端同步发送方法:Producer 发出⼀条消息后,会在收到 MQ 返回的 ACK 之后才发下⼀条消息。该方式的消息可靠性最高,但消息发送效率太低。
                        sendResult = this.mQClientFactory.getMQClientAPIImpl().sendMessage(
                            brokerAddr,
                            mq.getBrokerName(),
                            msg,
                            requestHeader,
                            timeout - costTimeSync,
                            communicationMode,
                            context,
                            this);
                        break;
                    default:
                        assert false;
                        break;
                }
                // 发送消息后的钩子
                if (this.hasSendMessageHook()) {
                    context.setSendResult(sendResult);
                    this.executeSendMessageHookAfter(context);
                }

                return sendResult;
            } catch (RemotingException e) {
                if (this.hasSendMessageHook()) {
                    context.setException(e);
                    this.executeSendMessageHookAfter(context);
                }
                throw e;
            } catch (MQBrokerException e) {
                if (this.hasSendMessageHook()) {
                    context.setException(e);
                    this.executeSendMessageHookAfter(context);
                }
                throw e;
            } catch (InterruptedException e) {
                if (this.hasSendMessageHook()) {
                    context.setException(e);
                    this.executeSendMessageHookAfter(context);
                }
                throw e;
            } finally {
                msg.setBody(prevBody);
                msg.setTopic(NamespaceUtil.withoutNamespace(msg.getTopic(), this.defaultMQProducer.getNamespace()));
            }
        }
        // 主动更新后还是找不到路由信息,则抛出异常
        throw new MQClientException("The broker[" + mq.getBrokerName() + "] not exist", null);
}

该方法代码比较长,但相对比较简单,主要做了两件事情:

  1. 先获取 Broker 地址,如果为空则尝试从 nameserever 获取 topic 的路由信息,然后重新赋值 Broker 地址。

  2. 如果此时 Broker 地址不为空,则优先选择 VIP 通道进行发送,在发送前需要处理以下两件事情。

  3. 组装 SendMessageRequestHeader 请求头数据。

  4. 根据不同发送模式进行发送。



这里我们来看下三种发送模式的区别:

  1. 同步发送:Producer 发出⼀条消息后,会在收到 MQ 返回的 ACK 之后才发下⼀条消息。

  2. 该方式的消息可靠性最高,但消息发送效率太低。

  3. 异步发送:Producer 发出消息后无需等待 MQ 返回 ACK,直接发送下⼀条消息。

  4. 该方式的消息可靠性可以得到保障,消息发送效率也可以。

  5. 单向发送:Producer 仅负责发送消息,不等待、不处理 MQ 的 ACK。该发送方式时 MQ 也不返回 ACK。

  6. 该方式的消息发送效率最高,但消息可靠性较差。

最后我们来看下底层发送消息的方法。

源码位置:https://github.com/apache/rocketmq/blob/release-4.9.7/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java

public class MQClientAPIImpl {

    ....
    // 远程客户端
    private final RemotingClient remotingClient;
    private final TopAddressing topAddressing;
    // 客户端处理器
    private final ClientRemotingProcessor clientRemotingProcessor;
    // nameserver 地址
    private String nameSrvAddr = null;
    // 客户端配置
    private ClientConfig clientConfig;


    public SendResult sendMessage(
        final String addr,
        final String brokerName,
        final Message msg,
        final SendMessageRequestHeader requestHeader,
        final long timeoutMillis,
        final CommunicationMode communicationMode,
        final SendCallback sendCallback,
        final TopicPublishInfo topicPublishInfo,
        final MQClientInstance instance,
        final int retryTimesWhenSendFailed,
        final SendMessageContext context,
        final DefaultMQProducerImpl producer
    ) throws RemotingException, MQBrokerException, InterruptedException {
        long beginStartTime = System.currentTimeMillis();
        RemotingCommand request = null;
        String msgType = msg.getProperty(MessageConst.PROPERTY_MESSAGE_TYPE);
        boolean isReply = msgType != null && msgType.equals(MixAll.REPLY_MESSAGE_FLAG);
        if (isReply) {
            if (sendSmartMsg) {
                SendMessageRequestHeaderV2 requestHeaderV2 = SendMessageRequestHeaderV2.createSendMessageRequestHeaderV2(requestHeader);
                // 创建远程命令,code=RequestCode.SEND_REPLY_MESSAGE_V2
                request = RemotingCommand.createRequestCommand(RequestCode.SEND_REPLY_MESSAGE_V2, requestHeaderV2);
            } else {
                // 创建远程命令,code=RequestCode.SEND_REPLY_MESSAGE
                request = RemotingCommand.createRequestCommand(RequestCode.SEND_REPLY_MESSAGE, requestHeader);
            }
        } else {
            if (sendSmartMsg || msg instanceof MessageBatch) {
                SendMessageRequestHeaderV2 requestHeaderV2 = SendMessageRequestHeaderV2.createSendMessageRequestHeaderV2(requestHeader);
                // 创建远程命令
                request = RemotingCommand.createRequestCommand(msg instanceof MessageBatch ? RequestCode.SEND_BATCH_MESSAGE : RequestCode.SEND_MESSAGE_V2, requestHeaderV2);
            } else {
                // 创建远程命令,code=RequestCode.SEND_MESSAGE         
                request = RemotingCommand.createRequestCommand(RequestCode.SEND_MESSAGE, requestHeader);
            }
        }
        request.setBody(msg.getBody());

        switch (communicationMode) {
            // 单向发送
            case ONEWAY:
                this.remotingClient.invokeOneway(addr, request, timeoutMillis);
                return null;
            // 异步发送    
            case ASYNC:
                final AtomicInteger times = new AtomicInteger();
                long costTimeAsync = System.currentTimeMillis() - beginStartTime;
                if (timeoutMillis < costTimeAsync) {
                    throw new RemotingTooMuchRequestException("sendMessage call timeout");
                }
                this.sendMessageAsync(addr, brokerName, msg, timeoutMillis - costTimeAsync, request, sendCallback, topicPublishInfo, instance,
                    retryTimesWhenSendFailed, times, context, producer);
                return null;
            // 同步发送
            case SYNC:
                long costTimeSync = System.currentTimeMillis() - beginStartTime;
                if (timeoutMillis < costTimeSync) {
                    throw new RemotingTooMuchRequestException("sendMessage call timeout");
                }
                return this.sendMessageSync(addr, brokerName, msg, timeoutMillis - costTimeSync, request);
            default:
                assert false;
                break;
        }

        return null;
    }
}

该方法主要做了两件事情:

  1. 创建远程命令 RemotingCommand 对象,设置封装了消息内容的 SendMessageRequestHeader,以及区分业务场景的 code 为 RequestCode.SEND_MESSAGE。Broker 端在接收到 producer 发送消息的请求后,根据 code 获取对应的处理器,然后解析封装了消息内容的SendMessageRequestHeader 对象,然后存储消息。

  2. 通过 netty 客户端对象 NettyRemotingClient 向 Broker 发送请求。

03 发送流程总结

整个发送流程如下:

  1. 检查消息是否合法

  2. 获取 topic 路由信息

  3. 先尝试从本地获取路由信息,没有则向 NameServer 获取

  4. 向 NameServer 获取路由信息并更新本地缓存,没有则抛出异常并返回

  5. 从本地获取路由信息

  6. 如果本地扔获取不到路由信息则获取默认路由信息

  7. 向 NameServer 获取默认路由信息,如果获取不到则抛出异常并返回

  8. 修改获取到的默认路由信息为新的 topic 的路由信息

  9. 更新本地路由信息缓存

  10. 获取路由信息成功;失败则跳转到第4步

  11. 选择一个 MessageQueue

  12. 向 MessageQueue 发送消息

  13. 根据配置判断是否使用 VIP 通道

  14. 检查消息是否为 MessageBatch 类型

  15. 检查客户端配置中是否设置了命名空间

  16. 设置消息的标志位 sysFlag

  17. 尝试压缩消息体并更新 sysFlag

  18. 检查消息是否为事务消息并更新 sysFlag

  19. 调用钩子函数

  20. 设置消息请求头

  21. 根据发送消息的方式发送消息

  22. 获取路由信息失败

  23. 校验 NameServer 配置是否正确

  24. 抛出异常结束