从源码全面解析 dubbo 服务订阅的来龙去脉

本文涉及的产品
任务调度 XXL-JOB 版免费试用,400 元额度,开发版规格
服务治理 MSE Sentinel/OpenSergo,Agent数量 不受限
注册配置 MSE Nacos/ZooKeeper,118元/月
简介: 从源码全面解析 dubbo 服务订阅的来龙去脉

一、引言

对于 Java 开发者而言,关于 dubbo ,我们一般当做黑盒来进行使用,不需要去打开这个黑盒。

但随着目前程序员行业的发展,我们有必要打开这个黑盒,去探索其中的奥妙。

本期 dubbo 源码解析系列文章,将带你领略 dubbo 源码的奥秘

本期源码文章吸收了之前 SpringKakfaJUC源码文章的教训,将不再一行一行的带大家分析源码,我们将一些不重要的部分当做黑盒处理,以便我们更快、更有效的阅读源码。

虽然现在是互联网寒冬,但乾坤未定,你我皆是黑马!

废话不多说,发车!

二、消费者订阅服务

读过我们上一篇:从源码全面解析 dubbo 注解配置的来龙去脉 的文章的朋友,我们当时留了一个 EnableDubboConfig 注解里面的 ReferenceAnnotationBeanPostProcessor 方法

1、消费端配置

@DubboReference(protocol = "dubbo", timeout = 100)
private IUserService iUserService;

从这个配置我们可以得出一个信息,Spring 不会自动将 IUserService 注入 Bean 工厂中

当然这句话也是一个废话,人家 Dubbo 自定义的注解,Spring 怎么可能扫描到…

ReferenceAnnotationBeanPostProcessor 这个方法是消费端扫描 @Reference 使用的

本篇将正式的介绍下消费端是如何订阅我们服务端注册在 Zookeeper 上的服务的

2、扫描注解

我们先看这个类的实现:

public class ReferenceAnnotationBeanPostProcessor extends AbstractAnnotationBeanPostProcessor
    implements ApplicationContextAware, BeanFactoryPostProcessor {}

实现了 BeanFactoryPostProcessor 接口,这个时候如果看过博主的 Spring 的源码系列文章,DNA 应该已经开始活动了

没错,基本上这个接口就是为了往我们的 BeanDefinitionMap 里面注册 BeanDefinition 信息的

想必到这里,就算我们不看源码,也能猜到

这个哥们绝对是将 @DubboReference 的注解扫描封装成 BeanDefinition 注册至 BeanDefinitionMap

我们直接看源码

@Override
public void postProcessBeanFactory(ConfigurableListableBeanFactory beanFactory){
    // 拿到当前Spring工厂所有的bean名称
    String[] beanNames = beanFactory.getBeanDefinitionNames();
    for (String beanName : beanNames) {
        // 获取bean的类型
        beanType = beanFactory.getType(beanName);
        // 省略一些代码
        if (beanType != null) {
            // 获取元数据信息
            AnnotatedInjectionMetadata metadata = findInjectionMetadata(beanName, beanType, null);
            // 解析@DubboReference注解并注册至BeanDefinitionMap中
            prepareInjection(metadata);
        }
    }
}

我们详细看看这个 prepareInjection 方法是如何解析的 @DubboReference 注解并做了一些什么特殊的操作

protected void prepareInjection(AnnotatedInjectionMetadata metadata) throws BeansException {
    for (AnnotatedFieldElement fieldElement : metadata.getFieldElements()) {
        Class<?> injectedType = fieldElement.field.getType();
        // 配置的参数
        AnnotationAttributes attributes = fieldElement.attributes;
        // 解析&注册
        String referenceBeanName = registerReferenceBean(fieldElement.getPropertyName(), injectedType, attributes, fieldElement.field);
        }
    }
}

这个 registerReferenceBean 里面的逻辑较多,我们只取最关键的,感兴趣的朋友也可以自己去看一看

