网络异常,图片无法展示
|
网络异常,图片无法展示
|
Cluster 文章中我们提及到会从 Directory 中获取 invoker 列表
网络异常,图片无法展示
|
AbstractDirectory 没有封装什么特别的逻辑
protected RouterChain<T> routerChain; 复制代码
内部持有一个路由链
@Override public List<Invoker<T>> list(Invocation invocation) throws RpcException { if (destroyed) { throw new RpcException("Directory already destroyed .url: " + getUrl()); } return doList(invocation); } protected abstract List<Invoker<T>> doList(Invocation invocation) throws RpcException; 复制代码
实现接口的 list 接口、抽象方法 doList 给子类实现
先看简单的 StaticDirectory
在多注册中心的时候、我们会创建 StaticDirectory 保存
if (registryURL != null) { // registry url is available // for multi-subscription scenario, use 'zone-aware' policy by default URL u = registryURL.addParameterIfAbsent(CLUSTER_KEY, ZoneAwareCluster.NAME); // The invoker wrap relation would be like: ZoneAwareClusterInvoker(StaticDirectory) -> FailoverClusterInvoker(RegistryDirectory, routing happens here) -> Invoker invoker = CLUSTER.join(new StaticDirectory(u, invokers)); } else { // not a registry url, must be direct invoke. invoker = CLUSTER.join(new StaticDirectory(invokers)); } 复制代码
这里要返回的是接口的类型、所以直接获取 invoker 列表的第一个、返回对应的接口就行了
public Class<T> getInterface() { return invokers.get(0).getInterface(); } 复制代码
这里就是返回对应的 invoker 列表、如果路由链不为 null、那么就根据路由配置筛选之后再返回 invoker 列表
@Override protected List<Invoker<T>> doList(Invocation invocation) throws RpcException { List<Invoker<T>> finalInvokers = invokers; if (routerChain != null) { try { finalInvokers = routerChain.route(getConsumerUrl(), invocation); } catch (Throwable t) { logger.error("Failed to execute router: " + getUrl() + ", cause: " + t.getMessage(), t); } } return finalInvokers == null ? Collections.emptyList() : finalInvokers; } 复制代码
RegistryDirectory 不单继承了 AbstractDirectory 还实现了接口 NotifyListener。因为它是动态更新服务提供者列表以及动态配置的、所以这里就涉及到订阅和监听
class RegistryDirectory<T> extends AbstractDirectory<T> implements NotifyListener { 复制代码
网络异常,图片无法展示
|
订阅和取消订阅
public void subscribe(URL url) { setConsumerUrl(url); CONSUMER_CONFIGURATION_LISTENER.addNotifyListener(this); serviceConfigurationListener = new ReferenceConfigurationListener(this, url); registry.subscribe(url, this); } public void unSubscribe(URL url) { setConsumerUrl(null); CONSUMER_CONFIGURATION_LISTENER.removeNotifyListener(this); serviceConfigurationListener.stop(); registry.unsubscribe(url, this); } 复制代码
RegistryDiretory 自己也作为一个监听器传递过去
public synchronized void notify(List<URL> urls) { Map<String, List<URL>> categoryUrls = urls.stream() .filter(Objects::nonNull) .filter(this::isValidCategory) .filter(this::isNotCompatibleFor26x) .collect(Collectors.groupingBy(this::judgeCategory)); List<URL> configuratorURLs = categoryUrls.getOrDefault(CONFIGURATORS_CATEGORY, Collections.emptyList()); this.configurators = Configurator.toConfigurators(configuratorURLs).orElse(this.configurators); List<URL> routerURLs = categoryUrls.getOrDefault(ROUTERS_CATEGORY, Collections.emptyList()); toRouters(routerURLs).ifPresent(this::addRouters); // providers List<URL> providerURLs = categoryUrls.getOrDefault(PROVIDERS_CATEGORY, Collections.emptyList()); /** * 3.x added for extend URL address */ ExtensionLoader<AddressListener> addressListenerExtensionLoader = ExtensionLoader.getExtensionLoader(AddressListener.class); List<AddressListener> supportedListeners = addressListenerExtensionLoader.getActivateExtension(getUrl(), (String[]) null); if (supportedListeners != null && !supportedListeners.isEmpty()) { for (AddressListener addressListener : supportedListeners) { providerURLs = addressListener.notify(providerURLs, getConsumerUrl(),this); } } refreshOverrideAndInvoker(providerURLs); } 复制代码
private void refreshOverrideAndInvoker(List<URL> urls) { // mock zookeeper://xxx?mock=return null overrideDirectoryUrl(); refreshInvoker(urls); } 复制代码
private void refreshInvoker(List<URL> invokerUrls) { Assert.notNull(invokerUrls, "invokerUrls should not be null"); if (invokerUrls.size() == 1 && invokerUrls.get(0) != null && EMPTY_PROTOCOL.equals(invokerUrls.get(0).getProtocol())) { this.forbidden = true; // Forbid to access this.invokers = Collections.emptyList(); routerChain.setInvokers(this.invokers); destroyAllInvokers(); // Close all invokers } else { this.forbidden = false; // Allow to access Map<String, Invoker<T>> oldUrlInvokerMap = this.urlInvokerMap; // local reference if (invokerUrls == Collections.<URL>emptyList()) { invokerUrls = new ArrayList<>(); } if (invokerUrls.isEmpty() && this.cachedInvokerUrls != null) { invokerUrls.addAll(this.cachedInvokerUrls); } else { this.cachedInvokerUrls = new HashSet<>(); this.cachedInvokerUrls.addAll(invokerUrls);//Cached invoker urls, convenient for comparison } if (invokerUrls.isEmpty()) { return; } Map<String, Invoker<T>> newUrlInvokerMap = toInvokers(invokerUrls);// Translate url list to Invoker map /** * If the calculation is wrong, it is not processed. * * 1. The protocol configured by the client is inconsistent with the protocol of the server. * eg: consumer protocol = dubbo, provider only has other protocol services(rest). * 2. The registration center is not robust and pushes illegal specification data. * */ if (CollectionUtils.isEmptyMap(newUrlInvokerMap)) { logger.error(new IllegalStateException("urls to invokers error .invokerUrls.size :" + invokerUrls.size() + ", invoker.size :0. urls :" + invokerUrls .toString())); return; } List<Invoker<T>> newInvokers = Collections.unmodifiableList(new ArrayList<>(newUrlInvokerMap.values())); // pre-route and build cache, notice that route cache should build on original Invoker list. // toMergeMethodInvokerMap() will wrap some invokers having different groups, those wrapped invokers not should be routed. routerChain.setInvokers(newInvokers); this.invokers = multiGroup ? toMergeInvokerList(newInvokers) : newInvokers; this.urlInvokerMap = newUrlInvokerMap; try { destroyUnusedInvokers(oldUrlInvokerMap, newUrlInvokerMap); // Close the unused Invoker } catch (Exception e) { logger.warn("destroyUnusedInvokers error. ", e); } } } 复制代码
这个在消费者启动创建服务代理的时候曾经说过一下
对于 router 的擦不是参数、将其包装成路由规则、然后更新本地路由信息、会忽略 empty 开头的 URL
对于 configurator 类的参数、解释并根线本地的 Configurator 配置
对于 invoker 参数、如果是 empty 协议的 url、则会禁用该服务、并销毁本地缓存的 Invoker。将新的 URL 和本地的进行合并、找出差异的老 Invoker 并将其销毁。