走进HSF源码

本文涉及的产品
公共DNS(含HTTPDNS解析),每月1000万次HTTP解析
云解析 DNS,旗舰版 1个月
全局流量管理 GTM,标准版 1个月
简介: 前言本文源自一次组内分享,于是接着这个机会,将HSF的源码阅读过程记录下来,以供自己温故而知新。如果有哪些地方理解不到位的,还请诸位批评指正!简介HSF (High-speed Service Framework),高速服务框架,是在阿里巴巴内部广泛使用的分布式RPC服务框架。众所周知,HSF一款与我们的日常生活密不可分的RPC框架;所谓RPC——远程过程调用,就是指像调用本地方法一样调用远程的方

前言

本文源自一次组内分享,于是接着这个机会,将HSF的源码阅读过程记录下来,以供自己温故而知新。如果有哪些地方理解不到位的,还请诸位批评指正!

简介

HSF (High-speed Service Framework),高速服务框架,是在阿里巴巴内部广泛使用的分布式RPC服务框架。

众所周知,HSF一款与我们的日常生活密不可分的RPC框架;所谓RPC——远程过程调用,就是指像调用本地方法一样调用远程的方法,HSF帮我们实现了远程通讯、序列化、同步/异步调用,极大的便捷了我们的开发。

总体架构

这是HSF的总体架构,其中核心的有三块内容:

  1. 服务提供方
  2. 服务消费方
  3. 注册中心(通常由一些成熟的注册中心中间件代劳了,例如Zookeeper、Eureka、Nacos)

服务提供方

首先,我们就从服务提供方的视角,来详细看看HSF的执行细节。

从使用说起

HSF提供了便捷服务发布方法,示例代码如下:

public interface HelloService {
    
    String hello(String name);
    
}


@HSFProvider
public class HelloServiceImpl implements HelloService{
    @Override
    public String hello(String name) {
        return "Hello,"+name+"!";
    }
}

实现功能接口后,只需要加上@HSFProvider就可以将其发布,下面我们来看看这个注解到底做了哪些事情。

源码解析

源码阅读主要包含两部分内容:

  1. 服务启动,也就是在应用启动时做的一些事情
  2. 服务调用,处理rpc请求时的具体操作

服务启动

在服务启动这个阶段,又可以分为以下几个步骤:

  1. 服务实例化,主要是和spring容器打交道
  2. 服务初始化,准备了一些服务发布环节关键的组件
  3. 服务发布,将服务启动,并注册到注册中心

下面具体看看。

服务实例化

这部分主要是和spring容器打交道比较多一点,借助spring的IOC能力,将服务实例化。

服务启动的源头在@HSFProvider,其定义如下:

@Target({ElementType.TYPE})
@Retention(RetentionPolicy.RUNTIME)
@Component
@Import(HsfProviderAnnotationRegistrar.class)
public @interface HSFProvider {
    /**
     * The value may indicate a suggestion for a logical component name,
     * to be turned into a Spring bean in case of an autodetected component.
     *
     * @return the suggested component name, if any
     */
    String value() default "";

    /**
     * HSF service interface
     */
    Class serviceInterface() default Object.class;

    /**
     * HSF service version
     */
    String serviceVersion() default Constants.DEFAULT_VERSION;

    /**
     * HSF service group
     */
    String serviceGroup() default Constants.DEFAULT_GROUP;

    /**
     * HSF service name
     */
    String serviceName() default "";

...

这里主要定义了发布服务的version、group等信息;不过,其中有两点关键信息:

  1. @Component,为了让Spring容器识别出这个Bean
  2. @Import(HsfProviderAnnotationRegistrar.class),这个注解相当重要,服务发布的主要工作都通过这个注解来实现。

小知识

@Import(HsfProviderAnnotationRegistrar.class)的作用:Import实现ImportBeanDefinitionRegistrar的类,可以在bean定义信息进行一些自定义的操作。

BeanDefinition可以理解为Bean的元信息,通过调用Bean工厂的org.springframework.beans.factory.support.AbstractBeanFactory#doGetBean方法,即可根据其元信息来实例化Bean对象;最常见的ApplicationContext其实就是一个BeanFacoty。

重点看一下HsfProviderAnnotationRegistrar.class

@Configuration
@ConditionalOnProperty(name = Constants.ENABLED, matchIfMissing = true)
@EnableConfigurationProperties(HsfProperties.class)
@AutoConfigureBefore(HsfConsumerAutoConfiguration.class)
public class HsfProviderAnnotationRegistrar implements ImportBeanDefinitionRegistrar, EnvironmentAware,
BeanClassLoaderAware {
  

其主要实现了ImportBeanDefinitionRegistrar类的registerBeanDefinitions方法:

@Override
public void registerBeanDefinitions(AnnotationMetadata importingClassMetadata, BeanDefinitionRegistry registry) {
    HsfProperties hsfProperties = HsfPropertiesUtils.buildHsfProperties(environment);
    if (!hsfProperties.isEnabled()) {
        if (logger.isDebugEnabled()) {
            logger.debug("spring.hsf.enabled is false, so skip process @HSFProvider");
        }
        return;
    }

    // 这个是真实服务的bean
    String targetBeanName = BeanNameUtils.beanName(importingClassMetadata, registry);
    // 这里又创建了一个bean,用于对外提供服务能力
    String beanName = targetBeanName + Constants.GENERATE_HSFPROVIDERBEAN_SUFFIX;

    String className = importingClassMetadata.getClassName();
    Class targetClass;
    try {
        targetClass = this.classLoader.loadClass(className);

        // 1. 解析HSFProvider注解的内容
        HSFProvider hsfProvider = AnnotationUtils.findAnnotation(targetClass, HSFProvider.class);

        configIfNecessary(hsfProperties);

        HsfProviderBeanDefinitionBuilder builder = new HsfProviderBeanDefinitionBuilder(targetBeanName, hsfProvider)
                .clazz(targetClass)
                .properties(hsfProperties);

        // 2. 创建beanDefinition【关键点】
        BeanDefinition beanDefinition = builder.build(registry);

        if (beanDefinition != null) {
            if (registry.containsBeanDefinition(beanName)) {
                throw new BeanDefinitionValidationException(
                        "BeanDefinition with the same beanName already existed, please check your config! beanName:"
                                + beanName);
            }
            // 3. 将新创建的beanDefinition注册进去,这里实际维护了一个map
            registry.registerBeanDefinition(beanName, beanDefinition);
            logger.info("[HSF Starter] register HSF provider bean: {}, targetClass: {}", beanName, className);
        }
    } catch (ClassNotFoundException e) {
        throw new BeanCreationException("create hsf provider bean error! beanName:" + beanName, e);
    }
}

其中第二点创建beanDefinition比较关键,具体如下:

BeanDefinition build(BeanDefinitionRegistry registry) {

    // 首先是一些常规的校验
    String serviceInterface = null;
    // find hsf service interface
    if (annotation.serviceInterface().equals(Object.class)) {
        // 检查类是否只实现了一个接口
        Class[] allInterfaces = ClassUtils.getAllInterfacesForClass(clazz);
        if (allInterfaces != null && allInterfaces.length == 1) {
            serviceInterface = allInterfaces[0].getName();
        } else {
            throw new IllegalArgumentException("@HSFProvider class: " + clazz.getName()
                    + " implements more than one interface: " + allInterfaces);
        }
    } else {
        serviceInterface = annotation.serviceInterface().getName();
    }

    // Check after the service interface was resolved from the class
    if ("false".equalsIgnoreCase(properties.getEnables().get(serviceInterface))) {
        logger.warn("HSF service {} is disabled, therefore will not publish.", serviceInterface);
        return null;
    }

	// 这里创建了一个HSFSpringProviderBean的工厂类【关键点】
    BeanDefinitionBuilder builder = BeanDefinitionBuilder.genericBeanDefinition(HSFSpringProviderBean.class);
    builder.setInitMethodName(Constants.INIT_METHOD);
    builder.addPropertyValue("serviceInterface", serviceInterface);
    builder.addPropertyValue("serviceVersion", ProviderPropertiesResolver.resolveVersion(annotation, serviceInterface, properties));
    builder.addPropertyValue("serviceGroup", ProviderPropertiesResolver.resolveGroup(annotation, serviceInterface, properties));
    builder.addPropertyValue("serviceName", annotation.serviceName());
    Object clientTimeout = ProviderPropertiesResolver.resolveTimeout(annotation, serviceInterface, properties);
    if (clientTimeout != null) {
        builder.addPropertyValue("clientTimeout", clientTimeout);
    }

    // 接口的具体实现类的引用,可以从Spring容器中找到对应的实现类【重要】
    builder.addPropertyReference("target", targetRef);

	// 维护了builder中其他一些参数,暂时忽略
	...

    return builder.getBeanDefinition();
}

这里透露出一个重要的信息就是,不管发布的服务的具体类型,最终生成的对外提供服务的Bean都是HSFSpringProviderBean

小总结

稍微回顾一下以上所讲内容。

HSFProvider注解通过一个Spring的钩子,最终为容器创建了两个bean实例,一个是原本的服务bean,另一个是对外提供服务的bean,也就是HSFSpringProviderBean

服务初始化

HSFSpringProviderBean实例化完成后,spring会调用一个钩子函数,进行服务自定义的初始化工作。

HSFSpringProviderBean这个对象实现了InitializingBean接口,在创建实例的时候,会执行对应的钩子方法afterPropertiesSet

我们来看看init()方法中究竟做了什么。

/**
 * 对外发布服务
 */
public void init() throws Exception {
    // 避免被初始化多次
    if (!inited.compareAndSet(false, true)) {
        return;
    }
	// 1. 服务初始化
    providerBean.initWithoutPub();
	// 2. 服务发布
    doPublish();
}

初始化方法中主要做了两件事:

  1. 服务初始化
  2. 服务发布

这两件事都非常重要,我们一件一件来看。

首先介绍几个类

  1. HSFSpringProviderBean:Spring容器服务提供者的Bean实例
  2. HSFApiProviderBean:具体进行服务提供的类
  3. ServiceMetadata:服务元信息,包括了元数据信息本身的描述,以及服务信息的发布与订阅,其中的 target字段就是接口对应的实现类
  4. ApplicationModel:HSF的私有容器,其中维护了服务的映射关系

  1. InvocationHandler:调用处理器,通过这个类将具体的请求路由到对应的实现类上去执行。

这里还有一个细节,有两个命名十分相似的Bean,HSFSpringProviderBeanHSFApiProviderBean,个人感觉HSFSpringProviderBean的作用主要是和Spring容器打交道,而实际的操作则交给了HSFApiProviderBean来处理。

服务启动的代码如下:

public void initWithoutPub() throws Exception {
    // 避免被初始化多次
    if (!inited.compareAndSet(false, true)) {
        return;
    }
	// 配置检查
    checkConfig();
	// 服务启动【关键点】
    metadata.init();
}

具体的服务启动逻辑如下:

public synchronized void init() {
    // 防止一个服务被发布多次
    if (!initFlag.compareAndSet(false, true)) {
        return;
    }

    // 服务元信息的初始化,暂时不重要
    List serviceMetadataBeforeInits = applicationModel.getServiceContainer().getInstances(
            ServiceMetadataBeforeInit.class);
    for (ServiceMetadataBeforeInit serviceMetadataBeforeInit : serviceMetadataBeforeInits) {
        serviceMetadataBeforeInit.beforeInit(this);
    }

    //add current app name to url
    addProperty(APPLICATION_NAME_KEY, applicationModel.getName());

    // 创建服务组件,暂时不重要
    List components = applicationModel.getServiceContainer().
            getInstances(ServiceComponent.class, isProvider ? new String[]{"provider"} : new String[]{"consumer"});
    for (ServiceComponent component : components) {
        component.init(this);
        componentMap.put(component.name(), component);
    }

    // 配置应用使用的协议,HSF or DUBBO,为下面创建调用链准备
    if (protocols.isEmpty()) {
        protocols.addAll(applicationModel.getServiceContainer().getInstances(Protocol.class));
    }
    // 创建服务暴露调用链【重要】
    protocolFilterChain = ProtocolInterceptorChainBuilder.build(this);

    InvocationHandlerFactory invocationHandlerFactory = applicationModel.getServiceContainer()
            .getInstance(InvocationHandlerFactory.class);

    if (invocationHandlerFactory == null) {
        throw new IllegalStateException("cannot find one valid implementation for InvocationHandlerFactory");
    }

    // 只看服务提供者的部分,主要创建出一个调用处理器【重要】
    if (isProvider()) {
        InvocationHandler serverInvocationHandler = invocationHandlerFactory.createInvocationHandler(getTarget());
        invocationHandler = InvocationHandlerChainFactory.buildHandlerChain(this, serverInvocationHandler);
    } else {
        invocationHandler = InvocationHandlerChainFactory.buildHandlerChain(this, consumerHandlerHitch);

        //bridge
        SyncInvocationHandler clientSyncInvocationHandler = invocationHandlerFactory.createSyncInvocationHandler(
                invocationHandler);
        syncInvocationHandler = SyncInvocationHandlerChainFactory
                .buildSyncInvocationHandlerChain(this, clientSyncInvocationHandler);
    }
}

这边首先对服务元信息进行初始化,主要成果有两个:

  1. protocolFilterChain,服务发布调用链,用于定义服务的发布过程
  2. InvocationHandler,调用执行Handler

服务发布

com.taobao.hsf.app.spring.util.HSFSpringProviderBean#doPublish

private void doPublish() {
    //非延迟发布,或者是非spring容器初始化的情况,直接发布服务
    if (!providerBean.getMetadata().isDelayedPublish() || !isInSpringContainer) {
        // 服务发布的具体操作【关键点】
        providerBean.publish();
        if(isInSpringContainer) {
            setAppInitedStatus();
        } else {
            String errorCodeStr = LoggerHelper.getErrorCodeStr("HSF", "HSF-0051", "BIZ",
                    "HSFSpringProviderBean is not created in spring container");
            LOGGER.warn(errorCodeStr);
        }
    }
}

com.taobao.hsf.app.api.util.HSFApiProviderBean#publish

public void publish() {
    // 防止一个服务被发布多次
    if (!isPublished.compareAndSet(false, true)) {
        return;
    }

    try {
        // HSFServiceContainer.getInstance(ProcessService.class).publish(metadata);

        // Init all service in Spring Context or main thread Make sure init
        // stage is Thread-Safe
        // 创建一个服务提供者对象
        ProviderServiceModel providerServiceModel = new ProviderServiceModel(metadata.getUniqueName(), metadata,
                                                                             metadata.getTarget());
        metadata.setProviderServiceModel(providerServiceModel);
        // 1. 维护服务关系
        applicationModel.initProviderService(metadata.getUniqueName(), providerServiceModel);

        // 2. 服务发布的具体工作【关键点】
        metadata.exportSync();
    } catch (Exception e) {
        throw new RuntimeException(e);
    }
}

这边主要做了两件事:

  1. 维护服务信息
  2. 服务暴露
维护服务信息

将服务全限定名与提供服务信息维护到HSF容器中com.taobao.hsf.model.ApplicationModel#initProviderService

public void initProviderService(String serviceName, ProviderServiceModel serviceModel) {
    // TODO thread safe?
    if (this.providedServices.put(serviceName, serviceModel) != null) {
        String errorCodeStr = LoggerHelper.getErrorCodeStr("HSF", "HSF-0051", "BIZ",
                "already register the provider service: " + serviceName);
        LOGGER.warn(errorCodeStr);
        return;
    }
    ApplicationModelFactory.put(serviceName, this);
    ApplicationModelFactory.put(serviceName, serviceModel.getMetadata());

    InvocationHandler invocationHandler = serviceModel.getMetadata().getInvocationHandler();

    //TODO this work should be done in Protocol.export()
    // 这里维护了服务全限定名,与服务处理器的映射关系【关键】
    providerService2Invoker.put(serviceName, invocationHandler);
}

其中invocationHandler就是在上一个环节服务启动中创建出来的。

服务暴露

com.taobao.hsf.model.metadata.ServiceMetadata#exportSync

public List exportSync() {
    // 使用线程池来发布服务
    Future> future = getExportReferExecutorService().submit(new Callable>() {
        @Override
        public List call() throws Exception {
            // 暴露动作【关键】
            return export();
        }
    });

    // 同步阻塞等待服务发布的结果
    try {
        List rawURLs = future.get();
        if (rawURLs != null) {
            LOGGER.info("Interface[" + getUniqueName() + "]Group[" + getGroup() + "]Publish HSF service to ConfigServer success!");
        } else {
            if (!readyToPublish.get()) {
                LOGGER.info("Interface[" + getUniqueName() + "]Group[" + getGroup() + "]use delay publish,wait for online/onlineSmart command to publish!");
            }
        }
        return rawURLs;
    } catch (Throwable e) {
        throw new RuntimeException(e);
    }
}

getExportReferExecutorService()获取到的线程池核心线程数为1,也就是说,这里发布服务的操作是串行的;具体这样做用意何为,还不太清楚。

具体的服务暴露逻辑com.taobao.hsf.model.metadata.ServiceMetadata#export

public List export() {
    try {
        // 保险起见,再次进行初始化,通常情况下,这一步跳过
        init();
        //reset flag for pub
        setRegistryPub(true);
        // 使用调用链来发布服务【关键】
        return protocolFilterChain.export(this, invocationHandler);
    } catch (Throwable t) {
        String errorCodeStr = LoggerHelper.getErrorCodeStr("HSF", "HSF-0058", "HSF",
                "export service: " + getUniqueName() + ", group=" + getGroup() + " got error");
        LOGGER.error("HSF-0058", errorCodeStr, t);
        throw new RuntimeException(t);
    }
}

这里就通过上一个服务启动环节创建出来的protocolFilterChain,来将服务进行具体的发布流程。

整个服务暴露调用链大致包含以下部分:

com.taobao.hsf.protocol.DelayPublishProtocolInterceptor#export

->com.taobao.hsf.plugins.spas.SpasProtocolInterceptor#export

->com.taobao.hsf.feature.optimized.hessian.protocolinterceptor.OptimizedHessianProtocolInterceptor#export

->com.taobao.hsf.process.component.CodeployProtocolInterceptor#export

->com.taobao.hsf.plugins.eagleeye.protocol.EagleEyeProtocolInterceptor#export

->com.taobao.hsf.region.service.impl.RegionProtocolInterceptor#export

->com.taobao.hsf.site.SiteProtocolInterceptor#export

->com.taobao.hsf.unit.service.impl.UnitProtocolInterceptor#export

->com.taobao.hsf.machine.group.protocol.MachineGroupProtocolInterceptor#export

->ElemeServicePropertyProtocolInterceptor##export(暂时找不到)

->com.taobao.hsf.grouping.component.GroupingProtocolInterceptor#export【分组策略】

->com.taobao.hsf.dpath.DPathProtocolInterceptor#export

->com.taobao.hsf.service.ServicePubComponent#export

->com.taobao.hsf.registry.RegistryProtocolInterceptor#export【上报服务信息至注册中心】

->FilterProtocolInterceptor#export(暂时找不到)

->com.taobao.hsf.metrics.ProviderPubStatusProtocolInterceptor#export

->com.taobao.hsf.tps.component.TPSRuleProtocolInterceptor#export

->com.taobao.hsf.protocol.MultiplexingProtocol#export

->com.taobao.hsf.remoting.service.HSFProtocol#export【将服务挂载到Netty上】

其中较为核心的主要有两个部分:

  1. 服务信息注册
  2. 开启Netty服务

其中服务信息注册,主要是对注册中心接口的调用,逻辑并不复杂,有兴趣的同学可以自行阅读。

下面重点看看开启Netty服务的部分。

com.taobao.hsf.remoting.service.HSFProtocol#export

@Override
public List export(ServiceMetadata serviceMetadata, InvocationHandler invocationHandler) {
    // 开启Netty服务【关键点】
    registerProvider(serviceMetadata);
    //register server InvocationHandler is done outside

    Map needExportServers = providerServer.severNeedExport();

    List serviceURLs = new ArrayList();

    for (Map.Entry server : needExportServers.entrySet()) {
        String data = HSFServiceTargetUtil.getTarget(server.getValue(), server.getKey(), serviceMetadata);
        ServiceURLRawData serviceURLRawData = ServiceURLRawData.parseUrl(data);
        HSFServiceURL hsfServiceURL = new HSFServiceURL(serviceMetadata, serviceURLRawData);
        serviceURLs.add(hsfServiceURL);
    }

    return serviceURLs;
}

com.taobao.hsf.remoting.service.HSFProtocol#registerProvider

public void registerProvider(ServiceMetadata metadata) {
    try {
        // 开启HSF服务
        providerServer.startHSFServer();
    } catch (Exception e) {
        throw new RuntimeException(
                LoggerHelper.getErrorCodeStr("hsf", "HSF-0016", "HSF", "start provider server failed"), e);
    }

    // 分配线程池
    threadPoolService.getExecutorManager().allocateExecutor(metadata.getUniqueName(), metadata.getCorePoolSize(),
            metadata.getMaxPoolSize());
}

具体开启Netty服务的地方com.taobao.hsf.io.provider.impl.ProviderServerImpl#startHSFServer

@Override
synchronized public void startHSFServer() throws HSFException {
    // 这边只会启动一个Netty服务
    if (isProviderStarted.compareAndSet(false, true)) {
        try {
            // 创建一个tcp server
            tcp = HSFServiceContainer.getInstance(Server.class, "tcp");
            // 绑定端口【关键点】
            tcp.bind(env.getBindHost(), env.getBindPort());
            if (tcp.isNeedExport()) {
                exportServerInfo.put(env.getPubPort(), env.getPubHost());
            }
            // 如果配置了Http服务,进行一些操作,暂时忽略
            if (config.getBoolean(Server.HSF_HTTP_ENABLE_KEY) || AppInfoUtils.isEnableHttp()) {
                http = HSFServiceContainer.getInstance(Server.class, "http");
                http.bind(env.getBindHost(), env.getHttpBindPort());
                if (http.isNeedExport()) {
                    exportServerInfo.put(env.getHttpPubPort(), env.getPubHost());
                }
                isHttpStarted.compareAndSet(false, true);
            }
            Runtime.getRuntime().addShutdownHook(new Thread() {
                public void run() {
                    LoggerInit.LOGGER.info("Shutdown hook invoked. Offline services and stop server");
                    try {
                        stopHSFServer();
                    } catch (Exception e) {
                        LoggerInit.LOGGER.warn("Exception happens during stop server:", e);
                    }
                }
            });
        } catch (Exception e) {
            throw new HSFException("fail to start HSF remoting server module", e);
        }
    }
}

内部的Netty服务启动的动作只会执行一次,也就是说,应用下所有的HSF服务都共用一个Netty服务,来对外提供调用服务。

端口绑定com.taobao.hsf.io.server.AbstractServer#bind

@Override
public void bind(String hostName, int port) {
    if (hostName == null) {
        throw new IllegalArgumentException("hostName is null");
    }
    // log
    this.hostName = hostName;
    this.port = port;
    doBind(this.hostName, this.port);
}

这里具体配置了Netty服务信息com.taobao.hsf.io.netty.tcp.NettyTcpServer#doBind

@Override
public void doBind(String hostName, int port) {


    ServerBootstrap serverBootstrap = new ServerBootstrap();

    Config config = HSFServiceContainer.getInstance(ConfigService.class).getConfig();

    // 可配置的几个
    serverBootstrap.group(bossGroup, workerGroup)
            .channel(NioServerSocketChannel.class)
            .option(ChannelOption.ALLOCATOR, Holder.byteBufAllocator)
            .option(ChannelOption.SO_BACKLOG, config.getInt(Server.HSF_BACKLOG_KEY))
            .childOption(ChannelOption.ALLOCATOR, Holder.byteBufAllocator)
            .childOption(ChannelOption.TCP_NODELAY, Boolean.TRUE)
            .childOption(ChannelOption.SO_REUSEADDR, Boolean.TRUE)
            .childOption(ChannelOption.AUTO_CLOSE, Boolean.TRUE)
            .childOption(ChannelOption.ALLOW_HALF_CLOSURE, Boolean.FALSE)
            // Netty的启动配置
        	.handler(new ChannelInitializer() {
                @Override
                protected void initChannel(ServerSocketChannel ch) throws Exception {
                    ch.pipeline()
                            .addLast("serverBindHandler",
                                    new NettyBindHandler(NettyTcpServer.this,
                                            serverStreamLifecycleListeners));
                }
            })
            // Netty的响应请求配置
            .childHandler(new ChannelInitializer() {
                @Override
                protected void initChannel(NioSocketChannel ch) throws Exception {
                    ch.pipeline()
                            .addLast("protocolHandler", new NettyProtocolHandler())
                            .addLast("serverIdleHandler",
                                    new IdleStateHandler(0, 0, serverIdleTimeInSeconds))
                            .addLast("serverHandler",
                                    // 这里定义了处理器【关键】
                                    new NettyServerStreamHandler(NettyTcpServer.this, false,
                                            serverStreamLifecycleListeners, serverStreamMessageListeners));
                }
            });

    if (isWaterMarkEnabled()) {
        serverBootstrap.childOption(ChannelOption.WRITE_BUFFER_WATER_MARK,
                new WriteBufferWaterMark(lowWaterMark, highWaterMark));
    }

    ChannelFuture future = serverBootstrap.bind(new InetSocketAddress(hostName, port));
    future.syncUninterruptibly();
}

小总结

再来回顾一下之前所讲的内容,首先将所有用户自定义的服务实例化为HSFSpringProviderBean,然后在Bean初始化时进行服务初始化和发布工作,最终启动一个Netty服务对外提供远程调用服务。

以上从服务启动的角度分析了HSF服务发布的大致链路,下面着重分析Netty服务是如何提供HSF远程调用能力的。

服务调用

服务调用的入口就是刚才Netty启动时配置的NettyServerStreamHandler,其处理逻辑主要体现在实现了ChannelInboundHandlerAdapter中的方法channelRead()中,下面具体看看。

小知识

channelRead()会在端口读取到信息的时候调用,其中定义了接受消息的具体逻辑。

数据处理逻辑在这里com.taobao.hsf.io.netty.server.NettyServerStreamHandler#channelRead

@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
    // 这里就获取到了刚才绑定的那个服务流
    ServerStream stream = (ServerStream) StreamUtils.streamOfChannel(ctx.channel());
    if (msg instanceof ResponsePacket) {
        // 如果数据是返回包,则直接返回
        ResponsePacket responsePacket = (ResponsePacket) msg;
        MessageAnswerHandler answerHandler = stream.removeAnswerHandler(responsePacket.requestId());
        if (answerHandler != null) {
            answerHandler.setAnswer(responsePacket);
        } else {
            log.warn("Receive response which requestId has not been stored, maybe some problem happened on network." + stream);
        }
    } else if (msg instanceof RequestPacket) {
        // 如果是请求数据,则进行处理【关键】
        callConnectionReceivedListeners(server, stream, (RequestPacket) msg);
    } else {
        log.warn("Receive unrecognized msg {}", msg);
    }
}

请求的处理关键逻辑com.taobao.hsf.io.netty.server.NettyServerStreamHandler#callConnectionReceivedListeners

private void callConnectionReceivedListeners(Server server, ServerStream stream, RequestPacket message) {
    for (ServerStreamMessageListener listener : serverStreamMessageListeners) {
        try {
            listener.received(server, stream, message);
        } catch (Exception ex) {
            error(listener.getClass().getName(), "received", ex);
        }
    }
}

这里会依次调用多个消息处理器来处理消息,其中用于处理请求的是com.taobao.hsf.io.stream.support.server.HandleRequest

其处理逻辑com.taobao.hsf.io.stream.support.server.HandleRequest#received

@Override
@SuppressWarnings("unchecked")
public void received(Server server, ServerStream stream, RequestPacket requestPacket) {
    ServerHandler serverHandler = ServerHandlerSelector.getInstance().select(requestPacket.protocolType());
    serverHandler.process(requestPacket, stream);
}

这里的服务处理器是com.taobao.hsf.io.remoting.hsf.message.HSFServerHandler

其处理逻辑com.taobao.hsf.io.remoting.hsf.message.HSFServerHandler#process

其中代码较多,不过核心大致可以大致拆分成两部分内容:

  1. 创建请求
  2. 处理请求

下面依次来看。

创建请求

Invocation ioInvocation;
if (packet.serializeType() == SerializeType.OPTIMIZED_HESSIAN2.getCode()) {
    try {
        // 服务元信息
        ServiceMetadata serviceMetadata = locateAndSetApplication(serviceUniqueName);
        // 组装一个请求对象【关键】
        ioInvocation = PacketFactorySelector.getInstance().select(packet.protocolType()).serverGet(
                packet, serverStream);
        if (serviceMetadata != null) {
            ioInvocation.setApplicationModel(serviceMetadata.getApplicationModel());
            ioInvocation.getServerInvocationContext().setMetadata(serviceMetadata);
        } else {
            ioInvocation.setApplicationModel(ApplicationModelFactory.getMainApplicationModel());
        }

    } catch (Throwable t) {
        log.error("HSF-0037", t.getMessage() + "on channel: " + serverStream, t);
        processSerializationError(packet, serverStream);
        //just close to fallback to normal serialization
        serverStream.close();
        return;
    }
} else {
    ioInvocation = null;
}

看一下构建请求的过程com.taobao.hsf.io.remoting.hsf.HSFPacketFactory#serverGet

@Override
public Invocation serverGet(RequestPacket requestPacket, Stream serverStream) {
    Invocation invocation = new Invocation();
    try {
        HSFRequestPacket hsfRequestPacket = (HSFRequestPacket) requestPacket;
        invocation.setSerializeType(hsfRequestPacket.serializeType());
        // 从请求包中解析出调用类名,方法名,并设置到调用器上
        invocation.setTargetServiceUniqueName(hsfRequestPacket.serviceUniqueName());
        invocation.setMethodName(ThreadLocalCache.getString(hsfRequestPacket.getTargetMethod()));

        // 然后设置调用的入参
        String[] argTypeStrings = new String[hsfRequestPacket.getArgTypes().length];
        for (int i = 0; i < argTypeStrings.length; i++) {
            argTypeStrings[i] = ThreadLocalCache.getString(hsfRequestPacket.getArgTypes()[i]);
        }
        invocation.setMethodArgSigs(argTypeStrings);

        Serializer serializer = SerializerSelector.getInstance().select(hsfRequestPacket.serializeType());

        Object[] argsObjects = new Object[hsfRequestPacket.getArgs().length];
        for (int i = 0; i < argsObjects.length; i++) {
            argsObjects[i] = serializer.deserialize(hsfRequestPacket.getArgs()[i],
                    ClassUtils.forName(argTypeStrings[i]), serverStream);
        }
        invocation.setMethodArgs(argsObjects);

        Map props = null;
        if (hsfRequestPacket.getRequestProps() != null && hsfRequestPacket.getRequestProps().length > 0) {
            props = (Map) (serializer.deserialize(hsfRequestPacket.getRequestProps(), Map.class,
                    serverStream));
        }
        // 刷新了一下一些参数,暂时不重要
        invocation.refreshRequestProp(props);

        // 设置了一些调用的上下文信息
        invocation.getInvokerContext().setSerializeType(hsfRequestPacket.serializeType());
        invocation.getInvokerContext().setTimeout(hsfRequestPacket.timeout());
        invocation.getInvokerContext().setRequestId(hsfRequestPacket.requestId());
        invocation.getInvokerContext().setProtocolType(hsfRequestPacket.protocolType());
        invocation.getInvokerContext().setBiDirectionalInvocation(hsfRequestPacket.isBiDirectionInvocation());
        if (serverStream.supportBiDirection() != hsfRequestPacket.isBiDirectionSupported()) {
            serverStream.setBiDirectionSupport(hsfRequestPacket.isBiDirectionSupported());
        }

        if (hsfRequestPacket.isResponseAttachmentSupported()) {
            invocation.getInvokerContext().setResponseAttachmentSupported(true);
        }
        return invocation;
    } catch (Throwable t) {
        throw new HSFSerializeException(SerializationConstants.SERIALIZE_REQUEST_ERROR_SERVER
                + invocation.getTargetServiceUniqueName()
                + "#" + invocation.getMethodName(), t);
    }
}

维护上请求的类名,方法名,请求参数,以及一些附加的上下文信息。

执行请求

final Invocation finalIoInvocation = ioInvocation;
executor.execute(new Runnable() {
    @Override
    public void run() {
        try {
            Invocation invocation;
            if (finalIoInvocation == null) {
                // 拿到服务元信息
                ServiceMetadata serviceMetadata = locateAndSetApplication(serviceUniqueName);
                invocation = PacketFactorySelector.getInstance().select(packet.protocolType()).serverGet(
                        packet, serverStream);
                if (serviceMetadata != null) {
                    invocation.setApplicationModel(serviceMetadata.getApplicationModel());
                    invocation.getServerInvocationContext().setMetadata(serviceMetadata);
                } else {
                    invocation.setApplicationModel(ApplicationModelFactory.getMainApplicationModel());
                }
            } else {
                invocation = finalIoInvocation;
                // IO已经完成了应用选择,直接设置到当前业务处理线程中
                ApplicationModel applicationModel = invocation.getApplicationModel();
                ApplicationModelFactory.setCurrentApplication(applicationModel);
            }
            //for dubbo RpcContext
            invocation.put("local_address", serverStream.getLocalAddress());
            invocation.put("remote_address", serverStream.getRemoteAddress());

            //used to call client-side service
            ThreadLocalUtil.set(HSFConstants.IO_STREAM_SERVER_KEY, serverStream);
            //for identification
            invocation.put(STREAM_KEY, serverStream);
            // 真正执行的地方【关键】
            processor.handleRequest(invocation, new RpcOutput(invocation, serverStream));
        } catch (Throwable e) {
            log.error("HSF-0037", "decode error on channel " + serverStream, e);
            processSerializationError(packet, serverStream);
        }
    }
});

这个executor主要的工作就是提交一个线程来处理请求,直接看到具体的处理逻辑com.taobao.hsf.remoting.provider.ProviderProcessor#handleRequest

@Override
public ResponseStatus handleRequest(Invocation invocation, Output output) {
    String serviceUniqueName = invocation.getTargetServiceUniqueName();
    // 客户端没有设置这个值,所以这里设置客户端ip
    invocation.setPeerIP(output.targetAddress());

    // TODO block spas tps unit top $echo $generic sph eagleeye normal
    ServiceMetadata providerMetadata = invocation.getServerInvocationContext().getMetadata();

    if (providerMetadata == null) {
        ApplicationModel applicationModel = invocation.getApplicationModel();
        if (applicationModel == null) {
            return serviceNotFound(output, invocation, serviceUniqueName);
        }
        InvocationHandler directService = applicationModel.getServerInvocationHandler(serviceUniqueName);
        if (directService != null) {
            try {
                ListenableFuture rpcFuture = directService.invoke(invocation);
                rpcFuture.addListener(new OutputCallback(invocation, rpcFuture, output));
            } catch (Throwable throwable) {
                onServerException(output, invocation, throwable);
            }
            return invocation.getResponseStatus();
        } else {
            return serviceNotFound(output, invocation, serviceUniqueName);
        }
    } else {
        // 主要链路在这里
        try {
            // 从服务元信息中,拿到服务处理器,然后调用请求【关键】
            ListenableFuture rpcFuture = providerMetadata.getInvocationHandler().invoke(
                invocation);
            // 给调用结果注册监听
            rpcFuture.addListener(new OutputCallback(invocation, rpcFuture, output));
        } catch (Throwable throwable) {
            onServerException(output, invocation, throwable);
        }

        return invocation.getResponseStatus();
    }
}

这里的调用处理器就是在服务初始化环节创建出来的调用处理器,其也是一个调用责任链,其中的末端,也就是真实处理请求的服务处理器是com.taobao.hsf.remoting.provider.ReflectInvocationHandler,具体的调用逻辑com.taobao.hsf.remoting.provider.ReflectInvocationHandler#invoke

@Override
public ListenableFuture invoke(Invocation invocation) throws Throwable {
    SettableFuture defaultRPCFuture = Futures.createSettableFuture();
    if (serverAsync) {
        invocation.addContextAware(serverContextAware);
    }
    // 具体的请求执行方法【关键】
    HSFResponse hsfResponse = handleRequest0(invocation, invocation.getHsfRequest(), defaultRPCFuture);
    if (hsfResponse == null) {
        return defaultRPCFuture;
    }

    RPCResult rpcResult = new RPCResult();
    rpcResult.setHsfResponse(hsfResponse);

    defaultRPCFuture.set(rpcResult);
    return defaultRPCFuture;
}

com.taobao.hsf.remoting.provider.ReflectInvocationHandler#handleRequest0

private static HSFResponse handleRequest0(Invocation invocation, final HSFRequest hsfRequest,
                                              SettableFuture defaultRPCFuture) {
    ProviderServiceModel serviceModel = invocation.getServerInvocationContext().getServiceModel();
    ProviderMethodModel methodModel = invocation.getServerInvocationContext().getMethodModel();

    HSFResponse hsfResponse = new HSFResponse();
    String remoteHost = invocation.getPeerIP();
    ClassLoader tcl = Thread.currentThread().getContextClassLoader();
    try {
        // 拿到调用的方法
        Method workerMethod = methodModel.getMethod();
        Method injectIpMethod = serviceModel.getInjectConsumerIpMethod();
        if (injectIpMethod != null) {
            injectConsumerIp(injectIpMethod, serviceModel.getServiceInstance(), remoteHost);
        }

        // 拿到调用的参数
        Object[] methodArgs = hsfRequest.getMethodArgs();
        AsyncContext asyncContext = null;
        if (serverAsync) {
            // Different to HSF1.x for the position of AsyncContext start
            asyncContext = new AsyncContext(defaultRPCFuture, invocation);
            // 将AsyncContext放到ThreadLocal中
            asyncContext.saveInContext();
        }

        // 最底层,java的反射调用
        // 调用业务应用时,切换为应用class loader
        ClassLoaderUtil.switchContextLoader(serviceModel.getMetadata().getServicePojoClassLoader());
        // 调用实现类的方法
        Object appResp = workerMethod.invoke(serviceModel.getServiceInstance(), methodArgs);

        if (serverAsync) {
            // 开启了异步
            if (AsyncContext.isAsyncStart()) {
                invocation.put(ASYNC_PROPERTY_KEY, Boolean.TRUE);
                // TODO 本来处理业务异常,但是放在这里似乎就没用了
                if (hsfResponse.getAppResponse() instanceof Throwable) {
                    if (!asyncContext.closeAsync("Business method occur exception", (Throwable) hsfResponse.getAppResponse())) {
                        // 关闭失败,说明异步业务线程已经调用了AsyncContext.write()写了响应
                        return null;
                    }
                } else {
                    // 业务方法正常,异步业务线程会写响应,这里就忽略返回值
                    return null;
                }
            }
        }

        hsfResponse.setAppResponse(appResp);
    } catch (InvocationTargetException ivke) {
        processBizException(invocation, hsfResponse, ivke);
    } catch (Throwable t) {
        processUnknownException(invocation, hsfResponse, t);
    } finally {
        ClassLoaderUtil.switchContextLoader(tcl);
    }
    return hsfResponse;
}

这里就用到了java原生的反射调用,调用实现类的方法,获取返回结果appResp

处理结果监听处理逻辑com.taobao.hsf.remoting.provider.ProviderProcessor.OutputCallback#operationComplete

@Override
protected void operationComplete(RPCResult rpcResult) {
    try {
        long requestId = invocation.getInvokerContext().getRequestId();
        byte serializeType = invocation.getInvokerContext().getSerializeType();
        byte protocolType = invocation.getInvokerContext().getProtocolType();

        rpcResult.getResponseContext().setRequestId(requestId);
        rpcResult.getResponseContext().setProtocolType(protocolType);
        rpcResult.getResponseContext().setSerializeType(serializeType);
        rpcResult.getResponseContext().setResponseAttachmentSupported(invocation.getInvokerContext().isResponseAttachmentSupported());
        // only save the root cause
        if (rpcResult.getAppResponse() instanceof Throwable) {
            CutCauseUtil.cutCause((Throwable) rpcResult.getAppResponse());
        }
    	// 将结果回刷到服务流中
        output.flush(rpcResult);
    } catch (Throwable t) {
        String errorCodeStr = LoggerHelper.getErrorCodeStr("HSF", "HSF-0067", "HSF",
                "hsf server write response error.");
        LOGGER.error("HSF-0067", errorCodeStr, t);
    }
}

这里将请求的处理结果回刷到服务流中,完成服务处理。

小总结

这里总结一下服务调用的处理过程

到这里,服务提供者的逻辑就大致讲完啦。

服务消费方

从使用说起

如果我们需要使用上文提供的服务,只需要加上@HSFConsumer即可,示例代码如下:

@Configuration
public class HelloServiceConfig {

