上一篇文章中、我们已经知道 Dubbo 会额外注册 ServiceBean 到 Spring 容器中、因为需要借助这个 ServiceBean 注册到服务中心
@Override public void postProcessBeanDefinitionRegistry(BeanDefinitionRegistry registry) throws BeansException { // @since 2.7.5 registerBeans(registry, DubboBootstrapApplicationListener.class); Set<String> resolvedPackagesToScan = resolvePackagesToScan(packagesToScan); if (!CollectionUtils.isEmpty(resolvedPackagesToScan)) { registerServiceBeans(resolvedPackagesToScan, registry); } } } 复制代码
registerBeans(registry, DubboBootstrapApplicationListener.class) 注册 DubboBootstrapApplicationListener 到 Spring 容器中
@Override public void onApplicationContextEvent(ApplicationContextEvent event) { if (event instanceof ContextRefreshedEvent) { onContextRefreshedEvent((ContextRefreshedEvent) event); } else if (event instanceof ContextClosedEvent) { onContextClosedEvent((ContextClosedEvent) event); } } private void onContextRefreshedEvent(ContextRefreshedEvent event) { dubboBootstrap.start(); } 复制代码
/** * Start the bootstrap */ public DubboBootstrap start() { if (started.compareAndSet(false, true)) { ready.set(false); initialize(); if (logger.isInfoEnabled()) { logger.info(NAME + " is starting..."); } // 1. export Dubbo Services exportServices(); // Not only provider register if (!isOnlyRegisterProvider() || hasExportedServices()) { // 2. export MetadataService exportMetadataService(); //3. Register the local ServiceInstance if required registerServiceInstance(); } referServices(); if (asyncExportingFutures.size() > 0) { new Thread(() -> { try { this.awaitFinish(); } catch (Exception e) { logger.warn(NAME + " exportAsync occurred an exception."); } ready.set(true); }).start(); } else { ready.set(true); } } return this; } 复制代码
其中 exportServices(); 就是将服务注册到注册中心
private void exportServices() { configManager.getServices().forEach(sc -> { // TODO, compatible with ServiceConfig.export() ServiceConfig serviceConfig = (ServiceConfig) sc; serviceConfig.setBootstrap(this); if (exportAsync) { ExecutorService executor = executorRepository.getServiceExporterExecutor(); Future<?> future = executor.submit(() -> { sc.export(); exportedServices.add(sc); }); asyncExportingFutures.add(future); } else { sc.export(); exportedServices.add(sc); } }); } 复制代码
那么什么时候 ServiceBean 加入到这个 configManager 里面?
在 AbstractConfig#addIntoConfigManager 中
@PostConstruct public void addIntoConfigManager() { ApplicationModel.getConfigManager().addConfig(this); } 复制代码
回到 export 方法中、我们得知它会调用 ServiceConfig 的 export 方法
public synchronized void export() { if (!shouldExport()) { return; } if (bootstrap == null) { bootstrap = DubboBootstrap.getInstance(); bootstrap.init(); } checkAndUpdateSubConfigs(); //init serviceMetadata serviceMetadata.setVersion(version); serviceMetadata.setGroup(group); serviceMetadata.setDefaultGroup(group); serviceMetadata.setServiceType(getInterfaceClass()); serviceMetadata.setServiceInterfaceName(getInterface()); serviceMetadata.setTarget(getRef()); if (shouldDelay()) { DELAY_EXPORT_EXECUTOR.schedule(this::doExport, getDelay(), TimeUnit.MILLISECONDS); } else { doExport(); } exported(); } 复制代码
这里我们没有进行延迟暴露、直接进入到 doExport 方法中
protected synchronized void doExport() { if (unexported) { throw new IllegalStateException("The service " + interfaceClass.getName() + " has already unexported!"); } if (exported) { return; } exported = true; if (StringUtils.isEmpty(path)) { path = interfaceName; } doExportUrls(); } 复制代码
没啥好说的、进入到 doExportUrls 中
protected synchronized void doExport() { if (unexported) { throw new IllegalStateException("The service " + interfaceClass.getName() + " has already unexported!"); } if (exported) { return; } exported = true; if (StringUtils.isEmpty(path)) { path = interfaceName; } doExportUrls(); } 复制代码
针对不同的协议将其注册到服务注册中心上
private void doExportUrls() { ServiceRepository repository = ApplicationModel.getServiceRepository(); ServiceDescriptor serviceDescriptor = repository.registerService(getInterfaceClass()); repository.registerProvider( getUniqueServiceName(), ref, serviceDescriptor, this, serviceMetadata ); List<URL> registryURLs = ConfigValidationUtils.loadRegistries(this, true); for (ProtocolConfig protocolConfig : protocols) { String pathKey = URL.buildKey(getContextPath(protocolConfig) .map(p -> p + "/" + path) .orElse(path), group, version); // In case user specified path, register service one more time to map it to path. repository.registerService(pathKey, interfaceClass); // TODO, uncomment this line once service key is unified serviceMetadata.setServiceKey(pathKey); doExportUrlsFor1Protocol(protocolConfig, registryURLs); } } 复制代码
主要涉及就是这三行代码
private void doExportUrlsFor1Protocol(ProtocolConfig protocolConfig, List<URL> registryURLs) { ......... Invoker<?> invoker = PROXY_FACTORY.getInvoker(ref, (Class) interfaceClass, registryURL.addParameterAndEncoded(EXPORT_KEY, url.toFullString())); DelegateProviderMetaDataInvoker wrapperInvoker = new DelegateProviderMetaDataInvoker(invoker, this); Exporter<?> exporter = PROTOCOL.export(wrapperInvoker); ......... } 复制代码
Invoker<?> invoker = PROXY_FACTORY.getInvoker(ref, (Class) interfaceClass, registryURL.addParameterAndEncoded(EXPORT_KEY, url.toFullString())); 复制代码
这个主要就是生成一个 Wrapper 。这个 Wrapper 比反射更加高效、因为它是强转类型然后直接调用方法。
当服务消费者来消费的时候就可以直接调用提供服务的 service 了、不需要进行反射
JavassistProxyFactory @Override public <T> Invoker<T> getInvoker(T proxy, Class<T> type, URL url) { // TODO Wrapper cannot handle this scenario correctly: the classname contains '$' final Wrapper wrapper = Wrapper.getWrapper(proxy.getClass().getName().indexOf('$') < 0 ? proxy.getClass() : type); return new AbstractProxyInvoker<T>(proxy, type, url) { @Override protected Object doInvoke(T proxy, String methodName, Class<?>[] parameterTypes, Object[] arguments) throws Throwable { return wrapper.invokeMethod(proxy, methodName, parameterTypes, arguments); } }; } 复制代码
生成的 Wrapper 部分代码如下
public Object invokeMethod(Object o, String n, Class[] p, Object[] v) throws java.lang.reflect.InvocationTargetException { com.demo.api.DemoService w; try { w = ((com.demo.api.DemoService) $1); } catch (Throwable e) { throw new IllegalArgumentException(e); } try { if ("sayHello".equals($2) && $3.length == 1) { return ($w) w.sayHello((java.lang.String) $4[0]); } } catch (Throwable e) { throw new java.lang.reflect.InvocationTargetException(e); } throw new org.apache.dubbo.common.bytecode.NoSuchMethodException("Not found method \"" + $2 + "\" in class com.demo.api.DemoService."); } 复制代码
至于 PROTOCOL.export(wrapperInvoker); 里面涉及到 Dubbo SPI 机制
RegistryProtocol
@Override public <T> Exporter<T> export(final Invoker<T> originInvoker) throws RpcException { URL registryUrl = getRegistryUrl(originInvoker); // url to export locally URL providerUrl = getProviderUrl(originInvoker); final URL overrideSubscribeUrl = getSubscribedOverrideUrl(providerUrl); final OverrideListener overrideSubscribeListener = new OverrideListener(overrideSubscribeUrl, originInvoker); overrideListeners.put(overrideSubscribeUrl, overrideSubscribeListener); providerUrl = overrideUrlWithConfig(providerUrl, overrideSubscribeListener); //export invoker final ExporterChangeableWrapper<T> exporter = doLocalExport(originInvoker, providerUrl); // url to registry final Registry registry = getRegistry(originInvoker); final URL registeredProviderUrl = getUrlToRegistry(providerUrl, registryUrl); // decide if we need to delay publish boolean register = providerUrl.getParameter(REGISTER_KEY, true); if (register) { register(registryUrl, registeredProviderUrl); } // register stated url on provider model registerStatedUrl(registryUrl, registeredProviderUrl, register); // Deprecated! Subscribe to override rules in 2.6.x or before. registry.subscribe(overrideSubscribeUrl, overrideSubscribeListener); exporter.setRegisterUrl(registeredProviderUrl); exporter.setSubscribeUrl(overrideSubscribeUrl); notifyExport(exporter); //Ensure that a new exporter instance is returned every time export return new DestroyableExporter<>(exporter); } 复制代码
final ExporterChangeableWrapper exporter = doLocalExport(originInvoker, providerUrl);
这里去启动 NettyServer
private <T> ExporterChangeableWrapper<T> doLocalExport(final Invoker<T> originInvoker, URL providerUrl) { String key = getCacheKey(originInvoker); return (ExporterChangeableWrapper<T>) bounds.computeIfAbsent(key, s -> { Invoker<?> invokerDelegate = new InvokerDelegate<>(originInvoker, providerUrl); return new ExporterChangeableWrapper<>((Exporter<T>) protocol.export(invokerDelegate), originInvoker); }); } 复制代码
ProtocolFilterWrapper 这里为这个 Invoker 添加一系列的 Filter 在真正执行服务代码前执行
@Override public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException { if (UrlUtils.isRegistry(invoker.getUrl())) { return protocol.export(invoker); } return protocol.export(buildInvokerChain(invoker, SERVICE_FILTER_KEY, CommonConstants.PROVIDER)); } 复制代码
@Override public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException { URL url = invoker.getUrl(); // export service. String key = serviceKey(url); DubboExporter<T> exporter = new DubboExporter<T>(invoker, key, exporterMap); exporterMap.put(key, exporter); //export an stub service for dispatching event Boolean isStubSupportEvent = url.getParameter(STUB_EVENT_KEY, DEFAULT_STUB_EVENT); Boolean isCallbackservice = url.getParameter(IS_CALLBACK_SERVICE, false); if (isStubSupportEvent && !isCallbackservice) { String stubServiceMethods = url.getParameter(STUB_EVENT_METHODS_KEY); } openServer(url); optimizeSerialization(url); return exporter; } 复制代码
private void openServer(URL url) { // find server. String key = url.getAddress(); //client can export a service which's only for server to invoke boolean isServer = url.getParameter(IS_SERVER_KEY, true); if (isServer) { ProtocolServer server = serverMap.get(key); if (server == null) { synchronized (this) { server = serverMap.get(key); if (server == null) { serverMap.put(key, createServer(url)); } } } else { // server supports reset, use together with override server.reset(url); } } } 复制代码
private ProtocolServer createServer(URL url) { url = URLBuilder.from(url) // send readonly event when server closes, it's enabled by default .addParameterIfAbsent(CHANNEL_READONLYEVENT_SENT_KEY, Boolean.TRUE.toString()) // enable heartbeat by default .addParameterIfAbsent(HEARTBEAT_KEY, String.valueOf(DEFAULT_HEARTBEAT)) .addParameter(CODEC_KEY, DubboCodec.NAME) .build(); String str = url.getParameter(SERVER_KEY, DEFAULT_REMOTING_SERVER); if (str != null && str.length() > 0 && !ExtensionLoader.getExtensionLoader(Transporter.class).hasExtension(str)) { throw new RpcException("Unsupported server type: " + str + ", url: " + url); } ExchangeServer server; try { server = Exchangers.bind(url, requestHandler); } catch (RemotingException e) { throw new RpcException("Fail to start server(url: " + url + ") " + e.getMessage(), e); } str = url.getParameter(CLIENT_KEY); if (str != null && str.length() > 0) { Set<String> supportedTypes = ExtensionLoader.getExtensionLoader(Transporter.class).getSupportedExtensions(); if (!supportedTypes.contains(str)) { throw new RpcException("Unsupported client type: " + str); } } return new DubboProtocolServer(server); } 复制代码
当我们收到消费者的消息时、通过 requestHandler 处理
server = Exchangers.bind(url, requestHandler); 复制代码
private ExchangeHandler requestHandler = new ExchangeHandlerAdapter() { @Override public CompletableFuture<Object> reply(ExchangeChannel channel, Object message) throws RemotingException { Invocation inv = (Invocation) message; Invoker<?> invoker = getInvoker(channel, inv); .......... RpcContext.getContext().setRemoteAddress(channel.getRemoteAddress()); Result result = invoker.invoke(inv); return result.thenApply(Function.identity()); } 复制代码
服务启动介绍之后我们回到服务注册
@Override public <T> Exporter<T> export(final Invoker<T> originInvoker) throws RpcException { URL registryUrl = getRegistryUrl(originInvoker); // url to export locally URL providerUrl = getProviderUrl(originInvoker); // Subscribe the override data // FIXME When the provider subscribes, it will affect the scene : a certain JVM exposes the service and call // the same service. Because the subscribed is cached key with the name of the service, it causes the // subscription information to cover. final URL overrideSubscribeUrl = getSubscribedOverrideUrl(providerUrl); final OverrideListener overrideSubscribeListener = new OverrideListener(overrideSubscribeUrl, originInvoker); overrideListeners.put(overrideSubscribeUrl, overrideSubscribeListener); providerUrl = overrideUrlWithConfig(providerUrl, overrideSubscribeListener); //export invoker final ExporterChangeableWrapper<T> exporter = doLocalExport(originInvoker, providerUrl); // url to registry final Registry registry = getRegistry(originInvoker); final URL registeredProviderUrl = getUrlToRegistry(providerUrl, registryUrl); // decide if we need to delay publish boolean register = providerUrl.getParameter(REGISTER_KEY, true); if (register) { register(registryUrl, registeredProviderUrl); } ......... 复制代码
register(registryUrl, registeredProviderUrl);
private void register(URL registryUrl, URL registeredProviderUrl) { Registry registry = registryFactory.getRegistry(registryUrl); registry.register(registeredProviderUrl); } 复制代码
根据你配置的注册中心地址、选择出 Registry
最终去到 ZookeeperRegistry 的 diRegister 中
@Override public void doRegister(URL url) { try { zkClient.create(toUrlPath(url), url.getParameter(DYNAMIC_KEY, true)); } catch (Throwable e) { throw new RpcException("Failed to register " + url + " to zookeeper " + getUrl() + ", cause: " + e.getMessage(), e); } } 复制代码
到此服务注册基本结束了、只是细节还没去详细讲