SpringCloud Alibaba微服务实战三十二 - 集成RocketMQ实现分布式事务

本文涉及的产品
日志服务 SLS,月写入数据量 50GB 1个月
简介: SpringCloud Alibaba微服务实战三十二 - 集成RocketMQ实现分布式事务

前言


分布式事务是在微服务开发中经常会遇到的一个问题,之前的文章中我们已经实现了利用Seata来实现强一致性事务,其实还有一种广为人知的方案就是利用消息队列来实现分布式事务,保证数据的最终一致性,也就是我们常说的柔性事务。


消息队列实现分布式事务原理

首先让我们来看一下基于消息队列实现分布式事务的原理方案。

发送消息的服务有个OUTBOX数据表,在进行INSERT、UPDATE、DELETE 业务操作时也会给OUTBOX数据表INSERT一条消息记录,这样可以保证原子性,因为这是基于本地的ACID事务。

OUTBOX表充当临时消息队列,然后我们在引入一个消息中继(MessageRelay)的服务,由他从OUTBOX表中读取数据并发布消息到消息组件。

消息中继的实现可以很简单,只需要通过定时任务定期从OUTBOX表中拉取最新未发布的数据,获取到数据后将数据发送给消息组件,最后将完成发送的消息从OUTBOX表中删除即可,对于失败的消息可以根据业务规则进行重试。

RocketMQ的事务消息

RocketMQ本身已经支持事务消息,如果你们项目使用了RocketMQ,可以直接借助RocketMQ的事务消息实现分布式事务,我们先看一下RocketMQ事务消息的原理然后再借助RocketMQ来实现分布式事务。

RocketMQ采用了2PC的思想来实现了提交事务消息,同时增加一个补偿逻辑来处理二阶段超时或者失败的消息,如下图所示。

RocketMQ实现事务消息主要分为两个阶段:正常事务的发送及提交、事务信息的补偿流程

整体流程 为:

  • 正常事务发送与提交阶段
    1、生产者发送一个半消息给MQServer(半消息是指消费者暂时不能消费的消息)
    2、服务端响应消息写入结果,半消息发送成功
    3、开始执行本地事务
    4、根据本地事务的执行状态执行Commit或者Rollback操作
  • 事务信息的补偿流程
    1、如果MQServer长时间没收到本地事务的执行状态会向生产者发起一个确认回查的操作请求
    2、生产者收到确认回查请求后,检查本地事务的执行状态
    3、根据检查后的结果执行Commit或者Rollback操作
    补偿阶段主要是用于解决生产者在发送Commit或者Rollback操作时发生超时或失败的情况。

RocketMQ事务流程关键

  1. 事务消息在一阶段对用户不可见
    事务消息相对普通消息最大的特点就是一阶段发送的消息对用户是不可见的,也就是说消费者不能直接消费。这里RocketMQ的实现方法是原消息的主题与消息消费队列,然后把主题改成RMQ_SYS_TRANS_HALF_TOPIC ,这样由于消费者没有订阅这个主题,所以不会被消费。
  2. 如何处理第二阶段的失败消息?
    在本地事务执行完成后会向MQServer发送Commit或Rollback操作,此时如果在发送消息的时候生产者出故障了,那么要保证这条消息最终被消费,MQServer会像服务端发送回查请求,确认本地事务的执行状态。当然了rocketmq并不会无休止的的信息事务状态回查,默认回查15次,如果15次回查还是无法得知事务状态,RocketMQ默认回滚该消息。
  3. 消息状态
    事务消息有三种状态:
    TransactionStatus.CommitTransaction:提交事务消息,消费者可以消费此消息TransactionStatus.RollbackTransaction:回滚事务,它代表该消息将被删除,不允许被消费。
    TransactionStatus.Unknown :中间状态,它代表需要检查消息队列来确定状态。

代码实现


业务需求:用户请求订单微服务 order-service 接口删除订单(退货),删除订单时需要调用 account-service的方法给账户增加余额,一个典型的分布式事务问题。


基础配置

在开始代码之前首先需要搭建好的RocketMQ环境,可参考下面这篇文章:

http://javadaily.cn/articles/2020/04/07/1586248405351.html

  • 在Order-Service和Account-Service中引入Rocket消息组件
<dependency>
    <groupId>org.apache.rocketmq</groupId>
    <artifactId>rocketmq-spring-boot-starter</artifactId>
</dependency>


  • 在配置中心添加RocketMQ的相关配置
