【微服务39】分布式事务Seata源码解析七:图解Seata事务执行流程之开启全局事务

本文涉及的产品
云原生网关 MSE Higress,422元/月
服务治理 MSE Sentinel/OpenSergo,Agent数量 不受限
注册配置 MSE Nacos/ZooKeeper,118元/月
简介: 【微服务39】分布式事务Seata源码解析七:图解Seata事务执行流程之开启全局事务

@[TOC]

一、前言

至此,seata系列的内容包括:

  1. can not get cluster name in registry config ‘service.vgroupMapping.xx‘, please make sure registry问题解决
  2. Seata Failed to get available servers: endpoint format should like ip:port 报错原因/解决方案汇总版(看完本文必解决问题)
  3. Seata json decode exception, Cannot construct instance of java.time.LocalDateTime报错原因/解决方案最全汇总版
  4. 【微服务 31】超细的Spring Cloud 整合Seata实现分布式事务(排坑版)
  5. 【微服务 32】Spring Cloud整合Seata、Nacos实现分布式事务案例(巨细排坑版)【云原生】
  6. 【微服务33】分布式事务Seata源码解析一:在IDEA中启动Seata Server
  7. 【微服务34】分布式事务Seata源码解析二:Seata Server启动时都做了什么
  8. 【微服务35】分布式事务Seata源码解析三:从Spring Boot特性来看Seata Client 启动时都做了什么
  9. 【微服务36】分布式事务Seata源码解析四:图解Seata Client 如何与Seata Server建立连接、通信
  10. 【微服务37】分布式事务Seata源码解析五:@GlobalTransactional如何开启全局事务
  11. 【微服务38】分布式事务Seata源码解析六:全局/分支事务分布式ID如何生成?序列号超了怎么办?时钟回拨问题如何处理?

本文正式进入Seata最核心的全局事务执行流程。

二、全局事务执行的入口

【微服务37】分布式事务Seata源码解析五:@GlobalTransactional如何开启全局事务一文,我们知道了所谓的@GlobalTransactional注解开启全局事务,其实就是给类 或 类的方法上标注了@GlobalTransactional注解的类创建动态代理对象。但是动态代理对象是针对类的;

1、拦截器GlobalTransactionalInterceptor

当一个类中有多个方法并且类没有被@GlobalTransactional注解标注,但只有一个方法被@GlobalTransactional注解标注时,这里针对整个类生成了动态代理对象,当调用Bean时,拦截器GlobalTransactionalInterceptor会做进一步处理,保证只有加了@GlobalTransactional注解的方法才会开启全局事务。

GlobalTransactionalInterceptor类的继承图:

在这里插入图片描述

GlobalTransactionalInterceptor实现了MethodInterceptor接口,所以当每次执行添加了 GlobalTransactionalInterceptor拦截器的Bean的方法时,都会进入到GlobalTransactionalInterceptor类覆写MethodInterceptor接口的invoke()方法

