手把手教你Spring Cloud集成Seata XA模式

简介: 手把手教你Spring Cloud集成Seata XA模式

前言

我们在前面的文章中已经教大家分别集成了Seata AT模式以及Seata TCC模式,这篇文章就教大家如何在自己的Spring Cloud项目中集成Seata XA模式。

同样我们还是以购物车下单的业务场景作为本次案例,当前Seata版本为1.5.2:


image.png

1.用户请求从Business业务入口进来,在业务入口中,我们会根据业务需求,先执行RPC扣款操作,然后再调用订单创建的功能;

2.在订单创建过程中,会先RPC执行扣减库存,成功后才会在订单表中插入订单数据;

3.在上述所有逻辑处理成功后,购物车下单逻辑就完成了,中途任何RM服务出现异常,将触发分布式事务回滚,导致下单失败;

注意:Seata XA模式不能与AT模式共存,两种模式在同一个服务中只能存在其一

创建数据表

在Seata XA模式中,除业务表外,不需要创建其他额外的数据表,该分布式事务完全依赖与XA协议;

  • Account服务:
-- ----------------------------
-- Table structure for wallet_xa_tbl
-- ----------------------------
DROP TABLE IF EXISTS `wallet_xa_tbl`;
CREATE TABLE `wallet_xa_tbl` (
  `id` int NOT NULL COMMENT '主键',
  `user_id` varchar(255) COLLATE utf8mb4_bin NOT NULL,
  `money` int NOT NULL,
  PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin;
-- ----------------------------
-- Records of wallet_xa_tbl
-- ----------------------------
BEGIN;
INSERT INTO `wallet_xa_tbl` (`id`, `user_id`, `money`) VALUES (1, '123456', 100000);
COMMIT;
复制代码
  • Order服务
-- ----------------------------
-- Table structure for order_xa_tbl
-- ----------------------------
DROP TABLE IF EXISTS `order_xa_tbl`;
CREATE TABLE `order_xa_tbl` (
  `id` int NOT NULL AUTO_INCREMENT COMMENT '主键',
  `user_id` varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_bin NOT NULL COMMENT '用户ID',
  `commodity_code` varchar(255) COLLATE utf8mb4_bin NOT NULL COMMENT '商品编码',
  `count` int NOT NULL COMMENT '数量',
  `unit_price` int NOT NULL COMMENT '单价',
  `create_time` datetime NOT NULL COMMENT '创建时间',
  PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=7 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin;
复制代码
  • Storage服务
-- ----------------------------
-- Table structure for stock_xa_tbl
-- ----------------------------
DROP TABLE IF EXISTS `stock_xa_tbl`;
CREATE TABLE `stock_xa_tbl` (
  `id` int NOT NULL AUTO_INCREMENT COMMENT '主键',
  `commodity_code` varchar(255) COLLATE utf8mb4_bin NOT NULL COMMENT '商品编码',
  `count` int NOT NULL COMMENT '库存数',
  PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=2 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin;
-- ----------------------------
-- Records of stock_xa_tbl
-- ----------------------------
BEGIN;
INSERT INTO `stock_xa_tbl` (`id`, `commodity_code`, `count`) VALUES (1, 'CC-54321', 500);
COMMIT;
复制代码

创建RM服务

注意:XA模式在RM服务对应的配置文件中一定要单独指定,否则Seata模式是AT模式

# 分布式事务配置
seata:
  #  开启seata
  enabled: true
  # 一定要单独指定,默认是AT模式
  data-source-proxy-mode: XA
  #  注册中心找TC服务
  registry:
    type: nacos
    nacos:
      cluster: "default"
      username: 用户名
      password: 密码
      server-addr: "ip:端口"
      group: SEATA_GROUP
      namespace: seata-server
      application: seata-server
  application-id: ${spring.application.name}
  #  事务分组
  tx-service-group: shanghai
  service:
    vgroup-mapping:
      # 该分组对应的TC集群名称
      shanghai: default
复制代码
  • Account服务:
public interface IWalletService {
  /**
   * 扣钱
   *
   * @param userId
   * @param amount
   * @return
   */
  Boolean deductMoney4XA(String userId, long amount);
}
import com.example.awesomeaccount.service.IWalletService;
import com.example.awesomeaccount.dao.mapper.WalletXAEnhanceMapper;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import javax.annotation.Resource;
/**
 * @author zouwei
 * @className WalletServiceImpl
 * @date: 2022/9/27 21:20
 * @description:
 */
@Service
public class WalletServiceImpl implements IWalletService {
  @Resource
  private WalletXAEnhanceMapper walletXAEnhanceMapper;
  /**
   * 扣款(XA模式)
   *
   * @param userId
   * @param amount
   * @return
   */
  @Transactional
  @Override
  public Boolean deductMoney4XA(String userId, long amount) {
    // 相关sql:update wallet_xa_tbl set money = money - #{amount,jdbcType=INTEGER} where user_id = #{userId,jdbcType=VARCHAR} and money <![CDATA[ >= ]]>#{amount,jdbcType=INTEGER}
    return walletXAEnhanceMapper.deductMoney(userId, amount) > 0;
  }
}
// 对TM暴露接口
import com.example.accountapi.model.AmountInfo;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
/**
 * 供生产者和消费者共同使用
 *
 * @author zouwei
 * @className AccountApi
 * @date: 2022/9/18 14:31
 * @description:
 */
public interface WalletApi {
  /**
   * 扣款
   *
   * @param amountInfo
   * @return
   */
  @PostMapping("/deductMoney4XA")
  Boolean deductMoney4XA(@RequestBody AmountInfo amountInfo);
}
import com.example.accountapi.api.WalletApi;
import com.example.accountapi.model.AmountInfo;
import com.example.awesomeaccount.service.IWalletService;
import io.seata.core.context.RootContext;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
/**
 * WalletApi接口在account-api模块中
 *
 * @author zouwei
 * @className WalletController
 * @date: 2022/9/18 14:58
 * @description:
 */
@Slf4j
@RestController
@RequestMapping("/wallet")
public class WalletController implements WalletApi {
  @Autowired
  private IWalletService walletService;
  @Override
  public Boolean deductMoney4XA(AmountInfo amountInfo) {
    String userId = amountInfo.getUserId();
    long amount = amountInfo.getAmount();
    // 打印分布式事务XID
    log.warn("XID:{}", RootContext.getXID());
    // 扣款
    return walletService.deductMoney4XA(userId, amount);
  }
}
复制代码

1.Account服务的扣款逻辑和平常的crud并没有任何区别,依然是调用mybatis的Mapper进行sql操作;

2.Account服务对外暴露给TM一个restful的扣款接口;

  • Storage服务:

Storage服务同样需要保证配置文件中的Seata.data-source-proxy-mode=XA,否则Seata将以AT模式启动服务;

public interface IStockService {
  // 扣减库存
  Boolean deductStock4XA(String commodityCode, int count);
}
import com.example.awesomestorage.dao.mapper.StockXAEnhanceMapper;
import com.example.awesomestorage.service.IStockService;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import javax.annotation.Resource;
/**
 * @author zouwei
 * @className StockServiceImpl
 * @date: 2022/9/28 00:06
 * @description:
 */
@Service
public class StockServiceImpl implements IStockService {
  @Resource
  private StockXAEnhanceMapper stockXAEnhanceMapper;
  /**
   * 扣减库存(XA模式)
   *
   * @param commodityCode
   * @param count
   * @return
   */
  @Transactional
  @Override
  public Boolean deductStock4XA(String commodityCode, int count) {
    // 扣减库存,判断影响行数是否大于0
    // sql如下: update stock_tbl set count = count - #{count,jdbcType=INTEGER} where commodity_code = #{commodityCode,jdbcType=VARCHAR} and count <![CDATA[ >= ]]> #{count,jdbcType=INTEGER}
    return stockXAEnhanceMapper.deductStock(commodityCode, count) > 0;
  }
}
// 对外暴露接口
import com.example.storageapi.model.OrderInfo;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
/**
 * @author zouwei
 * @className StockApi
 * @date: 2022/9/28 00:26
 * @description:
 */
public interface StockApi {
  @PostMapping("/deduct4XA")
  Boolean deductStock4XA(@RequestBody OrderInfo orderInfo);
}
import com.example.awesomestorage.service.IStockService;
import com.example.storageapi.api.StockApi;
import com.example.storageapi.model.OrderInfo;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
/**
 * @author zouwei
 * @className StockController
 * @date: 2022/9/28 00:23
 * @description:
 */
@RestController
@RequestMapping("/stock")
public class StockController implements StockApi {
  @Autowired
  private IStockService stockService;
  @Override
  public Boolean deductStock4XA(OrderInfo orderInfo) {
    String commodityCode = orderInfo.getCommodityCode();
    int count = orderInfo.getCount();
    return stockService.deductStock4XA(commodityCode, count);
  }
}
复制代码

同样的,我们的Storage服务也和Account服务一样,暴露给TM一个扣减库存的接口;

  • Order服务

和上面两个服务一样,我们先配置Seata.data-source-proxy-mode=XA,保证我们能够正确地开启XA模式

public interface IOrderService {
  Boolean createOrder4XA(String userId, String commodityCode, int count, long unitPrice);
}
import com.example.awesomeorder.api.StockApiClient;
import com.example.awesomeorder.dao.entity.Order;
import com.example.awesomeorder.dao.mapper.OrderMapper;
import com.example.awesomeorder.service.IOrderService;
import com.example.awesomeorder.tcc.IOrderTccAction;
import com.example.storageapi.model.OrderInfo;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import javax.annotation.Resource;
import java.time.LocalDateTime;
/**
 * @author zouwei
 * @className OrderServiceImpl
 * @date: 2022/9/27 22:47
 * @description:
 */
@Service
public class OrderServiceImpl implements IOrderService {
  @Resource
  private OrderXAMapper orderXAMapper;
  @Resource
  private StockApiClient stockApiClient;
  /**
   * 创建订单(XA模式)
   *
   * @param userId
   * @param commodityCode
   * @param count
   * @param unitPrice
   * @return
   */
  @Transactional
  @Override
  public Boolean createOrder4XA(String userId, String commodityCode, int count, long unitPrice) {
    // 构建待扣减的库存信息
    OrderInfo orderInfo = new OrderInfo();
    // 设置商品编码
    orderInfo.setCommodityCode(commodityCode);
    // 设置需要扣减的数量
    orderInfo.setCount(count);
    // 先构建库存
    if (stockApiClient.deductStock4XA(orderInfo)) {
      // 扣减库存成功后,准备创建订单
      OrderXA order = new OrderXA();
      // 创建时间
      order.setCreateTime(LocalDateTime.now());
      // 用户ID
      order.setUserId(userId);
      // 数量
      order.setCount(count);
      // 商品编码
      order.setCommodityCode(commodityCode);
      // 单价
      order.setUnitPrice(unitPrice);
      // 创建订单
      return orderXAMapper.insert(order) > 0;
    }
    // 扣减库存失败,订单创建失败
    return Boolean.FALSE;
  }
}
// 对外暴露接口
import com.example.orderapi.model.OrderInfo;
import org.springframework.web.bind.annotation.PutMapping;
import org.springframework.web.bind.annotation.RequestBody;
/**
 * @author zouwei
 * @className OrderApi
 * @date: 2022/9/27 22:57
 * @description:
 */
public interface OrderApi {
  @PutMapping("/create4At")
  Boolean createOrder4XA(@RequestBody OrderInfo orderInfo);
}
import com.example.awesomeorder.service.IOrderService;
import com.example.orderapi.api.OrderApi;
import com.example.orderapi.model.OrderInfo;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
/**
 * @author zouwei
 * @className OrderController
 * @date: 2022/9/27 22:55
 * @description:
 */
@RestController
@RequestMapping("/order")
public class OrderController implements OrderApi {
  @Autowired
  private IOrderService orderService;
  @Override
  public Boolean createOrder4XA(OrderInfo orderInfo) {
    String commodityCode = orderInfo.getCommodityCode();
    int count = orderInfo.getCount();
    long unitPrice = orderInfo.getUnitPrice();
    String userId = orderInfo.getUserId();
    return orderService.createOrder4XA(userId, commodityCode, count, unitPrice);
  }
}
复制代码

至此,我们所有的RM服务全部创建完毕,下面我们开始准备构建TM;

创建TM服务

在Seata TM服务中,我们不需要配置数据源,因为TM不需要跟数据源打交道,我们只需要与TC服务进行通讯,在业务调用上,通过openFeign与RM服务进行通讯;

在TM服务中,就不需要额外指定seata.data-source-proxy-mode=XA,因为TM不和数据源打交道:

seata:
  #  开启seata
  enabled: true
  #  注册中心找TC服务
  registry:
    type: nacos
    nacos:
      cluster: "default"
      username: "用户名"
      password: "密码"
      server-addr: "ip:端口"
      group: SEATA_GROUP
      namespace: seata-server
      application: seata-server
  application-id: ${spring.application.name}
  #  事务分组
  tx-service-group: shanghai
  service:
    vgroup-mapping:
      # 该分组对应的TC集群名称
      shanghai: default
复制代码

另外的话,还需要配置上OpenFeign。有疑问的小伙伴可查看文章OpenFeign的集成与优化

import io.seata.core.exception.GlobalTransactionException;
/**
 * @author zouwei
 * @className ShoppingCartService
 * @date: 2022/9/18 14:01
 * @description:
 */
public interface ShoppingCartService {
  // 下单
  String placeOrder4XA() throws GlobalTransactionException;
}
import com.example.accountapi.model.AmountInfo;
import com.example.awesomebusiness.api.OrderApiClient;
import com.example.awesomebusiness.api.WalletApiClient;
import com.example.awesomebusiness.service.ShoppingCartService;
import com.example.orderapi.model.OrderInfo;
import io.seata.core.exception.GlobalTransactionException;
import io.seata.spring.annotation.GlobalTransactional;
import org.springframework.stereotype.Service;
import javax.annotation.Resource;
/**
 * @author zouwei
 * @className ShoppingCartServiceImpl
 * @date: 2022/9/18 14:01
 * @description:
 */
@Service
public class ShoppingCartServiceImpl implements ShoppingCartService {
  // 钱包服务
  @Resource
  private WalletApiClient walletApiClient;
  // 订单服务
  @Resource
  private OrderApiClient orderApiClient;
  // 别忘记了这个注解,这是开启分布式事务的标记
  @GlobalTransactional
  @Override
  public String placeOrder4XA() throws GlobalTransactionException {
    // 模拟用户ID 123456,对应数据库初始化的用户ID
    String userId = "123456";
    // 构建订单数据
    OrderInfo orderInfo = new OrderInfo();
    // 数量15个
    orderInfo.setCount(15);
    // 商品编码,对应库存数据表的初始化数据
    orderInfo.setCommodityCode("CC-54321");
    // 单价299,默认是long类型,单位分;避免double精度丢失
    orderInfo.setUnitPrice(299);
    // 订单归属
    orderInfo.setUserId(userId);
    // 计算扣款金额,数量*单价
    long amount = orderInfo.getCount() * orderInfo.getUnitPrice();
    // 构建扣款数据
    AmountInfo amountInfo = new AmountInfo();
    // 设置扣款金额
    amountInfo.setAmount(amount);
    // 设置扣款主体
    amountInfo.setUserId(userId);
    // 先扣款,扣款成功就创建订单,扣减库存在创建订单的逻辑里面
    if (walletApiClient.deductMoney4XA(amountInfo) && orderApiClient.createOrder4XA(orderInfo)) {
      return "下单成功!";
    }
    // 1.扣款失败,抛异常,分布式事务回滚
    // 2.创建订单失败,抛异常,分布式事务回滚
    throw new GlobalTransactionException("下单失败!");
  }
}
import com.example.awesomebusiness.service.ShoppingCartService;
import io.seata.core.exception.GlobalTransactionException;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
/**
 * @author zouwei
 * @className ShoppingCartController
 * @date: 2022/9/18 13:55
 * @description:
 */
@RestController
@RequestMapping(value = "/shoppingCart")
public class ShoppingCartController {
  @Autowired
  private ShoppingCartService shoppingCartService;
  @PostMapping("/placeOrder")
  public String placeOrder() throws GlobalTransactionException {
    return shoppingCartService.placeOrder4XA();
  }
}
复制代码

以上代码便是Spring Cloud集成Seata XA模式的核心代码,感兴趣的小伙伴可以访问awesome-seats下载源码。


相关文章
|
3月前
|
存储 数据可视化 Java
基于MicrometerTracing门面和Zipkin实现集成springcloud2023的服务追踪
Sleuth将会停止维护,Sleuth最新版本也只支持springboot2。作为替代可以使用MicrometerTracing在微服务中作为服务追踪的工具。
192 1
|
3月前
|
Java Maven Docker
gitlab-ci 集成 k3s 部署spring boot 应用
gitlab-ci 集成 k3s 部署spring boot 应用
|
5天前
|
监控 Java Nacos
使用Spring Boot集成Nacos
通过上述步骤,Spring Boot应用可以成功集成Nacos,利用Nacos的服务发现和配置管理功能来提升微服务架构的灵活性和可维护性。通过这种集成,开发者可以更高效地管理和部署微服务。
62 17
|
7天前
|
缓存 安全 Java
Spring Boot 3 集成 Spring Security + JWT
本文详细介绍了如何使用Spring Boot 3和Spring Security集成JWT,实现前后端分离的安全认证概述了从入门到引入数据库,再到使用JWT的完整流程。列举了项目中用到的关键依赖,如MyBatis-Plus、Hutool等。简要提及了系统配置表、部门表、字典表等表结构。使用Hutool-jwt工具类进行JWT校验。配置忽略路径、禁用CSRF、添加JWT校验过滤器等。实现登录接口,返回token等信息。
121 12
|
5天前
|
人工智能 安全 Dubbo
Spring AI 智能体通过 MCP 集成本地文件数据
MCP 作为一款开放协议,直接规范了应用程序如何向 LLM 提供上下文。MCP 就像是面向 AI 应用程序的 USB-C 端口,正如 USB-C 提供了一种将设备连接到各种外围设备和配件的标准化方式一样,MCP 提供了一个将 AI 模型连接到不同数据源和工具的标准化方法。
|
13天前
|
存储 安全 Java
Spring Boot 3 集成Spring AOP实现系统日志记录
本文介绍了如何在Spring Boot 3中集成Spring AOP实现系统日志记录功能。通过定义`SysLog`注解和配置相应的AOP切面,可以在方法执行前后自动记录日志信息,包括操作的开始时间、结束时间、请求参数、返回结果、异常信息等,并将这些信息保存到数据库中。此外,还使用了`ThreadLocal`变量来存储每个线程独立的日志数据,确保线程安全。文中还展示了项目实战中的部分代码片段,以及基于Spring Boot 3 + Vue 3构建的快速开发框架的简介与内置功能列表。此框架结合了当前主流技术栈,提供了用户管理、权限控制、接口文档自动生成等多项实用特性。
55 8
|
1月前
|
数据库 微服务
SEATA模式
Seata 是一款开源的分布式事务解决方案,支持多种事务模式以适应不同的应用场景。其主要模式包括:AT(TCC)模式,事务分三阶段执行;TCC 模式,提供更灵活的事务控制;SAGA 模式,基于状态机实现跨服务的事务一致性;XA 模式,采用传统两阶段提交协议确保数据一致性。
49 5
|
1月前
|
XML Java API
Spring Boot集成MinIO
本文介绍了如何在Spring Boot项目中集成MinIO,一个高性能的分布式对象存储服务。主要步骤包括:引入MinIO依赖、配置MinIO属性、创建MinIO配置类和服务类、使用服务类实现文件上传和下载功能,以及运行应用进行测试。通过这些步骤,可以轻松地在项目中使用MinIO的对象存储功能。
|
1月前
|
消息中间件 监控 Java
如何将Spring Boot + RabbitMQ应用程序部署到Pivotal Cloud Foundry (PCF)
如何将Spring Boot + RabbitMQ应用程序部署到Pivotal Cloud Foundry (PCF)
42 6
|
1月前
|
Java 关系型数据库 MySQL
如何将Spring Boot + MySQL应用程序部署到Pivotal Cloud Foundry (PCF)
如何将Spring Boot + MySQL应用程序部署到Pivotal Cloud Foundry (PCF)
67 5

热门文章

最新文章