public String registerReferenceBean(String propertyName, Class<?> injectedType, Map<String, Object> attributes, Member member){
      RootBeanDefinition beanDefinition = new RootBeanDefinition();
      // ReferenceBean.class.getName() = org.apache.dubbo.config.spring.ReferenceBean
        beanDefinition.setBeanClassName(ReferenceBean.class.getName());
        beanDefinition.getPropertyValues().add(ReferenceAttributes.ID, referenceBeanName);
      // referenceProps = 配置的信息
      // interfaceName = com.common.service.IUserService
      // interfaceClass = interface com.common.service.IUserService
        beanDefinition.setAttribute(Constants.REFERENCE_PROPS, attributes);
        beanDefinition.setAttribute(ReferenceAttributes.INTERFACE_CLASS, interfaceClass);
        beanDefinition.setAttribute(ReferenceAttributes.INTERFACE_NAME, interfaceName);
        GenericBeanDefinition targetDefinition = new GenericBeanDefinition();
        targetDefinition.setBeanClass(interfaceClass);
        beanDefinition.setDecoratedDefinition(new BeanDefinitionHolder(targetDefinition, referenceBeanName + "_decorated"));
        beanDefinition.setAttribute(Constants.OBJECT_TYPE_ATTRIBUTE, interfaceClass);
      // 注册至BeanDefinitionMap中
        beanDefinitionRegistry.registerBeanDefinition(referenceBeanName, beanDefinition);
        referenceBeanManager.registerReferenceKeyAndBeanName(referenceKey, referenceBeanName);
        return referenceBeanName;
}

到这里,我们的 ReferenceAnnotationBeanPostProcessor 方法将 @DubboReference 扫描组装成 BeanDefinition 注册到了 BeanDefinitionMap

3、创建代理对象

在我们上面注册的时候,有这么一行代码:

beanDefinition.setBeanClassName(ReferenceBean.class.getName());

表明我们当前注册的 BeanClass 类型为 org.apache.dubbo.config.spring.ReferenceBean

当我们的 Spring 去实例化 BeanDefinitionMap 中的对象时,这个时候会调用 ReferenceBeangetObject 方法

Spring 在实例化时会获取每一个 BeanDefinition 的 Object,不存在则创建

我们发现,在 ReferenceBean 里面实际上是重写了 getObject 的方法:

public T getObject() {
    if (lazyProxy == null) {
        createLazyProxy();
    }
    return (T) lazyProxy;
}
private void createLazyProxy() {
    // 创建代理对象
    ProxyFactory proxyFactory = new ProxyFactory();
    proxyFactory.setTargetSource(new DubboReferenceLazyInitTargetSource());
    proxyFactory.addInterface(interfaceClass);
    Class<?>[] internalInterfaces = AbstractProxyFactory.getInternalInterfaces();
    for (Class<?> anInterface : internalInterfaces) {
        proxyFactory.addInterface(anInterface);
    }
    // 进行动态代理(生成动态代理的对象)
    // 这里动态代理用的是JdkDynamicAopProxy
    this.lazyProxy = proxyFactory.getProxy(this.beanClassLoader);
}

我们看下 JdkDynamicAopProxy 里面做了什么?

final class JdkDynamicAopProxy implements AopProxy, InvocationHandler, Serializable {
    // proxyFactory.setTargetSource(new DubboReferenceLazyInitTargetSource());
    // 这里的targetSource = DubboReferenceLazyInitTargetSource
    TargetSource targetSource = this.advised.targetSource;
    Object target = targetSource.getTarget()
}
public synchronized Object getTarget() throws Exception {
   // 第一次为null,未初始化
   if (this.lazyTarget == null) {
      logger.debug("Initializing lazy target object");
      this.lazyTarget = createObject();
   }
   return this.lazyTarget;
}

我们看下 DubboReferenceLazyInitTargetSourcecreateObject

// 第一次调用时,会初始化该方法
private class DubboReferenceLazyInitTargetSource extends AbstractLazyCreationTargetSource {
    @Override
    protected Object createObject() throws Exception {
        return getCallProxy();
    }
    @Override
    public synchronized Class<?> getTargetClass() {
        return getInterfaceClass();
    }
}

这个 getCallProxy 就是我们订阅服务的地方

4、订阅服务

当我们初始化完毕之后,在我们第一次调用的时候,会调用 getCallProxy() 该方法,去进行服务的订阅,这里会执行该方法:

private class DubboReferenceLazyInitTargetSource extends AbstractLazyCreationTargetSource {
    @Override
    protected Object createObject() throws Exception {
        return getCallProxy();
    }
    @Override
    public synchronized Class<?> getTargetClass() {
        return getInterfaceClass();
    }
}
private Object getCallProxy() throws Exception {
    synchronized(((DefaultSingletonBeanRegistry)getBeanFactory()).getSingletonMutex()) {
        // 获取reference
        return referenceConfig.get();
    }
}

我们继续向下看,这里会来到 ReferenceConfiginit 方法