@Override
public Object invoke(final MethodInvocation methodInvocation) throws Throwable {
    // method invocation是一次方法调用,一定是针对某个对象的方法调用;
    // methodInvocation.getThis()就是拿到当前方法所属的对象;
    // AopUtils.getTargetClass()获取到当前实例对象所对应的Class
    Class<?> targetClass =
            methodInvocation.getThis() != null ? AopUtils.getTargetClass(methodInvocation.getThis()) : null;

    // 通过反射获取到被调用目标Class的method方法
    Method specificMethod = ClassUtils.getMostSpecificMethod(methodInvocation.getMethod(), targetClass);

    // 如果目标method不为空,并且方法的DeclaringClass不是Object
    if (specificMethod != null && !specificMethod.getDeclaringClass().equals(Object.class)) {
        // 通过BridgeMethodResolver寻找method的桥接方法
        final Method method = BridgeMethodResolver.findBridgedMethod(specificMethod);
        // 获取目标方法的@GlobalTransactional注解
        final GlobalTransactional globalTransactionalAnnotation =
                getAnnotation(method, targetClass, GlobalTransactional.class);
        // 如果目标方法被@GlobalLock注解标注,获取到@GlobalLock注解内容
        final GlobalLock globalLockAnnotation = getAnnotation(method, targetClass, GlobalLock.class);
        // 如果禁用了全局事务 或 开启了事务降级检查并且降级检查次数大于等于降级检查允许的次数
        // 则localDisable等价于全局事务被禁用了
        boolean localDisable = disable || (degradeCheck && degradeNum >= degradeCheckAllowTimes);

        // 如果全局事务没有被禁用
        if (!localDisable) {
            // 全局事务注解不为空 或者 AOP切面全局事务核心配置不为空
            if (globalTransactionalAnnotation != null || this.aspectTransactional != null) {
                AspectTransactional transactional;
                if (globalTransactionalAnnotation != null) {
                    // 构建一个AOP切面全局事务核心配置,配置的数据从全局事务注解中取
                    transactional = new AspectTransactional(globalTransactionalAnnotation.timeoutMills(),
                            globalTransactionalAnnotation.name(), globalTransactionalAnnotation.rollbackFor(),
                            globalTransactionalAnnotation.rollbackForClassName(),
                            globalTransactionalAnnotation.noRollbackFor(),
                            globalTransactionalAnnotation.noRollbackForClassName(),
                            globalTransactionalAnnotation.propagation(),
                            globalTransactionalAnnotation.lockRetryInterval(),
                            globalTransactionalAnnotation.lockRetryTimes());
                } else {
                    transactional = this.aspectTransactional;
                }
                // 真正处理全局事务的入口
                return handleGlobalTransaction(methodInvocation, transactional);
            } else if (globalLockAnnotation != null) {
                // 获取事务锁
                return handleGlobalLock(methodInvocation, globalLockAnnotation);
            }
        }
    }
    // 直接运行目标方法
    return methodInvocation.proceed();
}

invoke()方法解析

1)方法入参--MethodInvocation

invoke()方法的入参为MethodInvocation,MethodInvocation是一次方法调用,并且是针对某个对象的方法调用;

  • methodInvocation.getThis()会拿到当前方法所属的对象;

在这里插入图片描述

在通过methodInvocation.getThis()会拿到当前方法所属的对象时,如果获取到的是null,则使用AopUtils.getTargetClass()获取到当前实例对象所对应的Class(如果被AOP代理,则是代理类,否则是普通类)。

2)判断目标方法是否需要开启全局事务

直接通过反射拿到目标Class的method方法;如果method不为空,并且method所属的类不是Object类;再判断如果method直接或间接被GlobalTransactional注解标注,并且没有禁用全局事务,则再进一步判断全局事务是否被禁用,如果没有被禁用则执行全局事务。

在这里插入图片描述

3)开始处理全局事务

handleGlobalTransaction()方法中真正开始进行全局事务的处理。方法具体内容见<三、全局事务执行>

2、不用开启全局事务的情况

1)全局事务被禁用

在判断完method直接或间接被@GlobalTransactional标注之后,会判断全局事务是否被禁用,如果被禁用则至今运行目标方法。
在这里插入图片描述
禁用全局事务有两种方式:

1> 显示的设置disable属性

  • 配置service.disableGlobalTransaction,默认为false,表示不禁用全局事务;

在这里插入图片描述

2> 开启了事务降级检查,并且降级检查次数大于等于降级检查允许的次数

  • 配置client.tm.degradeCheck,默认为false,表示不开启事务降级检查;
  • 配置client.tm.degradeCheckAllowTimes,只有当开启事务降级检查,这个配置才有意义;

在这里插入图片描述

2)某一个类被标注的注解,但Object超类下的所有方法仍都不会开启全局事务

在GlobalTransactionalInterceptor#invoke()方法中会判断如果目标类的方法是Object类下的方法,则不会执行全局事务;

在这里插入图片描述

3)某一个方法标注了事务注解,其余方法没标注,并且类没有被标注,其余方法都不会开启全局事务

假如我们调用TradeService类中没有标注@GlobalTransactional注解的test()方法(且 TradeService类也没有标注@GlobalTransaction注解);
在这里插入图片描述

invoke()方法中会再次判断 当前调用的bean的方法 或 方法所处的类上是否标注了@GlobalTransactional注解,如果没有标注,则执行运行目标方法;否则才会以全局事务的方式执行方法。

三、全局事务执行

在上面我们聊了GlobalTransactionalInterceptor#handleGlobalTransaction()方法会进行全局事务的处理;

