1. 首页
  2. Dubbo源码解析
  3. Dubbo 源码分析 —— 服务暴露(二)之远程暴露

Dubbo 源码分析 —— 服务暴露(二)之远程暴露

  • 发布于 2024-11-09
  • 2 次阅读

在Dubbo框架中,服务暴露是核心功能之一,它允许服务提供者将服务发布出去,供服务消费者调用。服务暴露主要分为本地暴露和远程暴露两种方式。下面将详细分析Dubbo中的远程暴露过程。

远程暴露流程

  1. 服务暴露的分类

    • Dubbo支持两种服务暴露类型:远程暴露和本地暴露。远程暴露是指将服务发布到远程注册中心,供消费者调用。

  2. 服务暴露全流程

    • 服务暴露的流程大致可以分为三个步骤:检测配置、暴露服务(包括本地和远程)、注册服务至注册中心。

  3. 服务实现类转换

    • 从对象构建转换的角度来看,服务暴露可以分为将服务实现类转换成Invoker,再将Invoker通过具体的协议转换成Exporter

  4. 远程暴露实现原理

    • 服务提供者在启动时,将服务实现类通过协议进行网络传输,发布到注册中心上。服务消费者在启动时,从注册中心上获取到服务提供者的地址,然后通过协议进行网络通信,调用服务提供者的方法。

  5. 远程暴露源码分析

    • ServiceConfig类的doExportUrlsFor1Protocol方法中,根据scope的配置,如果不是local,则会进行远程服务的暴露。

    • 远程服务暴露时,会遍历注册中心的URL,为每个注册中心生成一个Invoker,然后通过协议的export方法将Invoker导出为Exporter,完成服务的远程暴露。

  6. 注册服务至注册中心

    • 在远程暴露的过程中,服务提供者会将服务URL注册到注册中心,这样服务消费者可以从注册中心动态获取服务提供者列表。

  7. 监控中心的集成

    • 在服务暴露的过程中,如果配置了监控中心,Dubbo还会将监控中心的URL添加到服务URL中,以便监控服务的调用情况。

  8. 多协议支持

    • Dubbo允许配置多协议,在不同服务上支持不同协议或者同一服务上同时支持多种协议。

通过上述分析,我们可以看到Dubbo的远程暴露过程涉及到服务的包装、协议的选择、注册中心的集成以及监控中心的配置等多个方面,这些共同确保了服务能够被正确地发布和调用。

Dubbo 是一个高性能的 Java RPC 框架,广泛用于构建分布式服务架构。在 Dubbo 中,服务暴露是将本地服务发布到远程,使得其他服务可以调用的过程。本文将详细分析 Dubbo 服务暴露的远程暴露部分。

1. 服务暴露流程概述

Dubbo 的服务暴露主要包括以下几个步骤:

  1. 本地暴露:将服务注册到本地服务目录。

  2. 远程暴露:将服务注册到注册中心,使得其他服务可以发现并调用。

2. 远程暴露的实现

2.1 服务导出器(Exporter)

在 Dubbo 中,服务暴露的核心类是 Exporter,它负责将服务导出到网络上。Exporter 的实现类通常是 ProviderModel 的一部分,ProviderModel 保存了服务提供者的相关信息。

2.2 服务导出过程

服务导出过程主要涉及以下几个类和方法:

  1. Protocol:协议层接口,负责服务的暴露和引用。

  2. Registry:注册中心接口,负责服务的注册和发现。

  3. Exporter:服务导出器接口,负责将服务导出到网络上。

  4. Invoker:服务调用器接口,负责服务的调用。

2.3 关键类和方法

2.3.1 Protocol 接口

Protocol 接口是 Dubbo 的核心接口之一,负责服务的暴露和引用。主要方法包括:

  • export(Invoker<T> invoker):将服务导出到网络上。

  • refer(Class<T> type, URL url):从网络上引用服务。

public interface Protocol {
    <T> Exporter<T> export(Invoker<T> invoker) throws RpcException;
    <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException;
    void destroy();
}
2.3.2 RegistryProtocol 类

RegistryProtocol 类是 Protocol 接口的一个实现,负责将服务注册到注册中心。关键方法包括:

  • export(Invoker<T> invoker):将服务导出到注册中心。

public class RegistryProtocol implements Protocol {
    @Override
    public <T> Exporter<T> export(final Invoker<T> originInvoker) throws RpcException {
        // 创建代理 Invoker
        final Invoker<T> invokerDelegate = new InvokerDelegationWrapper<>(originInvoker);
        
        // 获取服务的 URL
        URL registryUrl = getRegistryUrl(originInvoker);
        
        // 创建注册中心
        Registry registry = registryFactory.getRegistry(registryUrl);
        
        // 创建服务导出器
        ExporterChangeableWrapper<T> exporter = doLocalExport(invokerDelegate, originInvoker.getUrl());
        
        // 注册服务到注册中心
        register(registry, invokerDelegate, registryUrl);
        
        return exporter;
    }
    
    private <T> ExporterChangeableWrapper<T> doLocalExport(final Invoker<T> originInvoker, URL url) {
        // 创建本地导出器
        final Exporter<T> exporter = protocol.export(new Invoker<T>() {
            @Override
            public Class<T> getInterface() {
                return originInvoker.getInterface();
            }
​
            @Override
            public URL getUrl() {
                return url;
            }
​
            @Override
            public boolean isAvailable() {
                return originInvoker.isAvailable();
            }
​
            @Override
            public Result invoke(Invocation invocation) throws RpcException {
                return originInvoker.invoke(invocation);
            }
​
            @Override
            public void destroy() {
                originInvoker.destroy();
            }
        });
        
        return new ExporterChangeableWrapper<>(exporter, originInvoker);
    }
    
    private <T> void register(Registry registry, Invoker<T> invoker, URL registryUrl) {
        // 注册服务到注册中心
        registry.register(buildRegisterUrl(invoker));
    }
}
2.3.3 InvokerDelegationWrapper 类

InvokerDelegationWrapper 类是对 Invoker 的包装,用于在服务调用时添加额外的逻辑。

