本文已收录在Github,关注我,紧跟本系列专栏文章,咱们下篇再续!
- 🚀 魔都架构师 | 全网30W技术追随者
- 🔧 大厂分布式系统/数据中台实战专家
- 🏆 主导交易系统百万级流量调优 & 车联网平台架构
- 🧠 AIGC应用开发先行者 | 区块链落地实践者
- 🌍 以技术驱动创新,我们的征途是改变世界!
- 👉 实战干货:编程严选网
0 前言
- 事务的传播机制
- 多数据源的切换问题
更深入理解Spring事务。
1 数据准备
用户注册完成后,需要给该用户登记一门PUA必修课,并更新该门课的登记用户数。为此,添加
1.1 两个表
① 课程表course
记录课程名称和注册的用户数。
CREATE TABLE `course` ( `id` int(11) NOT NULL AUTO_INCREMENT, `course_name` varchar(64) DEFAULT NULL, `number` int(11) DEFAULT NULL, PRIMARY KEY (`id`) ) ENGINE=InnoDB DEFAULT CHARSET=utf8
② 用户选课表 user_course
记录用户表 user 和课程表 course 之间的多对多关联。
CREATE TABLE `student_course` ( `student_id` int(11) NOT NULL, `course_id` int(11) NOT NULL ) ENGINE = InnoDB DEFAULT CHARSET = utf8
为课程表初始化了一条课程信息:
insert into course values (1, 'PUA', 0);
1.2 用户相关操作
主要包括两部分:
① 新增用户选课记录
@Mapper public interface UserCourseMapper { @Insert("INSERT INTO `user_course`(`user_id`, `course_id`) VALUES (#{userId}, #{courseId})") void saveUserCourse(@Param("userId") Integer userId, @Param("courseId") Integer courseId); }
② 课程登记学生数 + 1
@Mapper public interface CourseMapper { @Update("update `course` set number = number + 1 where id = #{id}") void addCourseNumber(int courseId); }
新增业务类 CourseService实现相关业务逻辑,分别调用了上述方法保存用户与课程的关联关系,并给课程注册人数+1
@Service public class CourseService { @Autowired private CourseMapper courseMapper; @Autowired private UserCourseMapper userCourseMapper; @Transactional(rollbackFor = Exception.class) public void regCourse(int studentId) throws Exception { userCourseMapper.saveUserCourse(studentId, 1); courseMapper.addCourseNumber(1); } }
为避免注册课程的业务异常导致用户信息无法保存,这里 catch 注册课程方法中抛出的异常。希望当注册课程发生错误时,只回滚注册课程部分,保证用户信息依然正常。
@Service public class UserService { @Autowired private UserMapper userMapper; @Autowired private UserService userService; @Autowired private CourseService courseService; public void doSaveUser(User user) { userMapper.saveUser(user); if ("JavaEdge".equals(user.getName())) { throw new RuntimeException("该用户已存在"); } } @Transactional(rollbackFor = Exception.class) public void saveUser(String name) throws Exception { User user = new User(); user.setName(name); userService.doSaveUser(user); try { courseService.regCourse(user.getId()); } catch (Exception e) { e.printStackTrace(); } } }
为验证异常是否符合预期,regCourse()抛个注册失败异常:
@Service public class CourseService { @Autowired private CourseMapper courseMapper; @Autowired private UserCourseMapper userCourseMapper; @Transactional(rollbackFor = Exception.class) public void regCourse(int studentId) throws Exception { userCourseMapper.saveUserCourse(studentId, 1); courseMapper.addCourseNumber(1); throw new Exception("注册失败"); } }
执行代码:
public void regCourse(int userId) throws Exception { userCourseMapper.saveUserCourse(userId, courseId: 1); courseMapper.addCourseNumber(courseId: 1); throw new Exception("注册失败"); }
java.lang.Exception: 注册失败 at com.javaedge.spring.tx.v3.CourseService.regCourse(CourseService.java:24) at com.javaedge.spring.tx.v3.CourseService$$FastClassBySpringCGLIB$$1c626f94.invoke(<generated>) at org.springframework.cglib.proxy.MethodProxy.invoke(MethodProxy.java:218) at org.springframework.aop.framework.CglibAopProxy$CglibMethodInvocation.invokeJoinpoint(CglibAopProxy.java:769)
注册失败部分的异常符合预期,但是后面又多了一个这样的错误提示:Transaction rolled back because it has been marked as rollback-only
Exception in thread "main" org.springframework.transaction.UnexpectedRollbackException: Transaction rolled back because it has been marked as rollback-only at org.springframework.transaction.support.AbstractPlatformTransactionManager.processRollback(AbstractPlatformTransactionManager.java:871) at org.springframework.transaction.support.AbstractPlatformTransactionManager.commit(AbstractPlatformTransactionManager.java:708) at org.springframework.transaction.interceptor.TransactionAspectSupport.commitTransactionAfterReturning(TransactionAspectSupport.java:631) at org.springframework.transaction.interceptor.TransactionAspectSupport.invokeWithinTransaction(TransactionAspectSupport.java:385) at org.springframework.transaction.interceptor.TransactionInterceptor.invoke(TransactionInterceptor.java:99) at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:186) at org.springframework.aop.framework.CglibAopProxy$CglibMethodInvocation.proceed(CglibAopProxy.java:747) at org.springframework.aop.framework.CglibAopProxy$DynamicAdvisedInterceptor.intercept(CglibAopProxy.java:689) at com.javaedge.spring.tx.v3.UserService$$EnhancerBySpringCGLIB$$ed0dd850.saveUser(<generated>) at com.javaedge.spring.tx.v3.BestPractiseApplication.main(BestPractiseApplication.java:26)
最后用户和选课的信息都被回滚了,不符预期。
期待结果是即便内部事务regCourse()发生异常,外部事务saveStudent()俘获该异常后,内部事务应自行回滚,不影响外部事务。why?
2 源码解析
伪代码梳理整个事务结构:
// 外层 saveUser() 的事务 @Transactional(rollbackFor = Exception.class) public void saveUser(String name) throws Exception { // ... userService.doSaveUser(user); try { // 嵌套的内层 regCourse() 事务 @Transactional(rollbackFor = Exception.class) public void regCourse(int userId) throws Exception { // ... } } catch (Exception e) { e.printStackTrace(); } }
Spring声明式事务中的propagation属性,表示对这些方法使用怎样的事务,即:一个带事务的方法调用了另一个带事务的方法,被调用的方法咋处理自己事务和调用方法事务之间的关系。
因为:
- 在 saveUser() 上声明了一个外部的事务,就已经存在一个事务了
- 在propagation值为默认REQUIRED时
regCourse() 就会加入到已有的事务中,两个方法共用一个事务。
Spring事务处理的核心:
2.1 invokeWithinTransaction()
TransactionAspectSupport类中:
protected Object invokeWithinTransaction(Method method, @Nullable Class<?> targetClass, final InvocationCallback invocation) throws Throwable { TransactionAttributeSource tas = getTransactionAttributeSource(); final TransactionAttribute txAttr = (tas != null ? tas.getTransactionAttribute(method, targetClass) : null); final PlatformTransactionManager tm = determineTransactionManager(txAttr); final String joinpointIdentification = methodIdentification(method, targetClass, txAttr); if (txAttr == null || !(tm instanceof CallbackPreferringPlatformTransactionManager)) { // 是否需创建一个事务 TransactionInfo txInfo = createTransactionIfNecessary(tm, txAttr, joinpointIdentification); Object retVal = null; try { // 调用具体业务方法 retVal = invocation.proceedWithInvocation(); } catch (Throwable ex) { completeTransactionAfterThrowing(txInfo, ex); throw ex; } finally { cleanupTransactionInfo(txInfo); } // 正常返回时提交事务 commitTransactionAfterReturning(txInfo); return retVal; } ... }
整个方法完成了事务的一整套处理逻辑:
- 检查是否需创建事务
- 调用具体业务方法来处理
- 提交事务
- 处理异常
当前案例是俩事务嵌套,每个事务都会调用该方法,即该方法会被调两次。
2.2 内层事务
当捕获异常,会调用
2.2.1 completeTransactionAfterThrowing()
进行异常处理:
/** * Handle a throwable, completing the transaction. * We may commit or roll back, depending on the configuration. * @param txInfo information about the current transaction * @param ex throwable encountered */ protected void completeTransactionAfterThrowing(@Nullable TransactionInfo txInfo, Throwable ex) { if (txInfo != null && txInfo.getTransactionStatus() != null) { if (txInfo.transactionAttribute != null && txInfo.transactionAttribute.rollbackOn(ex)) { try { // see!!! txInfo.getTransactionManager().rollback(txInfo.getTransactionStatus()); } catch (TransactionSystemException ex2) { logger.error("Application exception overridden by rollback exception", ex); ex2.initApplicationException(ex); throw ex2; } catch (RuntimeException | Error ex2) { logger.error("Application exception overridden by rollback exception", ex); throw ex2; } } else { // We don't roll back on this exception. // Will still roll back if TransactionStatus.isRollbackOnly() is true. try { txInfo.getTransactionManager().commit(txInfo.getTransactionStatus()); } catch (TransactionSystemException ex2) { logger.error("Application exception overridden by commit exception", ex); ex2.initApplicationException(ex); throw ex2; } catch (RuntimeException | Error ex2) { logger.error("Application exception overridden by commit exception", ex); throw ex2; } } } }
对异常类型做一些检查,当符合声明中的定义后,执行rollback操作,就在AbstractPlatformTransactionManager类中的
2.2.2 rollback()
处理正参与到已有事务集的事务。委托执行Rollback和doSetRollbackOnly。
/** * This implementation of rollback handles participating in existing * transactions. Delegates to {@code doRollback} and * {@code doSetRollbackOnly}. * @see #doRollback * @see #doSetRollbackOnly */ @Override public final void rollback(TransactionStatus status) throws TransactionException { if (status.isCompleted()) { throw new IllegalTransactionStateException( "Transaction is already completed - do not call commit or rollback more than once per transaction"); } DefaultTransactionStatus defStatus = (DefaultTransactionStatus) status; // see!!! processRollback(defStatus, false); }
继续调用
2.2.3 processRollback()
private void processRollback(DefaultTransactionStatus status, boolean unexpected) { try { boolean unexpectedRollback = unexpected; if (status.hasSavepoint()) { // 有保存点 status.rollbackToHeldSavepoint(); } else if (status.isNewTransaction()) { // 是否为一个新事务 doRollback(status); } else { // 处于一个更大的事务中 if (status.hasTransaction()) { // 分支1 if (status.isLocalRollbackOnly() || isGlobalRollbackOnParticipationFailure()) { doSetRollbackOnly(status); } } if (!isFailEarlyOnGlobalRollbackOnly()) { unexpectedRollback = false; } } // ... if (unexpectedRollback) { throw new UnexpectedRollbackException( "Transaction rolled back because it has been marked as rollback-only"); } } finally { cleanupAfterCompletion(status); } }
该里区分三种场景:
- 是否有保存点
- 是否为一个新的事务
- 是否处于一个更大的事务中
因为默认传播类型REQUIRED,嵌套的事务并未开启一个新事务,所以属于当前事务,即处于一个更大事务中,走分支1。
如下的判断条件确定是否设置为仅回滚:
if (status.isLocalRollbackOnly() || isGlobalRollbackOnParticipationFailure())
满足任一,都会执行 doSetRollbackOnly():
① isLocalRollbackOnly
private boolean rollbackOnly = false; /** * Determine the rollback-only flag via checking this TransactionStatus. * <p>Will only return "true" if the application called {@code setRollbackOnly} * on this TransactionStatus object. */ public boolean isLocalRollbackOnly() { return this.rollbackOnly; }
默认 false,当前场景为 false。
② isGlobalRollbackOnParticipationFailure()
private boolean globalRollbackOnParticipationFailure = true; /** * Return whether to globally mark an existing transaction as rollback-only * after a participating transaction failed. */ public final boolean isGlobalRollbackOnParticipationFailure() { return this.globalRollbackOnParticipationFailure; }
所以,就只由该方法来确定,默认为true,即是否回滚交由外层事务统一决定
条件得到满足,执行
DataSourceTransactionManager#doSetRollbackOnly
@Override protected void doSetRollbackOnly(DefaultTransactionStatus status) { DataSourceTransactionObject txObject = (DataSourceTransactionObject) status.getTransaction(); if (status.isDebug()) { logger.debug("Setting JDBC transaction [" + txObject.getConnectionHolder().getConnection() + "] rollback-only"); } // see!!! txObject.setRollbackOnly(); }
最终调用
2.2.4 DataSourceTransactionObject#setRollbackOnly()
public void setRollbackOnly() { getConnectionHolder().setRollbackOnly(); }
内层事务操作完毕。
2.3 外层事务
外层事务中,业务代码就捕获了内层所抛异常,所以该异常不会继续往上抛,最后的事务会在invokeWithinTransaction()中的
2.3.1 commitTransactionAfterReturning()
/** * Execute after successful completion of call, but not after an exception was handled. * Do nothing if we didn't create a transaction. * @param txInfo information about the current transaction */ protected void commitTransactionAfterReturning(@Nullable TransactionInfo txInfo) { if (txInfo != null && txInfo.getTransactionStatus() != null) { // see!!! txInfo.getTransactionManager().commit(txInfo.getTransactionStatus()); } }
该方法里执行commit:
AbstractPlatformTransactionManager#commit
package org.springframework.transaction.support; /** * 实现了 Spring 标准事务工作流的抽象基类, * 作为具体平台事务管理器(如 * {@link org.springframework.transaction.jta.JtaTransactionManager})的基础。 * * <p>该基类提供了以下工作流处理: * <ul> * <li>判断当前是否存在已有事务; * <li>应用合适的传播行为; * <li>在必要时挂起和恢复事务; * <li>在提交时检查回滚标记(rollback-only flag); * <li>在回滚时执行相应的操作 * (实际回滚或仅设置 rollback-only 状态); * <li>触发已注册的同步回调 * (如果事务同步处于激活状态)。 * </ul> * * <p>子类必须为事务的特定状态实现具体的模板方法, * 如:begin(开始)、suspend(挂起)、resume(恢复)、commit(提交)、rollback(回滚)。 * 其中最重要的方法是抽象的,必须由具体子类提供实现; * 其余方法则提供了默认实现,因此子类可选择性地重写。 * * <p>事务同步是一种通用机制,用于注册在事务完成时被调用的回调。 * 它主要被 JDBC、Hibernate、JPA 等数据访问支持类内部使用, * 特别是在 JTA 事务环境中运行时: * 这些类会注册在事务内打开的资源,以便在事务完成时关闭, * 从而允许例如在同一个事务内复用相同的 Hibernate Session。 * 应用程序也可以利用相同的机制来满足自定义的同步需求。 * * <p>本类的状态是可序列化的,以便在携带事务拦截器的代理对象被序列化时, * 能够一并序列化事务策略。 * 子类可根据需要决定是否使其自身状态也可序列化。 * 若需要,则应实现 {@code java.io.Serializable} 标记接口, * 并且如果需要恢复任何瞬态(transient)状态, * 可根据 Java 序列化规则提供一个私有的 {@code readObject()} 方法。 */ @SuppressWarnings("serial") public abstract class AbstractPlatformTransactionManager implements PlatformTransactionManager, Serializable { /** * 此 commit 方法的实现用于处理参与现有事务的情况 * 以及编程式发起的回滚请求。 * 它会委托给 {@code isRollbackOnly}、{@code doCommit} * 和 {@code rollback} 方法进行具体处理。 */ @Override public final void commit(TransactionStatus status) throws TransactionException { if (status.isCompleted()) { throw new IllegalTransactionStateException( "Transaction is already completed - do not call commit or rollback more than once per transaction"); } DefaultTransactionStatus defStatus = (DefaultTransactionStatus) status; if (defStatus.isLocalRollbackOnly()) { if (defStatus.isDebug()) { logger.debug("Transactional code has requested rollback"); } processRollback(defStatus, false); return; } if (!shouldCommitOnGlobalRollbackOnly() && defStatus.isGlobalRollbackOnly()) { if (defStatus.isDebug()) { logger.debug("Global transaction is marked as rollback-only but transactional code requested commit"); } // see!!! processRollback(defStatus, true); return; } processCommit(defStatus); }
当满足 !shouldCommitOnGlobalRollbackOnly() &&defStatus.isGlobalRollbackOnly(),就会回滚,否则继续提交事务:
shouldCommitOnGlobalRollbackOnly()
/** * 返回是否应在已被全局标记为 rollback-only(仅回滚)的事务上调用doCommit。 * * 此设置不适用于应用程序通过 TransactionStatus 本地将事务设为 rollback-only 的情况, * 而仅适用于事务协调器(transaction coordinator)本身将该事务标记为 rollback-only 的情形。 * * 默认返回 "false":本地事务策略通常不会在事务对象内部保存 rollback-only 标记, * 因此它们无法在事务提交过程中处理 rollback-only 状态的事务。 * 在这种情况下,AbstractPlatformTransactionManager 将主动触发回滚, * 并随后抛 UnexpectedRollbackException。 * * 如果具体的事务管理器即使在事务被标记为 rollback-only 时也期望调用doCommit, * 从而在doCommit中进行特殊处理,则应重写此方法并返回 "true"。 * 如 JTA 就属于这种情况:UserTransaction.commit()自身会检查只读/回滚标记, * 并在必要时抛出相应的 RollbackException,其中可能包含具体原因(如事务超时)。 * * 如果此方法返回 "true",但doCommit的实现却没有抛出异常, * 那么该事务管理器自身仍会抛UnexpectedRollbackException。 * 这通常不应发生;该检查主要是为了防范行为异常的 JTA 提供商—— * 它们在调用代码并未请求回滚的情况下,却静默地执行了回滚操作。 */ protected boolean shouldCommitOnGlobalRollbackOnly() { return false; }
若发现事务被标记了全局回滚,且在发生全局回滚时,判断是否应该提交事务,这个方法的默认返回 false,这里无需关注
isGlobalRollbackOnly()
/** * 通过检查事务对象来确定 rollback-only 标志, * 前提是该事务对象实现了 {@link SmartTransactionObject} 接口。 * * <p>如果事务协调器(例如因事务超时等原因)已将全局事务本身标记为 rollback-only, * 则此方法将返回 {@code true}。 */ @Override public boolean isGlobalRollbackOnly() { return ((this.transaction instanceof SmartTransactionObject) && ((SmartTransactionObject) this.transaction).isRollbackOnly()); }
该方法最终进入
isRollbackOnly()
@SuppressWarnings("serial") public class DataSourceTransactionManager extends AbstractPlatformTransactionManager implements ResourceTransactionManager, InitializingBean { @Override public boolean isRollbackOnly() { return getConnectionHolder().isRollbackOnly(); }
之前内部事务处理最终调用到DataSourceTransactionObject#setRollbackOnly()
public void setRollbackOnly() { getConnectionHolder().setRollbackOnly(); }
- isRollbackOnly()
- setRollbackOnly()
两个方法本质都是对ConnectionHolder.rollbackOnly属性标志位的存取。但ConnectionHolder则存在于DefaultTransactionStatus#transaction属性。
综上:外层事务是否回滚关键,最终取决于DataSourceTransactionObject#isRollbackOnly(),该方法返回值正是在内层异常时设置的。所以最终外层事务也被回滚,从而在控制台中打印上述日志。
因此,Spring默认事务传播属性为REQUIRED,内外两层事务都处同一事务。在regCourse()抛异常,并触发回滚操作时,这个回滚会继续传播,从而把saveUser()也回滚,最终整个事务都被回滚!
3 修正
Spring事务默认传播属性 REQUIRED,在整个事务的调用链上,任一环节抛异常都会导致全局回滚。只需将传播属性改成 REQUIRES_NEW:
@Transactional(rollbackFor = Exception.class, propagation = Propagation.REQUIRES_NEW) public void regCourse(int userId) throws Exception { userCourseMapper.saveUserCourse(userId, courseId: 1); courseMapper.addCourseNumber(courseId: 1); throw new Exception("注册失败"); }
运行:
java.lang.Exception: 注册失败 at com.javaedge.spring.tx.v3.CourseService.regCourse(CourseService.java:25) at com.javaedge.spring.tx.v3.CourseService$$FastClassBySpringCGLIB$$1c626f94.invoke(<generated>) at org.springframework.cglib.proxy.MethodProxy.invoke(MethodProxy.java:218) at org.springframework.aop.framework.CglibAopProxy$CglibMethodInvocation.invokeJoinpoint(CglibAopProxy.java:769)
异常正常抛出,注册课程部分的数据没有保存,但用户还是正常注册成功。这意味着此时Spring 只对注册课程这部分的数据进行了回滚,并没有传播到外层:
- 当子事务声明为Propagation.REQUIRES_NEW时,在
TransactionAspectSupport.invokeWithinTransaction()中调用createTransactionIfNecessary()就会创建一个新事务,独立于外层事务 - 在AbstractPlatformTransactionManager.processRollback() 进行 rollback 处理时,因为status.isNewTransaction()会因为它处于一个新事务而返回true,所以它走入另一分支,执行doRollback(),子事务单独回滚,不影响主事务