在这里插入图片描述
全局事务的执行会交给全局事务执行业务逻辑的模板TransactionalTemplate,并将目标方法封装到TransactionalExecutor中作为全局事务中执行业务逻辑的回调。

全局事务执行模板TransactionalTemplate

全局事务的整体执行流程体现在TransactionalTemplate#execute()方法中:

在这里插入图片描述

具体代码 和 注释:

public Object execute(TransactionalExecutor business) throws Throwable {
    // 1. Get transactionInfo
    TransactionInfo txInfo = business.getTransactionInfo();
    if (txInfo == null) {
        throw new ShouldNeverHappenException("transactionInfo does not exist");
    }
    // 1.1 Get current transaction, if not null, the tx role is 'GlobalTransactionRole.Participant'.
    // 获取当前事务,根据ThreadLocal,获取当前线程本地变量副本中的xid,进而判断是否存在一个全局事务
    // 刚开始一个全局事务时,肯定是没有全局事务的
    GlobalTransaction tx = GlobalTransactionContext.getCurrent();

    // 1.2 Handle the transaction propagation.
    // 从全局事务的配置里 获取事务传播级别,默认是REQUIRED(如果存在则加入,否则开启一个新的)
    Propagation propagation = txInfo.getPropagation();
    SuspendedResourcesHolder suspendedResourcesHolder = null;
    try {
        // 根据事务的隔离级别做不同的处理
        switch (propagation) {
            case NOT_SUPPORTED:
                // If transaction is existing, suspend it.
                if (existingTransaction(tx)) {
                    // 事务存在,则挂起事务(默认将xid从RootContext中移除)
                    suspendedResourcesHolder = tx.suspend();
                }
                // Execute without transaction and return.
                return business.execute();
            case REQUIRES_NEW:
                // If transaction is existing, suspend it, and then begin new transaction.
                if (existingTransaction(tx)) {
                    suspendedResourcesHolder = tx.suspend();
                    tx = GlobalTransactionContext.createNew();
                }
                // Continue and execute with new transaction
                break;
            case SUPPORTS:
                // If transaction is not existing, execute without transaction.
                if (notExistingTransaction(tx)) {
                    return business.execute();
                }
                // Continue and execute with new transaction
                break;
            case REQUIRED:
                // If current transaction is existing, execute with current transaction,
                // else continue and execute with new transaction.
                break;
            case NEVER:
                // If transaction is existing, throw exception.
                if (existingTransaction(tx)) {
                    throw new TransactionException(
                        String.format("Existing transaction found for transaction marked with propagation 'never', xid = %s"
                                , tx.getXid()));
                } else {
                    // Execute without transaction and return.
                    return business.execute();
                }
            case MANDATORY:
                // If transaction is not existing, throw exception.
                if (notExistingTransaction(tx)) {
                    throw new TransactionException("No existing transaction found for transaction marked with propagation 'mandatory'");
                }
                // Continue and execute with current transaction.
                break;
            default:
                throw new TransactionException("Not Supported Propagation:" + propagation);
        }

        // 1.3 If null, create new transaction with role 'GlobalTransactionRole.Launcher'.
        if (tx == null) {
            // 创建全局事务(角色为事务发起者),并关联全局事务管理器
            tx = GlobalTransactionContext.createNew();
        }

        // set current tx config to holder
        GlobalLockConfig previousConfig = replaceGlobalLockConfig(txInfo);

        try {
            // 2. If the tx role is 'GlobalTransactionRole.Launcher', send the request of beginTransaction to TC,
            //    else do nothing. Of course, the hooks will still be triggered.
            // 开启全局事务,如果事务角色是'GlobalTransactionRole.Launcher',发送开始事务请求到seata-server(TC)
            beginTransaction(txInfo, tx);

            Object rs;
            try {
                // Do Your Business
                // 执行业务方法,把全局事务ID通过 MVC拦截器 / dubbo filter传递到后面的分支事务;
                // 每个分支事务都会去运行
                rs = business.execute();
            } catch (Throwable ex) {
                // 3. The needed business exception to rollback.
                // 如果全局事务执行发生了异常,则回滚;
                completeTransactionAfterThrowing(txInfo, tx, ex);
                throw ex;
            }

            // 4. everything is fine, commit.
            // 全局事务和分支事务运行无误,提交事务;
            commitTransaction(tx);

            return rs;
        } finally {
            //5. clear
            // 全局事务完成之后做一些清理工作
            resumeGlobalLockConfig(previousConfig);
            triggerAfterCompletion();
            cleanUp();
        }
    } finally {
        // If the transaction is suspended, resume it.
        if (suspendedResourcesHolder != null) {
            // 如果有挂起的全局事务,则恢复全局事务
            tx.resume(suspendedResourcesHolder);
        }
    }
}