public class InvokerDelegationWrapper<T> implements Invoker<T> {
    private final Invoker<T> invoker;
​
    public InvokerDelegationWrapper(Invoker<T> invoker) {
        this.invoker = invoker;
    }
​
    @Override
    public Class<T> getInterface() {
        return invoker.getInterface();
    }
​
    @Override
    public URL getUrl() {
        return invoker.getUrl();
    }
​
    @Override
    public boolean isAvailable() {
        return invoker.isAvailable();
    }
​
    @Override
    public Result invoke(Invocation invocation) throws RpcException {
        return invoker.invoke(invocation);
    }
​
    @Override
    public void destroy() {
        invoker.destroy();
    }
}
2.3.4 ExporterChangeableWrapper 类

ExporterChangeableWrapper 类是对 Exporter 的包装,用于在服务导出时添加额外的逻辑。

public class ExporterChangeableWrapper<T> implements Exporter<T> {
    private final Exporter<T> exporter;
    private final Invoker<T> invoker;
​
    public ExporterChangeableWrapper(Exporter<T> exporter, Invoker<T> invoker) {
        this.exporter = exporter;
        this.invoker = invoker;
    }
​
    @Override
    public Invoker<T> getInvoker() {
        return invoker;
    }
​
    @Override
    public void unexport() {
        exporter.unexport();
    }
}

3. 服务注册到注册中心

RegistryProtocol 类的 export 方法中,服务会被注册到注册中心。注册中心的实现可以是 Zookeeper、Nacos 等。

private <T> void register(Registry registry, Invoker<T> invoker, URL registryUrl) {
    // 构建注册 URL
    URL registerUrl = buildRegisterUrl(invoker);
    
    // 注册服务到注册中心
    registry.register(registerUrl);
}

4. 总结

通过上述分析,我们可以看到 Dubbo 的服务暴露过程主要分为以下几个步骤:

  1. 创建代理 Invoker:对原始 Invoker 进行包装,添加额外的逻辑。

  2. 获取注册中心 URL:根据服务的 URL 获取注册中心的 URL。

  3. 创建注册中心:根据注册中心的 URL 创建注册中心实例。

  4. 创建本地导出器:将服务导出到本地,创建本地导出器。

  5. 注册服务到注册中心:将服务注册到注册中心,使得其他服务可以发现并调用。

通过这些步骤,Dubbo 实现了服务的远程暴露,使得分布式系统中的各个服务可以互相调用。希望这些内容对你理解 Dubbo 的服务暴露机制有所帮助。

以下是对 Dubbo 源码中服务暴露之远程暴露部分的详细分析:

一、远程暴露整体流程概述


Dubbo 的远程暴露是将本地服务以可被远程调用的方式发布出去的过程,主要涉及到多个关键步骤和组件的协同工作。


  1. 配置加载与解析:首先,Dubbo 会从配置文件(如 XML 配置、properties 配置等)或者通过注解等方式获取服务相关的配置信息。这些配置包括服务接口、实现类、注册中心地址、通信协议、端口号等。例如,对于一个简单的 HelloWorld 服务,配置可能指定了服务接口为 HelloWorldService,实现类为 HelloWorldServiceImpl,注册中心使用 Zookeeper 且给出了其具体地址,通信协议为 Dubbo 协议,端口号为 20880 等。

  2. 服务包装与代理生成:Dubbo 会对服务实现类进行包装,生成一个代理类。这个代理类实现了服务接口,并且在内部封装了远程调用的逻辑。当客户端调用服务接口的方法时,实际上是调用了这个代理类的相应方法。代理类的生成通常使用了 Java 的动态代理机制(如果是接口方式)或者字节码生成技术(如 CGLIB,如果是类方式)。例如,对于 HelloWorldService 实现类,会生成一个类似 HelloWorldServiceProxy 的代理类,它隐藏了服务实际是如何进行远程通信和调用的细节,对外呈现的就是一个普通的服务接口实现的样子。

  3. 注册中心交互:接着,服务提供者会将服务的相关信息(如服务名称、接口、可用的 IP 地址和端口等)注册到注册中心(如 Zookeeper)。这一步骤使得服务消费者能够在注册中心发现该服务的存在。例如,在 Zookeeper 中,服务提供者会在特定的路径下创建节点来存储服务信息,以便消费者后续查找。

  4. 网络通信层准备:Dubbo 会根据配置的通信协议(如 Dubbo 协议、HTTP 协议等)准备相应的网络通信层。这涉及到创建服务器端的监听端口,等待客户端的连接请求。以 Dubbo 协议为例,会创建一个基于 Netty 等网络框架的服务器端监听程序,设置好端口号(如 20880),准备接收客户端的远程调用请求。

  5. 服务暴露完成:当上述步骤都完成后,服务就成功地以远程可调用的方式暴露出去了。此时,服务消费者就可以通过注册中心发现该服务,并通过网络通信层与服务提供者进行远程交互,实现对服务的调用。

二、关键代码分析

(一)服务包装与代理生成

// 假设这是Dubbo中生成代理类的一种常见方式示例
ProxyFactory proxyFactory = ExtensionLoader.getExtensionLoader(ProxyFactory.class).getExtension("jdk");
// 根据服务实现类和接口生成代理类
Object proxy = proxyFactory.getProxy(serviceBean, clazz);

这里首先通过 ExtensionLoader 获取到 ProxyFactory 的扩展实现(这里选择了 jdk 扩展,即使用 Java 动态代理),然后调用其 getProxy 方法,传入服务实现类(serviceBean)和服务接口类(clazz)来生成代理类。


对于使用 CGLIB 生成代理类(针对类方式的服务)的情况,代码逻辑类似,但会使用 CGLIB 相关的类和方法来实现字节码生成和代理创建。

(二)注册中心交互


Dubbo 与注册中心(以 Zookeeper 为例)交互的核心代码在 Registry 相关类和 ZookeeperRegistry 等具体实现类中。

// 获取注册中心实例,假设已经配置好Zookeeper注册中心地址等信息
Registry registry = ExtensionLoader.getExtensionLoader(Registry.class).getExtension("zookeeper");
// 注册服务信息到注册中心
registry.register(serviceUrl);

