了解Spring R2DBC的声明式事务实现机制

简介: # Spring非反应式事务实现原理 Spring基于注解和AOP的声明式事务(@Transactional)已经是业务开发的常用工具,默认是采用同步的方式基于ThreadLocal(保存连接信息和会话信息等)实现,在具体数据库操作时就使用同一个数据库连接,并手动提交事务,保证数据正确性。 # 基于反应式的Spring事务有何不同 Spring的反应式实现是基于Reactor框架,该框架

Spring非反应式事务实现原理

Spring基于注解和AOP的声明式事务(@Transactional)已经是业务开发的常用工具,默认是采用同步的方式基于ThreadLocal(保存连接信息和会话信息等)实现,在具体数据库操作时就使用同一个数据库连接,并手动提交事务,保证数据正确性。

基于反应式的Spring事务有何不同

Spring的反应式实现是基于Reactor框架,该框架对异步编程做了高度的抽象化,主动的线程切换只能通过publishOn/subscribeOn跟换线程池,导致在同步场景表现出色的ThreadLocal无法满足全异步化的事务信息存储需求。Reactor 3提供了一种叫做Context的数据结构,用来替代Threadlocal。

Context的传播机制

整体上Context类非常类似一个不可变的Map\<Object, Object>,采用CopyOnWrite策略,绑定在每一个订阅者上。但是,context传播具体是怎么实现的呢?有一个简单的例子:

Flux.just(1, 2, 3)
    .flatMap(x -> Mono.subscriberContext()
        .map(context -> String.format("%s%d", context.get("msg"), x))
    ).subscriberContext(context -> context.put("msg", "no."))
    .subscribe(System.out::println);

从代码可以看到是通过subscriberContext方法直接put数据,但是这个Context对象是什么时候创建的呢?查看subscriberContext方法的源码发现方法会创建一个FluxContextStart对象,该对象是InternalFluxOperator的子类,实现了subscribeOrReturn(被订阅时调用),在其中将上下文的操作应用到订阅者已有的上下文,而大多数订阅者初始上下文都是Context.empty()。

final class FluxContextStart<T> extends InternalFluxOperator<T, T> implements Fuseable {
....
    @Override
    public CoreSubscriber<? super T> subscribeOrReturn(CoreSubscriber<? super T> actual) {
        Context c = doOnContext.apply(actual.currentContext());
        return new ContextStartSubscriber<>(actual, c);
    }
....
}

从FluxContextStart的实现可见,Context是由CoreSubscriber的实例所持有,因此Context的传播实际是订阅者实例的传播。而为了避免因为不同操作导致的并发问题,对订阅者的操作都是采用装饰者模式包装一个新的实例,类似Spark RDD的形式。

public final void subscribe(Subscriber<? super T> actual) {
    CorePublisher publisher = Operators.onLastAssembly(this);
    CoreSubscriber subscriber = Operators.toCoreSubscriber(actual);

    try {
        ...
        publisher.subscribe(subscriber);
    }
    catch (Throwable e) {
        Operators.reportThrowInSubscribe(subscriber, e);
        return;
    }
}

基于Flux的subscribe方法可见,每次订阅时都会向上游发布者传递订阅者实例,因此Context是自底向上传播。

Spring R2DBC的事务实现

基于对常规声明式事务的认识,找到TransactionAspectSupport#invokeWithinTransaction方法,这个方法定义了声明的事务具体要路由到哪个事务管理器执行。自Spring 5.2 M2之后,Spring开始支持反应式事务,在invokeWithinTransaction方法内可以看到如下代码:

if (this.reactiveAdapterRegistry != null && tm instanceof ReactiveTransactionManager) {
    // 从缓存中获取已经加载的反应式事务管理器
    ReactiveTransactionSupport txSupport = this.transactionSupportCache.computeIfAbsent(method, key -> {
        if (KotlinDetector.isKotlinType(method.getDeclaringClass()) && KotlinDelegate.isSuspend(method)) {
            throw new TransactionUsageException(
                    "Unsupported annotated transaction on suspending function detected: " + method +
                    ". Use TransactionalOperator.transactional extensions instead.");
        }
        // 根据返回值类型获取适配器
        ReactiveAdapter adapter = this.reactiveAdapterRegistry.getAdapter(method.getReturnType());
        if (adapter == null) {
            throw new IllegalStateException("Cannot apply reactive transaction to non-reactive return type: " +
                    method.getReturnType());
        }
        return new ReactiveTransactionSupport(adapter);
    });
    // 执行事务
    return txSupport.invokeWithinTransaction(
            method, targetClass, invocation, txAttr, (ReactiveTransactionManager) tm);
}

再继续跟进到反应式的txSupport.invokeWithinTransaction里面,会将相关的事务管理器和事务设置等信息放入上文说到的Context中。具体的对于R2DBC的数据库反应式事务而言,其主要调用的是TransactionalOperatorImpl#transactional方法:

@Override
public <T> Mono<T> transactional(Mono<T> mono) {
    return TransactionContextManager.currentContext().flatMap(context -> {
        Mono<ReactiveTransaction> status = this.transactionManager.getReactiveTransaction(this.transactionDefinition);
        // This is an around advice: Invoke the next interceptor in the chain.
        // This will normally result in a target object being invoked.
        // Need re-wrapping of ReactiveTransaction until we get hold of the exception
        // through usingWhen.
        return status.flatMap(it -> Mono.usingWhen(Mono.just(it), ignore -> mono,
                this.transactionManager::commit, (res, err) -> Mono.empty(), this.transactionManager::commit)
                .onErrorResume(ex -> rollbackOnException(it, ex).then(Mono.error(ex))));
    })
    .subscriberContext(TransactionContextManager.getOrCreateContext())
    .subscriberContext(TransactionContextManager.getOrCreateContextHolder());
}    

从代码可以看见,这里也是首先从上下文获取事务信息,保证整个反应式处理的各个操作符都会用到同样的数据库连接,并最终实现声明式事务功能。

总结

Spring实现反应式事务本质上基于Reactor的Context传播机制,结合原有事务机制改造出来的,所以总结下来就两个核心点:1、Reactor的Context是一种类似不可变Map,绑定在每个订阅者自底向上传播;2、Spring反应式事务通过Reactor的Context在不同线程池共享。

参考资料

