一、声明式全局事务
在Seata
示例工程中,能看到@GlobalTransactional
,如下方法示例:
@GlobalTransactional public boolean purchase(long accountId, long stockId, long quantity) { String xid = RootContext.getXID(); LOGGER.info("New Transaction Begins: " + xid); boolean stockResult = reduceAccount(accountId,stockId, quantity); if (!stockResult) { throw new RuntimeException("账号服务调用失败,事务回滚!"); } Long orderId = createOrder(accountId, stockId, quantity); if (orderId == null || orderId <= 0) { throw new RuntimeException("订单服务调用失败,事务回滚!"); } return true; } 复制代码
purchase
方法上加上此注解,即表示此方法内的reduceAccount
和createOrder
两个微服务调用也将加入到分布式事务中,即扣除账户余额与创建订单将具有分布式事务的数据一致性保障能力。
了解 Spring 注解事务实现的话,应该也能推测出,Seata 的事务能力也可能是基于 Spring 的 AOP 机制,给标注了@GlobalTransactional
的方法做 AOP 增加,织入额外的逻辑以完成分布式事务的能力,伪代码大致如下:
GlobalTransaction tx = GlobalTransactionContext.getCurrentOrCreate(); try { tx.begin(xxx); ... purchase(xxx)//给purchase增加全局事务处理能力 ... tx.commit(); } catch (Exception exx) { tx.rollback(); throw exx; } 复制代码
本篇就介绍Seata 如何使用 Spring AOP 来将注解变成分布式事务的代码。
二、源码
在上一篇[《【学习Seata1.6源码#01】全局事务注解@GlobalTransactional的识别]》(juejin.cn/post/717954… bean,并对这类 bean 添加GlobalTransactionalInterceptor
,进行 AOP 增强,加入分布式事务的能力。本篇延续这个话题继续,梳理 AOP 增强的逻辑。
通过下边的调用堆栈帮大家梳理出 在源码AbstractAutoProxyCreator#wrapIfNecessary
中有 createProxy
的调用。
createProxy:443, AbstractAutoProxyCreator (org.springframework.aop.framework.autoproxy) wrapIfNecessary:344, AbstractAutoProxyCreator (org.springframework.aop.framework.autoproxy) wrapIfNecessary:307, GlobalTransactionScanner (io.seata.spring.annotation) postProcessAfterInitialization:293, AbstractAutoProxyCreator (org.springframework.aop.framework.autoproxy) applyBeanPostProcessorsAfterInitialization:455, AbstractAutowireCapableBeanFactory (org.springframework.beans.factory.support) initializeBean:1808, AbstractAutowireCapableBeanFactory (org.springframework.beans.factory.support) doCreateBean:620, AbstractAutowireCapableBeanFactory (org.springframework.beans.factory.support) createBean:542, AbstractAutowireCapableBeanFactory (org.springframework.beans.factory.support) 复制代码
AbstractAutoProxyCreator#createProxy
其中new ProxyFactory()
则是 AOP 的关键。
protected Object createProxy(Class<?> beanClass, @Nullable String beanName, @Nullable Object[] specificInterceptors, TargetSource targetSource) { if (this.beanFactory instanceof ConfigurableListableBeanFactory) { // 为目标 Bean 的 BeanDefinition 对象设置一个属性 // org.springframework.aop.framework.autoproxy.AutoProxyUtils.originalTargetClass -> 目标 Bean 的 Class 对象 AutoProxyUtils.exposeTargetClass((ConfigurableListableBeanFactory) this.beanFactory, beanName, beanClass); } // <1> 创建一个代理工厂 ProxyFactory proxyFactory = new ProxyFactory(); // <2> 复制当前 ProxyConfig 的一些属性(例如 proxyTargetClass、exposeProxy) proxyFactory.copyFrom(this); /** * <3> 判断是否类代理,也就是是否开启 CGLIB 代理 * 默认配置下为 `false`,参考 {@link org.springframework.context.annotation.EnableAspectJAutoProxy} */ if (!proxyFactory.isProxyTargetClass()) { /* * <3.1> 如果这个 Bean 配置了进行类代理,则设置为 `proxyTargetClass` 为 `true` */ if (shouldProxyTargetClass(beanClass, beanName)) { proxyFactory.setProxyTargetClass(true); } else { /* * <3.2> 检测当前 Bean 实现的接口是否包含可代理的接口 * 如没有实现,则将 `proxyTargetClass` 设为 `true`,表示需要进行 CGLIB 提升 */ evaluateProxyInterfaces(beanClass, proxyFactory); } } /* * <4> 对入参的 Advisor 进一步处理,因为其中可能还存在 Advice 类型,需要将他们包装成 DefaultPointcutAdvisor 对象 * 如果配置了 `interceptorNames` 拦截器,也会添加进来 */ Advisor[] advisors = buildAdvisors(beanName, specificInterceptors); // <5> 代理工厂添加 Advisor 数组 proxyFactory.addAdvisors(advisors); // <6> 代理工厂设置 TargetSource 对象 proxyFactory.setTargetSource(targetSource); // <7> 对 ProxyFactory 进行加工处理,抽象方法,目前没有子类实现 customizeProxyFactory(proxyFactory); proxyFactory.setFrozen(this.freezeProxy); // <8> 是否这个 AdvisedSupport 配置管理器已经过滤过目标类(默认为 false) if (advisorsPreFiltered()) { // 设置 `preFiltered` 为 `true` // 这样 Advisor 们就不会根据 ClassFilter 进行过滤了,而直接通过 MethodMatcher 判断是否处理被拦截方法 proxyFactory.setPreFiltered(true); } // Use original ClassLoader if bean class not locally loaded in overriding class loader ClassLoader classLoader = getProxyClassLoader(); if (classLoader instanceof SmartClassLoader && classLoader != beanClass.getClassLoader()) { classLoader = ((SmartClassLoader) classLoader).getOriginalClassLoader(); } // <9> 通过 ProxyFactory 代理工厂创建代理对象 return proxyFactory.getProxy(getProxyClassLoader()); } 复制代码
上边源码读起来很生硬,对于我们使用来梳理核心源码流程来说,留意 AOP 实现的几个关键要素即可:
- 配置target,被代理者(类或方法中有标注@GlobalTransactional的bean),最终还是要调用他么的方法
- 配置接口,即代理要具备的功能
- 配置额外的切面 addAdvisors,这里是指定
GlobalTransactionalInterceptor
- 根据ClassLoader 类加载器创建代理
由此我们可以推测中分布式事务的逻辑是在 GlobalTransactionalInterceptor
中,核心逻辑的实现应该就是invoke
中,我们从GlobalTransactionalInterceptor#invoke
源码中理一理:
@Override public Object invoke(final MethodInvocation methodInvocation) throws Throwable { Class<?> targetClass = methodInvocation.getThis() != null ? AopUtils.getTargetClass(methodInvocation.getThis()) : null; Method specificMethod = ClassUtils.getMostSpecificMethod(methodInvocation.getMethod(), targetClass); if (specificMethod != null && !specificMethod.getDeclaringClass().equals(Object.class)) { final Method method = BridgeMethodResolver.findBridgedMethod(specificMethod); // 获取方法上的@GlobalTransactional注解中的内容 final GlobalTransactional globalTransactionalAnnotation = getAnnotation(method, targetClass, GlobalTransactional.class); // 获取方法上的@GlobalLock注解中的内容 final GlobalLock globalLockAnnotation = getAnnotation(method, targetClass, GlobalLock.class); //判断是否禁用或者降级状态 boolean localDisable = disable || (ATOMIC_DEGRADE_CHECK.get() && degradeNum >= degradeCheckAllowTimes); if (!localDisable) { if (globalTransactionalAnnotation != null || this.aspectTransactional != null) { AspectTransactional transactional; if (globalTransactionalAnnotation != null) { //构建事务描述信息,这些基础配置信息很重要 transactional = new AspectTransactional(globalTransactionalAnnotation.timeoutMills(), globalTransactionalAnnotation.name(), globalTransactionalAnnotation.rollbackFor(), globalTransactionalAnnotation.rollbackForClassName(), globalTransactionalAnnotation.noRollbackFor(), globalTransactionalAnnotation.noRollbackForClassName(), globalTransactionalAnnotation.propagation(), globalTransactionalAnnotation.lockRetryInterval(), globalTransactionalAnnotation.lockRetryTimes(), globalTransactionalAnnotation.lockStrategyMode()); } else { transactional = this.aspectTransactional; } //若是@GlobalTransactional return handleGlobalTransaction(methodInvocation, transactional); } else if (globalLockAnnotation != null) { //若是@GlobalLock return handleGlobalLock(methodInvocation, globalLockAnnotation); } } } return methodInvocation.proceed(); } 复制代码
handleGlobalTransaction
中开始了重点,transactionalTemplate
从其名字可知,这是模板方法模式,new TransactionalExecutor()
中 getTransactionInfo
是在构建事务的一些基础信息,execute()
中则是指定了事务目标方法(如purchase
方法),
Object handleGlobalTransaction(final MethodInvocation methodInvocation, final AspectTransactional aspectTransactional) throws Throwable { boolean succeed = true; try { return transactionalTemplate.execute(new TransactionalExecutor() { @Override public Object execute() throws Throwable { return methodInvocation.proceed(); } ... @Override public TransactionInfo getTransactionInfo() { // reset the value of timeout int timeout = aspectTransactional.getTimeoutMills(); if (timeout <= 0 || timeout == DEFAULT_GLOBAL_TRANSACTION_TIMEOUT) { timeout = defaultGlobalTransactionTimeout; } TransactionInfo transactionInfo = new TransactionInfo(); transactionInfo.setTimeOut(timeout); transactionInfo.setName(name()); ... return transactionInfo; } }); } catch (TransactionalExecutor.ExecutionException e) { ... } } finally { if (ATOMIC_DEGRADE_CHECK.get()) { EVENT_BUS.post(new DegradeCheckEvent(succeed)); } } } 复制代码
execute
方法中的内容是重点
public Object execute(TransactionalExecutor business) throws Throwable { // 1. Get transactionInfo TransactionInfo txInfo = business.getTransactionInfo(); if (txInfo == null) { throw new ShouldNeverHappenException("transactionInfo does not exist"); } // 1.1 Get current transaction, if not null, the tx role is 'GlobalTransactionRole.Participant'. GlobalTransaction tx = GlobalTransactionContext.getCurrent(); // 1.2 Handle the transaction propagation. Propagation propagation = txInfo.getPropagation(); SuspendedResourcesHolder suspendedResourcesHolder = null; try { //... // 1.3 If null, create new transaction with role 'GlobalTransactionRole.Launcher'. //若全局事务上下文未就绪则new DefaultGlobalTransaction(); if (tx == null) { tx = GlobalTransactionContext.createNew(); } // set current tx config to holder GlobalLockConfig previousConfig = replaceGlobalLockConfig(txInfo); try { // 2. If the tx role is 'GlobalTransactionRole.Launcher', send the request of beginTransaction to TC, // else do nothing. Of course, the hooks will still be triggered. //2. 开启全局事务, // 2.1 triggerBeforeBegin() // 2.2 会跟TC通信获取全局事务ID:xid, // 2.3 RootContext.bind(xid); // 2.4 triggerAfterBegin()的事件通知调用 beginTransaction(txInfo, tx); Object rs; try { // 执行我们的事务方法如`purchase`方法 rs = business.execute(); } catch (Throwable ex) { // 3. 遇到 business exception 则回滚 completeTransactionAfterThrowing(txInfo, tx, ex); throw ex; } // 4. 提交事务,触发事件回调 // 4.1 triggerBeforeCommit(); // 4.2 tx.commit();与TC通信提交事务,内部默认是有5次重试机会 // 4.3 triggerAfterCommit(); commitTransaction(tx, txInfo); //返回结果 return rs; } finally { //5. clear resumeGlobalLockConfig(previousConfig); //结束后的回调 triggerAfterCompletion(); cleanUp(); } } finally { // If the transaction is suspended, resume it. if (suspendedResourcesHolder != null) { tx.resume(suspendedResourcesHolder); } } } 复制代码
三、小结:
本篇梳理了引入seata-spring-boot-starter
模块后,其内部会通过的自动装配机制会在SeataAutoConfiguration
类中,扫描具有@GlobalTransactional
全局事务注解的类和方法的 bean,并通过ProxyFactory
机制对这类 bean 进行AOP代理, 添加GlobalTransactionalInterceptor
,在其内部invoke
中通过transactionalTemplate
加入分布式事务的能力:
- 开启事务与 TC 进行通信,获取 xid ,注入事务上下文
- 调用目标方法
- 之后根据结果是否正常执行二阶段的提交或回滚
但这里仅仅是 TM 的能力,仍未到RM的职能边界。
四、最后说一句
我是石页兄,如果这篇文章对您有帮助,或者有所启发的话,欢迎关注笔者的微信公众号【 架构染色 】进行交流和学习。您的支持是我坚持写作最大的动力。