首先通过 ExtensionLoader 获取到 Registryzookeeper 扩展实现,得到注册中心实例。然后调用其 register 方法,传入服务的 URL 信息(serviceUrl),这个 URL 包含了服务的名称、接口、IP 地址、端口等关键信息,以便在 Zookeeper 中准确地注册该服务。

(三)网络通信层准备


以 Dubbo 协议采用 Netty 作为网络框架为例,在 DubboProtocol 等相关类中会有如下关键代码:

// 创建基于Netty的服务器端监听程序
Server server = Exporter<?> exporter = protocol.export(protocol.createExporter(service, url));
// 设置监听端口等参数
server.bind(url.getPort());

这里首先通过 protocol(如 DubboProtocol)创建一个 Exporter,它将服务和服务 URL 关联起来,然后通过 server(基于 Netty 的服务器)进行绑定操作,设置好监听端口(从 url 中获取),准备接收客户端的远程调用请求。

三、远程暴露过程中的一些重要机制

(一)服务治理机制


在远程暴露过程中,Dubbo 通过注册中心实现了服务治理功能。服务提供者将服务信息注册到注册中心后,服务消费者可以通过注册中心获取服务的最新信息,包括服务是否可用、有多少个提供者等。同时,注册中心还可以实现服务的动态上下线感知。例如,当一个服务提供者出现故障下线时,注册中心能够及时通知到相关的服务消费者,使得消费者可以采取相应的措施,如切换到其他可用的服务提供者。

(二)协议适配机制


Dubbo 支持多种通信协议,如 Dubbo 协议、HTTP 协议等。在远程暴露时,根据配置的协议,Dubbo 会自动适配相应的网络通信层实现。例如,如果配置为 Dubbo 协议,就会采用 Netty 等网络框架来搭建服务器端监听程序;如果配置为 HTTP 协议,就会采用合适的 HTTP 服务器实现(如 Jetty 等)来处理客户端的请求。这种协议适配机制使得 Dubbo 能够灵活地满足不同场景下的远程通信需求。

(三)负载均衡机制


当有多个服务提供者暴露了相同的服务时,Dubbo 会在服务消费者端采用负载均衡机制来分配调用请求。在远程暴露过程中,服务提供者将自己的信息注册到注册中心,服务消费者在发现这些服务提供者后,会根据配置的负载均衡策略(如随机、轮询、加权轮询等)来决定将请求发送到哪个服务提供者。例如,采用随机策略时,服务消费者每次调用会随机选择一个服务提供者进行远程调用。


通过以上对 Dubbo 源码中服务暴露之远程暴露部分的分析,可以更深入地了解 Dubbo 是如何将本地服务以远程可调用的方式发布出去的,以及其中涉及的关键技术和机制。

1. 概述

《精尽 Dubbo 源码分析 —— 服务暴露(一)之本地暴露(Injvm)》 一文中,我们已经分享了本地暴露服务。在本文中,我们来分享远程暴露服务。在 Dubbo 中提供多种协议( Protocol ) 的实现,大体流程一致,本文以 Dubbo Protocol 为例子,这也是 Dubbo 的默认协议。

特性

缺省协议,使用基于 mina 1.1.7 和 hessian 3.2.1 的 remoting 交互。

  • 连接个数:单连接

  • 连接方式:长连接

  • 传输协议:TCP

  • 传输方式:NIO 异步传输

  • 序列化:Hessian 二进制序列化

  • 适用范围:传入传出参数数据包较小(建议小于100K),消费者比提供者个数多,单一消费者无法压满提供者,尽量不要用 dubbo 协议传输大文件或超大字符串。

  • 适用场景:常规远程服务方法调用

相比本地暴露远程暴露会多做如下几件事情:

  • 启动通信服务器,绑定服务端口,提供远程调用。

  • 向注册中心注册服务提供者,提供服务消费者从注册中心发现服务。

2. 远程暴露

远程暴露服务的顺序图如下:

#doExportUrlsFor1Protocol(protocolConfig, registryURLs) 方法中,涉及远程暴露服务的代码如下:

 1: // 服务远程暴露
 2: // export to remote if the config is not local (export to local only when config is local)
 3: if (!Constants.SCOPE_LOCAL.toString().equalsIgnoreCase(scope)) {
 4:     if (logger.isInfoEnabled()) {
 5:         logger.info("Export dubbo service " + interfaceClass.getName() + " to url " + url);
 6:     }
 7:     if (registryURLs != null && !registryURLs.isEmpty()) {
 8:         for (URL registryURL : registryURLs) {
 9:             // "dynamic" :服务是否动态注册,如果设为false,注册后将显示后disable状态,需人工启用,并且服务提供者停止时,也不会自动取消册,需人工禁用。
10:             url = url.addParameterIfAbsent("dynamic", registryURL.getParameter("dynamic"));
11:             // 获得监控中心 URL
12:             URL monitorUrl = loadMonitor(registryURL); // TODO 芋艿,监控
13:             if (monitorUrl != null) {
14:                 url = url.addParameterAndEncoded(Constants.MONITOR_KEY, monitorUrl.toFullString());
15:             }
16:             if (logger.isInfoEnabled()) {
17:                 logger.info("Register dubbo service " + interfaceClass.getName() + " url " + url + " to registry " + registryURL);
18:             }
19:             // 使用 ProxyFactory 创建 Invoker 对象
20:             Invoker<?> invoker = proxyFactory.getInvoker(ref, (Class) interfaceClass, registryURL.addParameterAndEncoded(Constants.EXPORT_KEY, url.toFullString()));
21: 
22:             // 创建 DelegateProviderMetaDataInvoker 对象
23:             DelegateProviderMetaDataInvoker wrapperInvoker = new DelegateProviderMetaDataInvoker(invoker, this);
24: 
25:             // 使用 Protocol 暴露 Invoker 对象
26:             Exporter<?> exporter = protocol.export(wrapperInvoker);
27:             // 添加到 `exporters`
28:             exporters.add(exporter);
29:         }
30:     } else { // 用于被服务消费者直连服务提供者,参见文档 http://dubbo.apache.org/zh-cn/docs/user/demos/explicit-target.html 。主要用于开发测试环境使用。
31:         // 使用 ProxyFactory 创建 Invoker 对象
32:         Invoker<?> invoker = proxyFactory.getInvoker(ref, (Class) interfaceClass, url);
33: 
34:         // 创建 DelegateProviderMetaDataInvoker 对象
35:         DelegateProviderMetaDataInvoker wrapperInvoker = new DelegateProviderMetaDataInvoker(invoker, this);
36: 
37:         // 使用 Protocol 暴露 Invoker 对象
38:         Exporter<?> exporter = protocol.export(wrapperInvoker);
39:         // 添加到 `exporters`
40:         exporters.add(exporter);
41:     }
42: }
  • 第 30 至 41 行:大体和【第 7 至 29 行】逻辑相同。差别在于,当配置注册中心为 "N/A" 时,表示即使远程暴露服务,也不向注册中心注册。这种方式用于被服务消费者直连服务提供者,参见 《Dubbo 用户指南 —— 直连提供者》 文档。

    开发及测试环境下,经常需要绕过注册中心,只测试指定服务提供者,这时候可能需要点对点直连,点对点直联方式,将以服务接口为单位,忽略注册中心的提供者列表,A 接口配置点对点,不影响 B 接口从注册中心获取列表。

  • 第 8 行:循环祖册中心 URL 数组 registryURLs

  • 第 10 行:"dynamic" 配置项,服务是否动态注册。如果设为 false ,注册后将显示后 disable 状态,需人工启用,并且服务提供者停止时,也不会自动取消册,需人工禁用。

  • 第 12 行:调用 #loadMonitor(registryURL) 方法,获得监控中心 URL 。

  • 第 13 至 15 行:调用 URL#addParameterAndEncoded(key, value) 方法,将监控中心的 URL 作为 "monitor" 参数添加到服务提供者的 URL 中,并且需要编码。通过这样的方式,服务提供者的 URL 中,包含了监控中心的配置

  • 第 20 行:调用 URL#addParameterAndEncoded(key, value) 方法,将服务体用这的 URL 作为 "export" 参数添加到注册中心的 URL 中。通过这样的方式,注册中心的 URL 中,包含了服务提供者的配置

  • 第 20 行:调用 ProxyFactory#getInvoker(proxy, type, url) 方法,创建 Invoker 对象。该 Invoker 对象,执行 #invoke(invocation) 方法时,内部会调用 Service 对象( ref )对应的调用方法。

    • 🙂 详细的实现,后面单独写文章分享。

    • 😈 为什么传递的是注册中心的 URL 呢?下文会详细解析。

  • 第 23 行:创建 com.alibaba.dubbo.config.invoker.DelegateProviderMetaDataInvoker 对象。该对象在 Invoker 对象的基础上,增加了当前服务提供者 ServiceConfig 对象。

  • 第 26 行:调用 Protocol#export(invoker) 方法,暴露服务。

    • 此处 Dubbo SPI 自适应的特性的好处就出来了,可以自动根据 URL 参数,获得对应的拓展实现。例如,invoker 传入后,根据 invoker.url 自动获得对应 Protocol 拓展实现为 DubboProtocol 。

    • 实际上,Protocol 有两个 Wrapper 拓展实现类: ProtocolFilterWrapper、ProtocolListenerWrapper 。所以,#export(...) 方法的调用顺序是:

      • Protocol$Adaptive => ProtocolFilterWrapper => ProtocolListenerWrapper => RegistryProtocol

      • =>

      • Protocol$Adaptive => ProtocolFilterWrapper => ProtocolListenerWrapper => DubboProtocol

      • 也就是说,这一条大的调用链,包含两条小的调用链。原因是:

        • 首先,传入的是注册中心的 URL ,通过 Protocol$Adaptive 获取到的是 RegistryProtocol 对象。

        • 其次,RegistryProtocol 会在其 #export(...) 方法中,使用服务提供者的 URL ( 即注册中心的 URL 的 export 参数值),再次调用 Protocol$Adaptive 获取到的是 DubboProtocol 对象,进行服务暴露。

      • 为什么是这样的顺序?通过这样的顺序,可以实现类似 AOP 的效果,在本地服务器启动完成后,再向注册中心注册。伪代码如下:

RegistryProtocol#export(...) {
    
    // 1. 启动本地服务器
    DubboProtocol#export(...);
    
    // 2. 向注册中心注册。 
}
  • 这也是为什么上文提到的 “为什么传递的是注册中心的 URL 呢?” 的原因。

  • 第 28 行:添加到 exporters 集合中。

2.1 loadMonitor

友情提示,监控中心不是本文的重点,简单分享下该方法的逻辑。
可直接跳过。