protected synchronized void init() {
    ref = createProxy(referenceParameters);
}
private T createProxy(Map<String, String> referenceParameters) {
    // 1、监听注册中心
    // 2、本地保存服务
    createInvokerForRemote();
    URL consumerUrl = new ServiceConfigURL(CONSUMER_PROTOCOL, referenceParameters.get(REGISTER_IP_KEY), 0,
            referenceParameters.get(INTERFACE_KEY), referenceParameters);
    consumerUrl = consumerUrl.setScopeModel(getScopeModel());
    consumerUrl = consumerUrl.setServiceModel(consumerModel);
    MetadataUtils.publishServiceDefinition(consumerUrl, consumerModel.getServiceModel(), getApplicationModel());
    // 创建代理类
    return (T) proxyFactory.getProxy(invoker, ProtocolUtils.isGeneric(generic));
}
4.1 监听注册中心

private void createInvokerForRemote() {
    if (urls.size() == 1) {
        // URL:
        // registry://127.0.0.1:2181/org.apache.dubbo.registry.RegistryService?application=dubbo-consumer&dubbo=2.0.2&pid=2532&qos.enable=true&registry=zookeeper&release=3.1.8&timestamp=1686063555583
        URL curUrl = urls.get(0);
        invoker = protocolSPI.refer(interfaceClass, curUrl);
    }
}
public <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException {
    // zookeeper://127.0.0.1:2181/org.apache.dubbo.registry.RegistryService?application=dubbo-consumer&dubbo=2.0.2&pid=14952&qos.enable=true&release=3.1.8&timestamp=1686063658064
    url = getRegistryUrl(url);
    Registry registry = getRegistry(url);
    if (RegistryService.class.equals(type)) {
        return proxyFactory.getInvoker((T) registry, type, url);
    }
    // group="a,b" or group="*"
    Map<String, String> qs = (Map<String, String>) url.getAttribute(REFER_KEY);
    String group = qs.get(GROUP_KEY);
    if (StringUtils.isNotEmpty(group)) {
        if ((COMMA_SPLIT_PATTERN.split(group)).length > 1 || "*".equals(group)) {
            return doRefer(Cluster.getCluster(url.getScopeModel(), MergeableCluster.NAME), registry, type, url, qs);
        }
    }
    Cluster cluster = Cluster.getCluster(url.getScopeModel(), qs.get(CLUSTER_KEY));
    return doRefer(cluster, registry, type, url, qs);
}

这里的 QS 如下:

我们直接跳到 MigrationRuleHandlerdoMigrate 中:

public synchronized void doMigrate(MigrationRule rule) {
    MigrationStep step = MigrationStep.APPLICATION_FIRST;
    float threshold = -1f;
    step = rule.getStep(consumerURL);
    threshold = rule.getThreshold(consumerURL);
    if (refreshInvoker(step, threshold, rule)) {
        setMigrationRule(rule);
    }
}

在这个 refreshInvoker 里面,会判断当前注册的方式

private boolean refreshInvoker(MigrationStep step, Float threshold, MigrationRule newRule) {
        MigrationStep originStep = currentStep;
        if ((currentStep == null || currentStep != step) || !currentThreshold.equals(threshold)) {
            boolean success = true;
            switch (step) {
                // 接口&应用
                case APPLICATION_FIRST:
                    migrationInvoker.migrateToApplicationFirstInvoker(newRule);
                    break;
                // 应用
                case FORCE_APPLICATION:
                    success = migrationInvoker.migrateToForceApplicationInvoker(newRule);
                    break;
                // 接口
                case FORCE_INTERFACE:
                default:
                    success = migrationInvoker.migrateToForceInterfaceInvoker(newRule);
            }
            return success;
        }
        return true;
    }

我们本次只讲 接口注册,我们直接跳到:ZookeeperRegistrydoSubscribe 方法

public void doSubscribe(final URL url, final NotifyListener listener) {
    List<URL> urls = new ArrayList<>();
    for (String path : toCategoriesPath(url)) {
        // 创建目录
        zkClient.create(path, false, true);
        // 增加监听
        // 1、/dubbo/com.msb.common.service.IUserService/providers
        // 2、/dubbo/com.msb.common.service.IUserService/configurators
        // 3、/dubbo/com.msb.common.service.IUserService/routers
        List<String> children = zkClient.addChildListener(path, zkListener);
        if (children != null) {
            urls.addAll(toUrlsWithEmpty(url, path, children));
        }
    }
    // 将Zookeeper服务保存
    notify(url, listener, urls);
}
4.2 本地保存服务

