4 服务发现
在学习了服务暴露原理之后 , 接下来重点探讨服务是如何消费的 。 这里主要讲解如何通过注册中心进行服务发现进行远程服务调用等细节 。
4.1 服务发现流程
在详细探讨服务暴露细节之前 , 我们先看一下整体duubo的服务消费原理
在整体上看 , Dubbo 框架做服务消费也分为两大部分 , 第一步通过持有远程服务实例生成Invoker, 这个 Invoker 在客户端是核心的远程代理对象 。 第二步会把 Invoker 通过动态代理转换成实现用户接口的动态代理引用 。
服务消费方引用服务的蓝色初始化链,时序图如下:
4.2 源码分析
(1) 引用入口
服务引用的入口方法为 ReferenceBean 的 getObject 方法,该方法定义在 Spring 的 FactoryBean 接口中,ReferenceBean 实现了这个方法。
public Object getObject() throws Exception { return get(); } public synchronized T get() { // 检测 ref 是否为空,为空则通过 init 方法创建 if (ref == null) { // init 方法主要用于处理配置,以及调用 createProxy 生成代理类 init(); } return ref; }
Dubbo 提供了丰富的配置,用于调整和优化框架行为,性能等。Dubbo 在引用或导出服务时,首先会对这些配置进行检查和处理,以保证配置的正确性。
private void init() { // 创建代理类 ref = createProxy(map); }
此方法代码很长,主要完成的配置加载,检查,以及创建引用的代理对象。这里要从 createProxy 开始看起。从字面意思上来看,createProxy 似乎只是用于创建代理对象的。但实际上并非如此,该方法还会调用其他方法构建以及合并 Invoker 实例。具体细节如下。
private T createProxy(Map<String, String> map) { URL tmpUrl = new URL("temp", "localhost", 0, map); ........... isDvmRefer = InjvmProtocol . getlnjvmProtocol( ) . islnjvmRefer(tmpUrl) // 本地引用略 if (isJvmRefer) { } else { // 点对点调用略 if (url != null && url.length() > 0) { } else { // 加载注册中心 url List<URL> us = loadRegistries(false); if (us != null && !us.isEmpty()) { for (URL u : us) { URL monitorUrl = loadMonitor(u); if (monitorUrl != null) { map.put(Constants.MONITOR_KEY, URL.encode(monitorUrl.toFullString())); } // 添加 refer 参数到 url 中,并将 url 添加到 urls 中 urls.add(u.addParameterAndEncoded(Constants.REFER_KEY, StringUtils.toQueryString(map))); } } } // 单个注册中心或服务提供者(服务直连,下同) if (urls.size() == 1) { // 调用 RegistryProtocol 的 refer 构建 Invoker 实例 invoker = refprotocol.refer(interfaceClass, urls.get(0)); // 多个注册中心或多个服务提供者,或者两者混合 } else { List<Invoker<?>> invokers = new ArrayList<Invoker<?>>(); URL registryURL = null; // 获取所有的 Invoker for (URL url : urls) { // 通过 refprotocol 调用 refer 构建 Invoker,refprotocol 会在运行时 // 根据 url 协议头加载指定的 Protocol 实例,并调用实例的 refer 方法 invokers.add(refprotocol.refer(interfaceClass, url)); if (Constants.REGISTRY_PROTOCOL.equals(url.getProtocol())) { registryURL = url; } } if (registryURL != null) { // 如果注册中心链接不为空,则将使用 AvailableCluster URL u = registryURL.addParameter(Constants.CLUSTER_KEY, AvailableCluster.NAME); // 创建 StaticDirectory 实例,并由 Cluster 对多个 Invoker 进行合并 invoker = cluster.join(new StaticDirectory(u, invokers)); } else { invoker = cluster.join(new StaticDirectory(invokers)); } } } //省略无关代码 // 真正根据invoker 生成代理类 return (T) proxyFactory.getProxy(invoker); }
上面代码很多,不过逻辑比较清晰。
1、如果是本地调用,直接jvm 协议从内存中获取实例
2、如果只有一个注册中心,直接通过 Protocol 自适应拓展类构建 Invoker 实例接口
3、如果有多个注册中心,此时先根据 url 构建 Invoker。然后再通过 Cluster 合并多个 Invoker,最后调用 ProxyFactory 生成代理类
(2) 创建客户端
在服务消费方,Invoker 用于执行远程调用。Invoker 是由 Protocol 实现类构建而来。Protocol 实现类有很多,这里分析DubboProtocol
1、在AbstractProtocol.refer
调用protocolBindingRefer
@Override public <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException { return new AsyncToSyncInvoker<>(protocolBindingRefer(type, url)); }
查看DubboInvoker
中的refer
方法
public <T> Invoker<T> refer(Class<T> serviceType, URL url) throws RpcException { optimizeSerialization(url); // 创建 DubboInvoker DubboInvoker<T> invoker = new DubboInvoker<T>(serviceType, url, getClients(url), invokers); invokers.add(invoker); return invoker; }
上面方法看起来比较简单,创建一个DubboInvoker。通过构造方法传入远程调用的client对象。默认情况下,Dubbo 使用 NettyClient 进行通信。接下来,我们简单看一下 getClients 方法的逻辑。
private ExchangeClient[] getClients(URL url) { // 是否共享连接 boolean service_share_connect = false; // 获取连接数,默认为0,表示未配置 int connections = url.getParameter(Constants.CONNECTIONS_KEY, 0); // 如果未配置 connections,则共享连接 if (connections == 0) { service_share_connect = true; connections = 1; } ExchangeClient[] clients = new ExchangeClient[connections]; for (int i = 0; i < clients.length; i++) { if (service_share_connect) { // 获取共享客户端 clients[i] = getSharedClient(url); } else { // 初始化新的客户端 clients[i] = initClient(url); } } return clients; }
这里根据 connections 数量决定是获取共享客户端还是创建新的客户端实例,getSharedClient 方法中也会调用 initClient 方法,因此下面我们一起看一下这个方法。
private ExchangeClient initClient(URL url) { // 获取客户端类型,默认为 netty String str = url.getParameter(Constants.CLIENT_KEY, url.getParameter(Constants.SERVER_KEY, Constants.DEFAULT_REMOTING_CLIENT)); //省略无关代码 ExchangeClient client; try { // 获取 lazy 配置,并根据配置值决定创建的客户端类型 if (url.getParameter(Constants.LAZY_CONNECT_KEY, false)) { // 创建懒加载 ExchangeClient 实例 client = new LazyConnectExchangeClient(url, requestHandler); } else { // 创建普通 ExchangeClient 实例 client = Exchangers.connect(url, requestHandler); } } catch (RemotingException e) { throw new RpcException("Fail to create remoting client for service..."); } return client; }
initClient 方法首先获取用户配置的客户端类型,默认为 netty。下面我们分析一下 Exchangers 的 connect 方法。
public static ExchangeClient connect(URL url, ExchangeHandler handler) throws RemotingException { // 获取 Exchanger 实例,默认为 HeaderExchangeClient return getExchanger(url).connect(url, handler); }
如上,getExchanger 会通过 SPI 加载 HeaderExchangeClient 实例,这个方法比较简单,大家自己看一下吧。接下来分析 HeaderExchangeClient 的实现。
public ExchangeClient connect(URL url, ExchangeHandler handler) throws RemotingException { // 这里包含了多个调用,分别如下: // 1. 创建 HeaderExchangeHandler 对象 // 2. 创建 DecodeHandler 对象 // 3. 通过 Transporters 构建 Client 实例 // 4. 创建 HeaderExchangeClient 对象 return new HeaderExchangeClient(Transporters.connect(url, new DecodeHandler(new HeaderExchangeHandler(handler))), true); }
这里的调用比较多,我们这里重点看一下 Transporters 的 connect 方法。如下:
public static Client connect(URL url, ChannelHandler... handlers) throws RemotingException { if (url == null) { throw new IllegalArgumentException("url == null"); } ChannelHandler handler; if (handlers == null || handlers.length == 0) { handler = new ChannelHandlerAdapter(); } else if (handlers.length == 1) { handler = handlers[0]; } else { // 如果 handler 数量大于1,则创建一个 ChannelHandler 分发器 handler = new ChannelHandlerDispatcher(handlers); } // 获取 Transporter 自适应拓展类,并调用 connect 方法生成 Client 实例 return getTransporter().connect(url, handler); }
如上,getTransporter 方法返回的是自适应拓展类,该类会在运行时根据客户端类型加载指定的 Transporter 实现类。若用户未配置客户端类型,则默认加载 NettyTransporter,并调用该类的 connect 方法。
public Client connect(URL url, ChannelHandler listener) throws RemotingException { // 创建 NettyClient 对象 return new NettyClient(url, listener); }
(3) 注册
这里就已经创建好了NettyClient对象。关于 DubboProtocol 的 refer 方法就分析完了。接下来,继续分析 RegistryProtocol 的 refer 方法逻辑。
public <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException { // 取 registry 参数值,并将其设置为协议头 url = url.setProtocol(url.getParameter(Constants.REGISTRY_KEY, Constants.DEFAULT_REGISTRY)).removeParameter(Constants.REGISTRY_KEY); // 获取注册中心实例 Registry registry = registryFactory.getRegistry(url); if (RegistryService.class.equals(type)) { return proxyFactory.getInvoker((T) registry, type, url); } // 将 url 查询字符串转为 Map Map<String, String> qs = StringUtils.parseQueryString(url.getParameterAndDecoded(Constants.REFER_KEY)); // 获取 group 配置 String group = qs.get(Constants.GROUP_KEY); if (group != null && group.length() > 0) { if ((Constants.COMMA_SPLIT_PATTERN.split(group)).length > 1 || "*".equals(group)) { // 通过 SPI 加载 MergeableCluster 实例,并调用 doRefer 继续执行服务引用逻辑 return doRefer(getMergeableCluster(), registry, type, url); } } // 调用 doRefer 继续执行服务引用逻辑 return doRefer(cluster, registry, type, url); }
上面代码首先为 url 设置协议头,然后根据 url 参数加载注册中心实例。然后获取 group 配置,根据 group 配置决定 doRefer 第一个参数的类型。这里的重点是 doRefer 方法,如下:
private <T> Invoker<T> doRefer(Cluster cluster, Registry registry, Class<T> type, URL url) { // 创建 RegistryDirectory 实例 RegistryDirectory<T> directory = new RegistryDirectory<T>(type, url); // 设置注册中心和协议 directory.setRegistry(registry); directory.setProtocol(protocol); Map<String, String> parameters = new HashMap<String, String>(directory.getUrl().getParameters()); // 生成服务消费者链接 URL subscribeUrl = new URL(Constants.CONSUMER_PROTOCOL, parameters.remove(Constants.REGISTER_IP_KEY), 0, type.getName(), parameters); // 注册服务消费者,在 consumers 目录下新节点 if (!Constants.ANY_VALUE.equals(url.getServiceInterface()) && url.getParameter(Constants.REGISTER_KEY, true)) { registry.register(subscribeUrl.addParameters(Constants.CATEGORY_KEY, Constants.CONSUMERS_CATEGORY, Constants.CHECK_KEY, String.valueOf(false))); } // 订阅 providers、configurators、routers 等节点数据 directory.subscribe(subscribeUrl.addParameter(Constants.CATEGORY_KEY, Constants.PROVIDERS_CATEGORY + "," + Constants.CONFIGURATORS_CATEGORY + "," + Constants.ROUTERS_CATEGORY)); // 一个注册中心可能有多个服务提供者,因此这里需要将多个服务提供者合并为一个 Invoker invoker = cluster.join(directory); ProviderConsumerRegTable.registerConsumer(invoker, url, subscribeUrl, directory); return invoker; }
如上,doRefer 方法创建一个 RegistryDirectory 实例,然后生成服务者消费者链接,并向注册中心进行注册。注册完毕后,紧接着订阅 providers、configurators、routers 等节点下的数据。完成订阅后,RegistryDirectory 会收到这几个节点下的子节点信息。由于一个服务可能部署在多台服务器上,这样就会在 providers 产生多个节点,这个时候就需要 Cluster 将多个服务节点合并为一个,并生成一个 Invoker。
(4)创建代理对象
Invoker 创建完毕后,接下来要做的事情是为服务接口生成代理对象。有了代理对象,即可进行远程调用。代理对象生成的入口方法为 ProxyFactory 的 getProxy,接下来进行分析。
public <T> T getProxy(Invoker<T> invoker) throws RpcException { // 调用重载方法 return getProxy(invoker, false); } public <T> T getProxy(Invoker<T> invoker, boolean generic) throws RpcException { Class<?>[] interfaces = null; // 获取接口列表 String config = invoker.getUrl().getParameter("interfaces"); if (config != null && config.length() > 0) { // 切分接口列表 String[] types = Constants.COMMA_SPLIT_PATTERN.split(config); if (types != null && types.length > 0) { interfaces = new Class<?>[types.length + 2]; // 设置服务接口类和 EchoService.class 到 interfaces 中 interfaces[0] = invoker.getInterface(); interfaces[1] = EchoService.class; for (int i = 0; i < types.length; i++) { // 加载接口类 interfaces[i + 1] = ReflectUtils.forName(types[i]); } } } if (interfaces == null) { interfaces = new Class<?>[]{invoker.getInterface(), EchoService.class}; } // 为 http 和 hessian 协议提供泛化调用支持,参考 pull request #1827 if (!invoker.getInterface().equals(GenericService.class) && generic) { int len = interfaces.length; Class<?>[] temp = interfaces; // 创建新的 interfaces 数组 interfaces = new Class<?>[len + 1]; System.arraycopy(temp, 0, interfaces, 0, len); // 设置 GenericService.class 到数组中 interfaces[len] = GenericService.class; } // 调用重载方法 return getProxy(invoker, interfaces); } public abstract <T> T getProxy(Invoker<T> invoker, Class<?>[] types);
如上,上面大段代码都是用来获取 interfaces 数组的,我们继续往下看。getProxy(Invoker, Class<?>[]) 这个方法是一个抽象方法,下面我们到 JavassistProxyFactory 类中看一下该方法的实现代码。
public <T> T getProxy(Invoker<T> invoker, Class<?>[] interfaces) { // 生成 Proxy 子类(Proxy 是抽象类)。并调用 Proxy 子类的 newInstance 方法创建 Proxy 实例 return (T) Proxy.getProxy(interfaces).newInstance(new InvokerInvocationHandler(invoker)); }
上面代码并不多,首先是通过 Proxy 的 getProxy 方法获取 Proxy 子类,然后创建 InvokerInvocationHandler 对象,并将该对象传给 newInstance 生成 Proxy 实例。InvokerInvocationHandler 实现 JDK 的 InvocationHandler 接口,具体的用途是拦截接口类调用。下面以 org.apache.dubbo.demo.DemoService 这个接口为例,来看一下该接口代理类代码大致是怎样的(忽略 EchoService 接口)。
package org.apache.dubbo.common.bytecode; public class proxy0 implements org.apache.dubbo.demo.DemoService { public static java.lang.reflect.Method[] methods; private java.lang.reflect.InvocationHandler handler; public proxy0() { } public proxy0(java.lang.reflect.InvocationHandler arg0) { handler = $1; } public java.lang.String sayHello(java.lang.String arg0) { Object[] args = new Object[1]; args[0] = ($w) $1; Object ret = handler.invoke(this, methods[0], args); return (java.lang.String) ret; } }
好了,到这里代理类生成逻辑就分析完了。整个过程比较复杂,大家需要耐心看一下。
4.3 总结
- 从注册中心发现引用服务:在有注册中心,通过注册中心发现提供者地址的情况下,ReferenceConfig 解析出的 URL 格式为:
registry://registry-host:/org.apache.registry.RegistryService?refer=URL.encode("conumer-host/com.foo.FooService?version=1.0.0")
。
通过 URL 的registry://协议头识别,就会调用RegistryProtocol#refer()方法
- 查询提供者 URL,如
dubbo://service-host/com.foo.FooService?version=1.0.0
,来获取注册中心
- 创建一个 RegistryDirectory 实例并设置注册中心和协议
- 生成 conusmer 连接,在 consumer 目录下创建节点,向注册中心注册
- 注册完毕后,订阅 providers,configurators,routers 等节点的数据
- 通过 URL 的
dubbo://
协议头识别,调用DubboProtocol#refer()
方法,创建一个 ExchangeClient 客户端并返回 DubboInvoker 实例
- 由于一个服务可能会部署在多台服务器上,这样就会在 providers 产生多个节点,这样也就会得到多个 DubboInvoker 实例,就需要 RegistryProtocol 调用 Cluster 将多个服务提供者节点伪装成一个节点,并返回一个 Invoker
- Invoker 创建完毕后,调用 ProxyFactory 为服务接口生成代理对象,返回提供者引用