#loadMonitor(registryURL) 方法,加载监控中心 com.alibaba.dubbo.common.URL 数组。代码如下:

 1: /**
 2:  * 加载监控中心 URL
 3:  *
 4:  * @param registryURL 注册中心 URL
 5:  * @return 监控中心 URL
 6:  */
 7: protected URL loadMonitor(URL registryURL) {
 8:     // 从 属性配置 中加载配置到 MonitorConfig 对象。
 9:     if (monitor == null) {
10:         String monitorAddress = ConfigUtils.getProperty("dubbo.monitor.address");
11:         String monitorProtocol = ConfigUtils.getProperty("dubbo.monitor.protocol");
12:         if ((monitorAddress == null || monitorAddress.length() == 0) && (monitorProtocol == null || monitorProtocol.length() == 0)) {
13:             return null;
14:         }
15: 
16:         monitor = new MonitorConfig();
17:         if (monitorAddress != null && monitorAddress.length() > 0) {
18:             monitor.setAddress(monitorAddress);
19:         }
20:         if (monitorProtocol != null && monitorProtocol.length() > 0) {
21:             monitor.setProtocol(monitorProtocol);
22:         }
23:     }
24:     appendProperties(monitor);
25:     // 添加 `interface` `dubbo` `timestamp` `pid` 到 `map` 集合中
26:     Map<String, String> map = new HashMap<String, String>();
27:     map.put(Constants.INTERFACE_KEY, MonitorService.class.getName());
28:     map.put("dubbo", Version.getVersion());
29:     map.put(Constants.TIMESTAMP_KEY, String.valueOf(System.currentTimeMillis()));
30:     if (ConfigUtils.getPid() > 0) {
31:         map.put(Constants.PID_KEY, String.valueOf(ConfigUtils.getPid()));
32:     }
33:     // 将 MonitorConfig ,添加到 `map` 集合中。
34:     appendParameters(map, monitor);
35:     // 获得地址
36:     String address = monitor.getAddress();
37:     String sysaddress = System.getProperty("dubbo.monitor.address");
38:     if (sysaddress != null && sysaddress.length() > 0) {
39:         address = sysaddress;
40:     }
41:     // 直连监控中心服务器地址
42:     if (ConfigUtils.isNotEmpty(address)) {
43:         // 若不存在 `protocol` 参数,默认 "dubbo" 添加到 `map` 集合中。
44:         if (!map.containsKey(Constants.PROTOCOL_KEY)) {
45:             if (ExtensionLoader.getExtensionLoader(MonitorFactory.class).hasExtension("logstat")) {
46:                 map.put(Constants.PROTOCOL_KEY, "logstat");
47:             } else {
48:                 map.put(Constants.PROTOCOL_KEY, "dubbo");
49:             }
50:         }
51:         // 解析地址,创建 Dubbo URL 对象。
52:         return UrlUtils.parseURL(address, map);
53:     // 从注册中心发现监控中心地址
54:     } else if (Constants.REGISTRY_PROTOCOL.equals(monitor.getProtocol()) && registryURL != null) {
55:         return registryURL.setProtocol("dubbo").addParameter(Constants.PROTOCOL_KEY, "registry").addParameterAndEncoded(Constants.REFER_KEY, StringUtils.toQueryString(map));
56:     }
57:     return null;
58: }
  • 第 8 至 24 行:从属性配置中,加载配置到 MonitorConfig 对象。

  • 第 25 至 32 行:添加 interface dubbo timestamp pidmap 集合中。

  • 第 34 行:调用 #appendParameters(map, config) 方法,将 MonitorConfig ,添加到 map 集合中。

  • 第 35 至 40 行:获得监控中心的地址。

  • 第 42 行:当 address 非空时,直连监控中心服务器地址的情况

    • 第 43 至 50 行:若不存在 protocol 参数,缺省默认为 “dubbo” ,并添加到 map 集合中。

      • 第 44 至 55 行:可以忽略。因为,logstat 这个拓展实现已经不存在。

    • 第 52 行:调用 UrlUtils#parseURL(address, map) 方法,解析 address ,创建 Dubbo URL 对象。

      • 🙂 已经添加了代码注释,胖友点击链接查看。

  • 第 54 至 56 行:当 protocol = registry 时,并且注册中心 URL 非空时,从注册中心发现监控中心地址。以 registryURL 为基础,创建 URL :

    • protocol = dubbo

    • parameters.protocol = registry

    • parameters.refer = map

  • 第 57 行:无注册中心,返回空。

  • ps :后续会有文章,详细分享。

3. Protocol

本文涉及的 Protocol 类图如下:

3.1 ProtocolFilterWrapper

《精尽 Dubbo 源码分析 —— 服务暴露(一)之本地暴露(Injvm)》「 3.2 ProtocolFilterWrapper」 小节。

#export(invoker) 方法,代码如下:

1: public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
2:     // 注册中心
3:     if (Constants.REGISTRY_PROTOCOL.equals(invoker.getUrl().getProtocol())) {
4:         return protocol.export(invoker);
5:     }
6:     // 建立带有 Filter 过滤链的 Invoker ,再暴露服务。
7:     return protocol.export(buildInvokerChain(invoker, Constants.SERVICE_FILTER_KEY, Constants.PROVIDER));
8: }
  • 第 2 至 5 行:当 invoker.url.protocl = registry注册中心的 URL ,无需创建 Filter 过滤链。

  • 第 7 行:调用 #buildInvokerChain(invoker, key, group) 方法,创建带有 Filter 过滤链的 Invoker 对象。

  • 第 7 行:调用 protocol#export(invoker) 方法,继续暴露服务。

  • 在 RegistryProtocol 中,会调用 DubboProtocol#export(...) 方法时,会走【第 7 行】的流程。

3.2 RegistryProtocol

com.alibaba.dubbo.registry.integration.RegistryProtocol ,实现 Protocol 接口,注册中心协议实现类。

3.2.1 属性

属性相关,代码如下:

友情提示,仅包含本文涉及的属性。

// ... 省略部分和本文无关的属性。

/**
 * 单例。在 Dubbo SPI 中,被初始化,有且仅有一次。
 */
private static RegistryProtocol INSTANCE;

/**
 * 绑定关系集合。
 *
 * key:服务 Dubbo URL
 */
// To solve the problem of RMI repeated exposure port conflicts, the services that have been exposed are no longer exposed.
// 用于解决rmi重复暴露端口冲突的问题,已经暴露过的服务不再重新暴露
// providerurl <--> exporter
private final Map<String, ExporterChangeableWrapper<?>> bounds = new ConcurrentHashMap<String, ExporterChangeableWrapper<?>>();
/**
 * Protocol 自适应拓展实现类,通过 Dubbo SPI 自动注入。
 */
private Protocol protocol;
/**
 * RegistryFactory 自适应拓展实现类,通过 Dubbo SPI 自动注入。
 */
private RegistryFactory registryFactory;

public RegistryProtocol() {
    INSTANCE = this;
}

public static RegistryProtocol getRegistryProtocol() {
    if (INSTANCE == null) {
        ExtensionLoader.getExtensionLoader(Protocol.class).getExtension(Constants.REGISTRY_PROTOCOL); // load
    }
    return INSTANCE;
}

  • INSTANCE 静态属性,单例。通过 Dubbo SPI 加载创建,有且仅有一次。

    • #getRegistryProtocol() 静态方法,获得单例。

  • bounds 属性,绑定关系集合。其中,Key 为服务提供者 URL

  • protocol 属性,Protocol 自适应拓展实现类,通过 Dubbo SPI 自动注入。

  • registryFactory 属性,自适应拓展实现类,通过 Dubbo SPI 自动注入。

    • 用于创建注册中心 Registry 对象。

