前言
本文源自一次组内分享,于是接着这个机会,将HSF的源码阅读过程记录下来,以供自己温故而知新。如果有哪些地方理解不到位的,还请诸位批评指正!
简介
HSF (High-speed Service Framework),高速服务框架,是在阿里巴巴内部广泛使用的分布式RPC服务框架。
众所周知,HSF一款与我们的日常生活密不可分的RPC框架;所谓RPC——远程过程调用,就是指像调用本地方法一样调用远程的方法,HSF帮我们实现了远程通讯、序列化、同步/异步调用,极大的便捷了我们的开发。
总体架构
这是HSF的总体架构,其中核心的有三块内容:
-
服务提供方
-
服务消费方
-
注册中心(通常由一些成熟的注册中心中间件代劳了,例如Zookeeper、Eureka、Nacos)
服务提供方
首先,我们就从服务提供方的视角,来详细看看HSF的执行细节。
从使用说起
HSF提供了便捷服务发布方法,示例代码如下:
public interface HelloService {
String hello(String name);
}
@HSFProvider
public class HelloServiceImpl implements HelloService{
@Override
public String hello(String name) {
return "Hello,"+name+"!";
}
}
实现功能接口后,只需要加上@HSFProvider就可以将其发布,下面我们来看看这个注解到底做了哪些事情。
源码解析
源码阅读主要包含两部分内容:
-
服务启动,也就是在应用启动时做的一些事情
-
服务调用,处理rpc请求时的具体操作
服务启动
在服务启动这个阶段,又可以分为以下几个步骤:
-
服务实例化,主要是和spring容器打交道
-
服务初始化,准备了一些服务发布环节关键的组件
-
服务发布,将服务启动,并注册到注册中心
下面具体看看。
服务实例化
这部分主要是和spring容器打交道比较多一点,借助spring的IOC能力,将服务实例化。
服务启动的源头在@HSFProvider,其定义如下:
@Target({ElementType.TYPE})
@Retention(RetentionPolicy.RUNTIME)
@Component
@Import(HsfProviderAnnotationRegistrar.class)
public @interface HSFProvider {
/**
* The value may indicate a suggestion for a logical component name,
* to be turned into a Spring bean in case of an autodetected component.
*
* @return the suggested component name, if any
*/
String value() default "";
/**
* HSF service interface
*/
Class serviceInterface() default Object.class;
/**
* HSF service version
*/
String serviceVersion() default Constants.DEFAULT_VERSION;
/**
* HSF service group
*/
String serviceGroup() default Constants.DEFAULT_GROUP;
/**
* HSF service name
*/
String serviceName() default "";
...
这里主要定义了发布服务的version、group等信息;不过,其中有两点关键信息:
-
@Component,为了让Spring容器识别出这个Bean
-
@Import(HsfProviderAnnotationRegistrar.class),这个注解相当重要,服务发布的主要工作都通过这个注解来实现。
小知识
@Import(HsfProviderAnnotationRegistrar.class)的作用:Import实现ImportBeanDefinitionRegistrar的类,可以在bean定义信息进行一些自定义的操作。
BeanDefinition可以理解为Bean的元信息,通过调用Bean工厂的org.springframework.beans.factory.support.AbstractBeanFactory#doGetBean方法,即可根据其元信息来实例化Bean对象;最常见的ApplicationContext其实就是一个BeanFacoty。
重点看一下HsfProviderAnnotationRegistrar.class
@Configuration
@ConditionalOnProperty(name = Constants.ENABLED, matchIfMissing = true)
@EnableConfigurationProperties(HsfProperties.class)
@AutoConfigureBefore(HsfConsumerAutoConfiguration.class)
public class HsfProviderAnnotationRegistrar implements ImportBeanDefinitionRegistrar, EnvironmentAware,
BeanClassLoaderAware {
其主要实现了ImportBeanDefinitionRegistrar类的registerBeanDefinitions方法:
@Override
public void registerBeanDefinitions(AnnotationMetadata importingClassMetadata, BeanDefinitionRegistry registry) {
HsfProperties hsfProperties = HsfPropertiesUtils.buildHsfProperties(environment);
if (!hsfProperties.isEnabled()) {
if (logger.isDebugEnabled()) {
logger.debug("spring.hsf.enabled is false, so skip process @HSFProvider");
}
return;
}
// 这个是真实服务的bean
String targetBeanName = BeanNameUtils.beanName(importingClassMetadata, registry);
// 这里又创建了一个bean,用于对外提供服务能力
String beanName = targetBeanName + Constants.GENERATE_HSFPROVIDERBEAN_SUFFIX;
String className = importingClassMetadata.getClassName();
Class targetClass;
try {
targetClass = this.classLoader.loadClass(className);
// 1. 解析HSFProvider注解的内容
HSFProvider hsfProvider = AnnotationUtils.findAnnotation(targetClass, HSFProvider.class);
configIfNecessary(hsfProperties);
HsfProviderBeanDefinitionBuilder builder = new HsfProviderBeanDefinitionBuilder(targetBeanName, hsfProvider)
.clazz(targetClass)
.properties(hsfProperties);
// 2. 创建beanDefinition【关键点】
BeanDefinition beanDefinition = builder.build(registry);
if (beanDefinition != null) {
if (registry.containsBeanDefinition(beanName)) {
throw new BeanDefinitionValidationException(
"BeanDefinition with the same beanName already existed, please check your config! beanName:"
+ beanName);
}
// 3. 将新创建的beanDefinition注册进去,这里实际维护了一个map
registry.registerBeanDefinition(beanName, beanDefinition);
logger.info("[HSF Starter] register HSF provider bean: {}, targetClass: {}", beanName, className);
}
} catch (ClassNotFoundException e) {
throw new BeanCreationException("create hsf provider bean error! beanName:" + beanName, e);
}
}
其中第二点创建beanDefinition比较关键,具体如下:
BeanDefinition build(BeanDefinitionRegistry registry) {
// 首先是一些常规的校验
String serviceInterface = null;
// find hsf service interface
if (annotation.serviceInterface().equals(Object.class)) {
// 检查类是否只实现了一个接口
Class[] allInterfaces = ClassUtils.getAllInterfacesForClass(clazz);
if (allInterfaces != null && allInterfaces.length == 1) {
serviceInterface = allInterfaces[0].getName();
} else {
throw new IllegalArgumentException("@HSFProvider class: " + clazz.getName()
+ " implements more than one interface: " + allInterfaces);
}
} else {
serviceInterface = annotation.serviceInterface().getName();
}
// Check after the service interface was resolved from the class
if ("false".equalsIgnoreCase(properties.getEnables().get(serviceInterface))) {
logger.warn("HSF service {} is disabled, therefore will not publish.", serviceInterface);
return null;
}
// 这里创建了一个HSFSpringProviderBean的工厂类【关键点】
BeanDefinitionBuilder builder = BeanDefinitionBuilder.genericBeanDefinition(HSFSpringProviderBean.class);
builder.setInitMethodName(Constants.INIT_METHOD);
builder.addPropertyValue("serviceInterface", serviceInterface);
builder.addPropertyValue("serviceVersion", ProviderPropertiesResolver.resolveVersion(annotation, serviceInterface, properties));
builder.addPropertyValue("serviceGroup", ProviderPropertiesResolver.resolveGroup(annotation, serviceInterface, properties));
builder.addPropertyValue("serviceName", annotation.serviceName());
Object clientTimeout = ProviderPropertiesResolver.resolveTimeout(annotation, serviceInterface, properties);
if (clientTimeout != null) {
builder.addPropertyValue("clientTimeout", clientTimeout);
}
// 接口的具体实现类的引用,可以从Spring容器中找到对应的实现类【重要】
builder.addPropertyReference("target", targetRef);
// 维护了builder中其他一些参数,暂时忽略
...
return builder.getBeanDefinition();
}
这里透露出一个重要的信息就是,不管发布的服务的具体类型,最终生成的对外提供服务的Bean都是HSFSpringProviderBean。
小总结
稍微回顾一下以上所讲内容。
HSFProvider注解通过一个Spring的钩子,最终为容器创建了两个bean实例,一个是原本的服务bean,另一个是对外提供服务的bean,也就是HSFSpringProviderBean。
服务初始化
在HSFSpringProviderBean实例化完成后,spring会调用一个钩子函数,进行服务自定义的初始化工作。
HSFSpringProviderBean这个对象实现了InitializingBean接口,在创建实例的时候,会执行对应的钩子方法afterPropertiesSet。
我们来看看init()方法中究竟做了什么。
/**
* 对外发布服务
*/
public void init() throws Exception {
// 避免被初始化多次
if (!inited.compareAndSet(false, true)) {
return;
}
// 1. 服务初始化
providerBean.initWithoutPub();
// 2. 服务发布
doPublish();
}
初始化方法中主要做了两件事:
-
服务初始化
-
服务发布
这两件事都非常重要,我们一件一件来看。
首先介绍几个类
-
HSFSpringProviderBean:Spring容器服务提供者的Bean实例
-
HSFApiProviderBean:具体进行服务提供的类
-
ServiceMetadata:服务元信息,包括了元数据信息本身的描述,以及服务信息的发布与订阅,其中的 target字段就是接口对应的实现类
-
ApplicationModel:HSF的私有容器,其中维护了服务的映射关系
-
InvocationHandler:调用处理器,通过这个类将具体的请求路由到对应的实现类上去执行。
这里还有一个细节,有两个命名十分相似的Bean,HSFSpringProviderBean和HSFApiProviderBean,个人感觉HSFSpringProviderBean的作用主要是和Spring容器打交道,而实际的操作则交给了HSFApiProviderBean来处理。
服务启动的代码如下:
public void initWithoutPub() throws Exception {
// 避免被初始化多次
if (!inited.compareAndSet(false, true)) {
return;
}
// 配置检查
checkConfig();
// 服务启动【关键点】
metadata.init();
}
具体的服务启动逻辑如下:
public synchronized void init() {
// 防止一个服务被发布多次
if (!initFlag.compareAndSet(false, true)) {
return;
}
// 服务元信息的初始化,暂时不重要
List serviceMetadataBeforeInits = applicationModel.getServiceContainer().getInstances(
ServiceMetadataBeforeInit.class);
for (ServiceMetadataBeforeInit serviceMetadataBeforeInit : serviceMetadataBeforeInits) {
serviceMetadataBeforeInit.beforeInit(this);
}
//add current app name to url
addProperty(APPLICATION_NAME_KEY, applicationModel.getName());
// 创建服务组件,暂时不重要
List components = applicationModel.getServiceContainer().
getInstances(ServiceComponent.class, isProvider ? new String[]{"provider"} : new String[]{"consumer"});
for (ServiceComponent component : components) {
component.init(this);
componentMap.put(component.name(), component);
}
// 配置应用使用的协议,HSF or DUBBO,为下面创建调用链准备
if (protocols.isEmpty()) {
protocols.addAll(applicationModel.getServiceContainer().getInstances(Protocol.class));
}
// 创建服务暴露调用链【重要】
protocolFilterChain = ProtocolInterceptorChainBuilder.build(this);
InvocationHandlerFactory invocationHandlerFactory = applicationModel.getServiceContainer()
.getInstance(InvocationHandlerFactory.class);
if (invocationHandlerFactory == null) {
throw new IllegalStateException("cannot find one valid implementation for InvocationHandlerFactory");
}
// 只看服务提供者的部分,主要创建出一个调用处理器【重要】
if (isProvider()) {
InvocationHandler serverInvocationHandler = invocationHandlerFactory.createInvocationHandler(getTarget());
invocationHandler = InvocationHandlerChainFactory.buildHandlerChain(this, serverInvocationHandler);
} else {
invocationHandler = InvocationHandlerChainFactory.buildHandlerChain(this, consumerHandlerHitch);
//bridge
SyncInvocationHandler clientSyncInvocationHandler = invocationHandlerFactory.createSyncInvocationHandler(
invocationHandler);
syncInvocationHandler = SyncInvocationHandlerChainFactory
.buildSyncInvocationHandlerChain(this, clientSyncInvocationHandler);
}
}
这边首先对服务元信息进行初始化,主要成果有两个:
-
protocolFilterChain,服务发布调用链,用于定义服务的发布过程
-
InvocationHandler,调用执行Handler
服务发布
com.taobao.hsf.app.spring.util.HSFSpringProviderBean#doPublish
private void doPublish() {
//非延迟发布,或者是非spring容器初始化的情况,直接发布服务
if (!providerBean.getMetadata().isDelayedPublish() || !isInSpringContainer) {
// 服务发布的具体操作【关键点】
providerBean.publish();
if(isInSpringContainer) {
setAppInitedStatus();
} else {
String errorCodeStr = LoggerHelper.getErrorCodeStr("HSF", "HSF-0051", "BIZ",
"HSFSpringProviderBean is not created in spring container");
LOGGER.warn(errorCodeStr);
}
}
}
com.taobao.hsf.app.api.util.HSFApiProviderBean#publish
public void publish() {
// 防止一个服务被发布多次
if (!isPublished.compareAndSet(false, true)) {
return;
}
try {
// HSFServiceContainer.getInstance(ProcessService.class).publish(metadata);
// Init all service in Spring Context or main thread Make sure init
// stage is Thread-Safe
// 创建一个服务提供者对象
ProviderServiceModel providerServiceModel = new ProviderServiceModel(metadata.getUniqueName(), metadata,
metadata.getTarget());
metadata.setProviderServiceModel(providerServiceModel);
// 1. 维护服务关系
applicationModel.initProviderService(metadata.getUniqueName(), providerServiceModel);
// 2. 服务发布的具体工作【关键点】
metadata.exportSync();
} catch (Exception e) {
throw new RuntimeException(e);
}
}
这边主要做了两件事:
-
维护服务信息
-
服务暴露
维护服务信息
将服务全限定名与提供服务信息维护到HSF容器中com.taobao.hsf.model.ApplicationModel#initProviderService
public void initProviderService(String serviceName, ProviderServiceModel serviceModel) {
// TODO thread safe?
if (this.providedServices.put(serviceName, serviceModel) != null) {
String errorCodeStr = LoggerHelper.getErrorCodeStr("HSF", "HSF-0051", "BIZ",
"already register the provider service: " + serviceName);
LOGGER.warn(errorCodeStr);
return;
}
ApplicationModelFactory.put(serviceName, this);
ApplicationModelFactory.put(serviceName, serviceModel.getMetadata());
InvocationHandler invocationHandler = serviceModel.getMetadata().getInvocationHandler();
//TODO this work should be done in Protocol.export()
// 这里维护了服务全限定名,与服务处理器的映射关系【关键】
providerService2Invoker.put(serviceName, invocationHandler);
}
其中invocationHandler就是在上一个环节服务启动中创建出来的。
服务暴露
com.taobao.hsf.model.metadata.ServiceMetadata#exportSync
public List exportSync() {
// 使用线程池来发布服务
Future> future = getExportReferExecutorService().submit(new Callable>() {
@Override
public List call() throws Exception {
// 暴露动作【关键】
return export();
}
});
// 同步阻塞等待服务发布的结果
try {
List rawURLs = future.get();
if (rawURLs != null) {
LOGGER.info("Interface[" + getUniqueName() + "]Group[" + getGroup() + "]Publish HSF service to ConfigServer success!");
} else {
if (!readyToPublish.get()) {
LOGGER.info("Interface[" + getUniqueName() + "]Group[" + getGroup() + "]use delay publish,wait for online/onlineSmart command to publish!");
}
}
return rawURLs;
} catch (Throwable e) {
throw new RuntimeException(e);
}
}
getExportReferExecutorService()获取到的线程池核心线程数为1,也就是说,这里发布服务的操作是串行的;具体这样做用意何为,还不太清楚。
具体的服务暴露逻辑com.taobao.hsf.model.metadata.ServiceMetadata#export
public List export() {
try {
// 保险起见,再次进行初始化,通常情况下,这一步跳过
init();
//reset flag for pub
setRegistryPub(true);
// 使用调用链来发布服务【关键】
return protocolFilterChain.export(this, invocationHandler);
} catch (Throwable t) {
String errorCodeStr = LoggerHelper.getErrorCodeStr("HSF", "HSF-0058", "HSF",
"export service: " + getUniqueName() + ", group=" + getGroup() + " got error");
LOGGER.error("HSF-0058", errorCodeStr, t);
throw new RuntimeException(t);
}
}
这里就通过上一个服务启动环节创建出来的protocolFilterChain,来将服务进行具体的发布流程。
整个服务暴露调用链大致包含以下部分:
com.taobao.hsf.protocol.DelayPublishProtocolInterceptor#export
->com.taobao.hsf.plugins.spas.SpasProtocolInterceptor#export
->com.taobao.hsf.feature.optimized.hessian.protocolinterceptor.OptimizedHessianProtocolInterceptor#export
->com.taobao.hsf.process.component.CodeployProtocolInterceptor#export
->com.taobao.hsf.plugins.eagleeye.protocol.EagleEyeProtocolInterceptor#export
->com.taobao.hsf.region.service.impl.RegionProtocolInterceptor#export
->com.taobao.hsf.site.SiteProtocolInterceptor#export
->com.taobao.hsf.unit.service.impl.UnitProtocolInterceptor#export
->com.taobao.hsf.machine.group.protocol.MachineGroupProtocolInterceptor#export
->ElemeServicePropertyProtocolInterceptor##export(暂时找不到)
->com.taobao.hsf.grouping.component.GroupingProtocolInterceptor#export【分组策略】
->com.taobao.hsf.dpath.DPathProtocolInterceptor#export
->com.taobao.hsf.service.ServicePubComponent#export
->com.taobao.hsf.registry.RegistryProtocolInterceptor#export【上报服务信息至注册中心】
->FilterProtocolInterceptor#export(暂时找不到)
->com.taobao.hsf.metrics.ProviderPubStatusProtocolInterceptor#export
->com.taobao.hsf.tps.component.TPSRuleProtocolInterceptor#export
->com.taobao.hsf.protocol.MultiplexingProtocol#export
->com.taobao.hsf.remoting.service.HSFProtocol#export【将服务挂载到Netty上】
其中较为核心的主要有两个部分:
-
服务信息注册
-
开启Netty服务
其中服务信息注册,主要是对注册中心接口的调用,逻辑并不复杂,有兴趣的同学可以自行阅读。
下面重点看看开启Netty服务的部分。
com.taobao.hsf.remoting.service.HSFProtocol#export
@Override
public List export(ServiceMetadata serviceMetadata, InvocationHandler invocationHandler) {
// 开启Netty服务【关键点】
registerProvider(serviceMetadata);
//register server InvocationHandler is done outside
Map needExportServers = providerServer.severNeedExport();
List serviceURLs = new ArrayList();
for (Map.Entry server : needExportServers.entrySet()) {
String data = HSFServiceTargetUtil.getTarget(server.getValue(), server.getKey(), serviceMetadata);
ServiceURLRawData serviceURLRawData = ServiceURLRawData.parseUrl(data);
HSFServiceURL hsfServiceURL = new HSFServiceURL(serviceMetadata, serviceURLRawData);
serviceURLs.add(hsfServiceURL);
}
return serviceURLs;
}
com.taobao.hsf.remoting.service.HSFProtocol#registerProvider
public void registerProvider(ServiceMetadata metadata) {
try {
// 开启HSF服务
providerServer.startHSFServer();
} catch (Exception e) {
throw new RuntimeException(
LoggerHelper.getErrorCodeStr("hsf", "HSF-0016", "HSF", "start provider server failed"), e);
}
// 分配线程池
threadPoolService.getExecutorManager().allocateExecutor(metadata.getUniqueName(), metadata.getCorePoolSize(),
metadata.getMaxPoolSize());
}
具体开启Netty服务的地方com.taobao.hsf.io.provider.impl.ProviderServerImpl#startHSFServer
@Override
synchronized public void startHSFServer() throws HSFException {
// 这边只会启动一个Netty服务
if (isProviderStarted.compareAndSet(false, true)) {
try {
// 创建一个tcp server
tcp = HSFServiceContainer.getInstance(Server.class, "tcp");
// 绑定端口【关键点】
tcp.bind(env.getBindHost(), env.getBindPort());
if (tcp.isNeedExport()) {
exportServerInfo.put(env.getPubPort(), env.getPubHost());
}
// 如果配置了Http服务,进行一些操作,暂时忽略
if (config.getBoolean(Server.HSF_HTTP_ENABLE_KEY) || AppInfoUtils.isEnableHttp()) {
http = HSFServiceContainer.getInstance(Server.class, "http");
http.bind(env.getBindHost(), env.getHttpBindPort());
if (http.isNeedExport()) {
exportServerInfo.put(env.getHttpPubPort(), env.getPubHost());
}
isHttpStarted.compareAndSet(false, true);
}
Runtime.getRuntime().addShutdownHook(new Thread() {
public void run() {
LoggerInit.LOGGER.info("Shutdown hook invoked. Offline services and stop server");
try {
stopHSFServer();
} catch (Exception e) {
LoggerInit.LOGGER.warn("Exception happens during stop server:", e);
}
}
});
} catch (Exception e) {
throw new HSFException("fail to start HSF remoting server module", e);
}
}
}
内部的Netty服务启动的动作只会执行一次,也就是说,应用下所有的HSF服务都共用一个Netty服务,来对外提供调用服务。
端口绑定com.taobao.hsf.io.server.AbstractServer#bind
@Override
public void bind(String hostName, int port) {
if (hostName == null) {
throw new IllegalArgumentException("hostName is null");
}
// log
this.hostName = hostName;
this.port = port;
doBind(this.hostName, this.port);
}
这里具体配置了Netty服务信息com.taobao.hsf.io.netty.tcp.NettyTcpServer#doBind
@Override
public void doBind(String hostName, int port) {
ServerBootstrap serverBootstrap = new ServerBootstrap();
Config config = HSFServiceContainer.getInstance(ConfigService.class).getConfig();
// 可配置的几个
serverBootstrap.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.ALLOCATOR, Holder.byteBufAllocator)
.option(ChannelOption.SO_BACKLOG, config.getInt(Server.HSF_BACKLOG_KEY))
.childOption(ChannelOption.ALLOCATOR, Holder.byteBufAllocator)
.childOption(ChannelOption.TCP_NODELAY, Boolean.TRUE)
.childOption(ChannelOption.SO_REUSEADDR, Boolean.TRUE)
.childOption(ChannelOption.AUTO_CLOSE, Boolean.TRUE)
.childOption(ChannelOption.ALLOW_HALF_CLOSURE, Boolean.FALSE)
// Netty的启动配置
.handler(new ChannelInitializer() {
@Override
protected void initChannel(ServerSocketChannel ch) throws Exception {
ch.pipeline()
.addLast("serverBindHandler",
new NettyBindHandler(NettyTcpServer.this,
serverStreamLifecycleListeners));
}
})
// Netty的响应请求配置
.childHandler(new ChannelInitializer() {
@Override
protected void initChannel(NioSocketChannel ch) throws Exception {
ch.pipeline()
.addLast("protocolHandler", new NettyProtocolHandler())
.addLast("serverIdleHandler",
new IdleStateHandler(0, 0, serverIdleTimeInSeconds))
.addLast("serverHandler",
// 这里定义了处理器【关键】
new NettyServerStreamHandler(NettyTcpServer.this, false,
serverStreamLifecycleListeners, serverStreamMessageListeners));
}
});
if (isWaterMarkEnabled()) {
serverBootstrap.childOption(ChannelOption.WRITE_BUFFER_WATER_MARK,
new WriteBufferWaterMark(lowWaterMark, highWaterMark));
}
ChannelFuture future = serverBootstrap.bind(new InetSocketAddress(hostName, port));
future.syncUninterruptibly();
}
小总结
再来回顾一下之前所讲的内容,首先将所有用户自定义的服务实例化为HSFSpringProviderBean,然后在Bean初始化时进行服务初始化和发布工作,最终启动一个Netty服务对外提供远程调用服务。
以上从服务启动的角度分析了HSF服务发布的大致链路,下面着重分析Netty服务是如何提供HSF远程调用能力的。
服务调用
服务调用的入口就是刚才Netty启动时配置的NettyServerStreamHandler,其处理逻辑主要体现在实现了ChannelInboundHandlerAdapter中的方法channelRead()中,下面具体看看。
小知识
channelRead()会在端口读取到信息的时候调用,其中定义了接受消息的具体逻辑。
数据处理逻辑在这里com.taobao.hsf.io.netty.server.NettyServerStreamHandler#channelRead。
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
// 这里就获取到了刚才绑定的那个服务流
ServerStream stream = (ServerStream) StreamUtils.streamOfChannel(ctx.channel());
if (msg instanceof ResponsePacket) {
// 如果数据是返回包,则直接返回
ResponsePacket responsePacket = (ResponsePacket) msg;
MessageAnswerHandler answerHandler = stream.removeAnswerHandler(responsePacket.requestId());
if (answerHandler != null) {
answerHandler.setAnswer(responsePacket);
} else {
log.warn("Receive response which requestId has not been stored, maybe some problem happened on network." + stream);
}
} else if (msg instanceof RequestPacket) {
// 如果是请求数据,则进行处理【关键】
callConnectionReceivedListeners(server, stream, (RequestPacket) msg);
} else {
log.warn("Receive unrecognized msg {}", msg);
}
}
请求的处理关键逻辑com.taobao.hsf.io.netty.server.NettyServerStreamHandler#callConnectionReceivedListeners
private void callConnectionReceivedListeners(Server server, ServerStream stream, RequestPacket message) {
for (ServerStreamMessageListener listener : serverStreamMessageListeners) {
try {
listener.received(server, stream, message);
} catch (Exception ex) {
error(listener.getClass().getName(), "received", ex);
}
}
}
这里会依次调用多个消息处理器来处理消息,其中用于处理请求的是com.taobao.hsf.io.stream.support.server.HandleRequest
其处理逻辑com.taobao.hsf.io.stream.support.server.HandleRequest#received
@Override
@SuppressWarnings("unchecked")
public void received(Server server, ServerStream stream, RequestPacket requestPacket) {
ServerHandler serverHandler = ServerHandlerSelector.getInstance().select(requestPacket.protocolType());
serverHandler.process(requestPacket, stream);
}
这里的服务处理器是com.taobao.hsf.io.remoting.hsf.message.HSFServerHandler
其处理逻辑com.taobao.hsf.io.remoting.hsf.message.HSFServerHandler#process
其中代码较多,不过核心大致可以大致拆分成两部分内容:
-
创建请求
-
处理请求
下面依次来看。
创建请求
Invocation ioInvocation;
if (packet.serializeType() == SerializeType.OPTIMIZED_HESSIAN2.getCode()) {
try {
// 服务元信息
ServiceMetadata serviceMetadata = locateAndSetApplication(serviceUniqueName);
// 组装一个请求对象【关键】
ioInvocation = PacketFactorySelector.getInstance().select(packet.protocolType()).serverGet(
packet, serverStream);
if (serviceMetadata != null) {
ioInvocation.setApplicationModel(serviceMetadata.getApplicationModel());
ioInvocation.getServerInvocationContext().setMetadata(serviceMetadata);
} else {
ioInvocation.setApplicationModel(ApplicationModelFactory.getMainApplicationModel());
}
} catch (Throwable t) {
log.error("HSF-0037", t.getMessage() + "on channel: " + serverStream, t);
processSerializationError(packet, serverStream);
//just close to fallback to normal serialization
serverStream.close();
return;
}
} else {
ioInvocation = null;
}
看一下构建请求的过程com.taobao.hsf.io.remoting.hsf.HSFPacketFactory#serverGet
@Override
public Invocation serverGet(RequestPacket requestPacket, Stream serverStream) {
Invocation invocation = new Invocation();
try {
HSFRequestPacket hsfRequestPacket = (HSFRequestPacket) requestPacket;
invocation.setSerializeType(hsfRequestPacket.serializeType());
// 从请求包中解析出调用类名,方法名,并设置到调用器上
invocation.setTargetServiceUniqueName(hsfRequestPacket.serviceUniqueName());
invocation.setMethodName(ThreadLocalCache.getString(hsfRequestPacket.getTargetMethod()));
// 然后设置调用的入参
String[] argTypeStrings = new String[hsfRequestPacket.getArgTypes().length];
for (int i = 0; i < argTypeStrings.length; i++) {
argTypeStrings[i] = ThreadLocalCache.getString(hsfRequestPacket.getArgTypes()[i]);
}
invocation.setMethodArgSigs(argTypeStrings);
Serializer serializer = SerializerSelector.getInstance().select(hsfRequestPacket.serializeType());
Object[] argsObjects = new Object[hsfRequestPacket.getArgs().length];
for (int i = 0; i < argsObjects.length; i++) {
argsObjects[i] = serializer.deserialize(hsfRequestPacket.getArgs()[i],
ClassUtils.forName(argTypeStrings[i]), serverStream);
}
invocation.setMethodArgs(argsObjects);
Map props = null;
if (hsfRequestPacket.getRequestProps() != null && hsfRequestPacket.getRequestProps().length > 0) {
props = (Map) (serializer.deserialize(hsfRequestPacket.getRequestProps(), Map.class,
serverStream));
}
// 刷新了一下一些参数,暂时不重要
invocation.refreshRequestProp(props);
// 设置了一些调用的上下文信息
invocation.getInvokerContext().setSerializeType(hsfRequestPacket.serializeType());
invocation.getInvokerContext().setTimeout(hsfRequestPacket.timeout());
invocation.getInvokerContext().setRequestId(hsfRequestPacket.requestId());
invocation.getInvokerContext().setProtocolType(hsfRequestPacket.protocolType());
invocation.getInvokerContext().setBiDirectionalInvocation(hsfRequestPacket.isBiDirectionInvocation());
if (serverStream.supportBiDirection() != hsfRequestPacket.isBiDirectionSupported()) {
serverStream.setBiDirectionSupport(hsfRequestPacket.isBiDirectionSupported());
}
if (hsfRequestPacket.isResponseAttachmentSupported()) {
invocation.getInvokerContext().setResponseAttachmentSupported(true);
}
return invocation;
} catch (Throwable t) {
throw new HSFSerializeException(SerializationConstants.SERIALIZE_REQUEST_ERROR_SERVER
+ invocation.getTargetServiceUniqueName()
+ "#" + invocation.getMethodName(), t);
}
}
维护上请求的类名,方法名,请求参数,以及一些附加的上下文信息。
执行请求
final Invocation finalIoInvocation = ioInvocation;
executor.execute(new Runnable() {
@Override
public void run() {
try {
Invocation invocation;
if (finalIoInvocation == null) {
// 拿到服务元信息
ServiceMetadata serviceMetadata = locateAndSetApplication(serviceUniqueName);
invocation = PacketFactorySelector.getInstance().select(packet.protocolType()).serverGet(
packet, serverStream);
if (serviceMetadata != null) {
invocation.setApplicationModel(serviceMetadata.getApplicationModel());
invocation.getServerInvocationContext().setMetadata(serviceMetadata);
} else {
invocation.setApplicationModel(ApplicationModelFactory.getMainApplicationModel());
}
} else {
invocation = finalIoInvocation;
// IO已经完成了应用选择,直接设置到当前业务处理线程中
ApplicationModel applicationModel = invocation.getApplicationModel();
ApplicationModelFactory.setCurrentApplication(applicationModel);
}
//for dubbo RpcContext
invocation.put("local_address", serverStream.getLocalAddress());
invocation.put("remote_address", serverStream.getRemoteAddress());
//used to call client-side service
ThreadLocalUtil.set(HSFConstants.IO_STREAM_SERVER_KEY, serverStream);
//for identification
invocation.put(STREAM_KEY, serverStream);
// 真正执行的地方【关键】
processor.handleRequest(invocation, new RpcOutput(invocation, serverStream));
} catch (Throwable e) {
log.error("HSF-0037", "decode error on channel " + serverStream, e);
processSerializationError(packet, serverStream);
}
}
});
这个executor主要的工作就是提交一个线程来处理请求,直接看到具体的处理逻辑com.taobao.hsf.remoting.provider.ProviderProcessor#handleRequest
@Override
public ResponseStatus handleRequest(Invocation invocation, Output output) {
String serviceUniqueName = invocation.getTargetServiceUniqueName();
// 客户端没有设置这个值,所以这里设置客户端ip
invocation.setPeerIP(output.targetAddress());
// TODO block spas tps unit top $echo $generic sph eagleeye normal
ServiceMetadata providerMetadata = invocation.getServerInvocationContext().getMetadata();
if (providerMetadata == null) {
ApplicationModel applicationModel = invocation.getApplicationModel();
if (applicationModel == null) {
return serviceNotFound(output, invocation, serviceUniqueName);
}
InvocationHandler directService = applicationModel.getServerInvocationHandler(serviceUniqueName);
if (directService != null) {
try {
ListenableFuture rpcFuture = directService.invoke(invocation);
rpcFuture.addListener(new OutputCallback(invocation, rpcFuture, output));
} catch (Throwable throwable) {
onServerException(output, invocation, throwable);
}
return invocation.getResponseStatus();
} else {
return serviceNotFound(output, invocation, serviceUniqueName);
}
} else {
// 主要链路在这里
try {
// 从服务元信息中,拿到服务处理器,然后调用请求【关键】
ListenableFuture rpcFuture = providerMetadata.getInvocationHandler().invoke(
invocation);
// 给调用结果注册监听
rpcFuture.addListener(new OutputCallback(invocation, rpcFuture, output));
} catch (Throwable throwable) {
onServerException(output, invocation, throwable);
}
return invocation.getResponseStatus();
}
}
这里的调用处理器就是在服务初始化环节创建出来的调用处理器,其也是一个调用责任链,其中的末端,也就是真实处理请求的服务处理器是com.taobao.hsf.remoting.provider.ReflectInvocationHandler,具体的调用逻辑com.taobao.hsf.remoting.provider.ReflectInvocationHandler#invoke
@Override
public ListenableFuture invoke(Invocation invocation) throws Throwable {
SettableFuture defaultRPCFuture = Futures.createSettableFuture();
if (serverAsync) {
invocation.addContextAware(serverContextAware);
}
// 具体的请求执行方法【关键】
HSFResponse hsfResponse = handleRequest0(invocation, invocation.getHsfRequest(), defaultRPCFuture);
if (hsfResponse == null) {
return defaultRPCFuture;
}
RPCResult rpcResult = new RPCResult();
rpcResult.setHsfResponse(hsfResponse);
defaultRPCFuture.set(rpcResult);
return defaultRPCFuture;
}
com.taobao.hsf.remoting.provider.ReflectInvocationHandler#handleRequest0
private static HSFResponse handleRequest0(Invocation invocation, final HSFRequest hsfRequest,
SettableFuture defaultRPCFuture) {
ProviderServiceModel serviceModel = invocation.getServerInvocationContext().getServiceModel();
ProviderMethodModel methodModel = invocation.getServerInvocationContext().getMethodModel();
HSFResponse hsfResponse = new HSFResponse();
String remoteHost = invocation.getPeerIP();
ClassLoader tcl = Thread.currentThread().getContextClassLoader();
try {
// 拿到调用的方法
Method workerMethod = methodModel.getMethod();
Method injectIpMethod = serviceModel.getInjectConsumerIpMethod();
if (injectIpMethod != null) {
injectConsumerIp(injectIpMethod, serviceModel.getServiceInstance(), remoteHost);
}
// 拿到调用的参数
Object[] methodArgs = hsfRequest.getMethodArgs();
AsyncContext asyncContext = null;
if (serverAsync) {
// Different to HSF1.x for the position of AsyncContext start
asyncContext = new AsyncContext(defaultRPCFuture, invocation);
// 将AsyncContext放到ThreadLocal中
asyncContext.saveInContext();
}
// 最底层,java的反射调用
// 调用业务应用时,切换为应用class loader
ClassLoaderUtil.switchContextLoader(serviceModel.getMetadata().getServicePojoClassLoader());
// 调用实现类的方法
Object appResp = workerMethod.invoke(serviceModel.getServiceInstance(), methodArgs);
if (serverAsync) {
// 开启了异步
if (AsyncContext.isAsyncStart()) {
invocation.put(ASYNC_PROPERTY_KEY, Boolean.TRUE);
// TODO 本来处理业务异常,但是放在这里似乎就没用了
if (hsfResponse.getAppResponse() instanceof Throwable) {
if (!asyncContext.closeAsync("Business method occur exception", (Throwable) hsfResponse.getAppResponse())) {
// 关闭失败,说明异步业务线程已经调用了AsyncContext.write()写了响应
return null;
}
} else {
// 业务方法正常,异步业务线程会写响应,这里就忽略返回值
return null;
}
}
}
hsfResponse.setAppResponse(appResp);
} catch (InvocationTargetException ivke) {
processBizException(invocation, hsfResponse, ivke);
} catch (Throwable t) {
processUnknownException(invocation, hsfResponse, t);
} finally {
ClassLoaderUtil.switchContextLoader(tcl);
}
return hsfResponse;
}
这里就用到了java原生的反射调用,调用实现类的方法,获取返回结果appResp。
处理结果监听处理逻辑com.taobao.hsf.remoting.provider.ProviderProcessor.OutputCallback#operationComplete
@Override
protected void operationComplete(RPCResult rpcResult) {
try {
long requestId = invocation.getInvokerContext().getRequestId();
byte serializeType = invocation.getInvokerContext().getSerializeType();
byte protocolType = invocation.getInvokerContext().getProtocolType();
rpcResult.getResponseContext().setRequestId(requestId);
rpcResult.getResponseContext().setProtocolType(protocolType);
rpcResult.getResponseContext().setSerializeType(serializeType);
rpcResult.getResponseContext().setResponseAttachmentSupported(invocation.getInvokerContext().isResponseAttachmentSupported());
// only save the root cause
if (rpcResult.getAppResponse() instanceof Throwable) {
CutCauseUtil.cutCause((Throwable) rpcResult.getAppResponse());
}
// 将结果回刷到服务流中
output.flush(rpcResult);
} catch (Throwable t) {
String errorCodeStr = LoggerHelper.getErrorCodeStr("HSF", "HSF-0067", "HSF",
"hsf server write response error.");
LOGGER.error("HSF-0067", errorCodeStr, t);
}
}
这里将请求的处理结果回刷到服务流中,完成服务处理。
小总结
这里总结一下服务调用的处理过程
到这里,服务提供者的逻辑就大致讲完啦。
服务消费方
从使用说起
如果我们需要使用上文提供的服务,只需要加上@HSFConsumer即可,示例代码如下:
@Configuration
public class HelloServiceConfig {
@HSFConsumer
HelloService helloService;
}
服务消费方以@HSFConsumer为入口,实现远程调用。
源码解析
消费启动
其中包含以下几个部分:
-
消费者实例化,主要在和spring打交道
-
消费者初始化,专注于消费者的消费配置信息的维护
消费者实例化
首先介绍一下SpringBoot的自动装配原理,在pandora中也一样适用。
在我们应用程序的入口,会有这样一个注解@SpringBootApplication。
跟进到这个注解的内部发现,其中使用了@EnableAutoConfiguration。
根据Spring的SPI机制,目录下spring.factories文件中声明的配置类,都会进行加载。
这里找到了一个非常关键的配置类HsfConsumerAutoConfiguration,具体看一下:
@Configuration
@ConditionalOnProperty(name = Constants.ENABLED, matchIfMissing = true)
public class HsfConsumerAutoConfiguration {
@Bean
public static BeanFactoryPostProcessor hsfConsumerPostProcessor() {
return new HsfConsumerPostProcessor();
}
}
其中定义了一个用于对服务消费者进行自定义设置的后置处理器。
BeanFactoryPostProcessor可以自定义修改应用程序上下文的bean定义,调整上下文底层bean工厂的bean属性值。具体是修改时机是在bean的定义信息已经加载完毕,bean还未实例化的时候。
HsfConsumerPostProcessor中对标注了@HSFConsumer注解的bean进行修改,具体的修改逻辑往下看。
private void postProcessBeanFactory(ConfigurableListableBeanFactory beanFactory, BeanDefinitionRegistry registry) {
final HsfProperties properties= HsfPropertiesUtils.buildHsfProperties(environment);
Map> beanIdentifierMap = new HashMap>();
// 这里会遍历所有的bean
for (String beanName : beanFactory.getBeanDefinitionNames()) {
BeanDefinition definition = beanFactory.getBeanDefinition(beanName);
String beanClassName = definition.getBeanClassName();
// 当用 @Bean 返回的类型是Object时,beanClassName是 null
if(beanClassName != null) {
Class clazz = ClassUtils.resolveClassName(definition.getBeanClassName(), this.classLoader);
ReflectionUtils.doWithFields(clazz, new ReflectionUtils.FieldCallback() {
@Override
public void doWith(Field field) throws IllegalArgumentException, IllegalAccessException {
// 具体的修改工作在这里【关键】
HsfConsumerPostProcessor.this.parseElement(field, properties);
}
});
}
}
// 最后将修改后的bean定义信息放回容器
for (String beanName : beanDefinitions.keySet()) {
if (context.containsBean(beanName)) {
throw new IllegalArgumentException("[HSF Starter] Spring context already has a bean named " + beanName
+ ", please change @HSFConsumer field name.");
}
registry.registerBeanDefinition(beanName, beanDefinitions.get(beanName));
logger.info("registered HSFConsumerBean \"{}\" in spring context.", beanName);
}
}
com.alibaba.boot.hsf.consumer.HsfConsumerPostProcessor#parseElement
private void parseElement(Field field, HsfProperties properties) {
HSFConsumer annotation = AnnotationUtils.getAnnotation(field, HSFConsumer.class);
// 首先判断bean上是否有HSFConsumer注解,如果没有,直接返回
if (annotation == null) {
return;
}
HsfConsumerBeanDefinitionBuilder beanDefinitionBuilder = new HsfConsumerBeanDefinitionBuilder(field.getType(),
annotation);
beanDefinitionBuilder.context(context).beanFactory(beanFactory).properties(properties);
// 这里生成了服务消费者的bean定义信息【关键】
BeanDefinition beanDefinition = beanDefinitionBuilder.build();
if (checkFieldNameDuplicate4FirstTime(field.getName(), beanDefinition)) {
logger.error("registered HSFConsumerBean with duplicate fieldName:{} in spring context.", field.getName());
}
beanDefinitions.put(field.getName(), beanDefinition);
}
com.alibaba.boot.hsf.consumer.HsfConsumerBeanDefinitionBuilder#buildbean定义的生成过程:
BeanDefinition build() {
// 这里定义了创建出的bean是HSFSpringConsumerBean【关键】
BeanDefinitionBuilder builder = BeanDefinitionBuilder.genericBeanDefinition(HSFSpringConsumerBean.class);
// 首先配置服务消费的基本信息
builder.setInitMethodName(Constants.INIT_METHOD);
builder.addPropertyValue("interfaceClass", interfaceClass);
builder.addPropertyValue("group", resolveGroup(annotation, interfaceClass, properties, beanFactory));
String version = resolveVersion(annotation, interfaceClass, properties, beanFactory);
builder.addPropertyValue("version", version);
Object clientTimeout = resolveTimeout(annotation, interfaceClass, properties, beanFactory);
if (clientTimeout != null) {
builder.addPropertyValue("clientTimeout", clientTimeout);
}
builder.addPropertyValue("maxWaitTimeForCsAddress", ConsumerPropertiesResolver
.resolveMaxWaitTimeForCsAddress(annotation, interfaceClass, properties, beanFactory));
builder.addPropertyValue("proxyStyle", annotation.proxyStyle());
builder.addPropertyValue("generic", annotation.generic());
builder.addPropertyValue("singleton", annotation.singleton());
if (annotation.includeFilters().length != 0) {
builder.addPropertyValue("includeFilters", Arrays.asList(annotation.includeFilters()));
}
if (annotation.includeRouters().length != 0) {
builder.addPropertyValue("includeRouters", Arrays.asList(annotation.includeRouters()));
}
if (annotation.configServerCenters().length != 0) {
builder.addPropertyValue("configserverCenter", Arrays.asList(annotation.configServerCenters()));
}
...
return builder.getBeanDefinition();
}
bean定义的创建过程主要维护了服务调用的基本信息,其中最终的是指明了创建出来的bean是HSFSpringConsumerBean。
小总结
回顾一下刚才所讲内容,其实主要在做的就是,将标注了HSFConsumer注解的类注入到Spring容器中,并且,由于我们只是指定了服务消费的接口,由HSF的后置处理器将这些需要创建的bean统一实例化为HSFSpringConsumerBean。
消费者初始化
在HSFSpringConsumerBean中定义了它的初始化方法。
com.taobao.hsf.app.api.util.HSFApiConsumerBean#init()中的代码很长,但是核心内容是亮点
-
配置初始化
-
维护代理
配置初始化
com.taobao.hsf.model.metadata.ServiceMetadata#init这边初始化操作在服务提供方的源码讲解中也提到过,这里就不过多赘述;不过与服务提供方不同的是,这里主要创建创建了同步调用器syncInvocationHandler,来用于处理调用。
维护代理
com.taobao.hsf.model.metadata.ServiceMetadata#setTarget就是具体执行调用的组件维护进ServiceMetadata中,具体看看com.taobao.hsf.app.api.util.HSFApiConsumerBean#consume的内容。
private Object consume(final ServiceMetadata metadata) {
//已经使用了 serviceMetaData 的唯一性做了控制
// if (applicationModel.getConsumedServiceModel(metadata.getUniqueName()) != null) {
// return applicationModel.getConsumedServiceModel(metadata.getUniqueName()).getProxyObject();
// }
// 生成调用远程HSF服务的代理
ProxyDecoratorGenerator proxyDecoratorGenerator = HSFServiceContainer.getInstance(
ProxyDecoratorGenerator.class);
Class[] decorateInterfaces = proxyDecoratorGenerator.getDecorateInterfaces(metadata);
//proxy
ProxyFactory proxyFactory = HSFServiceContainer.getInstance(ProxyFactory.class, metadata.getProxyStyle());
// 1. 根据代理工厂和需要代理的服务信息,生成对应的代理【关键】
Object proxy = proxyFactory.getProxy(metadata, decorateInterfaces);
Method[] methods = proxyFactory.getMethods(proxy);
//model
ConsumerServiceModel consumerServiceModel = new ConsumerServiceModel(metadata, proxy, methods);
metadata.setConsumerServiceModel(consumerServiceModel);
// 这里将consumerServiceModel维护到applicationModel容器中
//TODO 这里是client的注册,这里应该生成RPCClient
applicationModel.initConsumerService(metadata.getUniqueName(), consumerServiceModel);
// 2. 将元信息引用上服务,设置消费调用链信息【关键】
metadata.referSync();
return proxy;
}
这边有两种创建代理的方法,一种是java原生的,另一种是基于javassist的,默认使用java原生的。
com.taobao.hsf.proxy.JdkProxyFactory#getProxy
@Override
public Object getProxy(ServiceMetadata metadata, Class... interfacesArray) {
try {
// 代理对象,这里定义了实际的调用动作【关键】
JdkProxyInvocationHandler jdkProxyInvocationHandler = new JdkProxyInvocationHandler(metadata);
// 主要就是调用java原生的动态代理api,创建一个代理对象
Object instance = Proxy.newProxyInstance(metadata.getIfClazz().getClassLoader(), interfacesArray, jdkProxyInvocationHandler);
jdkProxyInvocationHandler.init(instance);
return instance;
} catch (Throwable t) {
throw new HSFException("failed to generate jdk proxy",t);
}
}
代理类的真实行为暂时按下不表,在消费者的调用中在详细介绍,目前先聚焦在消费者的创建。
上面的步骤2将消费者元信息关联上了调用链信息,下面单独讨论一下。
设置消费调用链
com.taobao.hsf.model.metadata.ServiceMetadata#referSync
public InvocationHandler referSync() {
Future future = getExportReferExecutorService().submit(new Callable() {
@Override
public InvocationHandler call() throws Exception {
return refer();
}
});
try {
return future.get();
} catch (Throwable e) {
throw new RuntimeException(e);
}
}
这里还是异步调用了com.taobao.hsf.model.metadata.ServiceMetadata#refer方法,具体如下:
public InvocationHandler refer() {
try {
// 通过责任链生成调用处理器
InvocationHandler protocolInvocationHandler = protocolFilterChain.refer(this);
consumerHandlerHitch.setInvocationHandler(protocolInvocationHandler);
return this.invocationHandler;
} catch (Throwable t) {
String errorCodeStr = LoggerHelper.getErrorCodeStr("HSF", "HSF-0060", "HSF",
"refer service: " + getUniqueName() + ", group=" + getGroup() + " got error");
LOGGER.error("HSF-0060", errorCodeStr, t);
throw new RuntimeException(t);
}
}
这里的调用链上也有多个组件
其中比较重要的有两个:
-
RegistryProtocolInterceptor
-
HSFProtocol
RegistryProtocolInterceptor
在InvocationHandler protocolInvocationHandler = protocolFilterChain.refer(this);环节获取到的protocolInvocationHandler也就是RegistryProtocolInterceptor。
其主要作用就是从注册中心获取服务的地址信息
com.taobao.hsf.registry.RegistryProtocolInterceptor#refer
@Override
public InvocationHandler refer(ServiceMetadata serviceMetadata) {
InvocationHandler invocationHandler = protocol.refer(serviceMetadata);
// 拿到一个用于访问注册中心的处理器
RegistryInvocationHandler registryHandler = serviceMetadata.getAttachment(REGISTRY_INVOCATION_HANDLER_KEY);
//TODO remove this null check
if (registryHandler == null) {
registryHandler = new RegistryInvocationHandler();
registryHandler.setServiceMetadata(serviceMetadata);
registryHandler.setProtocol(protocol);
registryHandler.start();
serviceMetadata.putAttachment(REGISTRY_INVOCATION_HANDLER_KEY, registryHandler);
}
// 在这里去访问注册中心【关键】
registryHandler.refer();
registryHandler.setInvocationHandler(invocationHandler);
return registryHandler;
}
com.taobao.hsf.registry.RegistryInvocationHandler#refer
/**
* 将当前元信息注册到对应的注册中心上
*
* @return 订阅的注册中心数量
*/
public int refer() {
int registryNum = 0;
for (Map.Entry entry : registryClusterStrategyMap.entrySet()) {
// 主要的工作在这个Registry的订阅方法中
entry.getKey().subscribe(protocol, serviceMetadata, entry.getValue());
registryNum++;
}
return registryNum;
}
这里主要看一下使用ConfigServer作为注册中心时的交互方式,com.taobao.hsf.registry.cs.ConfigServerRegistry#subscribe
@Override
public void subscribe(Protocol protocol, ServiceMetadata metadata, RawAddressListener listener) {
synchronized (subscriberLock) {
if (metadata2DataSource.containsKey(metadata)) {
LOGGER.info("ConfigServerRegistry", "service [" + metadata.getUniqueName() + "] has already been subscribed.");
} else {
List centers = (metadata.getConfigserverCenter() == null || metadata.getConfigserverCenter().isEmpty()) ? new ArrayList() : metadata.getConfigserverCenter();
if (centers.size() == 0) {
centers.add(DEFAULT);
}
// 创建了一个数据源,其中维护了服务提供者的ip信息
CSDataSource dataSource;
if (CSCommonUtils.isClusterEnabled(metadata) || centers.size() > 1) {
dataSource = new MultiClusterDataSource( this,protocol,metadata, listener, applicationModel);
} else {
dataSource = new SingleClusterDataSource(this, protocol, metadata, listener, applicationModel);
}
metadata2DataSource.put(metadata, dataSource);
List protocols = protocol.getSupportedProtocol();
for (String pro : protocols) {
//for now, cs accept hsf only
if (pro.equalsIgnoreCase("hsf")) {
for (String center : centers) {
if (center != null) {
dataSource.subscribe(pro, metadata, center);
}
}
}
}
}
}
}
这里代码看似很多,其实最主要的工作就是维护了一个dataSource对象,而这个对象就帮我们对接注册中心,维护了服务提供方的ip信息。
HSFProtocol
com.taobao.hsf.remoting.service.HSFProtocol#refer
@Override
public InvocationHandler refer(ServiceMetadata serviceMetadata) {
// 这里返回一个用于处理rpc调用的处理器
return HSFServiceContainer.getInstance(InvocationHandler.class, "HSF");
}
根据声明的名称HSF,找到了对应的实现类RemotingRPCProtocolComponent。
当发起调用的时候,最终就通过com.taobao.hsf.remoting.service.RemotingRPCProtocolComponent#invoke方法向服务提供方发起调用。
小总结
消费者的初始化阶段,初始化了配置信息,创建对应的代理对象,并进行了消费方启动的一系列初始化工作,包括监听注册中心、定义rpc调用方法等。
消费调用
上文在服务启动中讲到,对于我们声明创建的接口,HSF统一实例化为HSFSpringConsumerBean,而其中的具体逻辑交由HSFApiConsumerBean,其中的ServiceMetadata中,通过target对象维护了一个代理,真实定义了服务的具体调用方式。
而创建的代理是JdkProxyInvocationHandler,现在我们就来看看其中的具体行为。
调用方法定义在父类中com.taobao.hsf.proxy.ProxyInvocationHandler#invoke
@Override
public Object invoke(final Object proxy, final Method method, final Object[] args) throws Throwable {
ApplicationModelFactory.setCurrentApplication(serviceMetadata.getApplicationModel());
ConsumerMethodModel methodModel = serviceMetadata.getConsumerServiceModel().getMethodModel(method);
// 具体执行调用的地方【关键】
return InvocationUtil.invoke(methodModel, args);
}
com.taobao.hsf.InvocationUtil#invoke
public static Object invoke(ConsumerMethodModel methodModel, Object... args) throws Throwable {
Invocation invocation = InvocationUtil.buildInvocation(methodModel, args);
// 这里就根据元信息中的同步调用器,获取到返回结果【关键】
RPCResult rpcResult = methodModel.getMetadata().getSyncInvocationHandler().invoke(invocation);
HSFResponse hsfResp = rpcResult.getHsfResponse();
//exception handling
if (hsfResp.isError()) {
if (hsfResp.getAppResponse() instanceof Throwable) {
throw new HSFException(hsfResp.getErrorMsg(), (Throwable) hsfResp.getAppResponse());
} else {
throw new HSFException(hsfResp.getErrorMsg());
}
}
Object appResponse = hsfResp.getAppResponse();
// 检查对端返回的业务层对象: 如果返回的是异常对象,则重新抛出异常
if (appResponse instanceof Throwable) {
throw (Throwable) appResponse;
}
return hsfResp.getAppResponse();
}
这里就用到了在配置初始化阶段创建出来的syncInvocationHandler,这边构造了一个流水线,但其实其中的核心调用动作,还是交给了创建出来的invocationHandler。
而在invocationHandler中,最关键的发起调用的部分则交给了RemotingRPCProtocolComponent,下面具体看看调用方法。
com.taobao.hsf.remoting.service.RemotingRPCProtocolComponent#invoke
@Override
public ListenableFuture invoke(Invocation invocation) throws Throwable {
ConsumerMethodModel methodModel = invocation.getClientInvocationContext().getMethodModel();
ServiceMetadata serviceMetadata = methodModel.getMetadata();
String serviceUniqueName = methodModel.getUniqueName();
InvokerContext invokerContext = invocation.getInvokerContext();
ListenableFuture rpcFuture;
// 获取调用的地址
ServiceURL remoteURL = invocation.getTargetAddress();
// 如这个时候targetURL仍然为null,抛出异常
if (remoteURL == null) {
throw new HSFServiceAddressNotFoundException("", LoggerHelper.getErrorCodeStr(
"hsf",
"HSF-0001",
"Env",
"[HSF-Consumer] can not find target server address,serviceName:" + serviceUniqueName + " group:"
+ serviceMetadata.getGroup()));
}
// 调用的url信息
String remoteIp = remoteURL.getHost();
invokerContext.setUrl(remoteURL);
invokerContext.setRemoteIp(remoteIp);
// 设置了本次调用上下文的ip地址
ThreadLocalUtil.set(HSFConstants.TARGET_SERVER_IP, remoteIp);
// 发起调用【关键】
rpcFuture = invokeForOne(methodModel, invocation, remoteURL);
return rpcFuture;
}
其中拿到的ip实际上是在RegistryInvocationHandler这个前置处理器中维护的,这里不赘述。
com.taobao.hsf.remoting.service.RemotingRPCProtocolComponent#invokeForOne
private ListenableFuture invokeForOne(ConsumerMethodModel consumerMethodModel, Invocation invocation,
ServiceURL remotingURL) throws Throwable {
int timeout = InvocationUtil.getReadTimeout(invocation, remotingURL);
try {
// 1. 首先获取一个客户端
ClientStream clientStream = client.of(remotingURL);
if (clientStream == null) {
throw new HSFException("Target server [" + remotingURL + "] has become unreachable.");
}
//used by sync invoke
invocation.getInvokerContext().setTimeout(timeout);
invocation.getInvokerContext().setSerializeType(
serializeType(consumerMethodModel.getMetadata(), remotingURL, clientStream));
invocation.getInvokerContext().setProtocolType(remotingURL.getProtocolType());
invocation.getInvokerContext().setBiDirectionalInvocation(false);
clientStream.setProtocolType(remotingURL.getProtocolType());
// 2. 发送请求
return clientStream.write(invocation);
} catch (Exception e) {
throw new HSFException("error on submit request on future invoke:", e);
}
}
发起调用的过程主要分两个步骤:
-
获取客户端
-
用刚才获取到的客户端进行发送请求
-
处理响应
下面具体看看。
获取客户端
com.taobao.hsf.io.client.AbstractClient#of
@Override
public ClientStream of(ServiceURL serviceURL) {
if (serviceURL == null) {
return null;
} else {
int connTimeout = serviceURL.getParameter(HSFConstants.CONNECT_TIMEOUT_KEY, 1000);
long connectionId = AddressUtils.addressToLong(serviceURL.getHost(), serviceURL.getPort());
if (connectionId == -1) {
return null;
}
// 获取客户端【关键】
ClientStream clientStream = getClientStream(connectionId, connTimeout);
return clientStream;
}
}
com.taobao.hsf.io.client.AbstractClient#getClientStream(long, int)
private ClientStream getClientStream(final long connectionID, int timeout) {
ClientStream stream = clientStreams.get(connectionID);
if (stream != null) {
return stream;
}
Lock streamConnectLock = getStreamConnectLock(connectionID);
try {
if (streamConnectLock.tryLock(4, TimeUnit.SECONDS)) {
try {
stream = clientStreams.get(connectionID);
if (stream == null) {
// 这里真实创建了一个客户端【关键】
stream = connect(connectionID, timeout);
if (stream != null) {
clientStreams.put(connectionID, stream);
connectLocks.remove(connectionID, streamConnectLock);
}
}
} finally {
streamConnectLock.unlock();
}
} else {
LOGGER.warn("try to connect to " + connectionID + " failed, caused by get lock timeout");
}
} catch (InterruptedException e) {
LOGGER.error("HSF", "getStreamConnectLock " + connectionID + " Interrupted", e);
}
return stream;
}
这里代码看似复杂,其实核心在做的是维护了一个key是connectionID的客户端缓存信息。
为什么要对客户端进行池化?因为创建的开销不小。
具体看看创建过程。
com.taobao.hsf.io.netty.tcp.NettyClient#connect
@Override
public ClientStream connect(final long connectionID, int connectTimeout) {
Bootstrap bootstrap = new Bootstrap();
// 创建netty客户端
bootstrap.group(workerGroup)
.option(ChannelOption.TCP_NODELAY, true)//
.option(ChannelOption.SO_REUSEADDR, true)//
.option(ChannelOption.ALLOCATOR, Holder.byteBufAllocator)//
.option(ChannelOption.ALLOW_HALF_CLOSURE, Boolean.FALSE)
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer() {
@Override
protected void initChannel(NioSocketChannel ch) {
ch.pipeline()
.addLast("protocol", new NettyProtocolHandler())
.addLast("clientIdleHandler", new IdleStateHandler(getHbSentInterval(), 0, 0))
.addLast("clientHandler",
new NettyClientStreamHandler(NettyClient.this, connectionID,
clientStreamLifecycleListeners, clientStreamMessageListeners));
}
});
bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, connectTimeout);
// 下面的代码暂时不重要
if (isWaterMarkEnabled()) {
bootstrap.option(ChannelOption.WRITE_BUFFER_WATER_MARK,
new WriteBufferWaterMark(getLowWaterMark(), getHighWaterMark()));
}
String targetIP = AddressUtils.longToHost(connectionID);
int targetPort = AddressUtils.longToPort(connectionID);
ChannelFuture future = bootstrap.connect(new InetSocketAddress(targetIP, targetPort));
try {
future.await(connectTimeout, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
LOGGER.error("HSF", "await connect Interrupted", e);
}
ClientStream result = null;
if (future.isSuccess()) {
if (StreamUtils.streamOfChannel(future.channel()) == null) {
NettyClientStream clientStream = new NettyClientStream(connectionID, future.channel());
clientStream.setClient(this);
StreamUtils.bindChannel(future.channel(), clientStream);
}
result = (ClientStream) StreamUtils.streamOfChannel(future.channel());
}
return result;
}
创建netty客户端的核心成果在NettyClientStreamHandler,其中定义了调用结果响应的方式。
发送请求
com.taobao.hsf.io.stream.AbstractStream#write(com.taobao.hsf.invocation.Invocation)
@Override
public ListenableFuture write(Invocation invocation) {
if (invocation.getInvokerContext().isBiDirectionalInvocation()) {
if (!this.supportBiDirection()) {
LOGGER.warn("", "Ignored request to stream:{} cause the client is not bi-directional", getRemoteAddress());
return null;
}
}
Executor executor = invocation.getExecutor();
if (executor == null) {
executor = threadPoolService.callbackExecutor();
} else if (executor instanceof UserThreadPreferedExecutor) {
((UserThreadPreferedExecutor) executor).activate();
}
SettableFuture ioFuture = Futures.createSettableFuture(executor);
ResponseMessageAnswerHandler handler = new ResponseMessageAnswerHandler(
invocation.getApplicationModel(), SerializePhase.BIZ, ioFuture, this);
ListenableFuture resultFuture = Futures.map(ioFuture, handler);
PacketFactory packetFactory = PacketFactorySelector.getInstance().select(
invocation.getInvokerContext().getProtocolType());
RequestPacket requestPacket;
StreamWriteRequest streamWriteRequest;
if (invocation.getInvocationType() == InvocationType.BIZ) {
if (invocation.getInvokerContext().getSerializeType()
== SerializeType.OPTIMIZED_HESSIAN2.getCode()) {
streamWriteRequest = new StreamWriteRequest(invocation, handler);
handler.setSerializePhase(SerializePhase.IO);
} else {
requestPacket = packetFactory.clientCreate(invocation, this);
streamWriteRequest = new StreamWriteRequest(requestPacket, handler);
}
} else {
requestPacket = packetFactory.clientCreateHeartbeatRequest();
streamWriteRequest = new StreamWriteRequest(requestPacket, handler);
}
// 发送请求数据
send(streamWriteRequest);
return resultFuture;
}
com.taobao.hsf.io.netty.tcp.NettyClientStream#send
@Override
public void send(Object packet) {
//TODO 区别responseType
if (client.isWaterMarkEnabled() && !isWritable()) {
String errorMsg = MessageFormat.format(
"write overflow, client gave up writing request to channel {0}, bytesBeforeWritable: {1}, watermark: [{2}-{3}] bytes",
channel, channel.bytesBeforeUnwritable(), client.getLowWaterMark(), client.getHighWaterMark());
LOGGER.error("HSF-0102", errorMsg);
//fail fast: don't let invoker wait for timeout exception
throw new HSFException(errorMsg);
} else {
// 真正的数据发送
channel.writeAndFlush(packet);
}
}
处理响应
com.taobao.hsf.io.netty.client.NettyClientStreamHandler#channelRead
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ClientStream stream = (ClientStream) StreamUtils.streamOfChannel(ctx.channel());
if (msg instanceof RequestPacket) {
RequestPacket requestPacket = (RequestPacket) msg;
ClientServiceHandler clientServiceHandler = ClientHandlerSelector.getInstance().select(requestPacket.protocolType());
if (clientServiceHandler != null) {
clientServiceHandler.process(requestPacket, stream);
} else {
log.warn("No client service found for request. protocolId:{},requestId:{},serializeType:{},messageType:{}", requestPacket.protocolType(),
requestPacket.requestId(), requestPacket.serializeType(), requestPacket.messageType());
}
} else if (msg instanceof ResponsePacket) {
// 处理返回结果【关键】
callConnectionReceivedListeners(client, stream, (ResponsePacket) msg);
} else {
log.warn("Received unrecognized msg:{}", msg);
}
}
com.taobao.hsf.io.netty.client.NettyClientStreamHandler#callConnectionReceivedListeners
private void callConnectionReceivedListeners(Client client, ClientStream stream, ResponsePacket message) {
for (int i = 0; i < clientStreamMessageListeners.length; i++) {
ClientStreamMessageListener listener = clientStreamMessageListeners[i];
try {
listener.received(client, stream, message);
} catch (Exception ex) {
String errorCodeStr = LoggerHelper.getErrorCodeStr("HSF", "HSF-0085", "HSF",
"invoke MessageListener#received" + listener.getClass() + " got exception");
log.error("HSF-0085", errorCodeStr, ex);
}
}
}
具体处理接受响应的是ReceiveResponse处理器,com.taobao.hsf.io.stream.support.client.ReceiveResponse#received
@Override
public void received(Client client, ClientStream stream, ResponsePacket message) {
MessageAnswerHandler answerHandler = stream.removeAnswerHandler(message.requestId());
if (answerHandler != null) {
if (answerHandler.getSerializePhase() == SerializePhase.IO) {
RPCResult rpcResult = answerHandler.call(message);
answerHandler.setAnswer(new RpcResultResponsePacketWrapper(rpcResult));
if (rpcResult.getHsfResponse().getStatus() == ResponseStatus.CLIENT_DESERIALIZE_ERROR) {
stream.attributeMap().put(STREAM_OPTIMIZED_HESSIAN_ENABLE_KEY, false);
}
} else {
answerHandler.setAnswer(message);
}
//成功收到服务端的业务响应,清除心跳失败的计数
stream.clearContinuousHbFailedTimes();
} else {
log.warn("Receive response which requestId has not been stored, maybe some problem happened on network." + stream);
}
}
总结
到这里,HSF的核心源码基本就梳理结束了。