rocketmq:
  name-server: xxx.xx.x.xx:9876
  producer:
    group: cloud-group


  • 在OrderService服务中建立一张事务日志表rocketmq_transaction_log(作用稍后说)


发送半消息

Order-Service作为分布式事务开始的入口,在Service层我们给RocketMQ发送一条半消息

  • OrderController入口
/**
 * 根据订单号删除订单
 * @param orderNo 订单编号
 */
@PostMapping("/order/delete")
public ResultData<String> delete(@RequestParam String orderNo){
 log.info("delete order id is {}",orderNo);
 orderService.delete(orderNo);
 return ResultData.success("订单删除成功");
}

直接调用orderService的delete方法

  • OrderServiceImpl业务逻辑
@Override
public void delete(String orderNo) {
 Order order = orderMapper.selectByNo(orderNo);
 //如果订单存在且状态为有效,进行业务处理
 if (order != null && CloudConstant.VALID_STATUS.equals(order.getStatus())) {
  String transactionId = UUID.randomUUID().toString();
  //如果可以删除订单则发送消息给rocketmq,让用户中心消费消息
  rocketMQTemplate.sendMessageInTransaction("add-amount",
    MessageBuilder.withPayload(
      UserAddMoneyDTO.builder()
        .userCode(order.getAccountCode())
        .amount(order.getAmount())
        .build()
    )
    .setHeader(RocketMQHeaders.TRANSACTION_ID, transactionId)
    .setHeader("order_id",order.getId())
    .build()
    ,order
  );
 }
}

首先校验一下订单状态,然后使用rocketMQTemplate.sendMessageInTransaction()发送事务消息。

sendMessageInTransaction方法有三个参数:

  • destination:目的地(主题),这里发送给add-amount 这个topic
  • message:发送给消费者的消息体,需要使用 MessageBuilder.withPayload() 来构建消息
  • arg:参数

注意,这里我们生成了一个transactionId,并放在header中跟消息一起发送(这里实际也可以构造成一个对象,放在arg里进行发送),作用后面再讲!

  • 消息封装实体UserAddMoneyDTO
@Data
@NoArgsConstructor
@AllArgsConstructor
@Builder
public class UserAddMoneyDTO {
    /**
     * 用户编码
     */
    private String userCode;
    /**
     * 金额
     */
    private BigDecimal amount;
}

这个类生产者和消费者都需要用到,所以我直接丢到common包中,大家根据项目实际情况决定放哪。


执行本地事务与回查

MQServer收到半消息后会告诉生产者order-service确认收到半消息,这时候order-service需要执行本地事务,执行完本地事务后再告诉MQServer本地事务的执行状态,确认此消息究竟是Commit还是Rollback。

RocketMQ提供了 RocketMQLocalTransactionListener 接口,本地事务监听器,这个接口类的实现如下:

第一个方法executeLocalTransaction 为执行本地事务;第二个方法checkLocalTransaction 为检查本地事务的执行状态,也就是回查动作。

我们需要实现RocketMQLocalTransactionListener接口,在executeLocalTransaction方法中执行本地事务,在执行checkLocalTransaction回查方法时告诉RocketMQ到底该提交还是回滚。

这里大家思考一个问题,本地事务已经执行完成了,怎么去回查本地事务的执行结果呢?

答案如下:我们可以在执行本地事务的时候同时生成一条事务日志,让本地事务与日志事务在同一个方法中,同时添加@Transactional 注解,保证两个操作事务是一个原子操作。这样如果事务日志表中有这个本地事务的信息,那就代表本地事务执行成功,需要Commit,相反如果没有对应的事务日志,则表示执行失败,需要Rollback

这就是为什么我们上面在OrderService中需要建立一张事务日志表的原因。

  • 实现RocketMQLocalTransactionListener接口,完成事务执行逻辑
/**
 * 监听事务消息
 * @author javadaily
 */