整个全局事务的执行由八步组成:

  1. 从线程本地变量副本中获取到xid,进而判断是否存在一个全局事务;
  2. 根据事务的隔离级别,对已存在的全局事务做不同的处理,包括:挂起事务、新建一个事务.....
    最后如果事务为空,则创建一个新的全局事务(刚开始一个新的全局事务时,会走进这个逻辑)
  3. 开启一个全局事务;
  4. 执行业务方法,把全局事务ID通过 MVC拦截器 / dubbo filter传递到后面的分支事务;
  5. 如果全局事务执行发生了异常,则通知TC回滚全局事务和所有的分支事务;
  6. 如果全局事务和分支事务运行无误,提交事务;
  7. 无论全局事务是否运行成功,都需要清理占用的全局锁资源;
  8. 最后,如果存在被挂起的全局事务,则恢复全局事务。

下面我们针对每一步具体来看;

1、第一步:判断是否存在一个全局事务

因为执行分支事务时,分支事务的业务方法也有可能被@GlobalTransactional注解直接或间接修饰,进而导致分支事务和全局事务的执行入口是一样的;所以需要先判断是否存在一个全局事务(而当存在全局事务时,分支事务应该如何执行,我们下一篇文章讨论)。

在这里插入图片描述

刚开始执行一个全局事务时,当前线程本地变量副本中的xid为null,即不存在一个全局事务。

2、第二步:根据事务的隔离级别做不同的处理

在这里插入图片描述
默认事务的隔离级别为REQUIRED:即:如果当前存在一个事务,则加入事务;否者新建一个事务。

由于刚开始执行一个全局事务时,不存在事务,所以默认会新建一个全局事务。

GlobalTransactionContext.createNew()负责新建一个全局事务:
在这里插入图片描述
在这里插入图片描述

6种事务隔离级别的具体逻辑

1> NOT_SUPPORTED

  • 不支持事务: 如果事务存在,则挂起事务(默认将xid从RootContext中移除,记录下挂起的事务资源)
    在这里插入图片描述
    在这里插入图片描述

2> REQUIRES_NEW

  • 新建一个事务:如果事务存在,则挂起事务,再新建一个事务。
    在这里插入图片描述

3> SUPPORTS

  • 支持事务:如果当前存在事务,则加入事务,不存在事务,则以非事务方式执行。
    在这里插入图片描述

4> REQUIRED(默认事务模式)

  • 必须有事务:如果当前存在一个事务,则加入事务;否者新建一个事务。
    在这里插入图片描述

5> NEVER

  • 不支持事务:如果当前存在事务,则报错;否则以非事务方式执行。
    在这里插入图片描述

6> MANDATORY

  • 强制使用事务:如果当前不存在事务,则报错;否则加入事务执行。
    在这里插入图片描述

3、第三步:开启全局事务

在这里插入图片描述

在这里插入图片描述

在开启全局事务前后会有钩子函数,默认开启全局事务前后的两个钩子中没有任何实现,如果有需要可以自己定制。这个业务执行前后的钩子函数在Spring体系中随处可见。

整个开启全局事务的逻辑如下:

  1. 开启全局事务时,会首先判断事务的角色是否Launcher,即全局事务;刚开始执行一个全局事务时,创建出来的DefaultGlobalTransaction,其role就是Launcher,也就是说事务角色为全局事务。

    • 如果事务的角色不是全局事务,则会断言xid不许为null,否者抛出异常IllegalStateException
  2. 当事务为全局事务时,首选断言xid为null,否者抛出异常IllegalStateException因为超时重试机制的缘故,会再次判断线程本地上下文中的xid是否为null,如果不为null,同样抛出异常IllegalStateException
  3. 请求TC(seata-server)开启全局事务,并获取到全局事务xid。
  4. 请求TC开启全局事务之后,设置事务的状态为开启,并将全局事务xid绑定到线程本地变量副本上。

下面着重看一下TM如何请求TC开启全局事务并获取到xid?

