【学习Seata1.6源码#02】通过Spring AOP 实现声明式事务机制

简介: 【学习Seata1.6源码#02】通过Spring AOP 实现声明式事务机制

一、声明式全局事务

Seata示例工程中,能看到@GlobalTransactional,如下方法示例:

@GlobalTransactional
public boolean purchase(long accountId, long stockId, long quantity) {
    String xid = RootContext.getXID();
    LOGGER.info("New Transaction Begins: " + xid);
    boolean stockResult = reduceAccount(accountId,stockId, quantity);
    if (!stockResult) {
        throw new RuntimeException("账号服务调用失败,事务回滚!");
    }
    Long orderId = createOrder(accountId, stockId, quantity);
    if (orderId == null || orderId <= 0) {
        throw new RuntimeException("订单服务调用失败,事务回滚!");
    }
    return true;
}
复制代码

purchase方法上加上此注解,即表示此方法内的reduceAccountcreateOrder两个微服务调用也将加入到分布式事务中,即扣除账户余额与创建订单将具有分布式事务的数据一致性保障能力。

了解 Spring 注解事务实现的话,应该也能推测出,Seata 的事务能力也可能是基于 Spring 的 AOP 机制,给标注了@GlobalTransactional 的方法做 AOP 增加,织入额外的逻辑以完成分布式事务的能力,伪代码大致如下:

GlobalTransaction tx = GlobalTransactionContext.getCurrentOrCreate();
try {
    tx.begin(xxx);
    ...
    purchase(xxx)//给purchase增加全局事务处理能力
    ...
    tx.commit();
} catch (Exception exx) {
    tx.rollback();
    throw exx;
}
复制代码

本篇就介绍Seata 如何使用 Spring AOP 来将注解变成分布式事务的代码。

二、源码