@Slf4j
@RocketMQTransactionListener
@RequiredArgsConstructor(onConstructor = @__(@Autowired))
public class AddUserAmountListener implements RocketMQLocalTransactionListener {
    private final OrderService orderService;
    private final RocketMqTransactionLogMapper rocketMqTransactionLogMapper;
    /**
     * 执行本地事务
     */
    @Override
    public RocketMQLocalTransactionState executeLocalTransaction(Message message, Object arg) {
        log.info("执行本地事务");
        MessageHeaders headers = message.getHeaders();
        //获取事务ID
        String transactionId = (String) headers.get(RocketMQHeaders.TRANSACTION_ID);
        Integer orderId = Integer.valueOf((String)headers.get("order_id"));
        log.info("transactionId is {}, orderId is {}",transactionId,orderId);
        try{
            //执行本地事务,并记录日志
            orderService.changeStatuswithRocketMqLog(orderId, CloudConstant.INVALID_STATUS,transactionId);
            //执行成功,可以提交事务
            return RocketMQLocalTransactionState.COMMIT;
        }catch (Exception e){
            return RocketMQLocalTransactionState.ROLLBACK;
        }
    }
    /**
     * 本地事务的检查,检查本地事务是否成功
     */
    @Override
    public RocketMQLocalTransactionState checkLocalTransaction(Message message) {
        MessageHeaders headers = message.getHeaders();
        //获取事务ID
        String transactionId = (String) headers.get(RocketMQHeaders.TRANSACTION_ID);
        log.info("检查本地事务,事务ID:{}",transactionId);
        //根据事务id从日志表检索
        QueryWrapper<RocketmqTransactionLog> queryWrapper = new QueryWrapper<>();
        queryWrapper.eq("transaction_id",transactionId);
        RocketmqTransactionLog rocketmqTransactionLog = rocketMqTransactionLogMapper.selectOne(queryWrapper);
        if(null != rocketmqTransactionLog){
            return RocketMQLocalTransactionState.COMMIT;
        }
        return RocketMQLocalTransactionState.ROLLBACK;
    }
}


  • 本地事务执行逻辑
@Transactional(rollbackFor = RuntimeException.class)
@Override
public void changeStatuswithRocketMqLog(Integer id,String status,String transactionId){
    orderMapper.changeStatus(id,status);
    rocketMqTransactionLogMapper.insert(
        RocketmqTransactionLog.builder()
        .transactionId(transactionId)
        .log("执行删除订单操作")
        .build()
    );
}

修改订单状态为删除状态,同时往事务日志表中插入一条事务日志,用@Transactional注解保证事务。


Account-Service消费消息

  • 监听消息并处理给用户增加余额逻辑
@Slf4j
@Service
@RocketMQMessageListener(topic = "add-amount",consumerGroup = "cloud-group")
@RequiredArgsConstructor(onConstructor = @__(@Autowired) )
public class AddUserAmountListener implements RocketMQListener<UserAddMoneyDTO> {
    private final AccountMapper accountMapper;
    /**
     * 收到消息的业务逻辑
     */
    @Override
    public void onMessage(UserAddMoneyDTO userAddMoneyDTO) {
        log.info("received message: {}",userAddMoneyDTO);
        accountMapper.increaseAmount(userAddMoneyDTO.getUserCode(),userAddMoneyDTO.getAmount());
        log.info("add money success");
    }
}


测试

  • 测试数据

订单表

用户表

事务日志表

如果事务消息成功消费最终用户表中jianzh5这个用户的amount应该变成300(100+200)

  • 测试准备

我们在执行本地事务成功并需要通知消息队列提交事务处打个断点,然后在执行到此处时手动模拟异常

  • 模拟异常

在准备提交事务时我们通过命令 taskkill /pid 10116 -t -f命令强制杀掉OrderService进程。(先通过jps获取OrderService进程ID)

  • 重启服务器,检查是否会执行回查方法

重启OrderService程序会自动执行回查方法,结合事务日志表判断是否提交事务。

  • 运行后的结果


小结


本篇文章我们介绍了使用消息队列实现柔性事务的方案,重点剖析了RocketMQ事务消息的原理,并通过Demo案例实现了分布式事务(柔性事务)。

最后请大家思考一个问题,这个代码如果需要在生产环境使用还需要做什么改造?欢迎留言评论!