3.2.2 export

本文涉及的 #export(invoker) 方法,代码如下:

 1: public <T> Exporter<T> export(final Invoker<T> originInvoker) throws RpcException {
 2:     // 暴露服务
 3:     // export invoker
 4:     final ExporterChangeableWrapper<T> exporter = doLocalExport(originInvoker);
 5: 
 6:     // 获得注册中心 URL
 7:     URL registryUrl = getRegistryUrl(originInvoker);
 8: 
 9:     // 获得注册中心对象
10:     // registry provider
11:     final Registry registry = getRegistry(originInvoker);
12: 
13:     // 获得服务提供者 URL
14:     final URL registedProviderUrl = getRegistedProviderUrl(originInvoker);
15: 
16:     //to judge to delay publish whether or not
17:     boolean register = registedProviderUrl.getParameter("register", true);
18: 
19:     // 向本地注册表,注册服务提供者
20:     ProviderConsumerRegTable.registerProvider(originInvoker, registryUrl, registedProviderUrl);
21: 
22:     // 向注册中心注册服务提供者(自己)
23:     if (register) {
24:         register(registryUrl, registedProviderUrl);
25:         ProviderConsumerRegTable.getProviderWrapper(originInvoker).setReg(true); // 标记向本地注册表的注册服务提供者,已经注册
26:     } 
27: 
28:     // 使用 OverrideListener 对象,订阅配置规则
29:     // Subscribe the override data
30:     // FIXME When the provider subscribes, it will affect the scene : a certain JVM exposes the service and call the same service. Because the subscribed is cached key with the name of the service, it causes the subscription information to cover.
31:     final URL overrideSubscribeUrl = getSubscribedOverrideUrl(registedProviderUrl);
32:     final OverrideListener overrideSubscribeListener = new OverrideListener(overrideSubscribeUrl, originInvoker);
33:     overrideListeners.put(overrideSubscribeUrl, overrideSubscribeListener);
34:     registry.subscribe(overrideSubscribeUrl, overrideSubscribeListener);
35:     //Ensure that a new exporter instance is returned every time export
36:     return new DestroyableExporter<T>(exporter, originInvoker, overrideSubscribeUrl, registedProviderUrl);
37: }
  • 第 4 行:调用 #doLocalExport(invoker) 方法,暴露服务。

  • 第 7 行:调用 #getRegistryUrl(originInvoker) 方法,获得注册中心 URL 。代码如下:

/**
 * 获得注册中心 URL
 *
 * @param originInvoker 原始 Invoker
 * @return URL
 */
private URL getRegistryUrl(Invoker<?> originInvoker) {
    URL registryUrl = originInvoker.getUrl();
    if (Constants.REGISTRY_PROTOCOL.equals(registryUrl.getProtocol())) { // protocol
        String protocol = registryUrl.getParameter(Constants.REGISTRY_KEY, Constants.DEFAULT_DIRECTORY);
        registryUrl = registryUrl.setProtocol(protocol).removeParameter(Constants.REGISTRY_KEY);
    }
    return registryUrl;
}

该过程是我们在 《精尽 Dubbo 源码分析 —— 服务暴露(一)之本地暴露(Injvm)》「2.1 loadRegistries」 的那张图的反向流程,即红线部分

  • getRegistryUrl


  • 第 11 行:获得注册中心对象。

  • 第 14 行:调用 #getRegistedProviderUrl(originInvoker) 方法,获得服务提供者 URL 。代码如下:

private URL getRegistedProviderUrl(final Invoker<?> originInvoker) {
    // 从注册中心的 export 参数中,获得服务提供者的 URL
    URL providerUrl = getProviderUrl(originInvoker);
    //The address you see at the registry
    return providerUrl.removeParameters(getFilteredKeys(providerUrl)) // 移除 .hide 为前缀的参数
            .removeParameter(Constants.MONITOR_KEY) // monitor
            .removeParameter(Constants.BIND_IP_KEY) // bind.ip
            .removeParameter(Constants.BIND_PORT_KEY) // bind.port
            .removeParameter(QOS_ENABLE) // qos.enable
            .removeParameter(QOS_PORT) // qos.port
            .removeParameter(ACCEPT_FOREIGN_IP); // qos.accept.foreign.ip
}

private URL getProviderUrl(final Invoker<?> origininvoker) {
    String export = origininvoker.getUrl().getParameterAndDecoded(Constants.EXPORT_KEY); // export
    if (export == null || export.length() == 0) {
        throw new IllegalArgumentException("The registry export url is null! registry: " + origininvoker.getUrl());
    }
    return URL.valueOf(export);
}

private static String[] getFilteredKeys(URL url) {
    Map<String, String> params = url.getParameters();
    if (params != null && !params.isEmpty()) {
        List<String> filteredKeys = new ArrayList<String>();
        for (Map.Entry<String, String> entry : params.entrySet()) {
            if (entry != null && entry.getKey() != null && entry.getKey().startsWith(Constants.HIDE_KEY_PREFIX)) {
                filteredKeys.add(entry.getKey());
            }
        }
        return filteredKeys.toArray(new String[filteredKeys.size()]);
    } else {
        return new String[]{};
    }
}
  • 重点,从注册中心的 URL 中获得 export 参数对应的值,即服务提供者的 URL 。

  • 移除多余的参数。因为,这些参数注册到注册中心没有实际的用途。

public void register(URL registryUrl, URL registedProviderUrl) {
    Registry registry = registryFactory.getRegistry(registryUrl);
    registry.register(registedProviderUrl);
}

3.2.3 doLocalExport