在上一篇[《【学习Seata1.6源码#01】全局事务注解@GlobalTransactional的识别]》(juejin.cn/post/717954… bean,并对这类 bean 添加GlobalTransactionalInterceptor,进行 AOP 增强,加入分布式事务的能力。本篇延续这个话题继续,梳理 AOP 增强的逻辑。

通过下边的调用堆栈帮大家梳理出 在源码AbstractAutoProxyCreator#wrapIfNecessary中有 createProxy的调用。

createProxy:443, AbstractAutoProxyCreator (org.springframework.aop.framework.autoproxy)
wrapIfNecessary:344, AbstractAutoProxyCreator (org.springframework.aop.framework.autoproxy)
wrapIfNecessary:307, GlobalTransactionScanner (io.seata.spring.annotation)
postProcessAfterInitialization:293, AbstractAutoProxyCreator (org.springframework.aop.framework.autoproxy)
applyBeanPostProcessorsAfterInitialization:455, AbstractAutowireCapableBeanFactory (org.springframework.beans.factory.support)
initializeBean:1808, AbstractAutowireCapableBeanFactory (org.springframework.beans.factory.support)
doCreateBean:620, AbstractAutowireCapableBeanFactory (org.springframework.beans.factory.support)
createBean:542, AbstractAutowireCapableBeanFactory (org.springframework.beans.factory.support)
复制代码

AbstractAutoProxyCreator#createProxy其中new ProxyFactory()则是 AOP 的关键。

protected Object createProxy(Class<?> beanClass, @Nullable String beanName,
        @Nullable Object[] specificInterceptors, TargetSource targetSource) {
    if (this.beanFactory instanceof ConfigurableListableBeanFactory) {
        // 为目标 Bean 的 BeanDefinition 对象设置一个属性
        // org.springframework.aop.framework.autoproxy.AutoProxyUtils.originalTargetClass -> 目标 Bean 的 Class 对象
        AutoProxyUtils.exposeTargetClass((ConfigurableListableBeanFactory) this.beanFactory, beanName, beanClass);
    }
    // <1> 创建一个代理工厂
    ProxyFactory proxyFactory = new ProxyFactory();
    // <2> 复制当前 ProxyConfig 的一些属性(例如 proxyTargetClass、exposeProxy)
    proxyFactory.copyFrom(this);
    /**
     * <3> 判断是否类代理,也就是是否开启 CGLIB 代理
     * 默认配置下为 `false`,参考 {@link org.springframework.context.annotation.EnableAspectJAutoProxy}
     */
    if (!proxyFactory.isProxyTargetClass()) {
        /*
         * <3.1> 如果这个 Bean 配置了进行类代理,则设置为 `proxyTargetClass` 为 `true`
         */
        if (shouldProxyTargetClass(beanClass, beanName)) {
            proxyFactory.setProxyTargetClass(true);
        }
        else {
            /*
             * <3.2> 检测当前 Bean 实现的接口是否包含可代理的接口
             * 如没有实现,则将 `proxyTargetClass` 设为 `true`,表示需要进行 CGLIB 提升
             */
            evaluateProxyInterfaces(beanClass, proxyFactory);
        }
    }
    /*
     * <4> 对入参的 Advisor 进一步处理,因为其中可能还存在 Advice 类型,需要将他们包装成 DefaultPointcutAdvisor 对象
     * 如果配置了 `interceptorNames` 拦截器,也会添加进来
     */
    Advisor[] advisors = buildAdvisors(beanName, specificInterceptors);
    // <5> 代理工厂添加 Advisor 数组
    proxyFactory.addAdvisors(advisors);
    // <6> 代理工厂设置 TargetSource 对象
    proxyFactory.setTargetSource(targetSource);
    // <7> 对 ProxyFactory 进行加工处理,抽象方法,目前没有子类实现
    customizeProxyFactory(proxyFactory);
    proxyFactory.setFrozen(this.freezeProxy);
    // <8> 是否这个 AdvisedSupport 配置管理器已经过滤过目标类(默认为 false)
    if (advisorsPreFiltered()) {
        // 设置 `preFiltered` 为 `true`
        // 这样 Advisor 们就不会根据 ClassFilter 进行过滤了,而直接通过 MethodMatcher 判断是否处理被拦截方法
        proxyFactory.setPreFiltered(true);
    }
    // Use original ClassLoader if bean class not locally loaded in overriding class loader
    ClassLoader classLoader = getProxyClassLoader();
    if (classLoader instanceof SmartClassLoader && classLoader != beanClass.getClassLoader()) {
       classLoader = ((SmartClassLoader) classLoader).getOriginalClassLoader();
    }
    // <9> 通过 ProxyFactory 代理工厂创建代理对象
    return proxyFactory.getProxy(getProxyClassLoader());
}
复制代码

上边源码读起来很生硬,对于我们使用来梳理核心源码流程来说,留意 AOP 实现的几个关键要素即可:

  1. 配置target,被代理者(类或方法中有标注@GlobalTransactional的bean),最终还是要调用他么的方法
  2. 配置接口,即代理要具备的功能
  3. 配置额外的切面 addAdvisors,这里是指定GlobalTransactionalInterceptor
  4. 根据ClassLoader 类加载器创建代理

由此我们可以推测中分布式事务的逻辑是在 GlobalTransactionalInterceptor 中,核心逻辑的实现应该就是invoke中,我们从GlobalTransactionalInterceptor#invoke源码中理一理:

@Override
public Object invoke(final MethodInvocation methodInvocation) throws Throwable {
    Class<?> targetClass =
        methodInvocation.getThis() != null ? AopUtils.getTargetClass(methodInvocation.getThis()) : null;
    Method specificMethod = ClassUtils.getMostSpecificMethod(methodInvocation.getMethod(), targetClass);
    if (specificMethod != null && !specificMethod.getDeclaringClass().equals(Object.class)) {
        final Method method = BridgeMethodResolver.findBridgedMethod(specificMethod);
        // 获取方法上的@GlobalTransactional注解中的内容
        final GlobalTransactional globalTransactionalAnnotation =
            getAnnotation(method, targetClass, GlobalTransactional.class);
        // 获取方法上的@GlobalLock注解中的内容
        final GlobalLock globalLockAnnotation = getAnnotation(method, targetClass, GlobalLock.class);
        //判断是否禁用或者降级状态
        boolean localDisable = disable || (ATOMIC_DEGRADE_CHECK.get() && degradeNum >= degradeCheckAllowTimes);
        if (!localDisable) {
            if (globalTransactionalAnnotation != null || this.aspectTransactional != null) {
                AspectTransactional transactional;
                if (globalTransactionalAnnotation != null) {
                    //构建事务描述信息,这些基础配置信息很重要
                    transactional = new AspectTransactional(globalTransactionalAnnotation.timeoutMills(),
                        globalTransactionalAnnotation.name(), globalTransactionalAnnotation.rollbackFor(),
                        globalTransactionalAnnotation.rollbackForClassName(),
                        globalTransactionalAnnotation.noRollbackFor(),
                        globalTransactionalAnnotation.noRollbackForClassName(),
                        globalTransactionalAnnotation.propagation(),
                        globalTransactionalAnnotation.lockRetryInterval(),
                        globalTransactionalAnnotation.lockRetryTimes(),
                        globalTransactionalAnnotation.lockStrategyMode());
                } else {
                    transactional = this.aspectTransactional;
                }
                //若是@GlobalTransactional
                return handleGlobalTransaction(methodInvocation, transactional);
            } else if (globalLockAnnotation != null) {
                //若是@GlobalLock
                return handleGlobalLock(methodInvocation, globalLockAnnotation);
            }
        }
    }
    return methodInvocation.proceed();
}
复制代码

handleGlobalTransaction中开始了重点,transactionalTemplate从其名字可知,这是模板方法模式,new TransactionalExecutor()getTransactionInfo是在构建事务的一些基础信息,execute()中则是指定了事务目标方法(如purchase方法),

Object handleGlobalTransaction(final MethodInvocation methodInvocation,
    final AspectTransactional aspectTransactional) throws Throwable {
    boolean succeed = true;
    try {
        return transactionalTemplate.execute(new TransactionalExecutor() {
            @Override
            public Object execute() throws Throwable {
                return methodInvocation.proceed();
            }
            ...
            @Override
            public TransactionInfo getTransactionInfo() {
                // reset the value of timeout
                int timeout = aspectTransactional.getTimeoutMills();
                if (timeout <= 0 || timeout == DEFAULT_GLOBAL_TRANSACTION_TIMEOUT) {
                    timeout = defaultGlobalTransactionTimeout;
                }
                TransactionInfo transactionInfo = new TransactionInfo();
                transactionInfo.setTimeOut(timeout);
                transactionInfo.setName(name());
                ...
                return transactionInfo;
            }
        });
    } catch (TransactionalExecutor.ExecutionException e) {
        ...
        }
    } finally {
        if (ATOMIC_DEGRADE_CHECK.get()) {
            EVENT_BUS.post(new DegradeCheckEvent(succeed));
        }
    }
}
复制代码