相关实践学习
消息队列RocketMQ版:基础消息收发功能体验
本实验场景介绍消息队列RocketMQ版的基础消息收发功能,涵盖实例创建、Topic、Group资源创建以及消息收发体验等基础功能模块。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
目录
相关文章
|
21天前
|
存储 JavaScript 开发工具
基于HarmonyOS 5.0(NEXT)与SpringCloud架构的跨平台应用开发与服务集成研究【实战】
本次的.HarmonyOS Next ,ArkTS语言,HarmonyOS的元服务和DevEco Studio 开发工具,为开发者提供了构建现代化、轻量化、高性能应用的便捷方式。这些技术和工具将帮助开发者更好地适应未来的智能设备和服务提供方式。
55 8
基于HarmonyOS 5.0(NEXT)与SpringCloud架构的跨平台应用开发与服务集成研究【实战】
|
26天前
|
存储 SpringCloudAlibaba Java
【SpringCloud Alibaba系列】一文全面解析Zookeeper安装、常用命令、JavaAPI操作、Watch事件监听、分布式锁、集群搭建、核心理论
一文全面解析Zookeeper安装、常用命令、JavaAPI操作、Watch事件监听、分布式锁、集群搭建、核心理论。
【SpringCloud Alibaba系列】一文全面解析Zookeeper安装、常用命令、JavaAPI操作、Watch事件监听、分布式锁、集群搭建、核心理论
|
17天前
|
Java 关系型数据库 数据库
微服务SpringCloud分布式事务之Seata
SpringCloud+SpringCloudAlibaba的Seata实现分布式事务,步骤超详细,附带视频教程
41 1
|
5月前
|
资源调度 Java 调度
Spring Cloud Alibaba 集成分布式定时任务调度功能
定时任务在企业应用中至关重要,常用于异步数据处理、自动化运维等场景。在单体应用中,利用Java的`java.util.Timer`或Spring的`@Scheduled`即可轻松实现。然而,进入微服务架构后,任务可能因多节点并发执行而重复。Spring Cloud Alibaba为此发布了Scheduling模块,提供轻量级、高可用的分布式定时任务解决方案,支持防重复执行、分片运行等功能,并可通过`spring-cloud-starter-alibaba-schedulerx`快速集成。用户可选择基于阿里云SchedulerX托管服务或采用本地开源方案(如ShedLock)
166 1
|
1月前
|
消息中间件 SQL 中间件
大厂都在用的分布式事务方案,Seata+RocketMQ带你打破10万QPS瓶颈
分布式事务涉及跨多个数据库或服务的操作,确保数据一致性。本地事务通过数据库直接支持ACID特性,而分布式事务则需解决跨服务协调难、高并发压力及性能与一致性权衡等问题。常见的解决方案包括两阶段提交(2PC)、Seata提供的AT和TCC模式、以及基于消息队列的最终一致性方案。这些方法各有优劣,适用于不同业务场景,选择合适的方案需综合考虑业务需求、系统规模和技术团队能力。
258 7
|
3月前
|
消息中间件 网络协议 C#
C#使用Socket实现分布式事件总线,不依赖第三方MQ
`CodeWF.EventBus.Socket` 是一个轻量级的、基于Socket的分布式事件总线系统,旨在简化分布式架构中的事件通信。它允许进程之间通过发布/订阅模式进行通信,无需依赖外部消息队列服务。
C#使用Socket实现分布式事件总线,不依赖第三方MQ
|
3月前
|
人工智能 文字识别 Java
SpringCloud+Python 混合微服务,如何打造AI分布式业务应用的技术底层?
尼恩,一位拥有20年架构经验的老架构师,通过其深厚的架构功力,成功指导了一位9年经验的网易工程师转型为大模型架构师,薪资逆涨50%,年薪近80W。尼恩的指导不仅帮助这位工程师在一年内成为大模型架构师,还让他管理起了10人团队,产品成功应用于多家大中型企业。尼恩因此决定编写《LLM大模型学习圣经》系列,帮助更多人掌握大模型架构,实现职业跃迁。该系列包括《从0到1吃透Transformer技术底座》、《从0到1精通RAG架构》等,旨在系统化、体系化地讲解大模型技术,助力读者实现“offer直提”。此外,尼恩还分享了多个技术圣经,如《NIO圣经》、《Docker圣经》等,帮助读者深入理解核心技术。
SpringCloud+Python 混合微服务,如何打造AI分布式业务应用的技术底层?
|
4月前
|
存储 NoSQL Redis
SpringCloud基础7——Redis分布式缓存,RDB,AOF持久化+主从+哨兵+分片集群
Redis持久化、RDB和AOF方案、Redis主从集群、哨兵、分片集群、散列插槽、自动手动故障转移
SpringCloud基础7——Redis分布式缓存,RDB,AOF持久化+主从+哨兵+分片集群
|
4月前
|
SQL NoSQL 数据库
SpringCloud基础6——分布式事务,Seata
分布式事务、ACID原则、CAP定理、Seata、Seata的四种分布式方案:XA、AT、TCC、SAGA模式
SpringCloud基础6——分布式事务,Seata
|
4月前
|
消息中间件 Java 对象存储
数据一致性挑战:Spring Cloud与Netflix OSS下的分布式事务管理
数据一致性挑战:Spring Cloud与Netflix OSS下的分布式事务管理
69 2