    @HSFConsumer
    HelloService helloService;
}

服务消费方以@HSFConsumer为入口,实现远程调用。

源码解析

消费启动

其中包含以下几个部分:

  1. 消费者实例化,主要在和spring打交道
  2. 消费者初始化,专注于消费者的消费配置信息的维护

消费者实例化

首先介绍一下SpringBoot的自动装配原理,在pandora中也一样适用。

在我们应用程序的入口,会有这样一个注解@SpringBootApplication

跟进到这个注解的内部发现,其中使用了@EnableAutoConfiguration

根据Spring的SPI机制,目录下spring.factories文件中声明的配置类,都会进行加载。

这里找到了一个非常关键的配置类HsfConsumerAutoConfiguration,具体看一下:

@Configuration
@ConditionalOnProperty(name = Constants.ENABLED, matchIfMissing = true)
public class HsfConsumerAutoConfiguration {

    @Bean
    public static BeanFactoryPostProcessor hsfConsumerPostProcessor() {
        return new HsfConsumerPostProcessor();
    }

}

其中定义了一个用于对服务消费者进行自定义设置的后置处理器。

BeanFactoryPostProcessor可以自定义修改应用程序上下文的bean定义,调整上下文底层bean工厂的bean属性值。具体是修改时机是在bean的定义信息已经加载完毕,bean还未实例化的时候。

HsfConsumerPostProcessor中对标注了@HSFConsumer注解的bean进行修改,具体的修改逻辑往下看。

private void postProcessBeanFactory(ConfigurableListableBeanFactory beanFactory, BeanDefinitionRegistry registry) {
    final HsfProperties properties= HsfPropertiesUtils.buildHsfProperties(environment);

    Map> beanIdentifierMap = new HashMap>();
    // 这里会遍历所有的bean
    for (String beanName : beanFactory.getBeanDefinitionNames()) {
        BeanDefinition definition = beanFactory.getBeanDefinition(beanName);

        String beanClassName = definition.getBeanClassName();
        // 当用 @Bean 返回的类型是Object时,beanClassName是 null
        if(beanClassName != null) {
            Class clazz = ClassUtils.resolveClassName(definition.getBeanClassName(), this.classLoader);
            ReflectionUtils.doWithFields(clazz, new ReflectionUtils.FieldCallback() {
                @Override
                public void doWith(Field field) throws IllegalArgumentException, IllegalAccessException {
                    // 具体的修改工作在这里【关键】
                    HsfConsumerPostProcessor.this.parseElement(field, properties);
                }
            });
        }
    }

    // 最后将修改后的bean定义信息放回容器
    for (String beanName : beanDefinitions.keySet()) {
        if (context.containsBean(beanName)) {
            throw new IllegalArgumentException("[HSF Starter] Spring context already has a bean named " + beanName
                    + ", please change @HSFConsumer field name.");
        }
        registry.registerBeanDefinition(beanName, beanDefinitions.get(beanName));
        logger.info("registered HSFConsumerBean \"{}\" in spring context.", beanName);
    }
}

com.alibaba.boot.hsf.consumer.HsfConsumerPostProcessor#parseElement

private void parseElement(Field field, HsfProperties properties) {
    HSFConsumer annotation = AnnotationUtils.getAnnotation(field, HSFConsumer.class);
    // 首先判断bean上是否有HSFConsumer注解,如果没有,直接返回
    if (annotation == null) {
        return;
    }

    HsfConsumerBeanDefinitionBuilder beanDefinitionBuilder = new HsfConsumerBeanDefinitionBuilder(field.getType(),
            annotation);
    beanDefinitionBuilder.context(context).beanFactory(beanFactory).properties(properties);
    // 这里生成了服务消费者的bean定义信息【关键】
    BeanDefinition beanDefinition = beanDefinitionBuilder.build();
    if (checkFieldNameDuplicate4FirstTime(field.getName(), beanDefinition)) {
        logger.error("registered HSFConsumerBean with duplicate fieldName:{} in spring context.", field.getName());
    }
    beanDefinitions.put(field.getName(), beanDefinition);
}

com.alibaba.boot.hsf.consumer.HsfConsumerBeanDefinitionBuilder#buildbean定义的生成过程:

BeanDefinition build() {

    // 这里定义了创建出的bean是HSFSpringConsumerBean【关键】
    BeanDefinitionBuilder builder = BeanDefinitionBuilder.genericBeanDefinition(HSFSpringConsumerBean.class);
    
    // 首先配置服务消费的基本信息
    builder.setInitMethodName(Constants.INIT_METHOD);
    builder.addPropertyValue("interfaceClass", interfaceClass);
    builder.addPropertyValue("group", resolveGroup(annotation, interfaceClass, properties, beanFactory));
    String version = resolveVersion(annotation, interfaceClass, properties, beanFactory);
    builder.addPropertyValue("version", version);
    Object clientTimeout = resolveTimeout(annotation, interfaceClass, properties, beanFactory);
    if (clientTimeout != null) {
        builder.addPropertyValue("clientTimeout", clientTimeout);
    }
    builder.addPropertyValue("maxWaitTimeForCsAddress", ConsumerPropertiesResolver
            .resolveMaxWaitTimeForCsAddress(annotation, interfaceClass, properties, beanFactory));
    builder.addPropertyValue("proxyStyle", annotation.proxyStyle());
    builder.addPropertyValue("generic", annotation.generic());
    builder.addPropertyValue("singleton", annotation.singleton());
    if (annotation.includeFilters().length != 0) {
        builder.addPropertyValue("includeFilters", Arrays.asList(annotation.includeFilters()));
    }
    if (annotation.includeRouters().length != 0) {
        builder.addPropertyValue("includeRouters", Arrays.asList(annotation.includeRouters()));
    }
    if (annotation.configServerCenters().length != 0) {
        builder.addPropertyValue("configserverCenter", Arrays.asList(annotation.configServerCenters()));
    }

    ...

    return builder.getBeanDefinition();
}

bean定义的创建过程主要维护了服务调用的基本信息,其中最终的是指明了创建出来的bean是HSFSpringConsumerBean

小总结

回顾一下刚才所讲内容,其实主要在做的就是,将标注了HSFConsumer注解的类注入到Spring容器中,并且,由于我们只是指定了服务消费的接口,由HSF的后置处理器将这些需要创建的bean统一实例化为HSFSpringConsumerBean

消费者初始化

HSFSpringConsumerBean中定义了它的初始化方法。

com.taobao.hsf.app.api.util.HSFApiConsumerBean#init()中的代码很长,但是核心内容是亮点