TM如何请求TC开启全局事务

在这里插入图片描述
全局事务发起者TM,会通过netty和TC进行网络通信;其中包括对seata-server集群的负载均衡,在获取到相应seata-server实例对应的channel之后,会进步处理请求的发送和相应结果的接收。

在这里插入图片描述

在写Channel之前,channelWritableCheck()方法会检查channel是否可写。

TM / RM 和TC的RPC通信均是异步进行的:

  • TM / RM 发送请求时,将封装了CompletableFuture的MessageFuture放到futures(ConcurrentHashMap<Integer, MessageFuture>)中;
  • TC处理完请求之后,会通过netty框架发送响应到TM / RM 的AbstractNettyRemoting中,其再将futures中的MessageFuture完成,发送请求的代码段中messageFuture.get()会获取到返回值,停止阻塞。

TM发送请求之后,TC如何接收请求,如何处理请求?

TC接收到TM的请求如何开启全局事务

【微服务36】分布式事务Seata源码解析四:图解Seata Client 如何与Seata Server建立连接、通信一文中,我们聊了Seata Client 如何和Seata Server建立连接、通信;

又在【微服务34】分布式事务Seata源码解析二:Seata Server启动时都做了什么一文中,我们知道了TC(Seata Server)启动之后,AbstractNettyRemotingServer的内部类ServerHandler负责接收并处理请求。

在这里插入图片描述
ServerHandler类上有个@ChannelHandler.Sharable注解,其表示所有的连接都会共用这一个ChannelHandler;所以当消息处理很慢时,会降低并发。

processMessage(ctx, (RpcMessage) msg)方法中会根据消息类型获取到 请求处理组件(消息的处理过程是典型的策略模式),如果消息对应的处理器设置了线程池,则放到线程池中执行;如果对应的处理器没有设置线程池,则直接执行;如果某条消息处理特别慢,会严重影响并发;所以在seata-server中大部分处理器都有对应的线程池。

/**
 * Rpc message processing.
 *
 * @param ctx        Channel handler context.
 * @param rpcMessage rpc message.
 * @throws Exception throws exception process message error.
 * @since 1.3.0
 */
protected void processMessage(ChannelHandlerContext ctx, RpcMessage rpcMessage) throws Exception {
    if (LOGGER.isDebugEnabled()) {
        LOGGER.debug(String.format("%s msgId:%s, body:%s", this, rpcMessage.getId(), rpcMessage.getBody()));
    }
    Object body = rpcMessage.getBody();
    if (body instanceof MessageTypeAware) {
        MessageTypeAware messageTypeAware = (MessageTypeAware) body;
        // 根据消息的类型获取到请求处理组件和请求处理线程池组成的Pair
        final Pair<RemotingProcessor, ExecutorService> pair = this.processorTable.get((int) messageTypeAware.getTypeCode());
        if (pair != null) {
            // 如果消息对应的处理器设置了线程池,则放到线程池中执行
            if (pair.getSecond() != null) {
                try {
                    pair.getSecond().execute(() -> {
                        try {
                            pair.getFirst().process(ctx, rpcMessage);
                        } catch (Throwable th) {
                            LOGGER.error(FrameworkErrorCode.NetDispatch.getErrCode(), th.getMessage(), th);
                        } finally {
                            MDC.clear();
                        }
                    });
                } catch (RejectedExecutionException e) {
                    // 线程池拒绝策略之一,抛出异常:RejectedExecutionException
                    LOGGER.error(FrameworkErrorCode.ThreadPoolFull.getErrCode(),
                        "thread pool is full, current max pool size is " + messageExecutor.getActiveCount());
                    if (allowDumpStack) {
                        String name = ManagementFactory.getRuntimeMXBean().getName();
                        String pid = name.split("@")[0];
                        long idx = System.currentTimeMillis();
                        try {
                            String jstackFile = idx + ".log";
                            LOGGER.info("jstack command will dump to " + jstackFile);
                            Runtime.getRuntime().exec(String.format("jstack %s > %s", pid, jstackFile));
                        } catch (IOException exx) {
                            LOGGER.error(exx.getMessage());
                        }
                        allowDumpStack = false;
                    }
                }
            } else {
                // 对应的处理器没有设置线程池,则直接执行;如果某条消息处理特别慢,会严重影响并发;
                try {
                    pair.getFirst().process(ctx, rpcMessage);
                } catch (Throwable th) {
                    LOGGER.error(FrameworkErrorCode.NetDispatch.getErrCode(), th.getMessage(), th);
                }
            }
        } else {
            LOGGER.error("This message type [{}] has no processor.", messageTypeAware.getTypeCode());
        }
    } else {
        LOGGER.error("This rpcMessage body[{}] is not MessageTypeAware type.", body);
    }
}