相关文章
|
3月前
|
SQL Java 关系型数据库
Spring事务传播机制:7种姿势教你玩转"事务接力赛"
事务传播机制是Spring框架中用于管理事务行为的重要概念,它决定了在方法调用时事务如何传递与执行。通过7种传播行为,开发者可以灵活控制事务边界,适应不同业务场景。例如:REQUIRED默认加入或新建事务,REQUIRES_NEW独立开启新事务,NESTED支持嵌套回滚等。合理使用传播机制不仅能保障数据一致性,还能提升系统性能与健壮性。掌握这“七种人格”,才能在复杂业务中游刃有余。
|
6月前
|
人工智能 JSON 安全
Spring Boot实现无感刷新Token机制
本文深入解析在Spring Boot项目中实现JWT无感刷新Token的机制,涵盖双Token策略、Refresh Token安全性及具体示例代码,帮助开发者提升用户体验与系统安全性。
673 4
|
9月前
|
Java Spring
Spring中事务失效的场景
因为Spring事务是基于代理来实现的,所以某个加了@Transactional的⽅法只有是被代理对象调⽤时, 那么这个注解才会⽣效 , 如果使用的是被代理对象调用, 那么@Transactional会失效 同时如果某个⽅法是private的,那么@Transactional也会失效,因为底层cglib是基于⽗⼦类来实现 的,⼦类是不能重载⽗类的private⽅法的,所以⽆法很好的利⽤代理,也会导致@Transactianal失效 如果在业务中对异常进行了捕获处理 , 出现异常后Spring框架无法感知到异常, @Transactional也会失效
|
4月前
|
Java 关系型数据库 数据库
深度剖析【Spring】事务:万字详解,彻底掌握传播机制与事务原理
在Java开发中,Spring框架通过事务管理机制,帮我们轻松实现了这种“承诺”。它不仅封装了底层复杂的事务控制逻辑(比如手动开启、提交、回滚事务),还提供了灵活的配置方式,让开发者能专注于业务逻辑,而不用纠结于事务细节。
|
9月前
|
Java 关系型数据库 数据库
微服务——SpringBoot使用归纳——Spring Boot事务配置管理——常见问题总结
本文总结了Spring Boot中使用事务的常见问题,虽然通过`@Transactional`注解可以轻松实现事务管理,但在实际项目中仍有许多潜在坑点。文章详细分析了三个典型问题:1) 异常未被捕获导致事务未回滚,需明确指定`rollbackFor`属性;2) 异常被try-catch“吃掉”,应避免在事务方法中直接处理异常;3) 事务范围与锁范围不一致引发并发问题,建议调整锁策略以覆盖事务范围。这些问题看似简单,但一旦发生,排查难度较大,因此开发时需格外留意。最后,文章提供了课程源代码下载地址,供读者实践参考。
246 0
|
9月前
|
Java 关系型数据库 数据库
微服务——SpringBoot使用归纳——Spring Boot事务配置管理——Spring Boot 事务配置
本文介绍了 Spring Boot 中的事务配置与使用方法。首先需要导入 MySQL 依赖,Spring Boot 会自动注入 `DataSourceTransactionManager`,无需额外配置即可通过 `@Transactional` 注解实现事务管理。接着通过创建一个用户插入功能的示例,展示了如何在 Service 层手动抛出异常以测试事务回滚机制。测试结果表明,数据库中未新增记录,证明事务已成功回滚。此过程简单高效,适合日常开发需求。
1206 0
|
9月前
|
Java 数据库 微服务
微服务——SpringBoot使用归纳——Spring Boot事务配置管理——事务相关
本文介绍Spring Boot事务配置管理,阐述事务在企业应用开发中的重要性。事务确保数据操作可靠,任一异常均可回滚至初始状态,如转账、购票等场景需全流程执行成功才算完成。同时,事务管理在Spring Boot的service层广泛应用,但根据实际需求也可能存在无需事务的情况,例如独立数据插入操作。
255 0
|
5月前
|
JSON 前端开发 Java
Spring MVC 核心组件与请求处理机制详解
本文解析了 Spring MVC 的核心组件及请求流程,核心组件包括 DispatcherServlet(中央调度)、HandlerMapping(URL 匹配处理器)、HandlerAdapter(执行处理器)、Handler(业务方法)、ViewResolver(视图解析),其中仅 Handler 需开发者实现。 详细描述了请求执行的 7 步流程:请求到达 DispatcherServlet 后,经映射器、适配器找到并执行处理器,再通过视图解析器渲染视图(前后端分离下视图解析可省略)。 介绍了拦截器的使用(实现 HandlerInterceptor 接口 + 配置类)及与过滤器的区别
466 0
|
7月前
|
人工智能 Java 数据库连接
Spring事务失效场景
本文深入探讨了Spring框架中事务管理可能失效的几种常见场景及解决方案,包括事务方法访问级别不当、方法内部自调用、错误的异常处理、事务管理器或数据源配置错误、数据库不支持事务以及不合理的事务传播行为或隔离级别。通过合理配置和正确使用`@Transactional`注解,开发者可以有效避免这些问题,确保应用的数据一致性和完整性。
431 10
|
6月前
|
Java 关系型数据库 MySQL
【Spring】【事务】初学者直呼学会了的Spring事务入门
本文深入解析了Spring事务的核心概念与使用方法。Spring事务是一种数据库事务管理机制,通过确保操作的原子性、一致性、隔离性和持久性(ACID),维护数据完整性。文章详细讲解了声明式事务(@Transactional注解)和编程式事务(TransactionTemplate、PlatformTransactionManager)的区别与用法,并探讨了事务传播行为(如REQUIRED、REQUIRES_NEW等)及隔离级别(如READ_COMMITTED、REPEATABLE_READ)。
498 1

热门文章

最新文章