  1. 配置初始化
  2. 维护代理

配置初始化

com.taobao.hsf.model.metadata.ServiceMetadata#init这边初始化操作在服务提供方的源码讲解中也提到过,这里就不过多赘述;不过与服务提供方不同的是,这里主要创建创建了同步调用器syncInvocationHandler,来用于处理调用。

维护代理

com.taobao.hsf.model.metadata.ServiceMetadata#setTarget就是具体执行调用的组件维护进ServiceMetadata中,具体看看com.taobao.hsf.app.api.util.HSFApiConsumerBean#consume的内容。

private Object consume(final ServiceMetadata metadata) {

    //已经使用了 serviceMetaData 的唯一性做了控制
//        if (applicationModel.getConsumedServiceModel(metadata.getUniqueName()) != null) {
//            return applicationModel.getConsumedServiceModel(metadata.getUniqueName()).getProxyObject();
//        }

    // 生成调用远程HSF服务的代理
    ProxyDecoratorGenerator proxyDecoratorGenerator = HSFServiceContainer.getInstance(
            ProxyDecoratorGenerator.class);
    Class[] decorateInterfaces = proxyDecoratorGenerator.getDecorateInterfaces(metadata);

    //proxy
    ProxyFactory proxyFactory = HSFServiceContainer.getInstance(ProxyFactory.class, metadata.getProxyStyle());
    // 1. 根据代理工厂和需要代理的服务信息,生成对应的代理【关键】
    Object proxy = proxyFactory.getProxy(metadata, decorateInterfaces);
    Method[] methods = proxyFactory.getMethods(proxy);

    //model
    ConsumerServiceModel consumerServiceModel = new ConsumerServiceModel(metadata, proxy, methods);
    metadata.setConsumerServiceModel(consumerServiceModel);

    // 这里将consumerServiceModel维护到applicationModel容器中
    //TODO 这里是client的注册,这里应该生成RPCClient
    applicationModel.initConsumerService(metadata.getUniqueName(), consumerServiceModel);

    // 2. 将元信息引用上服务,设置消费调用链信息【关键】
    metadata.referSync();


    return proxy;
}

这边有两种创建代理的方法,一种是java原生的,另一种是基于javassist的,默认使用java原生的。

com.taobao.hsf.proxy.JdkProxyFactory#getProxy

@Override
public Object getProxy(ServiceMetadata metadata, Class... interfacesArray) {
    try {
        // 代理对象,这里定义了实际的调用动作【关键】
        JdkProxyInvocationHandler jdkProxyInvocationHandler = new JdkProxyInvocationHandler(metadata);
        // 主要就是调用java原生的动态代理api,创建一个代理对象
        Object instance = Proxy.newProxyInstance(metadata.getIfClazz().getClassLoader(), interfacesArray, jdkProxyInvocationHandler);
        jdkProxyInvocationHandler.init(instance);
        return instance;
    } catch (Throwable t) {
        throw new HSFException("failed to generate jdk proxy",t);
    }
}

代理类的真实行为暂时按下不表,在消费者的调用中在详细介绍,目前先聚焦在消费者的创建。

上面的步骤2将消费者元信息关联上了调用链信息,下面单独讨论一下。

设置消费调用链

com.taobao.hsf.model.metadata.ServiceMetadata#referSync

public InvocationHandler referSync() {
    Future future = getExportReferExecutorService().submit(new Callable() {
        @Override
        public InvocationHandler call() throws Exception {
            return refer();
        }
    });

    try {
        return future.get();
    } catch (Throwable e) {
        throw new RuntimeException(e);
    }
}

这里还是异步调用了com.taobao.hsf.model.metadata.ServiceMetadata#refer方法,具体如下:

public InvocationHandler refer() {
    try {
        // 通过责任链生成调用处理器
        InvocationHandler protocolInvocationHandler = protocolFilterChain.refer(this);
        consumerHandlerHitch.setInvocationHandler(protocolInvocationHandler);

        return this.invocationHandler;
    } catch (Throwable t) {
        String errorCodeStr = LoggerHelper.getErrorCodeStr("HSF", "HSF-0060", "HSF",
                "refer service: " + getUniqueName() + ", group=" + getGroup() + " got error");
        LOGGER.error("HSF-0060", errorCodeStr, t);
        throw new RuntimeException(t);
    }
}

这里的调用链上也有多个组件

其中比较重要的有两个:

  1. RegistryProtocolInterceptor
  2. HSFProtocol

RegistryProtocolInterceptor

InvocationHandler protocolInvocationHandler = protocolFilterChain.refer(this);环节获取到的protocolInvocationHandler也就是RegistryProtocolInterceptor

其主要作用就是从注册中心获取服务的地址信息

com.taobao.hsf.registry.RegistryProtocolInterceptor#refer

@Override
public InvocationHandler refer(ServiceMetadata serviceMetadata) {
    InvocationHandler invocationHandler = protocol.refer(serviceMetadata);
	// 拿到一个用于访问注册中心的处理器
    RegistryInvocationHandler registryHandler = serviceMetadata.getAttachment(REGISTRY_INVOCATION_HANDLER_KEY);
    //TODO remove this null check
    if (registryHandler == null) {
        registryHandler = new RegistryInvocationHandler();
        registryHandler.setServiceMetadata(serviceMetadata);
        registryHandler.setProtocol(protocol);
        registryHandler.start();
        serviceMetadata.putAttachment(REGISTRY_INVOCATION_HANDLER_KEY, registryHandler);
    }

	// 在这里去访问注册中心【关键】
    registryHandler.refer();
    registryHandler.setInvocationHandler(invocationHandler);
    return registryHandler;
}

com.taobao.hsf.registry.RegistryInvocationHandler#refer

/**
 * 将当前元信息注册到对应的注册中心上
 *
 * @return 订阅的注册中心数量
 */
public int refer() {
    int registryNum = 0;
    for (Map.Entry entry : registryClusterStrategyMap.entrySet()) {
        // 主要的工作在这个Registry的订阅方法中
        entry.getKey().subscribe(protocol, serviceMetadata, entry.getValue());
        registryNum++;
    }

    return registryNum;
}

这里主要看一下使用ConfigServer作为注册中心时的交互方式,com.taobao.hsf.registry.cs.ConfigServerRegistry#subscribe

@Override
public void subscribe(Protocol protocol, ServiceMetadata metadata, RawAddressListener listener) {
    synchronized (subscriberLock) {
        if (metadata2DataSource.containsKey(metadata)) {
            LOGGER.info("ConfigServerRegistry", "service [" + metadata.getUniqueName() + "] has already been subscribed.");
        } else {
            List centers = (metadata.getConfigserverCenter() == null || metadata.getConfigserverCenter().isEmpty()) ? new ArrayList() : metadata.getConfigserverCenter();
            if (centers.size() == 0) {
                centers.add(DEFAULT);
            }

            // 创建了一个数据源,其中维护了服务提供者的ip信息
            CSDataSource dataSource;
            if (CSCommonUtils.isClusterEnabled(metadata) || centers.size() > 1) {
                dataSource = new MultiClusterDataSource( this,protocol,metadata, listener, applicationModel);
            } else {
                dataSource = new SingleClusterDataSource(this, protocol, metadata, listener, applicationModel);
            }
            metadata2DataSource.put(metadata, dataSource);

            List protocols = protocol.getSupportedProtocol();
            for (String pro : protocols) {
                //for now, cs accept hsf only
                if (pro.equalsIgnoreCase("hsf")) {
                    for (String center : centers) {
                        if (center != null) {
                            dataSource.subscribe(pro, metadata, center);
                        }
                    }
                }
            }
        }
    }
}

这里代码看似很多,其实最主要的工作就是维护了一个dataSource对象,而这个对象就帮我们对接注册中心,维护了服务提供方的ip信息。

HSFProtocol

com.taobao.hsf.remoting.service.HSFProtocol#refer

@Override
public InvocationHandler refer(ServiceMetadata serviceMetadata) {
    // 这里返回一个用于处理rpc调用的处理器
    return HSFServiceContainer.getInstance(InvocationHandler.class, "HSF");
}

根据声明的名称HSF,找到了对应的实现类RemotingRPCProtocolComponent

当发起调用的时候,最终就通过com.taobao.hsf.remoting.service.RemotingRPCProtocolComponent#invoke方法向服务提供方发起调用。

小总结

消费者的初始化阶段,初始化了配置信息,创建对应的代理对象,并进行了消费方启动的一系列初始化工作,包括监听注册中心、定义rpc调用方法等。

消费调用

上文在服务启动中讲到,对于我们声明创建的接口,HSF统一实例化为HSFSpringConsumerBean,而其中的具体逻辑交由HSFApiConsumerBean,其中的ServiceMetadata中,通过target对象维护了一个代理,真实定义了服务的具体调用方式。

而创建的代理是JdkProxyInvocationHandler,现在我们就来看看其中的具体行为。

调用方法定义在父类中com.taobao.hsf.proxy.ProxyInvocationHandler#invoke

@Override
public Object invoke(final Object proxy, final Method method, final Object[] args) throws Throwable {
    ApplicationModelFactory.setCurrentApplication(serviceMetadata.getApplicationModel());

    ConsumerMethodModel methodModel = serviceMetadata.getConsumerServiceModel().getMethodModel(method);
    // 具体执行调用的地方【关键】
    return InvocationUtil.invoke(methodModel, args);
}

com.taobao.hsf.InvocationUtil#invoke

public static Object invoke(ConsumerMethodModel methodModel, Object... args) throws Throwable {
    Invocation invocation = InvocationUtil.buildInvocation(methodModel, args);
    // 这里就根据元信息中的同步调用器,获取到返回结果【关键】
    RPCResult rpcResult = methodModel.getMetadata().getSyncInvocationHandler().invoke(invocation);

    HSFResponse hsfResp = rpcResult.getHsfResponse();
    //exception handling
    if (hsfResp.isError()) {
        if (hsfResp.getAppResponse() instanceof Throwable) {
            throw new HSFException(hsfResp.getErrorMsg(), (Throwable) hsfResp.getAppResponse());
        } else {
            throw new HSFException(hsfResp.getErrorMsg());
        }
    }

    Object appResponse = hsfResp.getAppResponse();
    // 检查对端返回的业务层对象: 如果返回的是异常对象,则重新抛出异常
    if (appResponse instanceof Throwable) {
        throw (Throwable) appResponse;
    }

    return hsfResp.getAppResponse();
}

这里就用到了在配置初始化阶段创建出来的syncInvocationHandler,这边构造了一个流水线,但其实其中的核心调用动作,还是交给了创建出来的invocationHandler

而在invocationHandler中,最关键的发起调用的部分则交给了RemotingRPCProtocolComponent,下面具体看看调用方法。

com.taobao.hsf.remoting.service.RemotingRPCProtocolComponent#invoke

@Override
public ListenableFuture invoke(Invocation invocation) throws Throwable {
    ConsumerMethodModel methodModel = invocation.getClientInvocationContext().getMethodModel();
    ServiceMetadata serviceMetadata = methodModel.getMetadata();
    String serviceUniqueName = methodModel.getUniqueName();
    InvokerContext invokerContext = invocation.getInvokerContext();

    ListenableFuture rpcFuture;

    // 获取调用的地址
    ServiceURL remoteURL = invocation.getTargetAddress();

    // 如这个时候targetURL仍然为null,抛出异常
    if (remoteURL == null) {
        throw new HSFServiceAddressNotFoundException("", LoggerHelper.getErrorCodeStr(
                "hsf",
                "HSF-0001",
                "Env",
                "[HSF-Consumer] can not find target server address,serviceName:" + serviceUniqueName + " group:"
                        + serviceMetadata.getGroup()));
    }

	// 调用的url信息
    String remoteIp = remoteURL.getHost();
    invokerContext.setUrl(remoteURL);
    invokerContext.setRemoteIp(remoteIp);

    // 设置了本次调用上下文的ip地址
    ThreadLocalUtil.set(HSFConstants.TARGET_SERVER_IP, remoteIp);

    // 发起调用【关键】
    rpcFuture = invokeForOne(methodModel, invocation, remoteURL);

    return rpcFuture;
}

其中拿到的ip实际上是在RegistryInvocationHandler这个前置处理器中维护的,这里不赘述。

com.taobao.hsf.remoting.service.RemotingRPCProtocolComponent#invokeForOne

private ListenableFuture invokeForOne(ConsumerMethodModel consumerMethodModel, Invocation invocation,
                                                 ServiceURL remotingURL) throws Throwable {
    int timeout = InvocationUtil.getReadTimeout(invocation, remotingURL);

    try {
        // 1. 首先获取一个客户端
        ClientStream clientStream = client.of(remotingURL);
        if (clientStream == null) {
            throw new HSFException("Target server [" + remotingURL + "] has become unreachable.");
        }

        //used by sync invoke
        invocation.getInvokerContext().setTimeout(timeout);
        invocation.getInvokerContext().setSerializeType(
                serializeType(consumerMethodModel.getMetadata(), remotingURL, clientStream));
        invocation.getInvokerContext().setProtocolType(remotingURL.getProtocolType());
        invocation.getInvokerContext().setBiDirectionalInvocation(false);
        clientStream.setProtocolType(remotingURL.getProtocolType());

        // 2. 发送请求
        return clientStream.write(invocation);

    } catch (Exception e) {
        throw new HSFException("error on submit request on future invoke:", e);
    }

}

发起调用的过程主要分两个步骤:

  1. 获取客户端
  2. 用刚才获取到的客户端进行发送请求
  3. 处理响应

下面具体看看。

获取客户端

com.taobao.hsf.io.client.AbstractClient#of

@Override
public ClientStream of(ServiceURL serviceURL) {
    if (serviceURL == null) {
        return null;
    } else {
        int connTimeout = serviceURL.getParameter(HSFConstants.CONNECT_TIMEOUT_KEY, 1000);
        long connectionId = AddressUtils.addressToLong(serviceURL.getHost(), serviceURL.getPort());
        if (connectionId == -1) {
            return null;
        }
        // 获取客户端【关键】
        ClientStream clientStream = getClientStream(connectionId, connTimeout);
        return clientStream;
    }
}

com.taobao.hsf.io.client.AbstractClient#getClientStream(long, int)

private ClientStream getClientStream(final long connectionID, int timeout) {
    ClientStream stream = clientStreams.get(connectionID);
    if (stream != null) {
        return stream;
    }
    Lock streamConnectLock = getStreamConnectLock(connectionID);
    try {
        if (streamConnectLock.tryLock(4, TimeUnit.SECONDS)) {
            try {
                stream = clientStreams.get(connectionID);
                if (stream == null) {
                    // 这里真实创建了一个客户端【关键】
                    stream = connect(connectionID, timeout);
                    if (stream != null) {
                        clientStreams.put(connectionID, stream);
                        connectLocks.remove(connectionID, streamConnectLock);
                    }
                }
            } finally {
                streamConnectLock.unlock();
            }
        } else {
            LOGGER.warn("try to connect to " + connectionID + " failed, caused by get lock timeout");
        }
    } catch (InterruptedException e) {
        LOGGER.error("HSF", "getStreamConnectLock " + connectionID + " Interrupted", e);
    }
    return stream;
}

这里代码看似复杂,其实核心在做的是维护了一个key是connectionID的客户端缓存信息。

为什么要对客户端进行池化?因为创建的开销不小。

具体看看创建过程。

com.taobao.hsf.io.netty.tcp.NettyClient#connect

@Override
public ClientStream connect(final long connectionID, int connectTimeout) {
    Bootstrap bootstrap = new Bootstrap();
    // 创建netty客户端
    bootstrap.group(workerGroup)
            .option(ChannelOption.TCP_NODELAY, true)//
            .option(ChannelOption.SO_REUSEADDR, true)//
            .option(ChannelOption.ALLOCATOR, Holder.byteBufAllocator)//
            .option(ChannelOption.ALLOW_HALF_CLOSURE, Boolean.FALSE)
            .channel(NioSocketChannel.class)
            .handler(new ChannelInitializer() {
                @Override
                protected void initChannel(NioSocketChannel ch) {
                    ch.pipeline()
                            .addLast("protocol", new NettyProtocolHandler())
                            .addLast("clientIdleHandler", new IdleStateHandler(getHbSentInterval(), 0, 0))
                            .addLast("clientHandler",
                                    new NettyClientStreamHandler(NettyClient.this, connectionID,
                                            clientStreamLifecycleListeners, clientStreamMessageListeners));
                }
            });

    bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, connectTimeout);

	// 下面的代码暂时不重要
    
    if (isWaterMarkEnabled()) {
        bootstrap.option(ChannelOption.WRITE_BUFFER_WATER_MARK,
                new WriteBufferWaterMark(getLowWaterMark(), getHighWaterMark()));
    }

    String targetIP = AddressUtils.longToHost(connectionID);
    int targetPort = AddressUtils.longToPort(connectionID);
    ChannelFuture future = bootstrap.connect(new InetSocketAddress(targetIP, targetPort));
    try {
        future.await(connectTimeout, TimeUnit.MILLISECONDS);
    } catch (InterruptedException e) {
        LOGGER.error("HSF", "await connect Interrupted", e);
    }

    ClientStream result = null;
    if (future.isSuccess()) {
        if (StreamUtils.streamOfChannel(future.channel()) == null) {
            NettyClientStream clientStream = new NettyClientStream(connectionID, future.channel());
            clientStream.setClient(this);
            StreamUtils.bindChannel(future.channel(), clientStream);
        }
        result = (ClientStream) StreamUtils.streamOfChannel(future.channel());
    }
    return result;
}

创建netty客户端的核心成果在NettyClientStreamHandler,其中定义了调用结果响应的方式。

发送请求

com.taobao.hsf.io.stream.AbstractStream#write(com.taobao.hsf.invocation.Invocation)