execute方法中的内容是重点

public Object execute(TransactionalExecutor business) throws Throwable {
    // 1. Get transactionInfo
    TransactionInfo txInfo = business.getTransactionInfo();
    if (txInfo == null) {
        throw new ShouldNeverHappenException("transactionInfo does not exist");
    }
    // 1.1 Get current transaction, if not null, the tx role is 'GlobalTransactionRole.Participant'.
    GlobalTransaction tx = GlobalTransactionContext.getCurrent();
    // 1.2 Handle the transaction propagation.
    Propagation propagation = txInfo.getPropagation();
    SuspendedResourcesHolder suspendedResourcesHolder = null;
    try {
        //...
        // 1.3 If null, create new transaction with role 'GlobalTransactionRole.Launcher'.
        //若全局事务上下文未就绪则new DefaultGlobalTransaction();
        if (tx == null) {
            tx = GlobalTransactionContext.createNew();
        }
        // set current tx config to holder
        GlobalLockConfig previousConfig = replaceGlobalLockConfig(txInfo);
        try {
            // 2. If the tx role is 'GlobalTransactionRole.Launcher', send the request of beginTransaction to TC,
            //    else do nothing. Of course, the hooks will still be triggered.
            //2. 开启全局事务,
            // 2.1 triggerBeforeBegin()
                // 2.2 会跟TC通信获取全局事务ID:xid,
                // 2.3 RootContext.bind(xid);
            // 2.4 triggerAfterBegin()的事件通知调用
            beginTransaction(txInfo, tx);
            Object rs;
            try {
                // 执行我们的事务方法如`purchase`方法
                rs = business.execute();
            } catch (Throwable ex) {
                // 3. 遇到 business exception 则回滚
                completeTransactionAfterThrowing(txInfo, tx, ex);
                throw ex;
            }
            // 4. 提交事务,触发事件回调
            // 4.1 triggerBeforeCommit();
                // 4.2 tx.commit();与TC通信提交事务,内部默认是有5次重试机会
            // 4.3 triggerAfterCommit();
            commitTransaction(tx, txInfo);
            //返回结果
            return rs;
        } finally {
            //5. clear
            resumeGlobalLockConfig(previousConfig);
            //结束后的回调
            triggerAfterCompletion();
            cleanUp();
        }
    } finally {
        // If the transaction is suspended, resume it.
        if (suspendedResourcesHolder != null) {
            tx.resume(suspendedResourcesHolder);
        }
    }
}
复制代码

三、小结:

本篇梳理了引入seata-spring-boot-starter模块后,其内部会通过的自动装配机制会在SeataAutoConfiguration类中,扫描具有@GlobalTransactional全局事务注解的类和方法的 bean,并通过ProxyFactory机制对这类 bean 进行AOP代理, 添加GlobalTransactionalInterceptor,在其内部invoke中通过transactionalTemplate加入分布式事务的能力:

  1. 开启事务与 TC 进行通信,获取 xid ,注入事务上下文
  2. 调用目标方法
  3. 之后根据结果是否正常执行二阶段的提交或回滚