直接跳到 AbstractRegistrydoSaveProperties 方法

  • 创建文件
  • 将服务端数据存入文件中
public void doSaveProperties(long version) {
    File lockfile = null;
    // 创建文件
    lockfile = new File(file.getAbsolutePath() + ".lock");
    tmpProperties = new Properties();
    Set<Map.Entry<Object, Object>> entries = properties.entrySet();
    for (Map.Entry<Object, Object> entry : entries) {
        tmpProperties.setProperty((String) entry.getKey(), (String) entry.getValue());
    }
    try (FileOutputStream outputFile = new FileOutputStream(file)) {
        tmpProperties.store(outputFile, "Dubbo Registry Cache");
    }
}

这里存储的数据如下:简单理解,各种服务端的信息

com.common.service.IUserService -> empty://192.168.0.103/com.common.service.IUserService?application=dubbo-consumer&background=false&category=routers&dubbo=2.0.2&interface=com.msb.common.service.IUserService&lazy=true&methods=getUserById&pid=13528&protocol=dubbo&qos.enable=true&release=3.1.8&side=consumer&sticky=false&timeout=100&timestamp=1686064969935&unloadClusterRelated=false empty://192.168.0.103/com.msb.common.service.IUserService?application=dubbo-consumer&background=false&category=configurators&dubbo=2.0.2&interface=com.msb.common.service.IUserService&lazy=true&methods=getUserById&pid=13528&protocol=dubbo&qos.enable=true&release=3.1.8&side=consumer&sticky=false&timeout=100&timestamp=1686064969935&unloadClusterRelated=false dubbo://192.168.0.103:20883/com.msb.common.service.IUserService?anyhost=true&application=dubbo-provider&background=false&check=false&deprecated=false&dubbo=2.0.2&dynamic=true&generic=false&interface=com.msb.common.service.IUserService&metadata-type=remote&methods=getUserById&register-mode=inter

如果后续我们的注册中心(Zookeeper)挂掉之后,我们的系统从本地磁盘读取服务信息也可以正常通信。

只是没有办法及时更新服务

4.3 创建动态代理类

private T createProxy(Map<String, String> referenceParameters) {
    // 省略代码
    // create service proxy
    return (T) proxyFactory.getProxy(invoker, ProtocolUtils.isGeneric(generic));
}

这里我们直接跳到 JavassistProxyFactorygetProxy 方法

public <T> T getProxy(Invoker<T> invoker, Class<?>[] interfaces) {
    return (T) Proxy.getProxy(interfaces).newInstance(new InvokerInvocationHandler(invoker));
}

这里直接创建代理类,当我们去调用 InvokerInvocationHandler 这个方法

至于为什么要调用 InvokerInvocationHandler ,大家可以看下之前写的动态代理文章:2023年再不会动态代理,就要被淘汰了

public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
        if (method.getDeclaringClass() == Object.class) {
            return method.invoke(invoker, args);
        }
        String methodName = method.getName();
        Class<?>[] parameterTypes = method.getParameterTypes();
        if (parameterTypes.length == 0) {
            if ("toString".equals(methodName)) {
                return invoker.toString();
            } else if ("$destroy".equals(methodName)) {
                invoker.destroy();
                return null;
            } else if ("hashCode".equals(methodName)) {
                return invoker.hashCode();
            }
        } else if (parameterTypes.length == 1 && "equals".equals(methodName)) {
            return invoker.equals(args[0]);
        }
        RpcInvocation rpcInvocation = new RpcInvocation(serviceModel, method.getName(), invoker.getInterface().getName(), protocolServiceKey, method.getParameterTypes(), args);
        if (serviceModel instanceof ConsumerModel) {
            rpcInvocation.put(Constants.CONSUMER_MODEL, serviceModel);
            rpcInvocation.put(Constants.METHOD_MODEL, ((ConsumerModel) serviceModel).getMethodModel(method));
        }
        return InvocationUtil.invoke(invoker, rpcInvocation);
    }

三、流程图

  • 原图可私信获取

四、总结

鲁迅先生曾说:独行难,众行易,和志同道合的人一起进步。彼此毫无保留的分享经验,才是对抗互联网寒冬的最佳选择。

其实很多时候,并不是我们不够努力,很可能就是自己努力的方向不对,如果有一个人能稍微指点你一下,你真的可能会少走几年弯路。