@Override
public ListenableFuture write(Invocation invocation) {
    if (invocation.getInvokerContext().isBiDirectionalInvocation()) {
        if (!this.supportBiDirection()) {
            LOGGER.warn("", "Ignored request to stream:{} cause the client is not bi-directional", getRemoteAddress());
            return null;
        }
    }
    Executor executor = invocation.getExecutor();
    if (executor == null) {
        executor = threadPoolService.callbackExecutor();
    } else if (executor instanceof UserThreadPreferedExecutor) {
        ((UserThreadPreferedExecutor) executor).activate();
    }
    SettableFuture ioFuture = Futures.createSettableFuture(executor);
    ResponseMessageAnswerHandler handler = new ResponseMessageAnswerHandler(
            invocation.getApplicationModel(), SerializePhase.BIZ, ioFuture, this);
    ListenableFuture resultFuture = Futures.map(ioFuture, handler);
    PacketFactory packetFactory = PacketFactorySelector.getInstance().select(
            invocation.getInvokerContext().getProtocolType());

    RequestPacket requestPacket;
    StreamWriteRequest streamWriteRequest;
    if (invocation.getInvocationType() == InvocationType.BIZ) {
        if (invocation.getInvokerContext().getSerializeType()
                == SerializeType.OPTIMIZED_HESSIAN2.getCode()) {
            streamWriteRequest = new StreamWriteRequest(invocation, handler);
            handler.setSerializePhase(SerializePhase.IO);
        } else {
            requestPacket = packetFactory.clientCreate(invocation, this);
            streamWriteRequest = new StreamWriteRequest(requestPacket, handler);
        }
    } else {
        requestPacket = packetFactory.clientCreateHeartbeatRequest();
        streamWriteRequest = new StreamWriteRequest(requestPacket, handler);
    }

	// 发送请求数据
    send(streamWriteRequest);
    return resultFuture;
}

com.taobao.hsf.io.netty.tcp.NettyClientStream#send

@Override
public void send(Object packet) {
    //TODO 区别responseType
    if (client.isWaterMarkEnabled() && !isWritable()) {
        String errorMsg = MessageFormat.format(
                "write overflow, client gave up writing request to channel {0}, bytesBeforeWritable: {1}, watermark: [{2}-{3}] bytes",
                channel, channel.bytesBeforeUnwritable(), client.getLowWaterMark(), client.getHighWaterMark());
        LOGGER.error("HSF-0102", errorMsg);
        //fail fast: don't let invoker wait for timeout exception
        throw new HSFException(errorMsg);
    } else {
        // 真正的数据发送
        channel.writeAndFlush(packet);
    }
}

处理响应

com.taobao.hsf.io.netty.client.NettyClientStreamHandler#channelRead

@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
    ClientStream stream = (ClientStream) StreamUtils.streamOfChannel(ctx.channel());
    if (msg instanceof RequestPacket) {
        RequestPacket requestPacket = (RequestPacket) msg;
        ClientServiceHandler clientServiceHandler = ClientHandlerSelector.getInstance().select(requestPacket.protocolType());
        if (clientServiceHandler != null) {
            clientServiceHandler.process(requestPacket, stream);
        } else {
            log.warn("No client service found for request. protocolId:{},requestId:{},serializeType:{},messageType:{}", requestPacket.protocolType(),
                    requestPacket.requestId(), requestPacket.serializeType(), requestPacket.messageType());
        }
    } else if (msg instanceof ResponsePacket) {
        // 处理返回结果【关键】
        callConnectionReceivedListeners(client, stream, (ResponsePacket) msg);
    } else {
        log.warn("Received unrecognized msg:{}", msg);
    }
}

com.taobao.hsf.io.netty.client.NettyClientStreamHandler#callConnectionReceivedListeners

private void callConnectionReceivedListeners(Client client, ClientStream stream, ResponsePacket message) {
    for (int i = 0; i < clientStreamMessageListeners.length; i++) {
        ClientStreamMessageListener listener = clientStreamMessageListeners[i];
        try {
            listener.received(client, stream, message);
        } catch (Exception ex) {
            String errorCodeStr = LoggerHelper.getErrorCodeStr("HSF", "HSF-0085", "HSF",
                    "invoke MessageListener#received" + listener.getClass() + " got exception");
            log.error("HSF-0085", errorCodeStr, ex);
        }
    }
}

具体处理接受响应的是ReceiveResponse处理器,com.taobao.hsf.io.stream.support.client.ReceiveResponse#received

@Override
public void received(Client client, ClientStream stream, ResponsePacket message) {
    MessageAnswerHandler answerHandler = stream.removeAnswerHandler(message.requestId());
    if (answerHandler != null) {
        if (answerHandler.getSerializePhase() == SerializePhase.IO) {
            RPCResult rpcResult = answerHandler.call(message);
            answerHandler.setAnswer(new RpcResultResponsePacketWrapper(rpcResult));
            if (rpcResult.getHsfResponse().getStatus() == ResponseStatus.CLIENT_DESERIALIZE_ERROR) {
                stream.attributeMap().put(STREAM_OPTIMIZED_HESSIAN_ENABLE_KEY, false);
            }
        } else {
            answerHandler.setAnswer(message);
        }
        //成功收到服务端的业务响应,清除心跳失败的计数
        stream.clearContinuousHbFailedTimes();
    } else {
        log.warn("Receive response which requestId has not been stored, maybe some problem happened on network." + stream);
    }
}

总结

到这里,HSF的核心源码基本就梳理结束了。

参考文献

https://zhuanlan.zhihu.com/p/147025312

目录
相关文章
|
Dubbo Java 应用服务中间件
阿里新框架干掉微服务,换下Dubbo,Spring CloudAlibaba王者降临
tm快了,不知不觉中金九银十的秋招已经快结束了,不少同学现在已经拿到offer了吧~现在的面试可是越来越难了,动不动就是“互联网三高”。
阿里新框架干掉微服务,换下Dubbo,Spring CloudAlibaba王者降临
|
编解码 Dubbo 应用服务中间件
Alibaba开源Dubbo源码解析手册,竟引领出RPC的新潮流
前言 Apache Dubbo,一款由阿里巴巴于2011年开源的高性能Java RPC框架,自开源以来在业界产生了深远影响。有大量公司广泛使用,甚至很多公司的自研RPC框架中都能看到Dubbo的影子。Dubbo在国内服务化体系演进过程中扮演了重要角色。尽管经历了几年的沉寂,但在阿里巴巴重启对Dubbo的开源维护,Dubbo正在从微服务领域的高性能RPC框架逐步演变为一个完整的微服务生态。 对于开发者来说,深入了解Dubbo底层的架构设计和实现是一项挑战。因此,一份完整的、体系化的对Apache Dubbo进行深入原理剖析的手册就显得尤为重要。
|
5月前
|
负载均衡 Dubbo Java
哈啰面试:说说Dubbo运行原理?
哈啰面试:说说Dubbo运行原理?
49 0
哈啰面试:说说Dubbo运行原理?
|
NoSQL Java Redis
爱了!阿里巴巴内部出品“SpringBoot+微服务指南”,理论与实战
爱了!阿里巴巴内部出品“SpringBoot+微服务指南”,理论与实战 有幸从一位朋友那里得到Alibaba内部出品强推的“SpringBoot+微服务学习指南”,秉承好东西的当然要共享的原则,今天就来秀一把,试试这“Springboot+微服务学习指南”是否也能让你事半功倍! Spring Boot 构建小系统到架构分布式大系统(理论+实战)
|
Java 应用服务中间件 HSF
走进HSF源码
前言本文源自一次组内分享,于是接着这个机会,将HSF的源码阅读过程记录下来,以供自己温故而知新。如果有哪些地方理解不到位的,还请诸位批评指正!简介HSF (High-speed Service Framework),高速服务框架,是在阿里巴巴内部广泛使用的分布式RPC服务框架。众所周知,HSF一款与我们的日常生活密不可分的RPC框架;所谓RPC——远程过程调用,就是指像调用本地方法一样调用远程的方
989 0
走进HSF源码
|
Rust Dubbo JavaScript
2023 - Dubbo 谷歌编程之夏报名启动了!
我们很高兴地宣布 Apache Dubbo 已正式参与到 GSoC 2023(2023 谷歌编程夏令营)中,当前贡献者报名阶段也已经正式启动,如果您对 Dubbo、对 GSoC、对开源感兴趣,欢迎报名参与。今年的活动同时对在校大学生、社会员工开放。也就是说,只要是对开源和编码感兴趣的开发者就可以报名参加 Dubbo 项目夏令营。
684 4
阿里毕玄:RPC 框架优化之路,从 37k 到 168k
阿里毕玄:RPC 框架优化之路,从 37k 到 168k
141 0
|
存储 Kubernetes Cloud Native
饿了么 Dubbo3 实践分享 | 学习笔记
快速学习饿了么 Dubbo3 实践分享
饿了么 Dubbo3 实践分享 | 学习笔记
|
XML SpringCloudAlibaba Dubbo
硬核 | 我一个人开发了“Dubbo”框架
大家好,我是冰河~~ 没错,这次冰河又要搞事情了,这次准备下手的是RPC框架项目。为什么要对RPC框架项目下手呢,因为在如今分布式、微服务乃至云原生不断发展的过程中,RPC作为底层必不可少的通信组件,被广泛应用在分布式、微服务和云原生项目中。
144 0
硬核 | 我一个人开发了“Dubbo”框架
|
Rust 负载均衡 Dubbo
Dubbo 编程之夏报名启动了!
今年 Dubbo 编程之夏活动对在校大学生、社会员工同时开放,任何对开源感兴趣的开发者都可以报名参加。
428 2
Dubbo 编程之夏报名启动了!
下一篇
无影云桌面