#doLocalExport() 方法,暴露服务。此处的 Local 指的是,本地启动服务,但是不包括向注册中心注册服务的意思。代码如下:

 1: /**
 2:  * 暴露服务。
 3:  *
 4:  * 此处的 Local 指的是,本地启动服务,但是不包括向注册中心注册服务的意思。
 5:  *
 6:  * @param originInvoker 原始 Invoker
 7:  * @param <T> 泛型
 8:  * @return Exporter 对象
 9:  */
10: @SuppressWarnings("unchecked")
11: private <T> ExporterChangeableWrapper<T> doLocalExport(final Invoker<T> originInvoker) {
12:     // 获得在 `bounds` 中的缓存 Key
13:     String key = getCacheKey(originInvoker);
14:     // 从 `bounds` 获得,是不是已经暴露过服务
15:     ExporterChangeableWrapper<T> exporter = (ExporterChangeableWrapper<T>) bounds.get(key);
16:     if (exporter == null) {
17:         synchronized (bounds) {
18:             exporter = (ExporterChangeableWrapper<T>) bounds.get(key);
19:             // 未暴露过,进行暴露服务
20:             if (exporter == null) {
21:                 // 创建 Invoker Delegate 对象
22:                 final Invoker<?> invokerDelegete = new InvokerDelegete<T>(originInvoker, getProviderUrl(originInvoker));
23:                 // 暴露服务,创建 Exporter 对象
24:                 // 使用 创建的Exporter对象 + originInvoker ,创建 ExporterChangeableWrapper 对象
25:                 exporter = new ExporterChangeableWrapper<T>((Exporter<T>) protocol.export(invokerDelegete), originInvoker);
26:                 // 添加到 `bounds`
27:                 bounds.put(key, exporter);
28:             }
29:         }
30:     }
31:     return exporter;
32: }
第  

/**

第 13 行:调用 #getCacheKey(originInvoker) 方法,获得在 bounds 中的缓存 Key 。代码如下:

/**
 * Get the key cached in bounds by invoker
 *
 * 获 取invoker 在 bounds中 缓存的key
 *
 * @param originInvoker 原始 Invoker
 * @return url 字符串
 */
private String getCacheKey(final Invoker<?> originInvoker) {
    URL providerUrl = getProviderUrl(originInvoker);
    return providerUrl.removeParameters("dynamic", "enabled").toFullString();
}
  • 第 14 至 18 行:从 bounds 中,获得已经暴露过的 ExporterChangeableWrapper 对象。

  • 第 20 至 28 行:未暴露过,进行暴露服务。

    • 第 22 行:调用 #getProviderUrl(originInvoker) 方法,获得服务提供者的 URL 。

    • 第 22 行:创建 com.alibaba.dubbo.registry.integration.RegistryProtocol.InvokerDelegete 对象。

      • InvokerDelegete 实现 com.alibaba.dubbo.rpc.protocol.InvokerWrapper 类,主要增加了 #getInvoker() 方法,获得真实的,非 InvokerDelegete 的 Invoker 对象。因为,可能会存在 InvokerDelegete.invoker 也是 InvokerDelegete 类型的情况。

    • 第 25 行:调用 DubboProtocol#export(invoker) 方法,暴露服务,返回 Exporter 对象。

      • 「4.3 DubboProtocol」 中,详细分享。

      • 🙂 若此若是其他协议,若调用对应协议的 XXXProtocol#export(invoker) 方法。

    • 第 25 行:使用【创建的 Exporter 对象】+【originInvoker】,创建 ExporterChangeableWrapper 对象。这样,originInvoker 就和 Exporter 对象,形成了绑定的关系。

    • 第 27 行:添加到 bounds

3.3.3 openServer


友情提示:本小节的内容,胖友先看过 《精尽 Dubbo 源码分析 —— NIO 服务器》 所有的文章。

#openServer(url) 方法,启动通信服务器。代码如下:

#openServer(url) 方法,启动通信服务器。代码如下:

/**
 * 通信客户端集合
 *
 * key: 服务器地址。格式为:host:port
 */
private final Map<String, ReferenceCountExchangeClient> referenceClientMap = new ConcurrentHashMap<String, ReferenceCountExchangeClient>(); // <host:port,Exchanger>

  1: /**
  2:  * 启动服务器
  3:  *
  4:  * @param url URL
  5:  */
  6: private void openServer(URL url) {
  7:     // find server.
  8:     String key = url.getAddress();
  9:     //client can export a service which's only for server to invoke
 10:     boolean isServer = url.getParameter(Constants.IS_SERVER_KEY, true); // isserver
 11:     if (isServer) {
 12:         ExchangeServer server = serverMap.get(key);
 13:         if (server == null) {
 14:             serverMap.put(key, createServer(url));
 15:         } else {
 16:             // server supports reset, use together with override
 17:             server.reset(url);
 18:         }
 19:     }
 20: }
  • 第 8 行:获得服务器地址。

  • 第 10 行:配置项 isserver ,可以暴露一个仅当前 JVM 可调用的服务。目前该配置项已经不存在。

  • 第 12 行:从 serverMap 获得对应服务器地址已存在的通信服务器。即,不重复创建

  • 第 13 至 14 行:通信服务器不存在,调用 #createServer(url) 方法,创建服务器。

  • 第 15 至 18 行:通信服务器已存在,调用 Server#reset(url) 方法,重置服务器的属性。

    • 为什么会存在呢?因为键是 host:port ,那么例如,多个 Service 共用同一个 Protocol ,服务器是同一个对象。

3.3.4 createServer