Seata Serer接收到请求的执行链路为:
在这里插入图片描述

又由于TM发送开启事务请求时的RPCMessage的body为GlobalBeginRequest:
在这里插入图片描述

所以进入到:
在这里插入图片描述

又由于在DefaultCoordinator#onRequest()方法中,将DefaultCoordinator自身绑定到了AbstractTransactionRequestToTChandler属性中:

在这里插入图片描述

所以进入到:
在这里插入图片描述

AbstractExceptionHandler#exceptionHandleTemplate()方法只是运行方法的入参Callback,即接着会进入到:

在这里插入图片描述

DefaultCore执行开启全局事务的业务逻辑

DefaultCore#begin()方法负责开启全局事务的业务逻辑,方法的入参包括:开启全局事务的应用程序名称、事务服务分组名称、事务名称(开启全局事务的方法名以及方法的入参类型)、事务超时时间。

@Override
public String begin(String applicationId, String transactionServiceGroup, String name, int timeout)
    throws TransactionException {
    // 创建一个全局事务会话
    GlobalSession session = GlobalSession.createGlobalSession(applicationId, transactionServiceGroup, name,
        timeout);
    // 通过MDC把XID放入线程本地变量ThreadLocal中(MDC是Slf4j提供的工具)
    MDC.put(RootContext.MDC_KEY_XID, session.getXid());
    // 添加对全局事务会话生命周期的监听
    session.addSessionLifecycleListener(SessionHolder.getRootSessionManager());

    // 开启全局事务会话
    session.begin();

    // transaction start event
    // 发布全局事务开启事件 做指标监控
    MetricsPublisher.postSessionDoingEvent(session, false);

    // 返回全局事务会话的xid
    return session.getXid();
}

seata-server开启全局事务的流程:

  • 创建一个全局会话GlobalSession;
  • 通过MDC把XID放入线程本地变量ThreadLocal中,并添加对全局事务会话生命周期的监听;
  • 开启全局事务会话;
  • 发布全局事务开启事件 做指标监控;
  • 返回全局事务会话的xid。

1> 第一步:创建全局会话GlobalSession

在这里插入图片描述

创建全局会话的最主要的点是根据雪花算法生成全局事务ID(transactionId)、XID(seata server的IP、Port和transactionId使用:拼接到一起)。

Seata如何使用雪花算法生成全局事务ID的见文章:【微服务38】分布式事务Seata源码解析六:全局/分支事务分布式ID如何生成?

2> 第二步:把XID放入线程本地变量副本,并添加对全局事务会话生命周期的监听

在这里插入图片描述

3> 第三步:开启全局事务会话

在这里插入图片描述
开启全局事务会话的逻辑主要在遍历所有的生成周期监听函数,执行begin事件;

根据我们启动Seata Server时选择的store.mode,会执行不同的SessionLifecycleListener
在这里插入图片描述

博主启动Seata Server时store.mode = db,所以我这里的SessionLifecycleListenerDataBaseSessionManager

在这里插入图片描述

DataBaseSessionManager执行begin事件的链路如下:

在这里插入图片描述
在这里插入图片描述

这里其实就是将全局事务会话信息持久化到DB中:

  • 首先将全局事务会话信息封装到GlobalTransactionDO模型中;
    在这里插入图片描述
  • 然后使用JDBC将全局事务会话信息持久化到表global_table中;
    在这里插入图片描述

所谓的开启全局事务会话,其实就是将全局事务会话信息持久化到Store.mode中。

4> 第四步:发布全局事务开启事件 做指标监控

在这里插入图片描述

这一块对了解seata事务的执行主流程没影响,不需要耗费特别大的精力关注,如果有指标监控的需求再重点看。

5> 返回全局事务会话的xid

在这里插入图片描述

4、第四步 --- 第八步:见下一篇博文

点个关注、订阅订阅专栏,下一篇系列文章更精彩。

