在Dubbo框架中,服务暴露是核心功能之一,它允许服务提供者将服务发布出去,供服务消费者调用。服务暴露主要分为本地暴露和远程暴露两种方式。下面将详细分析Dubbo中的远程暴露过程。
远程暴露流程
服务暴露的分类:
Dubbo支持两种服务暴露类型:远程暴露和本地暴露。远程暴露是指将服务发布到远程注册中心,供消费者调用。
服务暴露全流程:
服务暴露的流程大致可以分为三个步骤:检测配置、暴露服务(包括本地和远程)、注册服务至注册中心。
服务实现类转换:
从对象构建转换的角度来看,服务暴露可以分为将服务实现类转换成
Invoker
,再将Invoker
通过具体的协议转换成Exporter
。
远程暴露实现原理:
服务提供者在启动时,将服务实现类通过协议进行网络传输,发布到注册中心上。服务消费者在启动时,从注册中心上获取到服务提供者的地址,然后通过协议进行网络通信,调用服务提供者的方法。
远程暴露源码分析:
在
ServiceConfig
类的doExportUrlsFor1Protocol
方法中,根据scope
的配置,如果不是local
,则会进行远程服务的暴露。远程服务暴露时,会遍历注册中心的URL,为每个注册中心生成一个
Invoker
,然后通过协议的export
方法将Invoker
导出为Exporter
,完成服务的远程暴露。
注册服务至注册中心:
在远程暴露的过程中,服务提供者会将服务URL注册到注册中心,这样服务消费者可以从注册中心动态获取服务提供者列表。
监控中心的集成:
在服务暴露的过程中,如果配置了监控中心,Dubbo还会将监控中心的URL添加到服务URL中,以便监控服务的调用情况。
多协议支持:
Dubbo允许配置多协议,在不同服务上支持不同协议或者同一服务上同时支持多种协议。
通过上述分析,我们可以看到Dubbo的远程暴露过程涉及到服务的包装、协议的选择、注册中心的集成以及监控中心的配置等多个方面,这些共同确保了服务能够被正确地发布和调用。
Dubbo 是一个高性能的 Java RPC 框架,广泛用于构建分布式服务架构。在 Dubbo 中,服务暴露是将本地服务发布到远程,使得其他服务可以调用的过程。本文将详细分析 Dubbo 服务暴露的远程暴露部分。
1. 服务暴露流程概述
Dubbo 的服务暴露主要包括以下几个步骤:
本地暴露:将服务注册到本地服务目录。
远程暴露:将服务注册到注册中心,使得其他服务可以发现并调用。
2. 远程暴露的实现
2.1 服务导出器(Exporter)
在 Dubbo 中,服务暴露的核心类是 Exporter
,它负责将服务导出到网络上。Exporter
的实现类通常是 ProviderModel
的一部分,ProviderModel
保存了服务提供者的相关信息。
2.2 服务导出过程
服务导出过程主要涉及以下几个类和方法:
Protocol:协议层接口,负责服务的暴露和引用。
Registry:注册中心接口,负责服务的注册和发现。
Exporter:服务导出器接口,负责将服务导出到网络上。
Invoker:服务调用器接口,负责服务的调用。
2.3 关键类和方法
2.3.1 Protocol 接口
Protocol
接口是 Dubbo 的核心接口之一,负责服务的暴露和引用。主要方法包括:
export(Invoker<T> invoker)
:将服务导出到网络上。refer(Class<T> type, URL url)
:从网络上引用服务。
public interface Protocol {
<T> Exporter<T> export(Invoker<T> invoker) throws RpcException;
<T> Invoker<T> refer(Class<T> type, URL url) throws RpcException;
void destroy();
}
2.3.2 RegistryProtocol 类
RegistryProtocol
类是 Protocol
接口的一个实现,负责将服务注册到注册中心。关键方法包括:
export(Invoker<T> invoker)
:将服务导出到注册中心。
public class RegistryProtocol implements Protocol {
@Override
public <T> Exporter<T> export(final Invoker<T> originInvoker) throws RpcException {
// 创建代理 Invoker
final Invoker<T> invokerDelegate = new InvokerDelegationWrapper<>(originInvoker);
// 获取服务的 URL
URL registryUrl = getRegistryUrl(originInvoker);
// 创建注册中心
Registry registry = registryFactory.getRegistry(registryUrl);
// 创建服务导出器
ExporterChangeableWrapper<T> exporter = doLocalExport(invokerDelegate, originInvoker.getUrl());
// 注册服务到注册中心
register(registry, invokerDelegate, registryUrl);
return exporter;
}
private <T> ExporterChangeableWrapper<T> doLocalExport(final Invoker<T> originInvoker, URL url) {
// 创建本地导出器
final Exporter<T> exporter = protocol.export(new Invoker<T>() {
@Override
public Class<T> getInterface() {
return originInvoker.getInterface();
}
@Override
public URL getUrl() {
return url;
}
@Override
public boolean isAvailable() {
return originInvoker.isAvailable();
}
@Override
public Result invoke(Invocation invocation) throws RpcException {
return originInvoker.invoke(invocation);
}
@Override
public void destroy() {
originInvoker.destroy();
}
});
return new ExporterChangeableWrapper<>(exporter, originInvoker);
}
private <T> void register(Registry registry, Invoker<T> invoker, URL registryUrl) {
// 注册服务到注册中心
registry.register(buildRegisterUrl(invoker));
}
}
2.3.3 InvokerDelegationWrapper 类
InvokerDelegationWrapper
类是对 Invoker
的包装,用于在服务调用时添加额外的逻辑。
public class InvokerDelegationWrapper<T> implements Invoker<T> {
private final Invoker<T> invoker;
public InvokerDelegationWrapper(Invoker<T> invoker) {
this.invoker = invoker;
}
@Override
public Class<T> getInterface() {
return invoker.getInterface();
}
@Override
public URL getUrl() {
return invoker.getUrl();
}
@Override
public boolean isAvailable() {
return invoker.isAvailable();
}
@Override
public Result invoke(Invocation invocation) throws RpcException {
return invoker.invoke(invocation);
}
@Override
public void destroy() {
invoker.destroy();
}
}
2.3.4 ExporterChangeableWrapper 类
ExporterChangeableWrapper
类是对 Exporter
的包装,用于在服务导出时添加额外的逻辑。
public class ExporterChangeableWrapper<T> implements Exporter<T> {
private final Exporter<T> exporter;
private final Invoker<T> invoker;
public ExporterChangeableWrapper(Exporter<T> exporter, Invoker<T> invoker) {
this.exporter = exporter;
this.invoker = invoker;
}
@Override
public Invoker<T> getInvoker() {
return invoker;
}
@Override
public void unexport() {
exporter.unexport();
}
}
3. 服务注册到注册中心
在 RegistryProtocol
类的 export
方法中,服务会被注册到注册中心。注册中心的实现可以是 Zookeeper、Nacos 等。
private <T> void register(Registry registry, Invoker<T> invoker, URL registryUrl) {
// 构建注册 URL
URL registerUrl = buildRegisterUrl(invoker);
// 注册服务到注册中心
registry.register(registerUrl);
}
4. 总结
通过上述分析,我们可以看到 Dubbo 的服务暴露过程主要分为以下几个步骤:
创建代理 Invoker:对原始 Invoker 进行包装,添加额外的逻辑。
获取注册中心 URL:根据服务的 URL 获取注册中心的 URL。
创建注册中心:根据注册中心的 URL 创建注册中心实例。
创建本地导出器:将服务导出到本地,创建本地导出器。
注册服务到注册中心:将服务注册到注册中心,使得其他服务可以发现并调用。
通过这些步骤,Dubbo 实现了服务的远程暴露,使得分布式系统中的各个服务可以互相调用。希望这些内容对你理解 Dubbo 的服务暴露机制有所帮助。
以下是对 Dubbo 源码中服务暴露之远程暴露部分的详细分析:
一、远程暴露整体流程概述
Dubbo 的远程暴露是将本地服务以可被远程调用的方式发布出去的过程,主要涉及到多个关键步骤和组件的协同工作。
配置加载与解析:首先,Dubbo 会从配置文件(如 XML 配置、properties 配置等)或者通过注解等方式获取服务相关的配置信息。这些配置包括服务接口、实现类、注册中心地址、通信协议、端口号等。例如,对于一个简单的 HelloWorld 服务,配置可能指定了服务接口为
HelloWorldService
,实现类为HelloWorldServiceImpl
,注册中心使用 Zookeeper 且给出了其具体地址,通信协议为 Dubbo 协议,端口号为 20880 等。服务包装与代理生成:Dubbo 会对服务实现类进行包装,生成一个代理类。这个代理类实现了服务接口,并且在内部封装了远程调用的逻辑。当客户端调用服务接口的方法时,实际上是调用了这个代理类的相应方法。代理类的生成通常使用了 Java 的动态代理机制(如果是接口方式)或者字节码生成技术(如 CGLIB,如果是类方式)。例如,对于
HelloWorldService
实现类,会生成一个类似HelloWorldServiceProxy
的代理类,它隐藏了服务实际是如何进行远程通信和调用的细节,对外呈现的就是一个普通的服务接口实现的样子。注册中心交互:接着,服务提供者会将服务的相关信息(如服务名称、接口、可用的 IP 地址和端口等)注册到注册中心(如 Zookeeper)。这一步骤使得服务消费者能够在注册中心发现该服务的存在。例如,在 Zookeeper 中,服务提供者会在特定的路径下创建节点来存储服务信息,以便消费者后续查找。
网络通信层准备:Dubbo 会根据配置的通信协议(如 Dubbo 协议、HTTP 协议等)准备相应的网络通信层。这涉及到创建服务器端的监听端口,等待客户端的连接请求。以 Dubbo 协议为例,会创建一个基于 Netty 等网络框架的服务器端监听程序,设置好端口号(如 20880),准备接收客户端的远程调用请求。
服务暴露完成:当上述步骤都完成后,服务就成功地以远程可调用的方式暴露出去了。此时,服务消费者就可以通过注册中心发现该服务,并通过网络通信层与服务提供者进行远程交互,实现对服务的调用。
二、关键代码分析
(一)服务包装与代理生成
// 假设这是Dubbo中生成代理类的一种常见方式示例
ProxyFactory proxyFactory = ExtensionLoader.getExtensionLoader(ProxyFactory.class).getExtension("jdk");
// 根据服务实现类和接口生成代理类
Object proxy = proxyFactory.getProxy(serviceBean, clazz);
这里首先通过 ExtensionLoader
获取到 ProxyFactory
的扩展实现(这里选择了 jdk
扩展,即使用 Java 动态代理),然后调用其 getProxy
方法,传入服务实现类(serviceBean
)和服务接口类(clazz
)来生成代理类。
对于使用 CGLIB 生成代理类(针对类方式的服务)的情况,代码逻辑类似,但会使用 CGLIB 相关的类和方法来实现字节码生成和代理创建。
(二)注册中心交互
Dubbo 与注册中心(以 Zookeeper 为例)交互的核心代码在 Registry
相关类和 ZookeeperRegistry
等具体实现类中。
// 获取注册中心实例,假设已经配置好Zookeeper注册中心地址等信息
Registry registry = ExtensionLoader.getExtensionLoader(Registry.class).getExtension("zookeeper");
// 注册服务信息到注册中心
registry.register(serviceUrl);
首先通过 ExtensionLoader
获取到 Registry
的 zookeeper
扩展实现,得到注册中心实例。然后调用其 register
方法,传入服务的 URL 信息(serviceUrl
),这个 URL 包含了服务的名称、接口、IP 地址、端口等关键信息,以便在 Zookeeper 中准确地注册该服务。
(三)网络通信层准备
以 Dubbo 协议采用 Netty 作为网络框架为例,在 DubboProtocol
等相关类中会有如下关键代码:
// 创建基于Netty的服务器端监听程序
Server server = Exporter<?> exporter = protocol.export(protocol.createExporter(service, url));
// 设置监听端口等参数
server.bind(url.getPort());
这里首先通过 protocol
(如 DubboProtocol
)创建一个 Exporter
,它将服务和服务 URL 关联起来,然后通过 server
(基于 Netty 的服务器)进行绑定操作,设置好监听端口(从 url
中获取),准备接收客户端的远程调用请求。
三、远程暴露过程中的一些重要机制
(一)服务治理机制
在远程暴露过程中,Dubbo 通过注册中心实现了服务治理功能。服务提供者将服务信息注册到注册中心后,服务消费者可以通过注册中心获取服务的最新信息,包括服务是否可用、有多少个提供者等。同时,注册中心还可以实现服务的动态上下线感知。例如,当一个服务提供者出现故障下线时,注册中心能够及时通知到相关的服务消费者,使得消费者可以采取相应的措施,如切换到其他可用的服务提供者。
(二)协议适配机制
Dubbo 支持多种通信协议,如 Dubbo 协议、HTTP 协议等。在远程暴露时,根据配置的协议,Dubbo 会自动适配相应的网络通信层实现。例如,如果配置为 Dubbo 协议,就会采用 Netty 等网络框架来搭建服务器端监听程序;如果配置为 HTTP 协议,就会采用合适的 HTTP 服务器实现(如 Jetty 等)来处理客户端的请求。这种协议适配机制使得 Dubbo 能够灵活地满足不同场景下的远程通信需求。
(三)负载均衡机制
当有多个服务提供者暴露了相同的服务时,Dubbo 会在服务消费者端采用负载均衡机制来分配调用请求。在远程暴露过程中,服务提供者将自己的信息注册到注册中心,服务消费者在发现这些服务提供者后,会根据配置的负载均衡策略(如随机、轮询、加权轮询等)来决定将请求发送到哪个服务提供者。例如,采用随机策略时,服务消费者每次调用会随机选择一个服务提供者进行远程调用。
通过以上对 Dubbo 源码中服务暴露之远程暴露部分的分析,可以更深入地了解 Dubbo 是如何将本地服务以远程可调用的方式发布出去的,以及其中涉及的关键技术和机制。
1. 概述
在 《精尽 Dubbo 源码分析 —— 服务暴露(一)之本地暴露(Injvm)》 一文中,我们已经分享了本地暴露服务。在本文中,我们来分享远程暴露服务。在 Dubbo 中提供多种协议( Protocol ) 的实现,大体流程一致,本文以 Dubbo Protocol 为例子,这也是 Dubbo 的默认协议。
特性
缺省协议,使用基于 mina 1.1.7
和 hessian 3.2.1
的 remoting 交互。
连接个数:单连接
连接方式:长连接
传输协议:TCP
传输方式:NIO 异步传输
序列化:Hessian 二进制序列化
适用范围:传入传出参数数据包较小(建议小于100K),消费者比提供者个数多,单一消费者无法压满提供者,尽量不要用 dubbo 协议传输大文件或超大字符串。
适用场景:常规远程服务方法调用
相比本地暴露,远程暴露会多做如下几件事情:
启动通信服务器,绑定服务端口,提供远程调用。
向注册中心注册服务提供者,提供服务消费者从注册中心发现服务。
2. 远程暴露
远程暴露服务的顺序图如下:
在 #doExportUrlsFor1Protocol(protocolConfig, registryURLs)
方法中,涉及远程暴露服务的代码如下:
1: // 服务远程暴露
2: // export to remote if the config is not local (export to local only when config is local)
3: if (!Constants.SCOPE_LOCAL.toString().equalsIgnoreCase(scope)) {
4: if (logger.isInfoEnabled()) {
5: logger.info("Export dubbo service " + interfaceClass.getName() + " to url " + url);
6: }
7: if (registryURLs != null && !registryURLs.isEmpty()) {
8: for (URL registryURL : registryURLs) {
9: // "dynamic" :服务是否动态注册,如果设为false,注册后将显示后disable状态,需人工启用,并且服务提供者停止时,也不会自动取消册,需人工禁用。
10: url = url.addParameterIfAbsent("dynamic", registryURL.getParameter("dynamic"));
11: // 获得监控中心 URL
12: URL monitorUrl = loadMonitor(registryURL); // TODO 芋艿,监控
13: if (monitorUrl != null) {
14: url = url.addParameterAndEncoded(Constants.MONITOR_KEY, monitorUrl.toFullString());
15: }
16: if (logger.isInfoEnabled()) {
17: logger.info("Register dubbo service " + interfaceClass.getName() + " url " + url + " to registry " + registryURL);
18: }
19: // 使用 ProxyFactory 创建 Invoker 对象
20: Invoker<?> invoker = proxyFactory.getInvoker(ref, (Class) interfaceClass, registryURL.addParameterAndEncoded(Constants.EXPORT_KEY, url.toFullString()));
21:
22: // 创建 DelegateProviderMetaDataInvoker 对象
23: DelegateProviderMetaDataInvoker wrapperInvoker = new DelegateProviderMetaDataInvoker(invoker, this);
24:
25: // 使用 Protocol 暴露 Invoker 对象
26: Exporter<?> exporter = protocol.export(wrapperInvoker);
27: // 添加到 `exporters`
28: exporters.add(exporter);
29: }
30: } else { // 用于被服务消费者直连服务提供者,参见文档 http://dubbo.apache.org/zh-cn/docs/user/demos/explicit-target.html 。主要用于开发测试环境使用。
31: // 使用 ProxyFactory 创建 Invoker 对象
32: Invoker<?> invoker = proxyFactory.getInvoker(ref, (Class) interfaceClass, url);
33:
34: // 创建 DelegateProviderMetaDataInvoker 对象
35: DelegateProviderMetaDataInvoker wrapperInvoker = new DelegateProviderMetaDataInvoker(invoker, this);
36:
37: // 使用 Protocol 暴露 Invoker 对象
38: Exporter<?> exporter = protocol.export(wrapperInvoker);
39: // 添加到 `exporters`
40: exporters.add(exporter);
41: }
42: }
第 30 至 41 行:大体和【第 7 至 29 行】逻辑相同。差别在于,当配置注册中心为
"N/A"
时,表示即使远程暴露服务,也不向注册中心注册。这种方式用于被服务消费者直连服务提供者,参见 《Dubbo 用户指南 —— 直连提供者》 文档。在开发及测试环境下,经常需要绕过注册中心,只测试指定服务提供者,这时候可能需要点对点直连,点对点直联方式,将以服务接口为单位,忽略注册中心的提供者列表,A 接口配置点对点,不影响 B 接口从注册中心获取列表。
第 8 行:循环祖册中心 URL 数组
registryURLs
。第 10 行:
"dynamic"
配置项,服务是否动态注册。如果设为 false ,注册后将显示后 disable 状态,需人工启用,并且服务提供者停止时,也不会自动取消册,需人工禁用。第 12 行:调用
#loadMonitor(registryURL)
方法,获得监控中心 URL 。🙂 在 「2.1 loadMonitor」 小节,详细解析。
第 13 至 15 行:调用
URL#addParameterAndEncoded(key, value)
方法,将监控中心的 URL 作为"monitor"
参数添加到服务提供者的 URL 中,并且需要编码。通过这样的方式,服务提供者的 URL 中,包含了监控中心的配置。第 20 行:调用
URL#addParameterAndEncoded(key, value)
方法,将服务体用这的 URL 作为"export"
参数添加到注册中心的 URL 中。通过这样的方式,注册中心的 URL 中,包含了服务提供者的配置。第 20 行:调用
ProxyFactory#getInvoker(proxy, type, url)
方法,创建 Invoker 对象。该 Invoker 对象,执行#invoke(invocation)
方法时,内部会调用 Service 对象(ref
)对应的调用方法。🙂 详细的实现,后面单独写文章分享。
😈 为什么传递的是注册中心的 URL 呢?下文会详细解析。
第 23 行:创建
com.alibaba.dubbo.config.invoker.DelegateProviderMetaDataInvoker
对象。该对象在 Invoker 对象的基础上,增加了当前服务提供者 ServiceConfig 对象。第 26 行:调用
Protocol#export(invoker)
方法,暴露服务。此处 Dubbo SPI 自适应的特性的好处就出来了,可以自动根据 URL 参数,获得对应的拓展实现。例如,
invoker
传入后,根据invoker.url
自动获得对应 Protocol 拓展实现为 DubboProtocol 。实际上,Protocol 有两个 Wrapper 拓展实现类: ProtocolFilterWrapper、ProtocolListenerWrapper 。所以,
#export(...)
方法的调用顺序是:Protocol$Adaptive => ProtocolFilterWrapper => ProtocolListenerWrapper => RegistryProtocol
=>
Protocol$Adaptive => ProtocolFilterWrapper => ProtocolListenerWrapper => DubboProtocol
也就是说,这一条大的调用链,包含两条小的调用链。原因是:
首先,传入的是注册中心的 URL ,通过 Protocol$Adaptive 获取到的是 RegistryProtocol 对象。
其次,RegistryProtocol 会在其
#export(...)
方法中,使用服务提供者的 URL ( 即注册中心的 URL 的export
参数值),再次调用 Protocol$Adaptive 获取到的是 DubboProtocol 对象,进行服务暴露。
为什么是这样的顺序?通过这样的顺序,可以实现类似 AOP 的效果,在本地服务器启动完成后,再向注册中心注册。伪代码如下:
RegistryProtocol#export(...) {
// 1. 启动本地服务器
DubboProtocol#export(...);
// 2. 向注册中心注册。
}
这也是为什么上文提到的 “为什么传递的是注册中心的 URL 呢?” 的原因。
🙂 如果无法理解,在 「3. Protocol」 在解析代码,进一步理顺。
第 28 行:添加到
exporters
集合中。
2.1 loadMonitor
友情提示,监控中心不是本文的重点,简单分享下该方法的逻辑。
可直接跳过。
#loadMonitor(registryURL)
方法,加载监控中心 com.alibaba.dubbo.common.URL
数组。代码如下:
1: /**
2: * 加载监控中心 URL
3: *
4: * @param registryURL 注册中心 URL
5: * @return 监控中心 URL
6: */
7: protected URL loadMonitor(URL registryURL) {
8: // 从 属性配置 中加载配置到 MonitorConfig 对象。
9: if (monitor == null) {
10: String monitorAddress = ConfigUtils.getProperty("dubbo.monitor.address");
11: String monitorProtocol = ConfigUtils.getProperty("dubbo.monitor.protocol");
12: if ((monitorAddress == null || monitorAddress.length() == 0) && (monitorProtocol == null || monitorProtocol.length() == 0)) {
13: return null;
14: }
15:
16: monitor = new MonitorConfig();
17: if (monitorAddress != null && monitorAddress.length() > 0) {
18: monitor.setAddress(monitorAddress);
19: }
20: if (monitorProtocol != null && monitorProtocol.length() > 0) {
21: monitor.setProtocol(monitorProtocol);
22: }
23: }
24: appendProperties(monitor);
25: // 添加 `interface` `dubbo` `timestamp` `pid` 到 `map` 集合中
26: Map<String, String> map = new HashMap<String, String>();
27: map.put(Constants.INTERFACE_KEY, MonitorService.class.getName());
28: map.put("dubbo", Version.getVersion());
29: map.put(Constants.TIMESTAMP_KEY, String.valueOf(System.currentTimeMillis()));
30: if (ConfigUtils.getPid() > 0) {
31: map.put(Constants.PID_KEY, String.valueOf(ConfigUtils.getPid()));
32: }
33: // 将 MonitorConfig ,添加到 `map` 集合中。
34: appendParameters(map, monitor);
35: // 获得地址
36: String address = monitor.getAddress();
37: String sysaddress = System.getProperty("dubbo.monitor.address");
38: if (sysaddress != null && sysaddress.length() > 0) {
39: address = sysaddress;
40: }
41: // 直连监控中心服务器地址
42: if (ConfigUtils.isNotEmpty(address)) {
43: // 若不存在 `protocol` 参数,默认 "dubbo" 添加到 `map` 集合中。
44: if (!map.containsKey(Constants.PROTOCOL_KEY)) {
45: if (ExtensionLoader.getExtensionLoader(MonitorFactory.class).hasExtension("logstat")) {
46: map.put(Constants.PROTOCOL_KEY, "logstat");
47: } else {
48: map.put(Constants.PROTOCOL_KEY, "dubbo");
49: }
50: }
51: // 解析地址,创建 Dubbo URL 对象。
52: return UrlUtils.parseURL(address, map);
53: // 从注册中心发现监控中心地址
54: } else if (Constants.REGISTRY_PROTOCOL.equals(monitor.getProtocol()) && registryURL != null) {
55: return registryURL.setProtocol("dubbo").addParameter(Constants.PROTOCOL_KEY, "registry").addParameterAndEncoded(Constants.REFER_KEY, StringUtils.toQueryString(map));
56: }
57: return null;
58: }
第 8 至 24 行:从属性配置中,加载配置到 MonitorConfig 对象。
第 25 至 32 行:添加
interface
dubbo
timestamp
pid
到map
集合中。第 34 行:调用
#appendParameters(map, config)
方法,将 MonitorConfig ,添加到map
集合中。第 35 至 40 行:获得监控中心的地址。
第 42 行:当
address
非空时,直连监控中心服务器地址的情况。第 43 至 50 行:若不存在
protocol
参数,缺省默认为 “dubbo” ,并添加到map
集合中。第 44 至 55 行:可以忽略。因为,
logstat
这个拓展实现已经不存在。
第 52 行:调用
UrlUtils#parseURL(address, map)
方法,解析address
,创建 Dubbo URL 对象。🙂 已经添加了代码注释,胖友点击链接查看。
第 54 至 56 行:当
protocol = registry
时,并且注册中心 URL 非空时,从注册中心发现监控中心地址。以registryURL
为基础,创建 URL :protocol = dubbo
parameters.protocol = registry
parameters.refer = map
第 57 行:无注册中心,返回空。
ps :后续会有文章,详细分享。
3. Protocol
本文涉及的 Protocol 类图如下:
3.1 ProtocolFilterWrapper
接 《精尽 Dubbo 源码分析 —— 服务暴露(一)之本地暴露(Injvm)》「 3.2 ProtocolFilterWrapper」 小节。
#export(invoker)
方法,代码如下:
1: public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
2: // 注册中心
3: if (Constants.REGISTRY_PROTOCOL.equals(invoker.getUrl().getProtocol())) {
4: return protocol.export(invoker);
5: }
6: // 建立带有 Filter 过滤链的 Invoker ,再暴露服务。
7: return protocol.export(buildInvokerChain(invoker, Constants.SERVICE_FILTER_KEY, Constants.PROVIDER));
8: }
第 2 至 5 行:当
invoker.url.protocl = registry
,注册中心的 URL ,无需创建 Filter 过滤链。第 7 行:调用
#buildInvokerChain(invoker, key, group)
方法,创建带有 Filter 过滤链的 Invoker 对象。第 7 行:调用
protocol#export(invoker)
方法,继续暴露服务。在 RegistryProtocol 中,会调用
DubboProtocol#export(...)
方法时,会走【第 7 行】的流程。
3.2 RegistryProtocol
com.alibaba.dubbo.registry.integration.RegistryProtocol
,实现 Protocol 接口,注册中心协议实现类。
3.2.1 属性
属性相关,代码如下:
友情提示,仅包含本文涉及的属性。
// ... 省略部分和本文无关的属性。
/**
* 单例。在 Dubbo SPI 中,被初始化,有且仅有一次。
*/
private static RegistryProtocol INSTANCE;
/**
* 绑定关系集合。
*
* key:服务 Dubbo URL
*/
// To solve the problem of RMI repeated exposure port conflicts, the services that have been exposed are no longer exposed.
// 用于解决rmi重复暴露端口冲突的问题,已经暴露过的服务不再重新暴露
// providerurl <--> exporter
private final Map<String, ExporterChangeableWrapper<?>> bounds = new ConcurrentHashMap<String, ExporterChangeableWrapper<?>>();
/**
* Protocol 自适应拓展实现类,通过 Dubbo SPI 自动注入。
*/
private Protocol protocol;
/**
* RegistryFactory 自适应拓展实现类,通过 Dubbo SPI 自动注入。
*/
private RegistryFactory registryFactory;
public RegistryProtocol() {
INSTANCE = this;
}
public static RegistryProtocol getRegistryProtocol() {
if (INSTANCE == null) {
ExtensionLoader.getExtensionLoader(Protocol.class).getExtension(Constants.REGISTRY_PROTOCOL); // load
}
return INSTANCE;
}
INSTANCE
静态属性,单例。通过 Dubbo SPI 加载创建,有且仅有一次。#getRegistryProtocol()
静态方法,获得单例。
bounds
属性,绑定关系集合。其中,Key 为服务提供者 URL 。protocol
属性,Protocol 自适应拓展实现类,通过 Dubbo SPI 自动注入。registryFactory
属性,自适应拓展实现类,通过 Dubbo SPI 自动注入。用于创建注册中心 Registry 对象。
3.2.2 export
本文涉及的 #export(invoker)
方法,代码如下:
1: public <T> Exporter<T> export(final Invoker<T> originInvoker) throws RpcException {
2: // 暴露服务
3: // export invoker
4: final ExporterChangeableWrapper<T> exporter = doLocalExport(originInvoker);
5:
6: // 获得注册中心 URL
7: URL registryUrl = getRegistryUrl(originInvoker);
8:
9: // 获得注册中心对象
10: // registry provider
11: final Registry registry = getRegistry(originInvoker);
12:
13: // 获得服务提供者 URL
14: final URL registedProviderUrl = getRegistedProviderUrl(originInvoker);
15:
16: //to judge to delay publish whether or not
17: boolean register = registedProviderUrl.getParameter("register", true);
18:
19: // 向本地注册表,注册服务提供者
20: ProviderConsumerRegTable.registerProvider(originInvoker, registryUrl, registedProviderUrl);
21:
22: // 向注册中心注册服务提供者(自己)
23: if (register) {
24: register(registryUrl, registedProviderUrl);
25: ProviderConsumerRegTable.getProviderWrapper(originInvoker).setReg(true); // 标记向本地注册表的注册服务提供者,已经注册
26: }
27:
28: // 使用 OverrideListener 对象,订阅配置规则
29: // Subscribe the override data
30: // FIXME When the provider subscribes, it will affect the scene : a certain JVM exposes the service and call the same service. Because the subscribed is cached key with the name of the service, it causes the subscription information to cover.
31: final URL overrideSubscribeUrl = getSubscribedOverrideUrl(registedProviderUrl);
32: final OverrideListener overrideSubscribeListener = new OverrideListener(overrideSubscribeUrl, originInvoker);
33: overrideListeners.put(overrideSubscribeUrl, overrideSubscribeListener);
34: registry.subscribe(overrideSubscribeUrl, overrideSubscribeListener);
35: //Ensure that a new exporter instance is returned every time export
36: return new DestroyableExporter<T>(exporter, originInvoker, overrideSubscribeUrl, registedProviderUrl);
37: }
第 4 行:调用
#doLocalExport(invoker)
方法,暴露服务。第 7 行:调用
#getRegistryUrl(originInvoker)
方法,获得注册中心 URL 。代码如下:
/**
* 获得注册中心 URL
*
* @param originInvoker 原始 Invoker
* @return URL
*/
private URL getRegistryUrl(Invoker<?> originInvoker) {
URL registryUrl = originInvoker.getUrl();
if (Constants.REGISTRY_PROTOCOL.equals(registryUrl.getProtocol())) { // protocol
String protocol = registryUrl.getParameter(Constants.REGISTRY_KEY, Constants.DEFAULT_DIRECTORY);
registryUrl = registryUrl.setProtocol(protocol).removeParameter(Constants.REGISTRY_KEY);
}
return registryUrl;
}
该过程是我们在 《精尽 Dubbo 源码分析 —— 服务暴露(一)之本地暴露(Injvm)》「2.1 loadRegistries」 的那张图的反向流程,即红线部分 :
getRegistryUrl
第 11 行:获得注册中心对象。
第 14 行:调用
#getRegistedProviderUrl(originInvoker)
方法,获得服务提供者 URL 。代码如下:
private URL getRegistedProviderUrl(final Invoker<?> originInvoker) {
// 从注册中心的 export 参数中,获得服务提供者的 URL
URL providerUrl = getProviderUrl(originInvoker);
//The address you see at the registry
return providerUrl.removeParameters(getFilteredKeys(providerUrl)) // 移除 .hide 为前缀的参数
.removeParameter(Constants.MONITOR_KEY) // monitor
.removeParameter(Constants.BIND_IP_KEY) // bind.ip
.removeParameter(Constants.BIND_PORT_KEY) // bind.port
.removeParameter(QOS_ENABLE) // qos.enable
.removeParameter(QOS_PORT) // qos.port
.removeParameter(ACCEPT_FOREIGN_IP); // qos.accept.foreign.ip
}
private URL getProviderUrl(final Invoker<?> origininvoker) {
String export = origininvoker.getUrl().getParameterAndDecoded(Constants.EXPORT_KEY); // export
if (export == null || export.length() == 0) {
throw new IllegalArgumentException("The registry export url is null! registry: " + origininvoker.getUrl());
}
return URL.valueOf(export);
}
private static String[] getFilteredKeys(URL url) {
Map<String, String> params = url.getParameters();
if (params != null && !params.isEmpty()) {
List<String> filteredKeys = new ArrayList<String>();
for (Map.Entry<String, String> entry : params.entrySet()) {
if (entry != null && entry.getKey() != null && entry.getKey().startsWith(Constants.HIDE_KEY_PREFIX)) {
filteredKeys.add(entry.getKey());
}
}
return filteredKeys.toArray(new String[filteredKeys.size()]);
} else {
return new String[]{};
}
}
重点,从注册中心的 URL 中获得
export
参数对应的值,即服务提供者的 URL 。移除多余的参数。因为,这些参数注册到注册中心没有实际的用途。
第 17 行:配置项
register
,服务提供者是否注册到配置中心。第 20 行:调用
ProviderConsumerRegTable#registerProvider(invoker, registryUrl)
方法,向本地注册表,注册服务提供者。第 24 行:调用
#register(registryUrl, registedProviderUrl)
方法,向注册中心注册服务提供者(自己)。代码如下:
public void register(URL registryUrl, URL registedProviderUrl) {
Registry registry = registryFactory.getRegistry(registryUrl);
registry.register(registedProviderUrl);
}
调用
RegistryService#register(url)
方法,在 《精尽 Dubbo 源码分析 —— 注册中心(一)之抽象 API》「3. RegistryService」 ,有详细解析。
第 25 行:标记向本地注册表的注册服务提供者,已经注册。
第 28 至 34 行:使用 OverrideListener 对象,订阅配置规则。详细解析,见 《精尽 Dubbo 源码解析 —— 集群容错(六)之 Configurator 实现》 中。
第 36 行:创建 DestroyableExporter 对象。
3.2.3 doLocalExport
#doLocalExport()
方法,暴露服务。此处的 Local 指的是,本地启动服务,但是不包括向注册中心注册服务的意思。代码如下:
1: /**
2: * 暴露服务。
3: *
4: * 此处的 Local 指的是,本地启动服务,但是不包括向注册中心注册服务的意思。
5: *
6: * @param originInvoker 原始 Invoker
7: * @param <T> 泛型
8: * @return Exporter 对象
9: */
10: @SuppressWarnings("unchecked")
11: private <T> ExporterChangeableWrapper<T> doLocalExport(final Invoker<T> originInvoker) {
12: // 获得在 `bounds` 中的缓存 Key
13: String key = getCacheKey(originInvoker);
14: // 从 `bounds` 获得,是不是已经暴露过服务
15: ExporterChangeableWrapper<T> exporter = (ExporterChangeableWrapper<T>) bounds.get(key);
16: if (exporter == null) {
17: synchronized (bounds) {
18: exporter = (ExporterChangeableWrapper<T>) bounds.get(key);
19: // 未暴露过,进行暴露服务
20: if (exporter == null) {
21: // 创建 Invoker Delegate 对象
22: final Invoker<?> invokerDelegete = new InvokerDelegete<T>(originInvoker, getProviderUrl(originInvoker));
23: // 暴露服务,创建 Exporter 对象
24: // 使用 创建的Exporter对象 + originInvoker ,创建 ExporterChangeableWrapper 对象
25: exporter = new ExporterChangeableWrapper<T>((Exporter<T>) protocol.export(invokerDelegete), originInvoker);
26: // 添加到 `bounds`
27: bounds.put(key, exporter);
28: }
29: }
30: }
31: return exporter;
32: }
第
/**
第 13 行:调用 #getCacheKey(originInvoker)
方法,获得在 bounds
中的缓存 Key 。代码如下:
/**
* Get the key cached in bounds by invoker
*
* 获 取invoker 在 bounds中 缓存的key
*
* @param originInvoker 原始 Invoker
* @return url 字符串
*/
private String getCacheKey(final Invoker<?> originInvoker) {
URL providerUrl = getProviderUrl(originInvoker);
return providerUrl.removeParameters("dynamic", "enabled").toFullString();
}
第 14 至 18 行:从
bounds
中,获得已经暴露过的 ExporterChangeableWrapper 对象。第 20 至 28 行:未暴露过,进行暴露服务。
第 22 行:调用
#getProviderUrl(originInvoker)
方法,获得服务提供者的 URL 。第 22 行:创建
com.alibaba.dubbo.registry.integration.RegistryProtocol.InvokerDelegete
对象。InvokerDelegete 实现
com.alibaba.dubbo.rpc.protocol.InvokerWrapper
类,主要增加了#getInvoker()
方法,获得真实的,非 InvokerDelegete 的 Invoker 对象。因为,可能会存在InvokerDelegete.invoker
也是 InvokerDelegete 类型的情况。
第 25 行:调用
DubboProtocol#export(invoker)
方法,暴露服务,返回 Exporter 对象。在 「4.3 DubboProtocol」 中,详细分享。
🙂 若此若是其他协议,若调用对应协议的
XXXProtocol#export(invoker)
方法。
第 25 行:使用【创建的 Exporter 对象】+【
originInvoker
】,创建 ExporterChangeableWrapper 对象。这样,originInvoker
就和 Exporter 对象,形成了绑定的关系。🙂 ExporterChangeableWrapper 在 「4.1 ExporterChangeableWrapper」 看详细代码。
第 27 行:添加到
bounds
。
3.3.3 openServer
友情提示:本小节的内容,胖友先看过 《精尽 Dubbo 源码分析 —— NIO 服务器》 所有的文章。
#openServer(url)
方法,启动通信服务器。代码如下:
#openServer(url)
方法,启动通信服务器。代码如下:
/**
* 通信客户端集合
*
* key: 服务器地址。格式为:host:port
*/
private final Map<String, ReferenceCountExchangeClient> referenceClientMap = new ConcurrentHashMap<String, ReferenceCountExchangeClient>(); // <host:port,Exchanger>
1: /**
2: * 启动服务器
3: *
4: * @param url URL
5: */
6: private void openServer(URL url) {
7: // find server.
8: String key = url.getAddress();
9: //client can export a service which's only for server to invoke
10: boolean isServer = url.getParameter(Constants.IS_SERVER_KEY, true); // isserver
11: if (isServer) {
12: ExchangeServer server = serverMap.get(key);
13: if (server == null) {
14: serverMap.put(key, createServer(url));
15: } else {
16: // server supports reset, use together with override
17: server.reset(url);
18: }
19: }
20: }
第 8 行:获得服务器地址。
第 10 行:配置项
isserver
,可以暴露一个仅当前 JVM 可调用的服务。目前该配置项已经不存在。第 12 行:从
serverMap
获得对应服务器地址已存在的通信服务器。即,不重复创建。第 13 至 14 行:通信服务器不存在,调用
#createServer(url)
方法,创建服务器。第 15 至 18 行:通信服务器已存在,调用
Server#reset(url)
方法,重置服务器的属性。为什么会存在呢?因为键是
host:port
,那么例如,多个 Service 共用同一个 Protocol ,服务器是同一个对象。
3.3.4 createServer
#createServer(url)
方法,创建并启动通信服务器。代码如下:
1: private ExchangeServer createServer(URL url) {
2: // 默认开启 server 关闭时发送 READ_ONLY 事件
3: // send readonly event when server closes, it's enabled by default
4: url = url.addParameterIfAbsent(Constants.CHANNEL_READONLYEVENT_SENT_KEY, Boolean.TRUE.toString());
5: // 默认开启 heartbeat
6: // enable heartbeat by default
7: url = url.addParameterIfAbsent(Constants.HEARTBEAT_KEY, String.valueOf(Constants.DEFAULT_HEARTBEAT));
8:
9: // 校验 Server 的 Dubbo SPI 拓展是否存在
10: String str = url.getParameter(Constants.SERVER_KEY, Constants.DEFAULT_REMOTING_SERVER);
11: if (str != null && str.length() > 0 && !ExtensionLoader.getExtensionLoader(Transporter.class).hasExtension(str)) {
12: throw new RpcException("Unsupported server type: " + str + ", url: " + url);
13: }
14:
15: // 设置编解码器为 `"Dubbo"`
16: url = url.addParameter(Constants.CODEC_KEY, DubboCodec.NAME);
17:
18: // 启动服务器
19: ExchangeServer server;
20: try {
21: server = Exchangers.bind(url, requestHandler);
22: } catch (RemotingException e) {
23: throw new RpcException("Fail to start server(url: " + url + ") " + e.getMessage(), e);
24: }
25:
26: // 校验 Client 的 Dubbo SPI 拓展是否存在
27: str = url.getParameter(Constants.CLIENT_KEY);
28: if (str != null && str.length() > 0) {
29: Set<String> supportedTypes = ExtensionLoader.getExtensionLoader(Transporter.class).getSupportedExtensions();
30: if (!supportedTypes.contains(str)) {
31: throw new RpcException("Unsupported client type: " + str);
32: }
33: }
34: return server;
35: }
第 4 行:默认开启 Server 关闭时,发送 READ_ONLY 事件。
第 7 行:默认开启心跳功能。
第 9 至 13 行:校验配置的 Server 的 Dubbo SPI 拓展是否存在。若不存在,抛出 RpcException 异常。
第 16 行:设置编解码器为
"Dubbo"
协议,即 DubboCountCodec 。第 18 至 24 行:调用
Exchangers#bind(url, handler)
方法,启动服务器。具体requestHanlder
属性值的实现,我们放在 Dubbo 协议下的 RPC 的文章里分享。requestHanlder
属性的简化代码如下:
private ExchangeHandler requestHandler = new ExchangeHandlerAdapter() {
public Object reply(ExchangeChannel channel, Object message) throws RemotingException {
// .... 省略具体实现代码
return invoker.invoke(inv);
}
}
第 26 至 33 行:校验配置的 Client 的 Dubbo SPI 拓展是否存在。若不存在,抛出 RpcException 异常。默认情况下,未配置,所以不会校验。
第 34 行:返回通信服务器。
4. Exporter
本文涉及的 Exporter 类图如下:
4.1 ExporterChangeableWrapper
com.alibaba.dubbo.registry.integration.RegistryProtocol.ExporterChangeableWrapper
,实现 Exporter 接口,Exporter 可变的包装器。
建立【返回的 Exporter】与【Protocol export 出的 Exporter】的对应关系。
在 override 时可以进行关系修改。
代码如下:
private class ExporterChangeableWrapper<T> implements Exporter<T> {
/**
* 原 Invoker 对象
*/
private final Invoker<T> originInvoker;
/**
* 暴露的 Exporter 对象
*/
private Exporter<T> exporter;
public ExporterChangeableWrapper(Exporter<T> exporter, Invoker<T> originInvoker) {
this.exporter = exporter;
this.originInvoker = originInvoker;
}
public Invoker<T> getOriginInvoker() {
return originInvoker;
}
public Invoker<T> getInvoker() {
return exporter.getInvoker();
}
public void setExporter(Exporter<T> exporter) {
this.exporter = exporter;
}
public void unexport() {
String key = getCacheKey(this.originInvoker);
// 移除出 `bounds`
bounds.remove(key);
// 取消暴露
exporter.unexport();
}
}
4.2 DestroyableExporter
com.alibaba.dubbo.registry.integration.RegistryProtocol.DestroyableExporter
,实现 Exporter 接口,可销毁的 Exporter 实现类。
private class ExporterChangeableWrapper<T> implements Exporter<T> {
/**
* 原 Invoker 对象
*/
private final Invoker<T> originInvoker;
/**
* 暴露的 Exporter 对象
*/
private Exporter<T> exporter;
public ExporterChangeableWrapper(Exporter<T> exporter, Invoker<T> originInvoker) {
this.exporter = exporter;
this.originInvoker = originInvoker;
}
public Invoker<T> getOriginInvoker() {
return originInvoker;
}
@Override
public Invoker<T> getInvoker() {
return exporter.getInvoker();
}
// 可以重新设置 Exporter 对象
public void setExporter(Exporter<T> exporter) {
this.exporter = exporter;
}
@Override
public void unexport() {
String key = getCacheKey(this.originInvoker);
// 移除出 `bounds`
bounds.remove(key);
// 取消暴露
exporter.unexport();
}
}
Exporter 代理,建立原始的 Invoker 与
Protocol#export(Invoker<T> invoker)
方法返回的 Exporter 的对应关系。在配置规则发生变化时,可调用
#setExporter(Exporter<T>)
方法,修改对应关系。详细解析,见 《精尽 Dubbo 源码解析 —— 集群容错(六)之 Configurator 实现》 。
4.3 DubboExporter
com.alibaba.dubbo.rpc.protocol.dubbo.DubboExporter
,实现 AbstractExporter 抽象类,Dubbo Exporter 实现类。代码如下:
public class DubboExporter<T> extends AbstractExporter<T> {
/**
* 服务键
*/
private final String key;
/**
* Exporter 集合
*
* key: 服务键
*
* 该值实际就是 {@link com.alibaba.dubbo.rpc.protocol.AbstractProtocol#exporterMap}
*/
private final Map<String, Exporter<?>> exporterMap;
public DubboExporter(Invoker<T> invoker, String key, Map<String, Exporter<?>> exporterMap) {
super(invoker);
this.key = key;
this.exporterMap = exporterMap;
}
@Override
public void unexport() {
// 取消暴露
super.unexport();
// 移除
exporterMap.remove(key);
}
}
key
属性,服务键。#exporterMap
属性,Exporter 集合。在上文DubboProtocol#export(invoker)
方法中,我们可以看到,该属性就是AbstractProtocol.exporterMap
属性。构造方法,发起暴露,将自己添加到
exporterMap
中。#unexport()
方法,取消暴露,将自己移除出exporterMap
中。