@[TOC]
一、前言
至此,seata系列的内容包括:
- can not get cluster name in registry config ‘service.vgroupMapping.xx‘, please make sure registry问题解决;
- Seata Failed to get available servers: endpoint format should like ip:port 报错原因/解决方案汇总版(看完本文必解决问题)
- Seata json decode exception, Cannot construct instance of java.time.LocalDateTime报错原因/解决方案最全汇总版
- 【微服务 31】超细的Spring Cloud 整合Seata实现分布式事务(排坑版)
- 【微服务 32】Spring Cloud整合Seata、Nacos实现分布式事务案例(巨细排坑版)【云原生】
- 【微服务33】分布式事务Seata源码解析一:在IDEA中启动Seata Server
- 【微服务34】分布式事务Seata源码解析二:Seata Server启动时都做了什么【云原生】
本文着重聊一聊Seata Client启动时都做了什么
PS:前文中搭建的Seata案例,seata的版本为1.3.0,而本文开始的源码分析将基于当前(2022年8月)最新的版本1.5.2进行源码解析。
二、Seata-Client依赖调整
SpringBoot、SpringCloud、SpringCloudAlibaba的版本对应关系参考博文:SpringBoot、SpringCloud、SpringCloudAlibaba的版本对应关系,从Spring-cloud-alibaba的github官网来看当前最新的SpringCloud、SpringCloudAlibaba版本依赖关系如下:
可以看到Spring Cloud Alibaba最新版本中依赖的Seata版本是1.5.1,而我们看的源码是最新的1.5.2;所以需要做如下操作:
(1)调整SpringCloud版本:
在根pom下调整Spring Cloud、Spring Cloud Alibaba的全局依赖版本:
<properties>
<spring-boot.version>2.3.12.RELEASE</spring-boot.version>
<spring-cloud.version>Hoxton.SR12</spring-cloud.version>
<spring-cloud-alibaba.version>2.2.8.RELEASE</spring-cloud-alibaba.version>
</properties>
<dependencyManagement>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-dependencies</artifactId>
<version>${spring-boot.version}</version>
<type>pom</type>
<scope>import</scope>
</dependency>
<!--整合spring cloud-->
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-dependencies</artifactId>
<version>${spring-cloud.version}</version>
<type>pom</type>
<scope>import</scope>
</dependency>
<!--整合spring cloud alibaba-->
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-alibaba-dependencies</artifactId>
<version>${spring-cloud-alibaba.version}</version>
<type>pom</type>
<scope>import</scope>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>druid-spring-boot-starter</artifactId>
<version>${druid.version}</version>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>${mysql.version}</version>
</dependency>
</dependencies>
</dependencyManagement>
(2)在seata client项目下调整seata version
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-starter-alibaba-seata</artifactId>
<exclusions>
<exclusion>
<groupId>io.seata</groupId>
<artifactId>seata-all</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>io.seata</groupId>
<artifactId>seata-all</artifactId>
<version>1.5.2</version>
</dependency>
至此,seata client 的运行/DEBUG环境准备就绪;
三、从SpringBoot自动装配来看SeataClient加载的内容
在Seata client引入spring-cloud-starter-alibaba-seata
依赖之后,seata client的External Libraries
中会多出五个seata相关的jar包:
我们在SpringBoot专栏(精通Spring Boot)中聊过:SpringCloud集成其他组件时,就看自动装配类(XXXAutoConfiguration、XXXConfiguration),感兴趣的自行前往阅读SpringBoot源码解析相关文章;
下面我们就按顺序一个一个的看各个jar包的META-INF/spring.factories
文件;
1、spring-cloud-starter-alibaba-seata-2.2.8.RELEASE.jar
spring-cloud-starter-alibaba-seata-2.2.8.RELEASE.jar
包的META-IINF/spring.factories文件内容如下:
从类的命名大致可以推测出其中会对seata client之间调用时做一些处理;
因为Seata Client之间通信可以有很多种方式:RestTemplate、SpringMVC、OpenFeign、集成Hystrix限流,所以此处有四个类:SeataRestTemplateAutoConfiguration、SeataHandlerInterceptorConfiguration、SeataFeignClientAutoConfiguration、SeataHystrixAutoConfiguration;
下面我们逐个看一下;
1)SeataRestTemplateAutoConfiguration
如果你是看着博主的文章一路走过来的,你会发现这里和Ribbon对RestTemplate做负载均衡的入口很像:对RestTemplate添加一个ClientHttpRequestInterceptor
拦截器,每次调用RestTemplate时,都会先走进拦截器;
此处的拦截器为SeataRestTemplateInterceptor
:
public class SeataRestTemplateInterceptor implements ClientHttpRequestInterceptor {
@Override
public ClientHttpResponse intercept(HttpRequest httpRequest, byte[] bytes,
ClientHttpRequestExecution clientHttpRequestExecution) throws IOException {
// 1、对httpRequest做一个包装
HttpRequestWrapper requestWrapper = new HttpRequestWrapper(httpRequest);
// 2、从本地事务上下文(ThreadLocal)中获取全局事务xid; todo 什么时候将xid放到ThreadLocal中见下一篇博文
String xid = RootContext.getXID();
// 如果可以从本地事务上下文中获取到全局事务xid,则将其添加到httpRequest的请求头上;
if (!StringUtils.isEmpty(xid)) {
requestWrapper.getHeaders().add(RootContext.KEY_XID, xid);
}
// 执行HTTP请求
return clientHttpRequestExecution.execute(requestWrapper, bytes);
}
}
方法中主要就做一件事:从本地事务上下文(线程本地变量ThreadLocal)中获取全局事务xid,如果可以获取到,则将其添加到请求中的请求头中,传递到下一个seata client中。
那么xid是什么时候生成的?什么时候放到ThreadLocal中的呢?点个关注,敬请期待下一篇博文。
2)SeataHandlerInterceptorConfiguration
SeataHandlerInterceptorConfiguration中是对SpringMVC调用时做的拦截,拦截针对的路径为:/**
,拦截器为SeataHandlerInterceptor
:
public class SeataHandlerInterceptor implements HandlerInterceptor {
private static final Logger log = LoggerFactory
.getLogger(SeataHandlerInterceptor.class);
@Override
public boolean preHandle(HttpServletRequest request, HttpServletResponse response,
Object handler) {
String xid = RootContext.getXID();
String rpcXid = request.getHeader(RootContext.KEY_XID);
if (log.isDebugEnabled()) {
log.debug("xid in RootContext {} xid in RpcContext {}", xid, rpcXid);
}
if (StringUtils.isBlank(xid) && rpcXid != null) {
RootContext.bind(rpcXid);
if (log.isDebugEnabled()) {
log.debug("bind {} to RootContext", rpcXid);
}
}
return true;
}
@Override
public void afterCompletion(HttpServletRequest request, HttpServletResponse response,
Object handler, Exception e) {
if (StringUtils.isNotBlank(RootContext.getXID())) {
String rpcXid = request.getHeader(RootContext.KEY_XID);
if (StringUtils.isEmpty(rpcXid)) {
return;
}
String unbindXid = RootContext.unbind();
if (log.isDebugEnabled()) {
log.debug("unbind {} from RootContext", unbindXid);
}
if (!rpcXid.equalsIgnoreCase(unbindXid)) {
log.warn("xid in change during RPC from {} to {}", rpcXid, unbindXid);
if (unbindXid != null) {
RootContext.bind(unbindXid);
log.warn("bind {} back to RootContext", unbindXid);
}
}
}
}
}
SeataHandlerInterceptor做了两件事:
- 执行请求之前,从本地事务上下文(线程本地变量ThreadLocal)中获取全局事务xid;从请求头中获取全局事务xid(rpcXid);如果xid不存在,但是rpcXid存在,则将rpcXid作为xid绑定到全局事务上下文(线程本地变量ThreadLocal)上;
- 请求执行完毕之后;如果全局事务上下文中xid存在,但是请求头中rpcXid不存在,则不对xid进行处理,
但是如果全局事务上下文中xid存在 并且 请求头中包括rpcXid,则将xid从全局事务上下文中移除,如果移除的xid和rpcXid不相等,则将rpcXid作为xid绑定到全局事务上下文中。
聪明的你肯定发现,这里只是将xid保存到事务上下文RootContext中,并没有xid在seata client之间传递的迹象!!
而xid在seata client之间传递体现在远程调用的工具(OpenFeign、RestTemplate)中
3)SeataFeignClientAutoConfiguration
其实也和上面两个差不多;
无论FeignClient是否集成Hystrix、Sentinel,在构建请求的Feign.Builder
中,Client都是SeataFeignClient
;关于Feign的源码解析请看博主的Feign系列文章。
看看SeataFeignClient
的逻辑:
public class SeataFeignClient implements Client {
private final Client delegate;
private final BeanFactory beanFactory;
private static final int MAP_SIZE = 16;
SeataFeignClient(BeanFactory beanFactory) {
this.beanFactory = beanFactory;
this.delegate = new Client.Default(null, null);
}
SeataFeignClient(BeanFactory beanFactory, Client delegate) {
this.delegate = delegate;
this.beanFactory = beanFactory;
}
@Override
public Response execute(Request request, Request.Options options) throws IOException {
Request modifiedRequest = getModifyRequest(request);
return this.delegate.execute(modifiedRequest, options);
}
private Request getModifyRequest(Request request) {
String xid = RootContext.getXID();
if (StringUtils.isEmpty(xid)) {
return request;
}
Map<String, Collection<String>> headers = new HashMap<>(MAP_SIZE);
headers.putAll(request.headers());
List<String> seataXid = new ArrayList<>();
seataXid.add(xid);
headers.put(RootContext.KEY_XID, seataXid);
return Request.create(request.method(), request.url(), headers, request.body(),
request.charset());
}
}
在通过OpenFeign指定请求之前,也是会从事务上下文中获取xid,然后将其放到请求头中;
4)SeataHystrixAutoConfiguration
SeataHystrixConcurrencyStrategy
类关键内容如下:
Hystrix会将请求包装成Command命令执行,然后再将Command通过线程交给SpringMVC、RestTemplate等HTTP框架执行,这里只是将xid绑定到RootContext中;
2、seata-all-1.5.2.jar
seata-all-1.5.2.jar中不存在META-INf/spring.factories
文件,所以此处并没有SpringBoot自动装配特性的应用,但是请注意io.seata.integration.http
路径下的AbstractHttpExecutor
类,其内容有如下逻辑从事务上下文中获取xid,然后将其设置到请求的请求头中(做seata client之间传递全局事务xid用):
全局搜索这个类的使用,我们可以惊奇的发现,它的使用居然只体现在TEST测试类中,没有加载到Spring容器中!
其底层使用的是HTTPClient,如果我们想在项目中使用HTTPClient做远程调用,可以使用如下方式:
HttpResponse response = DefaultHttpExecutor.getInstance().executePost("http://127.0.0.1:8082", "/product/xxk",params, HttpResponse.class);
3、seata-spring-autoconfigure-client:1.5.1.jar
seata-spring-autoconfigure-client:1.5.1.jar下的META-INF/spring.facotries
文件中有两个自动装配类:SeataTCCFenceAutoConfiguration、SeataClientEnvironmentPostProcessor;其中:
- SeataTCCFenceAutoConfiguration负责加载TCC的一些配置;
- SeataClientEnvironmentPostProcessor负责设置一些seata.client的配置信息
4、seata-spring-autoconfigure-core:1.5.1.jar
seata-spring-autoconfigure-core:1.5.1.jar下的META-INF/spring.facotries
文件中有两个自动装配类:SeataCoreAutoConfiguration、SeataCoreEnvironmentPostProcessor;其中:
- SeataCoreAutoConfiguration中实例化了一个
SpringApplicationContextProvider
,其继承了ApplicationContextAware,可以用来从ApplicationContext中获取对象; - SeataCoreEnvironmentPostProcessor负责设置一些seata.core的配置信息
5、seata-spring-boot-starter:1.5.1.jar
seata-spring-boot-starter:1.5.1.jar下的META-INF/spring.facotries
文件内容如下:
1)SeataDataSourceAutoConfiguration
SeataDataSourceAutoConfiguration
中仅负责实例化一个Bean(SeataAutoDataSourceProxyCreator),SeataAutoDataSourceProxyCreator
是AT模式下自动代理数据库资源的切面,其继承自AbstractAutoProxyCreator
类,使用CGLIB对DataSource做动态代理,后续对DataSource的访问都会进入到它内部,即它可以看做是一个拦截器;
public class SeataAutoDataSourceProxyCreator extends AbstractAutoProxyCreator {
private static final Logger LOGGER = LoggerFactory.getLogger(SeataAutoDataSourceProxyCreator.class);
private final Set<String> excludes;
private final String dataSourceProxyMode;
private final Object[] advisors;
public SeataAutoDataSourceProxyCreator(boolean useJdkProxy, String[] excludes, String dataSourceProxyMode) {
setProxyTargetClass(!useJdkProxy);
this.excludes = new HashSet<>(Arrays.asList(excludes));
this.dataSourceProxyMode = dataSourceProxyMode;
this.advisors = buildAdvisors(dataSourceProxyMode);
}
private Object[] buildAdvisors(String dataSourceProxyMode) {
Advice advice = new SeataAutoDataSourceProxyAdvice(dataSourceProxyMode);
return new Object[]{new DefaultIntroductionAdvisor(advice)};
}
@Override
protected Object[] getAdvicesAndAdvisorsForBean(Class<?> beanClass, String beanName, TargetSource customTargetSource) {
return advisors;
}
@Override
protected boolean shouldSkip(Class<?> beanClass, String beanName) {
if (excludes.contains(beanClass.getName())) {
return true;
}
return SeataProxy.class.isAssignableFrom(beanClass);
}
@Override
protected Object wrapIfNecessary(Object bean, String beanName, Object cacheKey) {
// we only care DataSource bean
if (!(bean instanceof DataSource)) {
return bean;
}
// when this bean is just a simple DataSource, not SeataDataSourceProxy
if (!(bean instanceof SeataDataSourceProxy)) {
Object enhancer = super.wrapIfNecessary(bean, beanName, cacheKey);
// this mean this bean is either excluded by user or had been proxy before
if (bean == enhancer) {
return bean;
}
// else, build proxy, put <origin, proxy> to holder and return enhancer
DataSource origin = (DataSource) bean;
SeataDataSourceProxy proxy = buildProxy(origin, dataSourceProxyMode);
DataSourceProxyHolder.put(origin, proxy);
return enhancer;
}
/*
* things get dangerous when you try to register SeataDataSourceProxy bean by yourself!
* if you insist on doing so, you must make sure your method return type is DataSource,
* because this processor will never return any subclass of SeataDataSourceProxy
*/
LOGGER.warn("Manually register SeataDataSourceProxy(or its subclass) bean is discouraged! bean name: {}", beanName);
SeataDataSourceProxy proxy = (SeataDataSourceProxy) bean;
DataSource origin = proxy.getTargetDataSource();
Object originEnhancer = super.wrapIfNecessary(origin, beanName, cacheKey);
// this mean origin is either excluded by user or had been proxy before
if (origin == originEnhancer) {
return origin;
}
// else, put <origin, proxy> to holder and return originEnhancer
DataSourceProxyHolder.put(origin, proxy);
return originEnhancer;
}
SeataDataSourceProxy buildProxy(DataSource origin, String proxyMode) {
if (BranchType.AT.name().equalsIgnoreCase(proxyMode)) {
return new DataSourceProxy(origin);
}
if (BranchType.XA.name().equalsIgnoreCase(proxyMode)) {
return new DataSourceProxyXA(origin);
}
throw new IllegalArgumentException("Unknown dataSourceProxyMode: " + proxyMode);
}
}
2)SeataAutoConfiguration
SeataAutoConfiguration是一个特别特别重要的自动装配类,其中仅实例化了两个类到Spring容器中,一个FailureHandler
、一个GlobalTransactionScanner
;
1> FailureHandler
FailureHandler是事务执行失败的处理器,默认为DefaultFailureHandlerImpl
,内容如下:
public class DefaultFailureHandlerImpl implements FailureHandler {
private static final Logger LOGGER = LoggerFactory.getLogger(DefaultFailureHandlerImpl.class);
/**
* Retry 1 hours by default
*/
private static final int RETRY_MAX_TIMES = 6 * 60;
private static final long SCHEDULE_INTERVAL_SECONDS = 10;
private static final long TICK_DURATION = 1;
private static final int TICKS_PER_WHEEL = 8;
private HashedWheelTimer timer = new HashedWheelTimer(
new NamedThreadFactory("failedTransactionRetry", 1),
TICK_DURATION, TimeUnit.SECONDS, TICKS_PER_WHEEL);
@Override
public void onBeginFailure(GlobalTransaction tx, Throwable cause) {
LOGGER.warn("Failed to begin transaction. ", cause);
}
@Override
public void onCommitFailure(GlobalTransaction tx, Throwable cause) {
LOGGER.warn("Failed to commit transaction[" + tx.getXid() + "]", cause);
timer.newTimeout(new CheckTimerTask(tx, GlobalStatus.Committed), SCHEDULE_INTERVAL_SECONDS, TimeUnit.SECONDS);
}
@Override
public void onRollbackFailure(GlobalTransaction tx, Throwable originalException) {
LOGGER.warn("Failed to rollback transaction[" + tx.getXid() + "]", originalException);
timer.newTimeout(new CheckTimerTask(tx, GlobalStatus.Rollbacked), SCHEDULE_INTERVAL_SECONDS, TimeUnit.SECONDS);
}
@Override
public void onRollbackRetrying(GlobalTransaction tx, Throwable originalException) {
StackTraceLogger.warn(LOGGER, originalException, "Retrying to rollback transaction[{}]", new String[] {tx.getXid()});
timer.newTimeout(new CheckTimerTask(tx, GlobalStatus.RollbackRetrying), SCHEDULE_INTERVAL_SECONDS,
TimeUnit.SECONDS);
}
protected class CheckTimerTask implements TimerTask {
private final GlobalTransaction tx;
private final GlobalStatus required;
private int count = 0;
private boolean isStopped = false;
protected CheckTimerTask(final GlobalTransaction tx, GlobalStatus required) {
this.tx = tx;
this.required = required;
}
@Override
public void run(Timeout timeout) throws Exception {
if (!isStopped) {
if (++count > RETRY_MAX_TIMES) {
LOGGER.error("transaction [{}] retry fetch status times exceed the limit [{} times]", tx.getXid(), RETRY_MAX_TIMES);
return;
}
isStopped = shouldStop(tx, required);
timer.newTimeout(this, SCHEDULE_INTERVAL_SECONDS, TimeUnit.SECONDS);
}
}
}
private boolean shouldStop(final GlobalTransaction tx, GlobalStatus required) {
try {
GlobalStatus status = tx.getStatus();
LOGGER.info("transaction [{}] current status is [{}]", tx.getXid(), status);
if (status == required || status == GlobalStatus.Finished) {
return true;
}
} catch (TransactionException e) {
LOGGER.error("fetch GlobalTransaction status error", e);
}
return false;
}
}
2> GlobalTransactionScanner
GlobalTransactionScanner是Seata的核心所在,在类图如下:
部分类/接口作用如下:
AbstractAutoProxyCreator: Spring框架内动态代理创建组件
- ConfigurationChangeListener: 关注配置变更事件监听器
- InitializingBean: Bean初始化回调
- ApplicationContextAware: 感知到SPring容器
- DisposableBean: 支持可抛弃Bean
伴随着Spring容器初始化完毕,会调用GlobalTransactionScanner的初始化逻辑(即:afterPropertiesSet()
方法),进而调用initClient()
方法初始化seata client;
- 初始化Seata Client时,TM和RM的逻辑不同,TM会直接和Seata Server建立长连接;
- 而RM在AT模式下不会直接和Seata Server建立长连接。真正建立长连接的地方时实例化DataSourceProxy时。
seata client和seata server见下一篇文章!下一篇重点对GlobalTransactionScanner类进行解析。
3)HttpAutoConfiguration
HttpAutoConfiguration
继承SpringMVC的WebMvcConfigurerAdapter
,而 WebMvcConfigurerAdapter
又实现了WebMvcConfigurer
接口;
而HttpAutoConfiguration
的作用其实和spring-cloud-starter-alibaba-seata-2.2.8.RELEASE.jar
中的SeataHandlerInterceptorConfiguration
类似;一样是对RootContext进行处理,给SpringMVC添加一个拦截器;
就SpringMVC链路传递xid而言,使用spring-cloud-starter-alibaba-seata
依赖 或 seata-spring-boot-starter
依赖可以实现一样的效果;不过当需要用到OpenFeign、RestTemplate时需要使用spring-cloud-starter-alibaba-seata
依赖来实现xid在seata client间传递的效果。
4)SeataSagaAutoConfiguration
SeataSagaAutoConfiguration主要为SAGA模式服务,具体细节聊到SAGA模式时再说,此处mock处理。
四、总结和后续
本文以SpringBoot的自动装配特性为基调出发,通过对每一个自动装配类内容的分析,可以知道xid是如何在seata client之间传递的、seata client的初始化逻辑、seata client和seata server建立长连接的入口、AT模式下RM如何RC建立长连接;