#createServer(url) 方法,创建并启动通信服务器。代码如下:

 1: private ExchangeServer createServer(URL url) {
 2:     // 默认开启 server 关闭时发送 READ_ONLY 事件
 3:     // send readonly event when server closes, it's enabled by default
 4:     url = url.addParameterIfAbsent(Constants.CHANNEL_READONLYEVENT_SENT_KEY, Boolean.TRUE.toString());
 5:     // 默认开启 heartbeat
 6:     // enable heartbeat by default
 7:     url = url.addParameterIfAbsent(Constants.HEARTBEAT_KEY, String.valueOf(Constants.DEFAULT_HEARTBEAT));
 8: 
 9:     // 校验 Server 的 Dubbo SPI 拓展是否存在
10:     String str = url.getParameter(Constants.SERVER_KEY, Constants.DEFAULT_REMOTING_SERVER);
11:     if (str != null && str.length() > 0 && !ExtensionLoader.getExtensionLoader(Transporter.class).hasExtension(str)) {
12:         throw new RpcException("Unsupported server type: " + str + ", url: " + url);
13:     }
14: 
15:     // 设置编解码器为 `"Dubbo"` 
16:     url = url.addParameter(Constants.CODEC_KEY, DubboCodec.NAME);
17: 
18:     // 启动服务器
19:     ExchangeServer server;
20:     try {
21:         server = Exchangers.bind(url, requestHandler);
22:     } catch (RemotingException e) {
23:         throw new RpcException("Fail to start server(url: " + url + ") " + e.getMessage(), e);
24:     }
25: 
26:     // 校验 Client 的 Dubbo SPI 拓展是否存在
27:     str = url.getParameter(Constants.CLIENT_KEY);
28:     if (str != null && str.length() > 0) {
29:         Set<String> supportedTypes = ExtensionLoader.getExtensionLoader(Transporter.class).getSupportedExtensions();
30:         if (!supportedTypes.contains(str)) {
31:             throw new RpcException("Unsupported client type: " + str);
32:         }
33:     }
34:     return server;
35: }
  • 第 4 行:默认开启 Server 关闭时,发送 READ_ONLY 事件。

  • 第 7 行:默认开启心跳功能。

  • 第 9 至 13 行:校验配置的 Server 的 Dubbo SPI 拓展是否存在。若不存在,抛出 RpcException 异常。

  • 第 16 行:设置编解码器为 "Dubbo" 协议,即 DubboCountCodec 。

  • 第 18 至 24 行:调用 Exchangers#bind(url, handler) 方法,启动服务器。具体 requestHanlder 属性值的实现,我们放在 Dubbo 协议下的 RPC 的文章里分享。requestHanlder 属性的简化代码如下:

private ExchangeHandler requestHandler = new ExchangeHandlerAdapter() {

    public Object reply(ExchangeChannel channel, Object message) throws RemotingException {
        // .... 省略具体实现代码
        return invoker.invoke(inv);
    }
}
  • 第 26 至 33 行:校验配置的 Client 的 Dubbo SPI 拓展是否存在。若不存在,抛出 RpcException 异常。默认情况下,未配置,所以不会校验。

  • 第 34 行:返回通信服务器。

4. Exporter

本文涉及的 Exporter 类图如下:

4.1 ExporterChangeableWrapper

com.alibaba.dubbo.registry.integration.RegistryProtocol.ExporterChangeableWrapper ,实现 Exporter 接口,Exporter 可变的包装器。

  • 建立【返回的 Exporter】与【Protocol export 出的 Exporter】的对应关系。

  • 在 override 时可以进行关系修改。

代码如下:

private class ExporterChangeableWrapper<T> implements Exporter<T> {

    /**
     * 原 Invoker 对象
     */
    private final Invoker<T> originInvoker;
    /**
     * 暴露的 Exporter 对象
     */
    private Exporter<T> exporter;

    public ExporterChangeableWrapper(Exporter<T> exporter, Invoker<T> originInvoker) {
        this.exporter = exporter;
        this.originInvoker = originInvoker;
    }

    public Invoker<T> getOriginInvoker() {
        return originInvoker;
    }

    public Invoker<T> getInvoker() {
        return exporter.getInvoker();
    }

    public void setExporter(Exporter<T> exporter) {
        this.exporter = exporter;
    }

    public void unexport() {
        String key = getCacheKey(this.originInvoker);
        // 移除出 `bounds`
        bounds.remove(key);
        // 取消暴露
        exporter.unexport();
    }
}

4.2 DestroyableExporter

com.alibaba.dubbo.registry.integration.RegistryProtocol.DestroyableExporter ,实现 Exporter 接口,可销毁的 Exporter 实现类。

private class ExporterChangeableWrapper<T> implements Exporter<T> {

    /**
     * 原 Invoker 对象
     */
    private final Invoker<T> originInvoker;
    /**
     * 暴露的 Exporter 对象
     */
    private Exporter<T> exporter;

    public ExporterChangeableWrapper(Exporter<T> exporter, Invoker<T> originInvoker) {
        this.exporter = exporter;
        this.originInvoker = originInvoker;
    }

    public Invoker<T> getOriginInvoker() {
        return originInvoker;
    }

    @Override
    public Invoker<T> getInvoker() {
        return exporter.getInvoker();
    }

    // 可以重新设置 Exporter 对象
    public void setExporter(Exporter<T> exporter) {
        this.exporter = exporter;
    }

    @Override
    public void unexport() {
        String key = getCacheKey(this.originInvoker);
        // 移除出 `bounds`
        bounds.remove(key);
        // 取消暴露
        exporter.unexport();
    }

}

4.3 DubboExporter

com.alibaba.dubbo.rpc.protocol.dubbo.DubboExporter ,实现 AbstractExporter 抽象类,Dubbo Exporter 实现类。代码如下:

public class DubboExporter<T> extends AbstractExporter<T> {

    /**
     * 服务键
     */
    private final String key;
    /**
     * Exporter 集合
     *
     * key: 服务键
     *
     * 该值实际就是 {@link com.alibaba.dubbo.rpc.protocol.AbstractProtocol#exporterMap}
     */
    private final Map<String, Exporter<?>> exporterMap;

    public DubboExporter(Invoker<T> invoker, String key, Map<String, Exporter<?>> exporterMap) {
        super(invoker);
        this.key = key;
        this.exporterMap = exporterMap;
    }

    @Override
    public void unexport() {
        // 取消暴露
        super.unexport();
        // 移除
        exporterMap.remove(key);
    }

}

  • key 属性,服务键。

  • #exporterMap 属性,Exporter 集合。在上文 DubboProtocol#export(invoker) 方法中,我们可以看到,该属性就是 AbstractProtocol.exporterMap 属性。

    • 构造方法发起暴露,将自己添加到 exporterMap 中。

    • #unexport() 方法,取消暴露,将自己移除出 exporterMap 中。