相关文章
|
26天前
|
存储 设计模式 算法
【23种设计模式·全精解析 | 行为型模式篇】11种行为型模式的结构概述、案例实现、优缺点、扩展对比、使用场景、源码解析
行为型模式用于描述程序在运行时复杂的流程控制,即描述多个类或对象之间怎样相互协作共同完成单个对象都无法单独完成的任务,它涉及算法与对象间职责的分配。行为型模式分为类行为模式和对象行为模式,前者采用继承机制来在类间分派行为,后者采用组合或聚合在对象间分配行为。由于组合关系或聚合关系比继承关系耦合度低,满足“合成复用原则”,所以对象行为模式比类行为模式具有更大的灵活性。 行为型模式分为: • 模板方法模式 • 策略模式 • 命令模式 • 职责链模式 • 状态模式 • 观察者模式 • 中介者模式 • 迭代器模式 • 访问者模式 • 备忘录模式 • 解释器模式
【23种设计模式·全精解析 | 行为型模式篇】11种行为型模式的结构概述、案例实现、优缺点、扩展对比、使用场景、源码解析
|
26天前
|
设计模式 存储 安全
【23种设计模式·全精解析 | 创建型模式篇】5种创建型模式的结构概述、实现、优缺点、扩展、使用场景、源码解析
结构型模式描述如何将类或对象按某种布局组成更大的结构。它分为类结构型模式和对象结构型模式,前者采用继承机制来组织接口和类,后者釆用组合或聚合来组合对象。由于组合关系或聚合关系比继承关系耦合度低,满足“合成复用原则”,所以对象结构型模式比类结构型模式具有更大的灵活性。 结构型模式分为以下 7 种: • 代理模式 • 适配器模式 • 装饰者模式 • 桥接模式 • 外观模式 • 组合模式 • 享元模式
【23种设计模式·全精解析 | 创建型模式篇】5种创建型模式的结构概述、实现、优缺点、扩展、使用场景、源码解析
|
26天前
|
设计模式 存储 安全
【23种设计模式·全精解析 | 创建型模式篇】5种创建型模式的结构概述、实现、优缺点、扩展、使用场景、源码解析
创建型模式的主要关注点是“怎样创建对象?”,它的主要特点是"将对象的创建与使用分离”。这样可以降低系统的耦合度,使用者不需要关注对象的创建细节。创建型模式分为5种:单例模式、工厂方法模式抽象工厂式、原型模式、建造者模式。
【23种设计模式·全精解析 | 创建型模式篇】5种创建型模式的结构概述、实现、优缺点、扩展、使用场景、源码解析
|
2天前
|
自然语言处理 数据处理 索引
mindspeed-llm源码解析(一)preprocess_data
mindspeed-llm是昇腾模型套件代码仓,原来叫"modelLink"。这篇文章带大家阅读一下数据处理脚本preprocess_data.py(基于1.0.0分支),数据处理是模型训练的第一步,经常会用到。
8 0
|
27天前
|
安全 搜索推荐 数据挖掘
陪玩系统源码开发流程解析,成品陪玩系统源码的优点
我们自主开发的多客陪玩系统源码,整合了市面上主流陪玩APP功能,支持二次开发。该系统适用于线上游戏陪玩、语音视频聊天、心理咨询等场景,提供用户注册管理、陪玩者资料库、预约匹配、实时通讯、支付结算、安全隐私保护、客户服务及数据分析等功能,打造综合性社交平台。随着互联网技术发展,陪玩系统正成为游戏爱好者的新宠,改变游戏体验并带来新的商业模式。
|
2月前
|
监控 Java 应用服务中间件
高级java面试---spring.factories文件的解析源码API机制
【11月更文挑战第20天】Spring Boot是一个用于快速构建基于Spring框架的应用程序的开源框架。它通过自动配置、起步依赖和内嵌服务器等特性,极大地简化了Spring应用的开发和部署过程。本文将深入探讨Spring Boot的背景历史、业务场景、功能点以及底层原理,并通过Java代码手写模拟Spring Boot的启动过程,特别是spring.factories文件的解析源码API机制。
108 2
|
3月前
|
缓存 Java 程序员
Map - LinkedHashSet&Map源码解析
Map - LinkedHashSet&Map源码解析
93 0
|
3月前
|
算法 Java 容器
Map - HashSet & HashMap 源码解析
Map - HashSet & HashMap 源码解析
77 0
|
3月前
|
存储 Java C++
Collection-PriorityQueue源码解析
Collection-PriorityQueue源码解析
79 0
|
3月前
|
安全 Java 程序员
Collection-Stack&Queue源码解析
Collection-Stack&Queue源码解析
107 0

推荐镜像

更多