执行业务方法(AT模式下)、全局事务回滚、全局事务提交、全局锁资源释放见下一篇博文。

四、总结

本文重点聊了Seata事务执行流程中TM、TC中如何开启全局事务;其中设计几个比较关键的类:

  • TransactionalExecutor --> 全局事务执行组件
  • TransactionalTemplate --> 全局事务生命周期模板管理组件,负责管理事务的生命周期;
  • TransactionManager --> 全局事务管理组件,负责执行事务的业务逻辑;
  • DefaultCore --> Seata Server端事务业务的执行逻辑,封装了AT、TCC、Saga、XA分布式事务模式的具体实现。
相关文章
|
7月前
|
SQL 关系型数据库 数据库
【微服务系列笔记】Seata
Seata是一种开源的分布式事务解决方案,旨在解决分布式事务管理的挑战。它提供了高性能和高可靠性的分布式事务服务,支持XA、TCC、AT等多种事务模式,并提供了全局唯一的事务ID,以确保事务的一致性和隔离性。Seata还提供了分布式事务的协调、事务日志、事务恢复等功能,帮助开发人员简化分布式事务的管理和实现。
122 1
|
6月前
|
Java
解析Java线程池:参数详解与执行流程
解析Java线程池:参数详解与执行流程
62 1
|
4月前
|
存储 SQL 关系型数据库
深入解析MySQL事务机制和锁机制
深入解析MySQL事务机制和锁机制
|
6月前
|
监控 数据管理 Java
智慧城管源码,基于微服务+java+springboot+vue+uniapp开发的城管综合执法系统源码
智慧城管执法系统利用微服务和Java技术提升城市管理水平,涵盖事件处理、投诉、处罚等功能,包含PC和APP源码。系统支持执法APP,便于领导随时随地审批,具备文书模板、地图定位、法规查询等功能。此外,执法办案系统通过监控视频分析事件,实现案件全程闭环管理,包括组织、案件、信用和执法队伍管理,以及法规库等基础支撑。系统旨在优化流程,提高数据管理和效率。
140 3
智慧城管源码,基于微服务+java+springboot+vue+uniapp开发的城管综合执法系统源码
|
6月前
|
Kubernetes 监控 负载均衡
Istio:微服务开发的终极利器,你还在为繁琐的通信和部署流程烦恼吗?
本文介绍了服务网格(Service Mesh)的概念及其在微服务架构中的重要性。微服务强调围绕业务构建团队和去中心化的数据管理,带来更高的灵活性和扩展性。然而,随着服务数量增加,网络通信成为挑战,包括服务发现、路由和安全等问题。 Service Mesh如Istio应运而生,通过边车代理解决服务间通信,提供服务发现、负载均衡、智能路由、安全和监控等功能。它与Kubernetes结合,增强了容器环境的服务管理能力。Istio的bookinfo示例展示了其在多语言微服务中的应用,简化了代码中的服务调用逻辑,使开发更专注于业务本身。
706 3
Istio:微服务开发的终极利器,你还在为繁琐的通信和部署流程烦恼吗?
|
6月前
|
Java 应用服务中间件 API
Tomcat处理一个HTTP请求的执行流程的详细解析
Tomcat处理一个HTTP请求的执行流程的详细解析
177 4
|
6月前
|
JavaScript Java 测试技术
基于springboot+vue.js+uniapp小程序的微服务的车联网位置信息管理附带文章源码部署视频讲解等
基于springboot+vue.js+uniapp小程序的微服务的车联网位置信息管理附带文章源码部署视频讲解等
47 1
|
5月前
|
Java Spring
解析Spring Boot中的事务管理机制
解析Spring Boot中的事务管理机制
|
6月前
|
存储 关系型数据库 MySQL
深入解析MySQL 8:事务数据字典的变革
深入解析MySQL 8:事务数据字典的变革
|
7月前
|
人工智能 监控 安全
Java+Spring Cloud +Vue+UniApp微服务智慧工地云平台源码
视频监控系统、人员实名制与分账制管理系统、车辆管理系统、环境监测系统、大型设备监测(龙门吊、塔吊、升降机、卸料平台等)、用电监测系统、基坑监测系统、AI算法分析(安全帽佩戴、火焰识别、周界报警、人员聚众报警、升降机超载报警)、安全培训、设备监测。
77 4