但这里仅仅是 TM 的能力,仍未到RM的职能边界。

四、最后说一句

我是石页兄,如果这篇文章对您有帮助,或者有所启发的话,欢迎关注笔者的微信公众号【 架构染色 】进行交流和学习。您的支持是我坚持写作最大的动力。

欢迎点击链接扫马儿关注、交流。


相关文章
|
2天前
|
Java 开发者 Spring
Spring高手之路24——事务类型及传播行为实战指南
本篇文章深入探讨了Spring中的事务管理,特别是事务传播行为(如REQUIRES_NEW和NESTED)的应用与区别。通过详实的示例和优化的时序图,全面解析如何在实际项目中使用这些高级事务控制技巧,以提升开发者的Spring事务管理能力。
9 1
Spring高手之路24——事务类型及传播行为实战指南
|
20天前
|
存储 缓存 Java
Spring高手之路23——AOP触发机制与代理逻辑的执行
本篇文章深入解析了Spring AOP代理的触发机制和执行流程,从源码角度详细讲解了Bean如何被AOP代理,包括代理对象的创建、配置与执行逻辑,帮助读者全面掌握Spring AOP的核心技术。
28 3
Spring高手之路23——AOP触发机制与代理逻辑的执行
|
20天前
|
消息中间件 Java 数据库
新版 Seata 集成 RocketMQ事务消息,越来越 牛X 了!阿里的 Seata , yyds !
这里 借助 Seata 集成 RocketMQ 事务消息的 新功能,介绍一下一个新遇到的面试题:如果如何实现 **强弱一致性 结合**的分布式事务?
新版 Seata 集成 RocketMQ事务消息,越来越 牛X 了!阿里的 Seata , yyds !
|
5天前
|
Java Spring
[Spring]aop的配置与使用
本文介绍了AOP(面向切面编程)的基本概念和核心思想。AOP是Spring框架的核心功能之一,通过动态代理在不修改原代码的情况下注入新功能。文章详细解释了连接点、切入点、通知、切面等关键概念,并列举了前置通知、后置通知、最终通知、异常通知和环绕通知五种通知类型。
14 1
|
2月前
|
Java 数据库连接 数据库
spring复习05,spring整合mybatis,声明式事务
这篇文章详细介绍了如何在Spring框架中整合MyBatis以及如何配置声明式事务。主要内容包括:在Maven项目中添加依赖、创建实体类和Mapper接口、配置MyBatis核心配置文件和映射文件、配置数据源、创建sqlSessionFactory和sqlSessionTemplate、实现Mapper接口、配置声明式事务以及测试使用。此外,还解释了声明式事务的传播行为、隔离级别、只读提示和事务超时期间等概念。
spring复习05,spring整合mybatis,声明式事务
|
2月前
|
设计模式 Java 测试技术
spring复习04,静态代理动态代理,AOP
这篇文章讲解了Java代理模式的相关知识,包括静态代理和动态代理(JDK动态代理和CGLIB),以及AOP(面向切面编程)的概念和在Spring框架中的应用。文章还提供了详细的示例代码,演示了如何使用Spring AOP进行方法增强和代理对象的创建。
spring复习04,静态代理动态代理,AOP
|
30天前
|
Java 编译器 Spring
Spring AOP 和 AspectJ 的区别
Spring AOP和AspectJ AOP都是面向切面编程(AOP)的实现,但它们在实现方式、灵活性、依赖性、性能和使用场景等方面存在显著区别。‌
49 2
|
1月前
|
Java Spring 容器
Spring IOC、AOP与事务管理底层原理及源码解析
【10月更文挑战第1天】Spring框架以其强大的控制反转(IOC)和面向切面编程(AOP)功能,成为Java企业级开发中的首选框架。本文将深入探讨Spring IOC和AOP的底层原理,并通过源码解析来揭示其实现机制。同时,我们还将探讨Spring事务管理的核心原理,并给出相应的源码示例。
117 9
|
1月前
|
Java 关系型数据库 MySQL
Spring事务失效,我总结了这7个主要原因
本文详细探讨了Spring事务在日常开发中常见的七个失效原因,包括数据库不支持事务、类不受Spring管理、事务方法非public、异常被捕获、`rollbackFor`属性配置错误、方法内部调用事务方法及事务传播属性使用不当。通过具体示例和源码分析,帮助开发者更好地理解和应用Spring事务机制,避免线上事故。适合所有使用Spring进行业务开发的工程师参考。
27 2
|
1月前
|
Java 程序员 Spring
Spring事务的1道面试题
每次聊起Spring事务,好像很熟悉,又好像很陌生。本篇通过一道面试题和一些实践,来拆解几个Spring事务的常见坑点。
Spring事务的1道面试题