分布式事物-全面详解(学习总结---从入门到深化)

本文涉及的产品
云数据库 RDS MySQL,集群系列 2核4GB
推荐场景:
搭建个人博客
RDS MySQL Serverless 基础系列,0.5-2RCU 50GB
云原生数据库 PolarDB MySQL 版,通用型 2核8GB 50GB
简介: 分布式事物-全面详解(学习总结---从入门到深化)



分布式事物处理_认识本地事物

什么是事物

事务就是针对数据库的一组操作,它可以由一条或多条SQL语句组 成,同一个事务的操作具备同步的特点,事务中的语句要么都执 行,要么都不执行。

举个栗子:

你去小卖铺买东西,一手交钱,一手交货就是一个事务的例子,交钱和交货必须全部成功,事务才算成功,任一个活动失败,事务将撤销所有已成功的活动。

什么是本地事物

在计算机系统中,更多的是通过关系型数据库来控制事务,这是利 用数据库本身的事务特性来实现的,因此叫数据库事务,由于应用 主要靠关系数据库来控制事务,而数据库通常和应用在同一个服务器,所以基于关系型数据库的事务又被称为本地事务。

数据库事务的四大特性ACID

总结

数据库事务在实现时会将一次事务涉及的所有操作全部纳入到一个 不可分割的执行单元,该执行单元中的所有操作要么都成功,要么 都失败,只要其中任一操作执行失败,都将导致整个事务的回滚。

关系型数据库事务基础_并发事务带来的问题

并发事务带来的问题

数据库一般会并发执行多个事务,而多个事务可能会并发地对相同 的数据进行增加、删除、修改和查询操作,进而导致并发事务问 题。

脏写

当两个或两个以上的事务选择数据库中的同一行数据,并基于最初 选定的值更新该行数据时,因为每个事务之间都无法感知彼此的存 在,所以会出现最后的更新操作覆盖之前由其他事务完成的更新操 作的情况。也就是说,对于同一行数据,一个事务对该行数据的更新操作覆盖了其他事务对该行数据的更新操作。

解决方案: 让每个事物按照顺序串行的方式执行,按照一定的顺序一次进行写操作。

脏读

一个事务正在对数据库中的一条记录进行修改操作,在这个事务完 成并提交之前,当有另一个事务来读取正在修改的这条数据记录 时,如果没有对这两个事务进行控制,则第二个事务就会读取到没 有被提交的脏数据,并根据这些脏数据做进一步的处理,此时就会 产生未提交的数据依赖关系。我们通常把这种现象称为脏读,也就是一个事务读取了另一个事务未提交的数据。

解决方案: 先写后读,也就是写完之后再读。

不可重复读

一个事务读取了某些数据,在一段时间后,这个事务再次读取之前 读过的数据,此时发现读取的数据发生了变化,或者其中的某些记录已经被删除,这种现象就叫作不可重复读。

解决方案: 先读后写,也就是读完之后再写。

幻读

一个事务按照相同的查询条件重新读取之前读过的数据,此时发现 其他事务插入了满足当前事务查询条件的新数据,这种现象叫作幻读。

解决方案: 先读后写,也就是读完之后再写。

关系型数据库事务基础_MySQL事务隔离级别

MySQL中的InnoDB储存引擎提供SQL标准所描述的4种事务隔离级 别,分别为

读未提交 (Read Uncommitted)

读已提交 (ReadCommitted)

可重复读(Repeatable Read)

串行化 (Serializable)。

1、读未提交(Read Uncommitted):事务可以读取未提交的数据,也称作脏读(Dirty Read)。一 般很少使用。

2、读已提交(Read Committed):是大都是 DBMS (如:Oracle, SQLServer)默认事务隔离。执行两次同意的查询却有不同的结果,也叫不可重复读。

3、可重复读(Repeable Read):是 MySQL 默认事务隔离级别。能确保同一事务多次读取同一数据 的结果是一致的。可以解决脏读的问题,但理论上无法解决幻读(Phantom Read)的问题。

4、可串行化(Serializable):是最高的隔离级别。强制事务串行执行,会在读取的每一行数据上加锁,这样虽然能避免幻读的问题,但也可能导致大量的超时和锁争用的问题。很少会应用到这种级别,只有在非常需要确保数据的一致性且可以接受没有并发的应用场景下才会考虑。

MySQL事务隔离级别_模拟异常发生之脏读

前置知识

# 查看 MySQL 版本
select version();
# 开启事务
start transaction;
# 提交事务
commit;
# 回滚事务
rollback;

查看连接的客户端详情

每个 MySQL 命令行窗口就是一个 MySQL 客户端,每个客户端都可 以单独设置(不同的)事务隔离级别,这也是演示 MySQL 并发事务的基础。

show processlist;

新建数据库和测试数据

-- 创建数据库
drop database if exists testdb;
create database testdb;
use testdb;
-- 创建表
create table userinfo(
 id int primary key auto_increment,
 name varchar(250) not null,
 balance decimal(10,2) not null default 0
);
-- 插入测试数据
insert into userinfo(id,name,balance)
values(1,'Java',100),(2,'MySQL',200);

查询事务的隔离级别

select
@@global.transaction_isolation,@@transaction_is
olation;

设置客户端的事务隔离级别

通过以下 SQL 可以设置当前客户端的事务隔离级别:

set session transaction isolation level 事务隔离 级别;

脏读

一个事务读到另外一个事务还没有提交的数据,称之为脏读。脏读 演示的执行流程如下:

脏读演示步骤1

设置窗口 2 的事务隔离级别为读未提交,设置命令如下:

set session transaction isolation level read
uncommitted;

注意: 事务隔离级别读未提交存在脏读的问题。

脏读演示步骤2

窗口2开启事务,查询用户表如下图所示:

start transaction;
select * from userinfo;

注意: 从上述结果可以看出,在窗口 2 中读取到了窗口 1 中事务未提 交的数据,这就是脏读。

脏读演示步骤3

在窗口 1 中开启一个事务,并给 Java 账户加 50 元,但不提交事 务,执行的 SQL 如下:

mysql> start transaction;
Query OK, 0 rows affected (0.00 sec)
mysql> update userinfo set balance=balance+50
where name="java";
Query OK, 1 row affected (0.00 sec)
Rows matched: 1  Changed: 1  Warnings: 0

脏读演示步骤4

在窗口 2 中再次查询用户列表,执行结果如下:

注意: 从上述结果可以看出,在窗口 2 中读取到了窗口 1 中事务未提交的数据,这就是脏读。

不可重复读

不可重复读是指一个事务先后执行同一条 SQL,但两次读取到的数据不同,就是不可重复读。不可重复读演示的执行流程如下:

不可重复读演示步骤1

设置窗口 2 的事务隔离级别为读已提交

set session transaction isolation level read
committed;

注意: 读已提交可以解决脏读的问题,但存在不可重复读的问题。

不可重复读演示步骤2

在窗口 2 中开启事务,并查询用户表,执行结果如下

不可重复读演示步骤3

在窗口 1 中开启事务,并给 Java 用户添加 20 元,但不提交事务, 再观察窗口 2 中有没有脏读的问题,具体执行结果如下图所示:

从上述结果可以看出,当把窗口的事务隔离级别设置为读已提交, 已经不存在脏读问题了。接下来在窗口 1 中提交事务,执行结果如下图所示:

不可重复读演示步骤4

切换到窗口 2 中再次查询用户列表,执行结果如下:

不可重复读和脏读的区别:

脏读可以读到其他事务中未提交的数据,而不可重复读是读取到了其他事务已经提交的数据,但前后两次读取的结果不同。

幻读

幻读名如其文,它就像发生了某种幻觉一样,在一个事务中明明没 有查到主键为 X 的数据,但主键为 X 的数据就是插入不进去,就像某种幻觉一样。幻读演示的执行流程如下:

幻读演示步骤1

在窗口1和窗口2修改事务隔离级别为可重复读。

set session transaction isolation level
repeatable read;

幻读演示步骤2

设置窗口 2 为可重复读,可重复有幻读的问题,查询编号为 3 的用户,具体执行 SQL 如下:

start transaction;
select * from userinfo where id=3;

注意:从上述结果可以看出,查询的结果中 id=3 的数据为空。

幻读演示步骤3

开启窗口 1 的事务,插入用户编号为 3 的数据,然后成功提交事务,执行 SQL 如下:

start transaction;
insert into userinfo(id,name,balance)
values(3,'Spring',100);
commit;

幻读演示步骤4

在窗口 2 中插入用户编号为 3 的数据,执行 SQL 如下:

insert into userinfo(id,name,balance)
values(3,'Spring',100);

注意: 添加用户数据失败,提示表中已经存在了编号为 3 的数据,且 此字段为主键,不能添加多个。

幻读演示步骤5

在窗口 2 中,重新执行查询:

select * from userinfo where id=3;

注意: 在此事务中查询明明没有编号为 3 的用户,但插入的时候却却 提示已经存在了,这就是幻读。

不可重复读和幻读的区别

二者描述的则重点不同,不可重复读描述的侧重点是修改操作,而 幻读描述的侧重点是添加和删除操作。

MySQL中锁的分类

从本质上讲,锁是一种协调多个进程或多个线程对某一资源的访问 的机制,MySQL使用锁和MVCC机制实现了事务隔离级别。

锁的分类

悲观锁和乐观锁

悲观锁

顾名思义,悲观锁对于数据库中数据的读写持悲观态度,即在整个数据处理的过程中,它会将相应的数据锁定。在数据库中,悲观锁的实现需要依赖数据库提供的锁机制,以保证对数据库加锁后,其他应用系统无法修改数据库中的数据。

注意:

在悲观锁机制下,读取数据库中的数据时需要加锁,此时不能对这些数据进行修改操作。修改数据库中的数据时也需要加锁,此时不能对这些数据进行读取操作。

乐观锁

悲观锁会极大地降低数据库的性能,特别是对长事务而言,性能的损耗往往是无法承受的。乐观锁则在一定程度上解决了这个问题。

注意:

实现乐观锁的一种常用做法是为数据增加一个版本标识,如果是通过数据库实现,往往会在数据表中增加一个类似version的版本号字段。

读锁和写锁

读锁

读锁又称为共享锁,共享锁就是多个事务对于同一数据可以共享一 把锁,都能访问到数据,但是只能读不能修改。

写锁

写锁又称为排他锁,排他锁就是不能与其他所并存,如一个事务获取了一个数据行的排他锁,其他事务就不能再获取该行的其他锁, 包括共享锁和排他锁,但是获取排他锁的事务是可以对数据就行读取和修改。

注意:

需要注意的是,对同一份数据,如果加了读锁,则可以继续为 其加读锁,且多个读锁之间互不影响,但此时不能为数据增加 写锁。一旦加了写锁,则不能再增加写锁和读锁。因为读锁具有共享性,而写锁具有排他性。

表锁、行锁和页面锁

表锁

表锁也称为表级锁,就是在整个数据表上对数据进行加锁和释放锁。典型特点是开销比较小,加锁速度快,一般不会出现死锁,锁定的粒度比较大,发生锁冲突的概率最高,并发度最低。

手动增加表锁

mysql> lock table userinfo read;
Query OK, 0 rows affected (0.00 sec)
mysql> lock table userinfo write;
Query OK, 0 rows affected (0.00 sec)

查看数据表上增加的锁

show open tables;

删除表锁

mysql> unlock tables;
Query OK, 0 rows affected (0.00 sec)

行锁

行锁也称为行级锁,就是在数据行上对数据进行加锁和释放锁。典 型特点是开销比较大,加锁速度慢,可能会出现死锁,锁定的粒度最小,发生锁冲突的概率最小,并发度最高。

页面锁

页面锁也称为页级锁,就是在页面级别对数据进行加锁和释放锁。 对数据的加锁开销介于表锁和行锁之间,可能会出现死锁,锁定的粒度大小介于表锁和行锁之间,并发度一般。

间隙锁和临键锁

间隙锁

在MySQL中使用范围查询时,如果请求共享锁或排他锁,InnoDB 会给符合条件的已有数据的索引项加锁。如果键值在条件范围内, 而这个范围内并不存在记录,则认为此时出现了“间隙(也就是 GAP)”。InnoDB存储引擎会对这个“间隙”加锁,而这种加锁机制就是间隙锁(GAP Lock)。

例如,userinfo数据表中存在如下数据。

解释:

此时,userinfo数据表中的间隙包括id为(3,15]、(15,20]、 (20,正无穷]的三个区间。如果执行如下命令,将符合条件的用 户的账户余额增加100元。 update userinfo set balance = balance + 100 where id > 5 and id <16; 则其他事务无法在(3,20]这个区间内插入或者修改任何数据。 这里需要注意的是,间隙锁只有在可重复读事务隔离级别下才 会生效。

临键锁

临键锁(Next-Key Lock)是行锁和间隙锁的组合,例如上面例子中 的区间(3,20]就可以称为临键锁。

MySQL中的死锁问题

什么是死锁

死锁是并发系统中常见的问题,同样也会出现在数据库MySQL的并 发读写请求场景中。当两个及以上的事务,双方都在等待对方释放 已经持有的锁或因为加锁顺序不一致造成循环等待锁资源,就会出 现“死锁”。

Deadlock found when trying to get lock...

举例来说 A 事务持有 X1 锁 ,申请 X2 锁,B事务持有 X2 锁,申请 X1 锁。A 和 B 事务持有锁并且申请对方持有的锁进入循环等待,就造成了死锁。

第一步

打开终端A,登录MySQL,将事务隔离级别设置为可重复读,开启 事务后为userinfo数据表中id为1的数据添加排他锁,如下所示。

mysql> set session transaction isolation level
repeatable read;
Query OK, 0 rows affected (0.00 sec)
mysql> start transaction;
Query OK, 0 rows affected (0.00 sec)
mysql> select * from userinfo;
+----+-------+---------+
| id | name  | balance |
+----+-------+---------+
|  1 | Java  |  100.00 |
|  2 | MySQL |  200.00 |
+----+-------+---------+
2 rows in set (0.00 sec)

第二步

打开终端B,登录MySQL,将事务隔离级别设置为可重复读,开启事务后为userfinfo数据表中id为2的数据添加排他锁,如下所示。

mysql> set session transaction isolation level
repeatable read;
Query OK, 0 rows affected (0.00 sec)
mysql> start transaction;
Query OK, 0 rows affected (0.00 sec)
mysql> select * from userinfo where id = 2;
+----+-------+---------+
| id | name  | balance |
+----+-------+---------+
|  2 | MySQL |  200.00 |
+----+-------+---------+
1 row in set (0.00 sec)

第三步

在终端A为userinfo数据表中id为2的数据添加排他锁,如下所示。

mysql> select * from userinfo where id =2 for
update;

注意: 此时,线程会一直卡住,因为在等待终端B中id为2的数据释放排他锁。

第四步

在终端B中为userinfo数据表中id为1的数据添加排他锁,如下所示。

mysql> select * from userinfo where id =1 for
update;
ERROR 1213 (40001): Deadlock found when trying
to get lock; try restarting transaction

通过如下命令可以查看死锁的日志信息。

show engine innodb status\G

注意:

通过命令行查看LATEST DETECTED DEADLOCK选项相关的信 息,可以发现死锁的相关信息,或者通过配置 innodb_print_all_deadlocks(MySQL 5.6.2版本开始提供)参数为ON,将死锁相关信息打印到MySQL错误日志中。

如何避免死锁

MySQL事务的实现原理_什么是redo log

MySQL的事务实现离不开Redo Log和Undo Log。从某种程度上 说,事务的隔离性是由锁和MVCC机制实现的,原子性和持久性是 由Redo Log实现的,一致性是由Undo Log实现的。

什么是redo log

redo log叫做重做日志,是用来实现事务的持久性。该日志文件由 两部分组成:重做日志缓冲(redo log buffer)以及重做日志文件 (redo log),前者是在内存中,后者在磁盘中。当事务提交之后会 把所有修改信息都会存到该日志中。

注意:

先写日志,再写磁盘的技术就是 MySQL 里经常说到的 WAL(Write-Ahead Logging) 技术。

Redo Log刷盘规则

在计算机操作系统中,用户空间( user space )下的缓冲区数据一般情况 下是无法直接写入磁盘的,中间必须经过操作系统内核空间( kernel space )缓冲区( OS Buffer )。因此, redo logbuffer 写入 redo log file 实际上是先 写入 OS Buffer ,然后再通过系统调用 fsync() 将其刷到 redo log file 中。

mysql 支持三种将 redo log buffer 写入 redo log file 的时机。 innodb_flush_log_at_trx_commit。

Redo Log刷盘最佳实践

不同的Redo Log刷盘规则,对MySQL数据库性能的影响也不同。

创建测试数据库

create database if not exists test;
create table flush_disk_test(
 id int not null auto_increment,
 name varchar(20),
primary key(id)
)engine=InnoDB;

编写存储过程

为了测试方便,这里创建一个名为insert_data的存储过程,接收一 个int类型的参数。这个参数表示向flush_disk_test数据表中插入的记录行数。

drop procedure if exists insert_data;
-- 该段命令是否已经结束了,mysql是否可以执行了。
delimiter $$
create procedure insert_data(i int)
begin
-- 声明变量 s
declare s int default 1;
-- 声明变量 c
declare c varchar(50) default 'binghe';
-- while循环
while s<=i do
-- 开启事务
start transaction;
-- 添加数据
insert into flush_disk_test (name) values(c);
-- 提交事务
commit;
-- s变量累加
set s=s+1;
-- 循环结束
end while;
end$$
-- 该段命令是否已经结束了,mysql是否可以执行了。
delimiter ;

查看刷盘规则

show variables like
'innodb_flush_log_at_trx_commit';

第一步

将innodb_flush_log_at_trx_commit变量的值设置为0。

set global innodb_flush_log_at_trx_commit=0;

调用insert_data向flush_disk_test数据表中插入10万条数据,如下 所示。

mysql> call insert_data (100000);
Query OK, 0 rows affected (2.18 sec)

注意: 可以看到,当innodb_flush_log_at_trx_commit变量的值设置为0时,向表中插入10万条数据耗时2.18s。

第二步

将innodb_flush_log_at_trx_commit变量的值设置为1。

set global innodb_flush_log_at_trx_commit=1;

调用insert_data向flush_disk_test数据表中插入10万条数据,如下所示。

mysql> call insert_data (100000);
Query OK, 0 rows affected (16.18 sec)

注意:

可以看到,当innodb_flush_log_at_trx_commit变量的值设置为1时,向表中插入10万条数据耗时16.18s。

第三步

将innodb_flush_log_at_trx_commit变量的值设置为2。

set global innodb_flush_log_at_trx_commit=2;

调用insert_data向flush_disk_test数据表中插入10万条数据,如下所示。

mysql> call insert_data (100000);
Query OK, 0 rows affected (3.05 sec)

注意:

可以看到,当innodb_flush_log_at_trx_commit变量的值设置为2时,向表中插入10万条数据耗时3.05s。

结论

MySQL事务的实现原理_什么是undo log

undo log的概念

undo log是mysql中比较重要的事务日志之一,顾名思义,undo log是一种用于撤销回退的日志,在事务没提交之前,MySQL会先 记录更新前的数据到 undo log日志文件里面,当事务回滚时或者数据库崩溃时,可以利用 undo log来进行回退。

undo log的作用

在MySQL中,undo log日志的作用主要有两个:

1、提供回滚操作

---修改之前name = 张三
update user set name = "李四" where id = 1;  
----此时undo log会记录一条相反的update语句,如下:
update user set name = "张三" where id = 1;

注意: 如果这个修改出现异常,可以使用undo log日志来实现回滚操作,以保证事务的一致性。

2、提供多版本控制(MVCC)

MVCC,即多版本控制。在MySQL数据库InnoDB存储引擎中,用 undo Log来实现多版本并发控制(MVCC)。当读取的某一行被其他事务锁定时,它可以从undo log中分析出该行记录以前的数据版本是怎样的,从而让用户能够读取到当前事务操作之前的数据【快照读】。

快照读:

SQL读取的数据是快照版本【可见版本】,也就是历史版本,不用加锁,普通的SELECT就是快照读。

当前读:

SQL读取的数据是最新版本。通过锁机制来保证读取的数据无法 通过其他事务进行修改UPDATE、DELETE、INSERT、SELECT … LOCK IN SHARE MODE、SELECT … FOR UPDATE都是当前 读。

undo log的存储机制

undo log的存储由InnoDB存储引擎实现,数据保存在InnoDB的数据文件中。在InnoDB存储引擎中,undo log是采用分段(segment) 的方式进行存储的。

undo log的工作原理

在更新数据之前,MySQL会提前生成undo log日志,当事务提交的时候,并不会立即删除undo log,因为后面可能需要进行回滚操 作,要执行回滚(rollback)操作时,从缓存中读取数据。undo log日志的删除是通过通过后台purge线程进行回收处理的。

总结

undo log是用来回滚数据的用于保障未提交事务的原子性。

分布式事物处理_认识分布式事物

前言

随着互联网的快速发展,软件系统由原来的单体应用转变为分布式 应用,下图描述了单体应用向微服务的演变。

注意:

分布式系统会把一个应用系统拆分为可独立部署的多个服务, 因此需要服务与服务之间远程协作才能完成事务操作,这种分 布式系统环境下由不同的服务之间通过网络远程协作完成事务 称之为分布式事务,例如用户注册送积分事务、创建订单减库存事务,银行转账事务等都是分布式事务。

假如没有分布式事务

解释:

上图中包含了库存和订单两个独立的微服务,每个微服务维护了自己的数据库。在交易系统的业务逻辑中,一个商品在下单 之前需要先调用库存服务,进行扣除库存,再调用订单服务, 创建订单记录。

正常情况下,两个数据库各自更新成功,两边数据维持着一致性。

但是,在非正常情况下,有可能库存的扣减完成了,随后的订单记 录却因为某些原因插入失败。这个时候,两边数据就失去了应有的一致性。

问题: 这种时候需要要保证数据的一致性,单数据源的一致性靠单机 事物来保证,多数据源的一致性就要靠分布式事物保证。

什么是分布式事务

指一次大的操作由不同的小操作组成的,这些小的操作分布在不同 的服务器上,分布式事务需要保证这些小操作要么全部成功,要么全部失败。从本质上来说,分布式事务就是为了保证不同数据库的数据一致性。

分布式架构的理论知识_CAP理论

前言

分布式系统正变得越来越重要,大型网站几乎都是分布式的。分布式系统的最大难点,就是各个节点的状态如何同步。CAP 定理是这方面的基本定理,也是理解分布式系统的起点。

1998年,加州大学的计算机科学家Eric Brewer 提出,分布式系统有三个指标。

它们的第一个字母分别是 C、A、P。这三个指标不可能同时做到。 这个结论就叫做CAP 定理。

分区容错性

大多数分布式系统都分布在多个子网络。每个子网络就叫做一个 区。分区容错的意思是,区间通信可能失败。比如,一台服务器放 在中国,另一台服务器放在美国,这就是两个区,它们之间可能无法通信。

结论:

分区容错无法避免,因此可以认为 CAP 的 P 总是成立。CAP 定理告诉我们,剩下的 C 和 A 无法同时做到。

一致性

Consistency 中文叫做"一致性"。意思是,写操作之后的读操作,必须返回该值。举例来说,某条记录是 v0,用户向 G1 发起一个写操 作,将其改为 v1。

接下来,用户的读操作就会得到 v1。这就叫一致性。

问题是,用户有可能向 G2 发起读操作,由于 G2 的值没有发生变化,因此返回的是 v0。G1 和 G2 读操作的结果不一致,这就不满足一致性了。

为了让 G2 也能变为 v1,就要在 G1 写操作的时候,让 G1 向 G2 发 送一条消息,要求 G2 也改成 v1。

这样的话,用户向 G2 发起读操作,也能得到 v1。

可用性

只要收到用户的请求,服务器就必须给出回应。

一致性和可用性的矛盾

解释:

如果保证 G2 的一致性,那么 G1 必须在写操作时,锁定 G2 的 读操作和写操作。只有数据同步后,才能重新开放读写。锁定 期间,G2 不能读写,没有可用性。如果保证 G2 的可用性,那么势必不能锁定 G2,所以一致性不成立。

一致性和可用性如何选择

分布式事物处理_分布式事务产生的场景

跨JVM进程

当我们将单体项目拆分为分布式、微服务项目之后,各个服务之间通过远程REST或者RPC调用来协同完成业务操作。

典型的场景:

商城系统中的订单微服务和库存微服务,用户在下单时会访问 订单微服务,订单微服务在生成订单记录时,会调用库存微服务来扣减库存。各个微服务是部署在不同的JVM进程中的,此时,就会产生因跨JVM进程而导致的分布式事务问题。

跨数据库实例

单体系统访问多个数据库实例,也就是跨数据源访问时会产生分布式事务。

典型的场景:

例如,我们的系统中的订单数据库和交易数据库是放在不同的 数据库实例中,当用户发起退款时,会同时操作用户的订单数 据库和交易数据库,在交易数据库中执行退款操作,在订单数 据库中将订单的状态变更为已退款。由于数据分布在不同的数 据库实例,需要通过不同的数据库连接会话来操作数据库中的 数据,此时,就产生了分布式事务。

多个服务数据库

多个微服务访问同一个数据库。

分布式事物解决方案_强一致性分布式事务之2PC模型

两阶段提交又称2PC,2PC是一个非常经典的强一致、中心化的原子提交协议。这里所说的中心化是指协议中有两类节点:一个是中心化 协调者节点N个参与者节点

生活中的2PC

A组织B、C和D三个人去爬山:如果所有人都同意去爬山,那么活动将举行;如果有一人不同意去爬山,那么活动将取消。

首先A将成为该活动的协调者,B、C和D将成为该活动的参与者。

具体流程:

    阶段1:   

    ①A发邮件给B、C和D,提出下周三去爬山,问是否同意。 那么此时A需要等待B、C和D的邮件。

 ②B、C和D分别查看自己的日程安排表。B、C发现自己在 当日没有活动安排,则发邮件告诉A它们同意下周三去爬山。由 于某种原因, D白天没有查看邮 件。那么此时A、B和C均需要 等待。到晚上的时候,D发现了A的邮件,然后查看日程安排, 发现周三当天已经有别的安排,那么D回复A说活动取消吧。

   

      阶段2:   

    ①此时A收到了所有活动参与者的邮件,并且A发现D下周 三不能去爬山。那么A将发邮件通知B、C和D,下周三爬山活动 取消。   

   ②此时B、C回复A“太可惜了”,D回复A“不好意思”。至此该 事务终止。

2PC阶段处理流程

举例订单服务A,需要调用支付服务B去支付,支付成功则处理购物 订单为待发货状态,否则就需要将购物订单处理为失败状态。

第一阶段:投票阶段

分布式事物解决方案_XA方案

什么是DTP

2PC的传统方案是在数据库层面实现的,如Oracle、MySQL都支持 2PC协议,为了统一标准减少行业内不必要的对接成本,需要制定 标准化的处理模型及接口标准,国际开放标准组织Open Group定 义分布式事务处理模型DTP(Distributed Transaction Processing Reference Model)。

分布式事物解决方案_Seata实现

Seata是什么

Seata 是一款开源的分布式事务解决方案,致力于提供高性能和简 单易用的分布式事务服务。Seata 为用户提供了 AT、TCC、SAGAXA 事务模式,为用户打造一站式的分布式解决方案。

Seata整体框架

全局事务与分支事务的关系图

与传统2PC的模型类似,Seata定义了三个组件来协议分布式事务的处理过程

还拿新用户注册送积分举例Seata的分布式事务过程

执行流程 :

Seata实现2PC与传统2PC的差别

Seata提供XA模式实现分布式事务_业务说明

业务说明

本实例通过Seata中间件实现分布式事务,模拟两个账户的转账交易 过程。两个账户在两个不同的银行(张三在bank1、李四在 bank2),bank1和bank2是两个微服务。交易过程中,张三给李四 转账制定金额。上述交易步骤,要么一起成功,要么一起失败,必须是一个整体性的事务。

工程环境

创建数据库

bank1库,包含张三账户

CREATE DATABASE /*!32312 IF NOT EXISTS*/`bank1`
/*!40100 DEFAULT CHARACTER SET utf8 */;
USE `bank1`;
/*Table structure for table `account_info` */
DROP TABLE IF EXISTS `account_info`;
CREATE TABLE `account_info` (
  `id` bigint(20) NOT NULL AUTO_INCREMENT,
  `account_name` varchar(100) COLLATE utf8_bin
DEFAULT NULL COMMENT '户主姓名'
,
  `account_no` varchar(100) COLLATE utf8_bin
DEFAULT NULL COMMENT '银行卡号'
,
  `account_password` varchar(100) COLLATE
utf8_bin DEFAULT NULL COMMENT '帐户密码'
,`account_balance` double DEFAULT NULL COMMENT
'帐户余额'
,
  PRIMARY KEY (`id`) USING BTREE
) ENGINE=InnoDB AUTO_INCREMENT=3 DEFAULT
CHARSET=utf8 COLLATE=utf8_bin
ROW_FORMAT=DYNAMIC;
/*Data for the table `account_info` */
insert  into
`account_info`(`id`,`account_name`,`account_no`
,`account_password`,`account_balance`) values(2,'张三','1',NULL,1000);

bank2库,包含李四账户

CREATE DATABASE /*!32312 IF NOT EXISTS*/`bank2`
/*!40100 DEFAULT CHARACTER SET utf8 */;
USE `bank2`;
/*Table structure for table `account_info` */
DROP TABLE IF EXISTS `account_info`;
CREATE TABLE `account_info` (
  `id` bigint(20) NOT NULL AUTO_INCREMENT,
  `account_name` varchar(100) COLLATE utf8_bin
DEFAULT NULL COMMENT '户主姓名'
,
  `account_no` varchar(100) COLLATE utf8_bin
DEFAULT NULL COMMENT '银行卡号'
,
  `account_password` varchar(100) COLLATE
utf8_bin DEFAULT NULL COMMENT '帐户密码'
,`account_balance` double DEFAULT NULL COMMENT
'帐户余额'
,
  PRIMARY KEY (`id`) USING BTREE
) ENGINE=InnoDB AUTO_INCREMENT=4 DEFAULT
CHARSET=utf8 COLLATE=utf8_bin
ROW_FORMAT=DYNAMIC;
/*Data for the table `account_info` */
insert  into
`account_info`(`id`,`account_name`,`account_no`
,`account_password`,`account_balance`) values(3,'李四','2',NULL,0);

Seata提供XA模式实现分布式事务_下载启动Seata服务

下载seata服务器

下载地址 :https://github.com/seata/seata/releases

解压并启动

tar -zxvf seata-server-1.4.2.tar.gz -C
/usr/local/
#后台运行
nohup sh seata-server.sh -p 9999 -h
192.168.66.100 -m file &> seata.log &

注意:

其中9999为服务端口号;file为启动模式,这里指seata服务将采用文件的方式存储信息。

测试

查看启动日志

cat seata.log

Seata提供XA模式实现分布式事务_搭建聚合父工程构建

创建工程distribute-transaction

字符编码

注解生效激活

Java编译版本选择

<!-- 指定JDK版本-->
    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compilerplugin</artifactId>
                <configuration>
                    <source>1.8</source>
                    <target>1.8</target>
                    <encoding>UTF-8</encoding>
                </configuration>
            </plugin>
        </plugins>
    </build>

File Type过滤

pom配置版本

<properties>
        <spring-boot.version>2.6.3</spring-boot.version>
        <spring.cloud.version>2021.0.1</spring.cloud.version>
        <spring.cloud.alibaba.version>2021.0.1.0</spring.cloud.alibaba.version>
      <lombok.version>1.18.22</lombok.version>
</properties>
    <dependencyManagement>
        <dependencies>  <!--spring boot 2.6.3-->
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-bootstarter-parent</artifactId>
                <version>${spring-boot.version}</version>
                <type>pom</type>
                <scope>import</scope>
            </dependency>
            <!--       SpringCloud -->
            <dependency>
                <groupId>org.springframework.cloud</groupId>
                <artifactId>spring-clouddependencies</artifactId>
                <version>${spring.cloud.version}</version>
                <type>pom</type>
                <scope>import</scope>
            </dependency>
            <!--     SpringCloud Aliabab       -->
            <dependency>
                <groupId>com.alibaba.cloud</groupId>
                <artifactId>spring-cloudalibaba-dependencies</artifactId>
                <version>${spring.cloud.alibaba.version}</version>
                <type>pom</type>
                <scope>import</scope>
            </dependency>
            <dependency>
                <groupId>org.projectlombok</groupId>
                <artifactId>lombok</artifactId>
                <version>${lombok.version}</version>
            </dependency>
        </dependencies>
    </dependencyManagement>

IDEA开启Dashboard

普通的Run面板

Run Dashboard面板

修改配置文件

在.idea/workspace.xml 文件中找到

添加配置

<component name="RunDashboard">
  <option name="ruleStates">
    <list>
      <RuleState>
        <option name="name" value="ConfigurationTypeDashboardGroupingRule"/>
      </RuleState>
      <RuleState>
        <option name="name" value="StatusDashboardGroupingRule" />
      </RuleState>
    </list>
  </option>
  <option name="configurationTypes">
  <set>
    <option value="SpringBootApplicationConfigurationType"/>
  </set>
</option>
</component>

Seata提供XA模式实现分布式事务_转账功能实现上

实现如下功能

李四账户增加金额。

创建bank2

pom引入依赖

<dependency>
          <groupId>org.springframework.boot</groupId>
          <artifactId>spring-boot-starterweb</artifactId>
        </dependency>
        <dependency>
            <groupId>com.baomidou</groupId>
            <artifactId>mybatis-plus-bootstarter</artifactId>
            <version>3.5.1</version>
        </dependency>
        <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connectorjava</artifactId>
            <version>5.1.49</version>
        </dependency>
        <dependency>
           <groupId>org.projectlombok</groupId>
           <artifactId>lombok</artifactId>
        </dependency>
        <dependency>
            <groupId>com.alibaba.cloud</groupId>
            <artifactId>spring-cloud-starteralibaba-nacos-discovery</artifactId>
        </dependency>

编写主启动类

//添加对mapper包扫描 Mybatis-plus
@MapperScan("com.itbaizhan.mapper")
@SpringBootApplication
@Slf4j
//开启发现注册
@EnableDiscoveryClient
public class SeataBank2Main6002 {
    public static void main(String[] args) {
       SpringApplication.run(SeataBank1Main6002.class,args);
        log.info("************** SeataBank1Main6002 *************");
   }
}

编写YML配置文件

server:
 port: 6002
spring:
 application:
   name: seata-bank2
cloud:
   nacos:
     discovery:
        # Nacos server地址
       server-addr: 192.168.66.101:8848
 datasource:
   url: jdbc:mysql://localhost:3306/bank2?useUnicode=true&characterEncoding=utf-8&useSSL=false&serverTimezone=UTC
   username: root
   password01: 123456
   driver-class-name: com.mysql.jdbc.Driver

代码生成

引入Mybatis Plus代码生成依赖

<dependency>
            <groupId>com.baomidou</groupId>
            <artifactId>mybatis-plusgenerator</artifactId>
            <version>3.5.2</version>
        </dependency>
        <!-- 模板引擎 -->
        <dependency>
            <groupId>org.apache.velocity</groupId>
            <artifactId>velocity-enginecore</artifactId>
            <version>2.0</version>
        </dependency>

生成代码

package com.itbaizhan.utils;
import com.baomidou.mybatisplus.generator.FastAutoGenerator;
import com.baomidou.mybatisplus.generator.config.rules.NamingStrategy;
import java.util.Arrays;
import java.util.List;
public class CodeGenerator {
    public static void main(String[] args) {
      FastAutoGenerator.create("jdbc:mysql://192.168.66.100:3306/bank2", "root", "123456")
               .globalConfig(builder -> {
                    builder.author("itxiaotong")// 设置作者
                           .commentDate("MM-dd") // 注释日期格式
                           .outputDir(System.getProperty("user.dir")+"/xa-seata/bank2"+ "/src/main/java/") // 指定输出目录
                           .fileOverride(); //覆盖文件
               })
                // 包配置
               .packageConfig(builder -> { builder.parent("com.itbaizhan") // 包名前缀
                           .entity("entity")//实体类包名
                           .mapper("mapper")//mapper接口包名
                           .service("service"); //service包名
               })
               .strategyConfig(builder -> {
                    // 设置需要生成的表名
                    List<String> strings = Arrays.asList("account_info");
                    builder.addInclude(strings)
                            // 开始实体类配置
                           .entityBuilder()
                            // 开启lombok模型
                           .enableLombok()
                            //表名下划线转驼峰
                           .naming(NamingStrategy.underline_to_camel)
                            //列名下划线转驼峰
                           .columnNaming(NamingStrategy.underline_to_camel);
               })
               .execute();
   }
}

编写转账接口

public interface IAccountInfoService {
    //李四增加金额
    void updateAccountBalance(String accountNo, Double amount);
}

编写转账接口实现类

@Service
@Slf4j
public class AccountInfoServiceImpl implements IAccountInfoService {
    @Autowired
    AccountMapper accountMapper;
    @Override
    public void updateAccountBalance(String accountNo, Double amount) {
        // 1. 获取用户信息
        AccountInfo accountInfo = accountMapper.selectById(accountNo);
        accountInfo.setAccountBalance(accountInfo.getAccountBalance() + amount);
        accountMapper.updateById(accountInfo);
   }
}

编写控制层

@RestController
@RequestMapping("/bank2")
public class Bank2Controller {
    @Autowired
    IAccountInfoService accountInfoService;
    //接收张三的转账
    @GetMapping("/transfer")
    public String transfer(Double amount){
        //李四增加金额
        accountInfoService.updateAccountBalance("3",amount);
        return "bank2"+amount;
   }
}

Seata提供XA模式实现分布式事务_转账功能实现下

pom引入依赖

<dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <dependency>
            <groupId>com.baomidou</groupId>
            <artifactId>mybatis-plus-boot-starter</artifactId>
            <version>3.5.1</version>
        </dependency>
        <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
            <version>5.1.49</version>
        </dependency>
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
        </dependency>
        <dependency>
            <groupId>com.alibaba.cloud</groupId>
            <artifactId>spring-cloud-starteralibaba-nacos-discovery</artifactId>
        </dependency>
        <!-- openfeign -->
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-openfeign</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-loadbalancer</artifactId>
        </dependency>

编写主启动类

//添加对mapper包扫描 Mybatis-plus
@MapperScan("com.itbaizhan.mapper")
//开启OpenFiegn
@EnableFeignClients
@SpringBootApplication
@Slf4j
//开启发现注册
@EnableDiscoveryClient
public class SeataBank1Main6001 {
    public static void main(String[] args) {
      SpringApplication.run(SeataBank1Main6001.class,args);
        log.info("**************SeataBank1Main6001 *************");
   }
}

编写YML配置文件

server:
 port: 6001
spring:
 application:
   name: seata-bank1
 cloud:
   nacos:
     discovery:
        # Nacos server地址
       server-addr: 192.168.66.101:8848
 datasource:
   url: jdbc:mysql://localhost:3306/bank1?useUnicode=true&characterEncoding=utf-8&useSSL=true&serverTimezone=UTC
   username: root
   password01: 123456
   driver-class-name: com.mysql.jdbc.Driver

创建实体类

@AllArgsConstructor
@NoArgsConstructor
@Builder
@TableName("account_info")
@Data
public class AccountInfo {
    //id
    @TableId
    private Long id;
    //户主姓名
    @TableField("account_name")
    private String accountName;
    //银行卡号
    @TableField("account_no")
    private String accountNo;
    //账户密码
    @TableField("account_password")
    private String accountPassword;
    //账户余额
    @TableField("account_balance")
    private Double accountBalance;
}

编写持久层

@Component
@Mapper
public interface AccountMapper extends BaseMapper<AccountInfo> { }

编写转账接口

public interface IAccountInfoService {
    //张三扣减金额
    public void updateAccountBalance(String accountNo, Double amount);
}

编写远程调用接口

@Component
@FeignClient(value="seata-bank2")
public interface Bank2Client {
    //远程调用李四的微服务
    @GetMapping("/bank2/transfer")
    String transfer(@RequestParam("amount") Double amount);
}

编写转账接口实现类

@Service
@Slf4j
public class AccountInfoServiceImpl implements
IAccountInfoService {
    @Autowired
    AccountMapper accountMapper;
    @Autowired
    Bank2Client bank2Client;
    @Override
    public void updateAccountBalance(String accountNo, Double amount) {
        // 1. 获取用户信息
        AccountInfo accountInfo = accountMapper.selectById(2);
        // 2. 判断张三账户余额是否有钱
        if (accountInfo.getAccountBalance() > amount){
        //扣减张三的金额
          accountInfo.setAccountBalance(accountInfo.getAccountBalance()-amount);
            int result = accountMapper.updateById(accountInfo);
            if (result!=0){
                //调用李四微服务,转账
                bank2Client.transfer(amount);
           }
       }
   }
}

编写控制层

@RestController
public class Bank1Controller {
    @Autowired
    IAccountInfoService IAccountInfoService;
    //张三转账
    @GetMapping("/transfer")
    public String transfer(Double amount){
        IAccountInfoService.updateAccountBalance("1",amount);
        return "bank1"+amount;
   }
}

Seata提供XA模式实现分布式事务_没有引入分布式事物问题演示

初始数据库数据

正常情况

发送请求http://localhost:6001/transfer?amount=2

制造异常

在bank2微服务制造异常

异常后测试

发送请求http://localhost:6001/transfer?amount=2

Seata提供XA模式实现分布式事务_项目引入Seata

创建 UNDO_LOG 表

SEATA XA 模式需要 UNDO_LOG

-- 注意此处0.3.0+ 增加唯一索引 ux_undo_log
CREATE TABLE `undo_log` (
  `id` bigint(20) NOT NULL AUTO_INCREMENT,
  `branch_id` bigint(20) NOT NULL,
  `xid` varchar(100) NOT NULL,
  `context` varchar(128) NOT NULL,
  `rollback_info` longblob NOT NULL,
  `log_status` int(11) NOT NULL,
  `log_created` datetime NOT NULL,
  `log_modified` datetime NOT NULL,
  `ext` varchar(100) DEFAULT NULL,
  PRIMARY KEY (`id`),
  UNIQUE KEY `ux_undo_log` (`xid`,`branch_id`)
) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT
CHARSET=utf8;

添加依赖

<dependency>
     <groupId>com.alibaba.cloud</groupId>
     <artifactId>spring-cloud-starter-alibaba-seata</artifactId>
</dependency>

修改配置文件YML

seata:
  # 注册中心
 registry:
   type: file
 service:
    # seata服务端的地址和端口信息,多个使用英文分号分隔
   grouplist:
     default: 192.168.66.100:9999
 tx-service-group: my_test_tx_group

bank1微服务开启全局事物

@Transactional

@GlobalTransactional  //开启全局事务

bank2开启事物

测试分布式事物

发送请求http://localhost:6001/transfer?amount=2

总结

传统2PC(基于数据库XA协议)和Seata实现2PC的两种2PC方案, 由于Seata的零入侵并且解决了传统2PC长期锁资源的问题,所以推荐采用Seata实现2PC。

XA强一致性分布式事务实战_Atomikos介绍

简单介绍

产品分两个版本:

1、TransactionEssentials:开源的免费产品;

2、ExtremeTransactions:上商业版,需要收费。

这两个产品的关系如下图所示:

什么是JTA

Java事务API(JTA:Java Transaction API)和它的同胞Java事务服 务(JTS:Java Transaction Service),为J2EE平台提供了分布式事务服务(distributed transaction)的能力。

注意:

要想使用用 JTA 事务,那么就需要有一个实现 javax.sql.XADataSource 、 javax.sql.XAConnection 和 javax.sql.XAResource 接口的 JDBC 驱动程序。一个实现了这些 接口的驱动程序将可以参与 JTA 事务。一个 XADataSource 对象就是一个 XAConnection 对象的工厂。XAConnection 是参与 JTA 事务的JDBC 连接。

XA强一致性分布式事务实战_业务说明

场景介绍

本案例使用Atomikos框架实现XA强一致性分布式事务,模拟跨库转账的业务场景。不同账户之间的转账操作通过同一个项目程序完成。

说明:

转账服务不会直接连接数据库进行转账操作,而是通过 Atomikos框架对数据库连接进行封装,通过Atomikos框架操作 不同的数据库。由于Atomikos框架内部实现了XA分布式事务协 议,因此转账服务的逻辑处理不用关心分布式事务是如何实现的,只需要关注具体的业务逻辑。

框架选型

user_account数据表结构

设计完数据表后,在192.168.66.100服务器创建2个数据库,分别 为tx-xa-01和tx-xa-02,分别在2个数据库中创建转出金额数据库。

DROP TABLE IF EXISTS `user_account`;
CREATE TABLE `user_account`  (
  `account_no` varchar(64) CHARACTER SET utf8
COLLATE utf8_bin NOT NULL COMMENT '账户编号'
,
  `account_name` varchar(64) CHARACTER SET utf8
COLLATE utf8_bin NULL DEFAULT NULL COMMENT '账户
名称'
,
  `account_balance` decimal(10, 2) NULL DEFAULT
NULL COMMENT '账户余额'
,
  PRIMARY KEY (`account_no`) USING BTREE
) ENGINE = InnoDB CHARACTER SET = utf8 COLLATE
= utf8_bin ROW_FORMAT = Dynamic;

添加数据

tx-xa-01库中添加数据。

INSERT INTO `user_account` VALUES ('1001','张三', 10000.00);

tx-xa-02库中添加数据。

INSERT INTO `user_account` VALUES ('1002','李四', 10000.00);

XA强一致性分布式事务实战_项目搭建

创建atomikos-xa项目

创建依赖

<dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-jta-atomikos</artifactId>
        </dependency>
        <!-- druid连接池依赖组件-->
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>druid-spring-boot-starter</artifactId>
            <version>1.1.22</version>
        </dependency>
        <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
        </dependency>
        <dependency>
            <groupId>com.baomidou</groupId>
            <artifactId>mybatis-plus-boot-starter</artifactId>
        </dependency>
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
        </dependency>
    </dependencies>

编写配置文件

server:
 port: 6003
spring:
 autoconfigure:
    #停用druid连接池的自动配置
   exclude:
       com.alibaba.druid.spring.boot.autoconfigure.DruidDataSourceAutoConfigure
 datasource:
    #选用druid的XADataSource数据源,因为这个数据源支持分布式事务管理
   type: com.alibaba.druid.pool.xa.DruidXADataSource
    #以下是自定义字段
   dynamic:
     primary: master
     datasource:
       master:
         url:
jdbc:mysql://192.168.66.102:3306/tx-xa-01?
useUnicode=true&characterEncoding=utf8&allowMultiQueries=true&useSSL=false&zeroDateT
imeBehavior=convertToNull&serverTimezone=Asia/S
hanghai&autoReconnect=true
         username: root
         password01: 123456
         driver-class-name: com.mysql.jdbc.Driver
       slave:
         url:
jdbc:mysql://192.168.66.102:3306/tx-xa-02?
useUnicode=true&characterEncoding=utf8&allowMultiQueries=true&useSSL=false&zeroDateT
imeBehavior=convertToNull&serverTimezone=Asia/S
hanghai&autoReconnect=true
         username: root
         password01: 123456
         driver-class-name: com.mysql.jdbc.Driver
       validation-query: SELCET 1

编写主启动类

@Slf4j
@SpringBootApplication
@EnableTransactionManagement(proxyTargetClass = true)
public class TxXaStarter {
    public static void main(String[] args){
        SpringApplication.run(TxXaStarter.class,args);
        log.info("*************** TxXaStarter*********");
   }
}

XA强一致性分布式事务实战_多数据源实现

创建第一个数据源的配置类DBConfig1

@Data
@ConfigurationProperties(prefix = "spring.datasource.dynamic.datasource.master")
public class DBConfig1 {
    private String url;
    private String username;
    private String password;
    private String dataSourceClassName;
}

创建第二个数据源的配置类DBConfig2

@Data
@ConfigurationProperties(prefix = "spring.datasource.dynamic.datasource.slave")
public class DBConfig2 {
    private String url;
    private String username;
    private String password;
    private String dataSourceClassName;
}

创建持久层接口

分别在com.itbaizhan.mapper1包和com.itbaizhan.mapper2包下创建UserAccount1Mapper接口和UserAccount2Mapper接口。

public interface UserAccount1Mapper extends BaseMapper<UserAccount> {}
public interface UserAccount2Mapper extends BaseMapper<UserAccount> {}

创建MyBatisConfig1类

MyBatisConfig1类的作用是整合Atomikos框架,读取DBConfig1类 中的信息,实现数据库连接池,最终通过Atomikos框架的数据库连接池连接数据库并操作。

@Configuration
@MapperScan(basePackages = "com.itbaizhan.mapper1", sqlSessionTemplateRef = "masterSqlSessionTemplate")
public class MyBatisConfig1 {
    @Bean(name = "masterDataSource")
    public DataSource masterDataSource(DBConfig1 dbConfig1) {
        AtomikosDataSourceBean sourceBean = new AtomikosDataSourceBean();
        sourceBean.setUniqueResourceName("masterDataSource");
        sourceBean.setXaDataSourceClassName(dbConfig1.getDataSourceClassName());
        sourceBean.setTestQuery("select 1");
        sourceBean.setBorrowConnectionTimeout(3);
        MysqlXADataSource dataSource = new MysqlXADataSource();
        dataSource.setUser(dbConfig1.getUsername());
        dataSource.setPassword(dbConfig1.getPassword());
        dataSource.setUrl(dbConfig1.getUrl());
        sourceBean.setXaDataSource(dataSource);
        return sourceBean;
   }
    @Bean(name = "masterSqlSessionFactory")
 public SqlSessionFactory masterSqlSessionFactory(@Qualifier("masterDataSource") DataSource dataSource) throws Exception
{
        MybatisSqlSessionFactoryBean sqlSessionFactoryBean = new MybatisSqlSessionFactoryBean();
        sqlSessionFactoryBean.setDataSource(dataSource);
        return sqlSessionFactoryBean.getObject();
   }
    @Bean(name = "masterSqlSessionTemplate")
    public SqlSessionTemplate masterSqlSessionTemplate(@Qualifier("masterSqlSessionFactory") SqlSessionFactory
sqlSessionFactory){
        return new SqlSessionTemplate(sqlSessionFactory);
   }
}

创建MyBatisConfig2类

MyBatisConfig2类的作用与MyBatisConfig1类的作用相似,只不过 MyBatisConfig2类读取的是DBConfig2类中的信息,封装的是整合 了Atomikos框架的另一个数据源的数据库连接池,通过连接池连接数据库并操作。

@Configuration
@MapperScan(basePackages = "com.itbaizhan.mapper2", sqlSessionTemplateRef = "slaveSqlSessionTemplate")
public class MyBatisConfig2 {
    @Bean(name = "slaveDataSource")
    public DataSource slaveDataSource(DBConfig2 dbConfig2) {
        AtomikosDataSourceBean sourceBean = new AtomikosDataSourceBean();
        sourceBean.setUniqueResourceName("slaveDataSource");
        sourceBean.setXaDataSourceClassName(dbConfig2.getDataSourceClassName());
        sourceBean.setTestQuery("select 1");
        sourceBean.setBorrowConnectionTimeout(3);
        MysqlXADataSource dataSource = new MysqlXADataSource();
        dataSource.setUser(dbConfig2.getUsername());
        dataSource.setPassword(dbConfig2.getPassword());
        dataSource.setUrl(dbConfig2.getUrl());
        sourceBean.setXaDataSource(dataSource);
        return sourceBean;
   }
    @Bean(name = "slaveSqlSessionFactory")
  public SqlSessionFactory slaveSqlSessionFactory(@Qualifier("slaveDataSource") DataSource dataSource) throws Exception {
        MybatisSqlSessionFactoryBean sqlSessionFactoryBean = new MybatisSqlSessionFactoryBean();
        sqlSessionFactoryBean.setDataSource(dataSource);
        return sqlSessionFactoryBean.getObject();
   }
    @Bean(name = "slaveSqlSessionTemplate")
    public SqlSessionTemplate slaveSqlSessionTemplate(@Qualifier("slaveSqlSessionFactory") SqlSessionFactory sqlSessionFactory){
        return new SqlSessionTemplate(sqlSessionFactory);
   }
}

XA强一致性分布式事务实战_业务层实现

项目的业务逻辑层主要实现具体的跨库转账的业务逻辑,由于具体 的XA跨库分布式事务是由Atomikos框架内部实现的,因此在业务逻 辑层处理跨库转账的逻辑时,就像操作本地数据库一样简单。

创建UserAccount类

@Data
@TableName("user_account")
@AllArgsConstructor
@NoArgsConstructor
public class UserAccount implements
Serializable {
    private static final long serialVersionUID = 6909533252826367496L;
    /**
     * 账户编号
     */
    @TableId
    private String accountNo;
    /**
     * 账户名称
     */
    private String accountName;
    /**
     * 账户余额
     */
    private BigDecimal accountBalance;
}

创建UserAccountService接口

public interface UserAccountService {
      /**
     * 跨库转账
     * @param sourceAccountNo 转出账户
     * @param targetSourceNo 转入账户
     * @param bigDecimal 金额
     */
   void transferAccounts(String sourceAccountNo, String targetSourceNo,BigDecimal transferAmount);
}

实现UserAccountService接口

package com.itbaizhan.service.impl;
import com.itbaizhan.entity.UserAccount;
import com.itbaizhan.mapper1.UserAccountMapper1;
import com.itbaizhan.mapper2.UserAccountMapper2;
import com.itbaizhan.service.IUserAccountService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import java.math.BigDecimal;
/**
* <p>
* 服务实现类
* </p>
*
* @author itbaizhan
* @since 05-13
*/
@Service
public class UserAccountServiceImpl  implements IUserAccountService {
    @Autowired
    private UserAccountMapper1 userAccountMapper1;
    @Autowired
    private UserAccountMapper2 userAccountMapper2;
    /**
     * 跨库转账
     * @param sourceAccountNo 源账户
     * @param targetSourceNo 目标账户
     * @param bigDecimal 金额
     */
    @Transactional
    @Override
  public void transofer(String sourceAccountNo, String targetSourceNo, BigDecimal bigDecimal) {
        // 1. 查询原账户
        UserAccount sourceUserAccount = userAccountMapper1.selectById(sourceAccountNo);
        // 2. 查询目标账户
        UserAccount targetUserAccount = userAccountMapper2.selectById(targetSourceNo);
        // 3. 判断转入账户和转出账户是否为空
        if (sourceAccountNo != null && targetUserAccount != null){
            // 4. 判断转出账户是否余额不足
            if (sourceUserAccount.getAccountBalance().compareTo(bigDecimal) < 0){
                throw  new RuntimeException("余额不足");
           }
            // 5.更新金额
          sourceUserAccount.setAccountBalance(sourceUserAccount.getAccountBalance().subtract(bigDecimal));
            // 6.张三账户减金额
          userAccountMapper1.updateById(sourceUserAccount);
            System.out.println(10/0);
            // 7.更新金额
          targetUserAccount.setAccountBalance(targetUserAccount.getAccountBalance().add(bigDecimal));
            // 8.张三账户减金额
          userAccountMapper2.updateById(targetUserAccount);
       }
   }
}

分布式架构的理论知识_BASE理论

为什么会出现BASE理论

CAP 理论表明,对于一个分布式系统而言,它是无法同时满足 Consistency(强一致性)、Availability(可用性) 和 Partition tolerance(分区容忍性) 这三个条件的,最多只能满足其中两个。

简介

BASE 理论起源于 2008 年, 由 eBay 的架构师 Dan Pritchett 在 ACM 上发表。

什么是BASE理论

BASE 是 Basically Available(基本可用) 、Soft-state(软状态) 和 Eventually Consistent(最终一致性) 三个短语的缩写。

核心思想:

既是无法做到强一致性(Strong consistency),但每个应用都可以根据自身的业务特点,采用适当的方式来使系统达到最终一致性(Eventual consistency)。

BASE 理论三要素

基本可用(Basically Available)

基本可用是指分布式系统在出现故障的时候,允许损失部分可用性,即保证核心可用。允许损失部分可用性。但是,这绝不等价于 系统不可用。

软状态(Soft State)

软状态是指允许系统存在中间状态,而该中间状态不会影响系统整体可用性。即允许系统在多个不同节点的数据副本存在数据延时。

注意:

用户在商城下单时,因网络超时等因素,订单处于“支付中”的状 态,待数据最终一致后状态将变更为“关闭”或“成功”状态。

最终一致性(Eventual Consistency)

最终一致性是指系统中的所有数据副本经过一定时间后,最终能够 达到一致的状态。弱一致性和强一致性相反,最终一致性是弱一致性的一种特殊情况。

总结

ACID 是数据库事务完整性的理论,CAP 是分布式系统设计理论, BASE是 CAP 理论中 AP 方案的延伸。符合Base理论的事务可以称为柔性事务。

分布式事务解决方案_最终一致性分布式事务

什么是最终一致性事务

强一致性分布式事务解决方案要求参与事务的各个节点的数据时刻 保持一致,查询任意节点的数据都能得到最新的数据结果。这就导 致在分布式场景,尤其是高并发场景下,系统的性能受到影响。而 最终一致性分布式事务解决方案并不要求参与事务的各节点数据时刻保持一致,允许其存在中间状态,只要一段时间后,能够达到数据的最终一致状态即可。

典型方案

为了解决分布式、高并发场景下系统的性能问题,业界基于Base理论提出了最终一致性分布式事务解决方案。

适用场景

优缺点

最终一致性分布式事务解决方案的优点:

最终一致性分布式事务解决方案的缺点:

最终一致性分布式事务解决方案_TCC是什么

概念

TCC(Try-Confirm-Cancel)又称补偿事务。

TCC核心思想

TCC分布式事务最核心的思想就是在应用层将一个完整的事务操作分为三个阶段。在某种程度上讲,TCC是一种资源,实现了Try、 Confirm、Cancel三个操作接口。

Try阶段

Try阶段是准备执行业务的阶段,在这个阶段尝试执行业务。

Confirm阶段

Confirm阶段是确认执行业务的阶段,在这个阶段确认执行的业务。

Cancel阶段

Cancel阶段取消执行业务。

TCC核心组成

Hmily实现TCC分布式事务实战_认识Hmily-TCC

概述

Hmily是一款高性能,零侵入,金融级分布式事务解决方案,目前 主要提供柔性事务的支持,包含 TCC , TAC (自动生成回滚SQL) 方案, 未来还会支持 XA 等方案。

Hmily实现TCC分布式事务实战_业务场景介绍

案例程序分为3个部分

项目公共模块、转出银行微服务和转入银行微服务。转出银行微服 务和转入银行微服务引用项目的公共模块,转出银行微服务作为 TCC分布式事务中的事务发起方,转入银行微服务作为TCC分布式事 务中的事务被动方。

框架选择

数据库表设计

在模拟跨行转账的业务场景中,核心服务包括转出银行微服务和转入银行微服务,对应的数据库包括转出银行数据库和转入银行数据库。

user_account账户数据表

字段名称 字段类型 字段名称
account_no varchar(64) 账户编号
account_name varchar(64) 账户名称
account_balance decimal(10,2) 账户余额
fransfer_amount decimal(10,2) 转账金额,用于锁定资源

try_log记录表

confirm_log记录表

cancel_log记录表

接下来,在192.168.66.100服务器的MySQL命令行执行如下命令创建转出银行数据库和数据表。

SET NAMES utf8mb4;
SET FOREIGN_KEY_CHECKS = 0;
-- ----------------------------
-- Table structure for cancel_log
-- ----------------------------
DROP TABLE IF EXISTS `cancel_log`;
CREATE TABLE `cancel_log`  (
  `tx_no` varchar(64) CHARACTER SET utf8mb4
COLLATE utf8mb4_general_ci NOT NULL COMMENT '全局事务编号',
`create_time` datetime(0) NULL DEFAULT NULL
COMMENT '创建时间',
  PRIMARY KEY (`tx_no`) USING BTREE
) ENGINE = InnoDB CHARACTER SET = utf8mb4
COLLATE = utf8mb4_general_ci COMMENT = 'Cancel
阶段执行的日志记录' ROW_FORMAT = Dynamic;
-- ----------------------------
-- Records of cancel_log
-- ----------------------------
-- ----------------------------
-- Table structure for confirm_log
-- ----------------------------
DROP TABLE IF EXISTS `confirm_log`;
CREATE TABLE `confirm_log`  (
  `tx_no` varchar(64) CHARACTER SET utf8mb4
COLLATE utf8mb4_general_ci NOT NULL COMMENT '全局事务编号',
  `create_time` datetime(0) NULL DEFAULT NULL
COMMENT '创建时间',
  PRIMARY KEY (`tx_no`) USING BTREE
) ENGINE = InnoDB CHARACTER SET = utf8mb4
COLLATE = utf8mb4_general_ci COMMENT = 'Confirm
阶段执行的日志记录' ROW_FORMAT = Dynamic;
-- ----------------------------
-- Records of confirm_log
-- ----------------------------
-- ----------------------------
-- Table structure for try_log
-- ----------------------------
DROP TABLE IF EXISTS `try_log`;
CREATE TABLE `try_log`  (
  `tx_no` varchar(64) CHARACTER SET utf8mb4
COLLATE utf8mb4_general_ci NOT NULL COMMENT '全局事务编号',
  `create_time` datetime(0) NULL DEFAULT
CURRENT_TIMESTAMP(0) COMMENT '创建时间'
,
  PRIMARY KEY (`tx_no`) USING BTREE
) ENGINE = InnoDB CHARACTER SET = utf8mb4
COLLATE = utf8mb4_general_ci COMMENT = 'Try阶段
执行的日志记录' ROW_FORMAT = Dynamic;
-- ----------------------------
-- Records of try_log
-- ----------------------------
-- ----------------------------
-- Table structure for user_account
-- ----------------------------
DROP TABLE IF EXISTS `user_account`;
CREATE TABLE `user_account`  (
  `account_no` varchar(64) CHARACTER SET
utf8mb4 COLLATE utf8mb4_general_ci NOT NULL
COMMENT '账户编号',
  `account_name` varchar(50) CHARACTER SET
utf8mb4 COLLATE utf8mb4_general_ci NULL DEFAULT
NULL COMMENT '账户名称',
  `account_balance` decimal(10, 2) NULL DEFAULT
NULL COMMENT '账户余额',
  `transfer_amount` decimal(10, 2) NULL DEFAULT
NULL COMMENT '转账金额',
PRIMARY KEY (`account_no`) USING BTREE
) ENGINE = InnoDB CHARACTER SET = utf8mb4
COLLATE = utf8mb4_general_ci COMMENT = '账户信
息' ROW_FORMAT = Dynamic;
-- ----------------------------
-- Records of user_account
-- ----------------------------
INSERT INTO `user_account` VALUES ('1001',
'张三', 10000.00, 0.00);
SET FOREIGN_KEY_CHECKS = 1;

在192.168.66.100服务器的MySQL命令行执行如下命令创建转入银行数据库和数据表。

SET NAMES utf8mb4;
SET FOREIGN_KEY_CHECKS = 0;
-- ----------------------------
-- Table structure for cancel_log
-- ----------------------------
DROP TABLE IF EXISTS `cancel_log`;
CREATE TABLE `cancel_log`  (
  `tx_no` varchar(64) CHARACTER SET utf8mb4
COLLATE utf8mb4_general_ci NOT NULL COMMENT '全局事务编号',
  `create_time` datetime(0) NULL DEFAULT NULL
COMMENT '创建时间',
  PRIMARY KEY (`tx_no`) USING BTREE
) ENGINE = InnoDB CHARACTER SET = utf8mb4
COLLATE = utf8mb4_general_ci COMMENT = 'Cancel阶段执行的日志记录' ROW_FORMAT = Dynamic;
-- ----------------------------
-- Records of cancel_log
-- ----------------------------
-- ----------------------------
-- Table structure for confirm_log
-- ----------------------------
DROP TABLE IF EXISTS `confirm_log`;
CREATE TABLE `confirm_log`  (
  `tx_no` varchar(64) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NOT NULL COMMENT '全局事务编号',
  `create_time` datetime(0) NULL DEFAULT NULL
COMMENT '创建时间',
  PRIMARY KEY (`tx_no`) USING BTREE
) ENGINE = InnoDB CHARACTER SET = utf8mb4
COLLATE = utf8mb4_general_ci COMMENT = 'Confirm
阶段执行的日志记录' ROW_FORMAT = Dynamic;
-- ----------------------------
-- Records of confirm_log
-- ----------------------------
-- ----------------------------
-- Table structure for try_log
-- ----------------------------
DROP TABLE IF EXISTS `try_log`;
CREATE TABLE `try_log`  (
`tx_no` varchar(64) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NOT NULL COMMENT '全
局事务编号',
  `create_time` datetime(0) NULL DEFAULT NULL
COMMENT '创建时间',
  PRIMARY KEY (`tx_no`) USING BTREE
) ENGINE = InnoDB CHARACTER SET = utf8mb4
COLLATE = utf8mb4_general_ci COMMENT = 'Try阶段
执行的日志记录' ROW_FORMAT = Dynamic;
-- ----------------------------
-- Records of try_log
-- ----------------------------
-- ----------------------------
-- Table structure for user_account
-- ----------------------------
DROP TABLE IF EXISTS `user_account`;
CREATE TABLE `user_account`  (
  `account_no` varchar(64) CHARACTER SET
utf8mb4 COLLATE utf8mb4_general_ci NOT NULL COMMENT '账户编号',
  `account_name` varchar(50) CHARACTER SET
utf8mb4 COLLATE utf8mb4_general_ci NULL DEFAULT
NULL COMMENT '账户名称',
  `account_balance` decimal(10, 2) NULL DEFAULT
NULL COMMENT '账户余额',
  `transfer_amount` decimal(10, 2) NULL DEFAULT
NULL COMMENT '转账金额',
  PRIMARY KEY (`account_no`) USING BTREE) ENGINE = InnoDB CHARACTER SET = utf8mb4
COLLATE = utf8mb4_general_ci COMMENT = '账户信息' ROW_FORMAT = Dynamic;
-- ----------------------------
-- Records of user_account
-- ----------------------------
INSERT INTO `user_account` VALUES ('1002','李四', 10000.00, 0.00);
SET FOREIGN_KEY_CHECKS = 1;

Hmily实现TCC分布式事务_项目搭建

创建父工程tx-tcc

设置逻辑工程

<packaging>pom</packaging>

创建公共模块

创建转出银行微服务

创建传入银行微服务

公共模块引入依赖

<dependencies>
        <dependency>
           <groupId>org.springframework.boot</groupId>
           <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
        </dependency>
        <dependency>
            <groupId>com.baomidou</groupId>
            <artifactId>mybatis-plus-boot-starter</artifactId>
        </dependency>
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
        </dependency>
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
        </dependency>
    </dependencies>

Hmily实现TCC分布式事务实战_公共模块

持久层的实现

项目公共模块的持久层是转出银行微服务和转入银行微服务共用 的,在逻辑上既实现了转出金额的处理,又实现了转入金额的处理,同时还实现了TCC分布式事务每个阶段执行记录的保存和查询 操作。

@Data
@NoArgsConstructor
@AllArgsConstructor
public class UserAccountDto implements Serializable {
    private static final long serialVersionUID = 3361105512695088121L;
    /**
     * 自定义事务编号
     */
    private String txNo;
    /**
     * 转出账户
     */
    private String sourceAccountNo;
    /**
     * 转入账户
     */
    private String targetAccountNo;
    /**
     * 金额
     */
    private BigDecimal amount;
}

Dubbo接口的定义

在整个项目的实现过程中,转出银行微服务和转入银行微服务之间 是通过Dubbo实现远程接口调用。因为项目中定义的Dubbo接口需要被转出银行微服务和转入银行微服务同时引用,所以需要将 Dubbo接口放在项目的公共模块。

public interface UserAccountBank02Service {
    /**
     * 转账
     */
    void transferAmountToBank2(UserAccountDto userAccountDto);
}

Hmily实现TCC分布式事务_集成Dubbo框架

转入银行微服务对外提供了转入账户的Dubbo接口,当转出银行微 服务调用转入银行微服务的Dubbo接口时,转入银行微服务会执行增加账户余额的操作。

引入Dubbo依赖

<!-- dubbo依赖 -->
        <dependency>
            <groupId>org.apache.dubbo</groupId>
            <artifactId>dubbo-spring-boot-starter</artifactId>
            <version>2.7.15</version>
        </dependency>
        <!--ZooKeeper客户端-->
        <dependency>
            <groupId>org.apache.curator</groupId>
            <artifactId>curator-framework</artifactId>
            <version>5.2.1</version>
        </dependency>
        <dependency>
           <groupId>org.apache.curator</groupId>
           <artifactId>curator-recipes</artifactId>
            <version>5.2.1</version>
        </dependency>

转入银行微服务编写application.yml

server:
 port: 6005
spring:
 datasource:
   driver-class-name: com.mysql.cj.jdbc.Driver
   url: jdbc:mysql://192.168.66.100:3306/tx-tcc-bank02?
useUnicode=true&characterEncoding=utf8
   username: root
   password01: 123456
dubbo:
 scan:
   base-packages: com.itbaizhan.service
 application:
   name: tx-tcc-bank02
 registry:
   address: zookeeper://localhost:2181
 protocol:
   name: dubbo
   port: 12345

转出银行微服务编写application.yml

server:
 port: 6004
spring:
 datasource:
   driver-class-name: com.mysql.cj.jdbc.Driver
   url: jdbc:mysql://192.168.66.100:3306/tx-tcc-bank02?
useUnicode=true&characterEncoding=utf8
   username: root
   password01: 123456
dubbo:
 scan:
   base-packages: com.itbaizhan.service
 application:
   name: tx-tcc-bank01
 registry:
   address: zookeeper://localhost:2181

编写转入微服务主启动类

@MapperScan("com.itbaizhan.mapper")
@SpringBootApplication
@Slf4j
public class Bank2Main6005 {
    public static void main(String[] args) {
       SpringApplication.run(Bank2Main6005.class,args);
        log.info("************ Bank2Main6005 启动成功 **********");
   }
}

编写转出微服务主启动类

@MapperScan("com.itbaizhan.mapper")
@SpringBootApplication
@Slf4j
public class Bank1Main6004 {
    public static void main(String[] args) {
        SpringApplication.run(Bank1Main6004.class,args);
        log.info("*********** Bank1Main6004*******");
   }
}

业务逻辑层的实现

转出银行微服务的业务逻辑层主要是实现本地账户的金额扣减操作,通过Dubbo框架实现转入银行微服务对应账户余额的增加操作。

转入微服务实现转账功能

@DubboService(version = "1.0.0")
public class UserAccountServicelmpl implements UserAccountBank02Service {
    @Autowired
    private UserAccountMapper userAccountMapper;
    @Override
    public void transferAmountToBank02(UserAccountDTO userAccountDTO) {
        // 1. 根据账户编号查询账户信息
        UserAccount userAccount = userAccountMapper.selectById(userAccountDTO);
        // 2. 判断账户是否存在
        if (userAccount != null ){
            userAccount.setAccountBalance(userAccount.getAccountBalance().add(userAccountDTO.getBigDecimal()));
         // 3. 更新账户
          userAccountMapper.updateById(userAccount);
       }
   }
}

转出微服务实现转账功能

编写转账接口

/**
     * 跨库转账
     * @param userAccountDTO
     */
    void transferAmountToBank02(UserAccountDTO userAccountDTO);

转出微服务转账接口实现

@Service
public class UserAccountServiceImpl  implements IUserAccountService {
    @DubboReference(version = "1.0.0")
    private UserAccountBank02Service userAccountBank02Service;
    @Autowired
     private UserAccountMapper userAccountMapper;
    @Override
    public void transferAmountToBank02(UserAccountDTO userAccountDTO) {
        // 1. 根据账户编号查询账户信息
        UserAccount userAccount = userAccountMapper.selectById(userAccountDTO);
        // 2. 判断账户是否存在
        if (userAccount != null && userAccount.getAccountBalance().compareTo(userAccountDTO.getBigDecimal()) > 0){
          userAccount.setAccountBalance(userAccount.getAccountBalance().subtract(userAccountDTO.getBigDecimal()));
            // 3. 更新账户
          userAccountMapper.updateById(userAccount);
       }
        // 4.远程调用转入微服务账户增加金额
        userAccountBank02Service.transferAmountToBank02(userAccountDTO);
   }
}

转出微服务编写控制层

@RestController
@RequestMapping("/userAccount")
public class UserAccountController {
    @Autowired
    private IUserAccountService iUserAccountService;
    /**
     * 转账
     * @return
     */
    @GetMapping("/transfer")
    public String transfer(UserAccountDTO userAccountDTO){
        iUserAccountService.transferAmountToBank02(userAccountDTO);
        return "转账成功";
   }
}

Hmily实现TCC事务_集成Hmily框架

转入和转出微服务引入依赖

<dependency>
            <groupId>org.dromara</groupId>
            <artifactId>hmily-spring-boot-starter-apache-dubbo</artifactId>
            <version>2.1.1</version>
            <exclusions>
               <exclusion>
                    <groupId>org.dromara</groupId>
                    <artifactId>hmily-repository-mongodb</artifactId>
                </exclusion>
                <exclusion>
                    <groupId>org.slf4j</groupId>
                    <artifactId>slf4j-api</artifactId>
                </exclusion>
                <exclusion>
                    <groupId>org.slf4j</groupId>
                    <artifactId>slf4j-log4j12</artifactId>
                </exclusion>
                <exclusion>
                    <groupId>log4j</groupId>
                    <artifactId>log4j</artifactId>
                </exclusion>
            </exclusions>
        </dependency>

转入微服务编写hmily配置文件

在项目的 resource 新建文件名为: hmily.yml 配置文件

hmily:
 server:
   configMode: local
   appName: user-account-bank02-dubbo
  # 如果server.configMode eq local 的时候才会读取到这里的配置信息.
 config:
   appName: user-account-bank01-dubbo
   serializer: kryo
   contextTransmittalMode: threadLocal
   scheduledThreadMax: 16
   scheduledRecoveryDelay: 60
   scheduledCleanDelay: 60
   scheduledPhyDeletedDelay: 600
   scheduledInitDelay: 30
   recoverDelayTime: 60
   cleanDelayTime: 180
   limit: 200
   retryMax: 10
   bufferSize: 8192
   consumerThreads: 16
   asyncRepository: true
   autoSql: true
   phyDeleted: true
   storeDays: 3
   repository: mysql
repository:
 database:
   driverClassName: com.mysql.cj.jdbc.Driver
   url :
jdbc:mysql://192.168.66.100:3306/hmily?
useUnicode=true&characterEncoding=UTF8&useOldAliasMetadataBehavior=true&autoReconnec
t=true&failOverReadOnly=false&useSSL=false&serv
erTimezone=UTC
   username: root
   password01: 123456
   maxActive: 20
   minIdle: 10
   connectionTimeout: 30000
   idleTimeout: 600000
   maxLifetime: 1800000

转出微服务编写hmily配置文件

在项目的 resource 新建文件名为: hmily.yml 配置文件

hmily:
 server:
   configMode: local
   appName: user-account-bank01-dubbo
  # 如果server.configMode eq local 的时候才会读取到这里的配置信息.
 config:
   appName: user-account-bank01-dubbo
   serializer: kryo
   contextTransmittalMode: threadLocal
   scheduledThreadMax: 16
   scheduledRecoveryDelay: 60
   scheduledCleanDelay: 60
   scheduledPhyDeletedDelay: 600
   scheduledInitDelay: 30
   recoverDelayTime: 60
   cleanDelayTime: 180
   limit: 200
   retryMax: 10
   bufferSize: 8192
   consumerThreads: 16
   asyncRepository: true
   autoSql: true
   phyDeleted: true
   storeDays: 3
   repository: mysql
repository:
 database:
   driverClassName: com.mysql.cj.jdbc.Driver
   url :
jdbc:mysql://192.168.66.100:3306/hmily?
useUnicode=true&characterEncoding=UTF8&useOldAliasMetadataBehavior=true&autoReconnec
t=true&failOverReadOnly=false&useSSL=false&serv
erTimezone=UTC
   username: root
   password: 123456
   maxActive: 20
   minIdle: 10
   connectionTimeout: 30000
   idleTimeout: 600000
   maxLifetime: 1800000

实现接口上添加注解

TCC模式

Hmily实现TCC分布式事务_转入转出微服务实现Try阶段

转出微服务Try阶段

/**
     * 转账功能
     * @param userAccountDTO
     */
   @HmilyTCC(confirmMethod = "sayConfrim", cancelMethod = "sayCancel")
    @Override
    public void transferAmountToBank02(UserAccountDTO userAccountDTO) {
        String txNo = userAccountDTO.getTxNo();
        log.info("**********   执行bank01 的 Try 方法 ,事务id={}",txNo);
        // 1、 幂等处理
        TryLog tryLog = tryLogMapper.selectById(txNo);
        if (tryLog != null){
            return ;
       }
        // 2、 悬挂处理
        if (confirmLogMapper.selectById(txNo) != null || cancelLogMapper.selectById(txNo) != null){
            return ;
       }
         // 3. 根据账户编号查询账户信息
        UserAccount userAccount = baseMapper.selectById(userAccountDTO.getSourceAccountNo());
        // 4. 判断账户是否存在
        if (userAccount != null){
        // 5. 账户金额更新
            LambdaUpdateWrapper<UserAccount> ulw = new LambdaUpdateWrapper<>();
        // 更新转账金额
          ulw.set(UserAccount::getTransferAmount,userAccount.getTransferAmount().add(userAccountDTO.getBigDecimal()));
            // 更新余额
          ulw.set(UserAccount::getAccountBalance,userAccount.getAccountBalance().subtract(userAccountDTO.getBigDecimal()));
          ulw.eq(UserAccount::getAccountNo,userAccountDTO.getSourceAccountNo());
            baseMapper.update(null,ulw);
       }
        // 7. 准备阶段记录
        TryLog tryLog1 = new TryLog();
        tryLog1.setTxNo(txNo);
        tryLog1.setCreateTime(LocalDateTime.now());
        tryLogMapper.insert(tryLog1);
         // 8. 远程调用 转入微服务 跨库转账的功能
         userAccountBank02Service.transferAmountToBank02(userAccountDTO);
   }

转入微服务Try阶段

/**
     * 跨库转账
     * @param userAccountDTO
     */
    @HmilyTCC(confirmMethod = "sayConfrim", cancelMethod = "sayCancel")
    @Override
    public void transferAmountToBank02(UserAccountDTO userAccountDTO) {
        String txNo = userAccountDTO.getTxNo();
        log.info("**********   执行bank02 的 Try方法 ,事务id={}",txNo);
        // 1、 幂等处理
        TryLog tryLog = tryLogMapper.selectById(txNo);
        if (tryLog != null){
            return ;
       }
        // 2、 悬挂处理
        if (confirmLogMapper.selectById(txNo) != null || cancelLogMapper.selectById(txNo) != null){
            return ;
       }
        // 3. 根据账户编号查询账户信息
        UserAccount userAccount = userAccountMapper.selectById(userAccountDTO.getTargetAccountNo());
       // 4. 判断账户是否存在
        if (userAccount != null){
            // 5. 账户金额更新
            LambdaUpdateWrapper<UserAccount> ulw = new LambdaUpdateWrapper<>();
            // 更新转账金额
            ulw.set(UserAccount::getTransferAmount,userAccount.getTransferAmount().add(userAccountDTO.getBigDecimal()));
          ulw.eq(UserAccount::getAccountNo,userAccountDTO.getTargetAccountNo());
            userAccountMapper.update(null,ulw);
       }
        // 7. 准备阶段记录
        TryLog tryLog1 = new TryLog();
        tryLog1.setTxNo(txNo);
        tryLog1.setCreateTime(LocalDateTime.now());
        tryLogMapper.insert(tryLog1);
   }

Hmily实现TCC事务_转入转出微服务实现Confirm阶段

编写转出微服务Confirm阶段

/**
     * 确认阶段
     * @param userAccountDTO
     */
    public void sayConfrim(UserAccountDTO userAccountDTO) {
        String txNo = userAccountDTO.getTxNo();
        log.info("**********   执行bank01 的 Confrim方法 ,事务id={}",txNo);
        // 1、幂等处理
        ConfirmLog confirmLog = confirmLogMapper.selectById(txNo);
        if (confirmLog != null){
            return ;
       }
        // 2、根据账户id查询账户
        UserAccount userAccount = baseMapper.selectById(userAccountDTO.getSourceAccountNo());
        userAccount.setTransferAmount(userAccount.getTransferAmount().subtract(userAccountDTO.getBigDecimal()));
        baseMapper.updateById(userAccount);
        // 3、 确认日志记录
        ConfirmLog confirmLog1 = new ConfirmLog();
        confirmLog1.setTxNo(userAccountDTO.getTxNo());
        confirmLog1.setCreateTime(LocalDateTime.now());
        confirmLogMapper.insert(confirmLog1);
   }

编写转入微服务Confirm阶段

/**
     * 确认阶段
     * @param userAccountDTO
     */
    public void sayConfrim(UserAccountDTO userAccountDTO) {
        String txNo = userAccountDTO.getTxNo();
        log.info("**********   执行bank02 的Confrim方法 ,事务id={}",txNo);
        // 1、幂等处理
        ConfirmLog confirmLog = confirmLogMapper.selectById(txNo);
        if (confirmLog != null) {
            return;
       }
        // 2、根据账户id查询账户
        UserAccount userAccount = userAccountMapper.selectById(userAccountDTO.getTargetAccountNo());
        userAccount.setAccountBalance(userAccount.getAccountBalance().add(userAccountDTO.getBigDecimal()));
      userAccount.setTransferAmount(userAccount.getTransferAmount().subtract(userAccountDTO.getBigDecimal()));
      userAccountMapper.updateById(userAccount);
        // 3、 确认日志记录
        ConfirmLog confirmLog1 = new ConfirmLog();
        confirmLog1.setTxNo(userAccountDTO.getTxNo());
        confirmLog1.setCreateTime(LocalDateTime.now());
        confirmLogMapper.insert(confirmLog1);
   }

Hmily实现TCC分布式事务_转入转出微服务实现Cancel阶段

转入微服务Cananl阶段

/**
     * 回滚
     * @param userAccountDto
     */
    @Transactional(rollbackFor = Exception.class)
    public void cancelMethod(UserAccountDto userAccountDto){
        String txNo = userAccountDto.getTxNo();
        log.info("执行bank02的cancel方法,事务id: {}, 参数为:{}",txNo,JSONObject.toJSONString(userAccountDto));
        CancelLog cancelLog = iCancelLogService.findByTxNo(txNo);
        if(cancelLog != null){
            log.info("bank02已经执行过Cancel方法,txNo:{}", txNo);
            return;
       }
        // 保存记录
       iCancelLogService.saveCancelLog(txNo);
       userAccountMapper.cancelUserAccountBalanceBank02(userAccountDto.getAmount(),
       userAccountDto.getTargetAccountNo());
   }

转出微服务Cancel阶段

/**
     * 取消阶段
     * @param userAccountDTO
     */
    public void sayCancel(UserAccountDTO userAccountDTO) {
        String txNo = userAccountDTO.getTxNo();
        log.info("**********   执行bank01 的 Cancel方法 ,事务id={}",txNo);
        // 1. 幂等处理
        CancelLog cancelLog = cancelLogMapper.selectById(txNo);
        if (cancelLog != null ){
            return;
       }
        // 2、根据账户id查询账户
        UserAccount userAccount = baseMapper.selectById(userAccountDTO.getSourceAccountNo());
        userAccount.setAccountBalance(userAccount.getAccountBalance().add(userAccountDTO.getBigDecimal()));
        userAccount.setTransferAmount(userAccount.getTransferAmount().subtract(userAccountDTO.getBigDecimal()));
        baseMapper.updateById(userAccount);
        // 3、记录回滚日志
        CancelLog cancelLog1 = new CancelLog();
        cancelLog1.setTxNo(txNo);
        cancelLog1.setCreateTime(LocalDateTime.now());
        cancelLogMapper.insert(cancelLog1);
   }

最终一致性分布式事务解决方案_什么是可靠消息最终一致性事务

可靠消息最终一致性的基本原理是事务发起方(消息发送者)执行 本地事务成功后发出一条消息,事务参与方(消息消费者)接收到 事务发起方发送过来的消息,并成功执行本地事务。事务发起方和事务参与方最终的数据能够达到一致的状态。

两种实现方式:

1、基于本地消息表

2、基于支持分布式事务的消息中间件,如RocketMQ等

基本原理

在使用可靠消息最终一致性方案解决分布式事务的问题时,需要确保消息发送和消息消费的一致性,从而确保消息的可靠性。

可靠消息最终一致性分布式事务实现_本地消息表

本地消息表模式的核心通过本地事务保证数据业务操作和消息的一 致性,然后通过定时任务发送给消费方或者中间加一层MQ的方 式,保障数据最终一致性。

库表设计

订单微服务中出库本地消息表:

基础功能

分析

Task微服务的任务

可靠消息最终一致性分布式事务实现_RocketMQ事务消息

RocketMQ是阿里巴巴开源的一款支持事务消息的消息中间件,于 2012年正式开源,2017年成为Apache基金会的顶级项目。

实现原理

RocketMQ 4.3版之后引入了完整的事务消息机制,其内部实现了完 整的本地消息表逻辑,使用RocketMQ实现可靠消息分布式事务就 不用用户再实现本地消息表的逻辑了,极大地减轻了开发工作量。

可靠消息最终一致性分布式事务实战_案列业务介绍

业务介绍

通过RocketMQ中间件实现可靠消息最终一致性分布式事务,模拟 商城业务中的下单扣减库存场景。订单微服务和库存微服务分别独立开发和部署。

流程

架构选型

数据库表设计

orders订单数据表

orders数据表存储于tx-msg-orders订单数据库。

DROP TABLE IF EXISTS `orders`;
CREATE TABLE `order`  (
  `id` bigint(20) NOT NULL COMMENT '主键',
  `create_time` datetime(0) NULL DEFAULT NULL COMMENT '创建时间',
  `order_no` varchar(64) CHARACTER SET utf8
COLLATE utf8_bin NULL DEFAULT NULL COMMENT '订单
编号',
  `product_id` bigint(20) NULL DEFAULT NULL COMMENT '商品id',
  `pay_count` int(11) NULL DEFAULT NULL COMMENT '购买数量',
  PRIMARY KEY (`id`) USING BTREE ) ENGINE = InnoDB CHARACTER SET = utf8 COLLATE
= utf8_bin ROW_FORMAT = Dynamic;
SET FOREIGN_KEY_CHECKS = 1;
CREATE TABLE `tx_log`  (
  `tx_no` varchar(64) CHARACTER SET utf8
COLLATE utf8_bin NOT NULL COMMENT '分布式事务全局序列号',
  `create_time` datetime(0) NULL DEFAULT NULL COMMENT '创建时间',
  PRIMARY KEY (`tx_no`) USING BTREE
) ENGINE = InnoDB CHARACTER SET = utf8 COLLATE = utf8_bin ROW_FORMAT = Dynamic;

stock库存数据表

DROP TABLE IF EXISTS `stock`;
CREATE TABLE `stock`  (
  `id` bigint(20) NOT NULL COMMENT '主键id',
  `product_id` bigint(20) NULL DEFAULT NULL COMMENT '商品id',
  `total_count` int(11) NULL DEFAULT NULL COMMENT '商品总库存',
  PRIMARY KEY (`id`) USING BTREE
) ENGINE = InnoDB CHARACTER SET = utf8 COLLATE = utf8_bin ROW_FORMAT = Dynamic;
-- ----------------------------
-- Table structure for tx_log
-- ----------------------------
DROP TABLE IF EXISTS `tx_log`;
CREATE TABLE `tx_log`  (
  `tx_no` varchar(64) CHARACTER SET utf8
COLLATE utf8_bin NOT NULL COMMENT '分布式事务全局序列号',
  `create_time` datetime(0) NULL DEFAULT NULL COMMENT '创建时间',
  PRIMARY KEY (`tx_no`) USING BTREE
) ENGINE = InnoDB CHARACTER SET = utf8 COLLATE = utf8_bin ROW_FORMAT = Dynamic;
SET FOREIGN_KEY_CHECKS = 1;

tx_log事务记录表

可靠消息最终一致性分布式事务实战_Docker安装 RocketMQ

在安装RocketMQ之前,我们先了解一下RocketMQ的部署架构,了 解一下RocketMQ的组件,然后基于当前主流的Docker安装 RocketMQ,我们这里安装单台RocketMQ,但为了防止单节点故 障、保障高可用,生产环境建议安装RocketMQ集群。

安装NameServer

拉取镜像

docker pull rocketmqinc/rocketmq

创建数据存储目录

mkdir -p /docker/rocketmq/data/namesrv/logs
/docker/rocketmq/data/namesrv/store

启动NameServer

docker run -d \
--restart=always \
--name rmqnamesrv \
-p 9876:9876 \
-v
/docker/rocketmq/data/namesrv/logs:/root/logs \
-v
/docker/rocketmq/data/namesrv/store:/root/store
\
-e "MAX_POSSIBLE_HEAP=100000000" \
rocketmqinc/rocketmq \
sh mqnamesrv

安装Broker

border配置:创建 broker.conf 配置文件

vim /docker/rocketmq/conf/broker.conf

# 所属集群名称,如果节点较多可以配置多个
brokerClusterName = DefaultCluster
#broker名称,master和slave使用相同的名称,表明他们的
主从关系
brokerName = broker-a
#0表示Master,大于0表示不同的
slave brokerId = 0
#表示几点做消息删除动作,默认是凌晨4点
deleteWhen = 04
#在磁盘上保留消息的时长,单位是小时
fileReservedTime = 48
#有三个值:SYNC_MASTER,ASYNC_MASTER,SLAVE;同步和
异步表示Master和Slave之间同步数据的机 制;
brokerRole = ASYNC_MASTER
#刷盘策略,取值为:ASYNC_FLUSH,SYNC_FLUSH表示同步刷
盘和异步刷盘;SYNC_FLUSH消息写入磁盘后 才返回成功状
态,ASYNC_FLUSH不需要;
flushDiskType = ASYNC_FLUSH
# 设置broker节点所在服务器的ip地址
brokerIP1 = 192.168.66.100
#剩余磁盘比例
diskMaxUsedSpaceRatio=99

启动broker

docker run -d --restart=always --name rmqbroker
--link rmqnamesrv:namesrv -p 10911:10911 -p
10909:10909 --privileged=true -v
/docker/rocketmq/data/broker/logs:/root/logs -v
/docker/rocketmq/data/broker/store:/root/store
-v
/docker/rocketmq/conf/broker.conf:/opt/rocketmq
-4.4.0/conf/broker.conf -e
"NAMESRV_ADDR=namesrv:9876" -e
"MAX_POSSIBLE_HEAP=200000000"
rocketmqinc/rocketmq sh mqbroker -c
/opt/rocketmq-4.4.0/conf/broker.conf

报错:

部署RocketMQ的管理工具

RocketMQ提供了UI管理工具,名为rocketmq-console,我们选择 docker安装

#创建并启动容器
docker run -d --restart=always --name rmqadmin
-e "JAVA_OPTS=-
Drocketmq.namesrv.addr=192.168.66.100:9876 -
Dcom.rocketmq.sendMessageWithVIPChannel=false"
-p 8080:8080 pangliang/rocketmq-console-ng

关闭防火墙(或者开放端口)

#关闭防火墙
systemctl stop firewalld.service
#禁止开机启动
systemctl disable firewalld.service

测试

访问:http://192.168.66.101:8080/#/ (可以切换中文)

可靠消息最终一致性分布式事务实战_实现订单微服务

创建父工程rocketmq-msg

创建订单微服务子工程

引入依赖

<dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starterweb</artifactId>
        </dependency>
        <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connectorjava</artifactId>
        </dependency>
        <dependency>
            <groupId>org.apache.rocketmq</groupId>
            <artifactId>rocketmq-spring-bootstarter</artifactId>
            <version>2.0.2</version>
        </dependency>
        <dependency>
            <groupId>com.baomidou</groupId>
            <artifactId>mybatis-plus-bootstarter</artifactId>
        </dependency>
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
        </dependency>
    </dependencies>

编写配置文件

server:
 port: 9090
spring:
 application:
   name: tx-msg-stock
 datasource:
   url: jdbc:mysql://192.168.66.100:3306/txmsg-order?
useUnicode=true&characterEncoding=UTF8&useOldAliasMetadataBehavior=true&autoReconnec
t=true&failOverReadOnly=false&useSSL=false
   username: root
   password: 123456
   driver-class-name: com.mysql.cj.jdbc.Driver
################ RocketMQ 配置 ##########
rocketmq:
 name-server: 192.168.66.100:9876
 producer:
   group: order-group

编写主启动类

/**
 * 订单微服务启动成功
 */
@Slf4j
@MapperScan("com.itbaizhan.order.mapper")
@SpringBootApplication
public class OrderMain9090 {
    public static void main(String[] args) {
      SpringApplication.run(OrderMain9090.class,args);
        log.info("************* 订单微服务启动成功*******");
   }
}

代码生成

package com.itbaizhan.utils;
import com.baomidou.mybatisplus.generator.FastAutoGenerator;
import com.baomidou.mybatisplus.generator.config.rules.NamingStrategy;
import java.util.Arrays;
import java.util.List;
public class CodeGenerator {
    public static void main(String[] args) {
      FastAutoGenerator.create("jdbc:mysql://192.168.66.102:3306/tx-msg-order", "root", "123456")
               .globalConfig(builder -> {
                    builder.author("itbaizhan")// 设置作者
                           .commentDate("MMdd") // 注释日期格式
                           .outputDir(System.getProperty("user.dir")+"/rocketmq-msg/orders"+ "/src/main/java/")
                           .fileOverride(); //覆盖文件
               })
                // 包配置
               .packageConfig(builder -> {
                  builder.parent("com.itbaizhan.orders") // 包名前缀
                           .entity("entity")//实体类包名
                           .mapper("mapper")//mapper接口包名
                           .service("service"); //service包名
               })
              .strategyConfig(builder -> {
                    // 设置需要生成的表名
                  builder.addInclude(Arrays.asList("orders","tx_log"))
                            // 开始实体类配置
                           .entityBuilder()
                            // 开启lombok模型
                           .enableLombok()
                            //表名下划线转驼峰
                           .naming(NamingStrategy.underline_to_camel)
                            //列名下划线转驼峰
                           .columnNaming(NamingStrategy.underline_to_camel);
               })
               .execute();
   }
}

创建TxMessage类

在项目的com.itbaizhan.orders.tx包下创建TxMessage类,主要用 来封装实现分布式事务时,在订单微服务、RocketMQ消息中间件 和库存微服务之间传递的全局事务消息,项目中会通过事务消息实现幂等。

@Data
@NoArgsConstructor
@AllArgsConstructor
public class TxMessage implements Serializable
{
    private static final long serialVersionUID = -4704980150056885074L;
    /**
     * 商品id
     */
    private Long productId;
    /**
     * 商品购买数量
     */
    private Integer payCount;
    /**
     * 全局事务编号
     */
    private String txNo;
}

可靠消息最终一致性分布式事务实战_订单微服务业务层实现

业务逻辑层主要实现了用户提交订单后的业务逻辑。

编写OrderService接口

/**
     * 添加订单
     * @param productId 商品id
     * @param payCount 购买数量
     */
    void save(Long productId,Integer payCount);
    /**
     * 提交订单同时保存事务信息
     */
    void submitOrderAndSaveTxNo(TxMessage txMessage);
    /**
     * 提交订单
     * @param productId 商品id
     * @param payCount 购买数量
     */
    void submitOrder(Long productId, Integer payCount);

编写OrderService接口实现

package com.itbaizhan.order.service.impl;
import com.alibaba.fastjson.JSONObject;
import com.itbaizhan.order.entity.Order;
import com.itbaizhan.order.entity.TxLog;
import com.itbaizhan.order.mapper.OrderMapper;
import com.itbaizhan.order.mapper.TxLogMapper;
import com.itbaizhan.order.service.IOrderService;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import com.itbaizhan.order.tx.TxMessage;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import javax.annotation.Resource;
import java.time.LocalDateTime;
import java.util.Date;
import java.util.UUID;
/**
* <p>
* 服务实现类
* </p>
*
* @author itbaizhan
* @since 05-20
*/
@Slf4j
@Service
public class OrderServiceImpl extends
ServiceImpl<OrderMapper, Order> implements IOrderService {
    @Resource
    RocketMQTemplate rocketMQTemplate;
    @Resource
  private TxLogMapper txLogMapper;
    /**
     * 添加
     * @param productId 商品id
     * @param payCount 购买数量
     */
    @Override
    public void save(Long productId, Integer payCount) {
        Order order = new Order();
        // 订单创建时间
        order.setCreateTime(LocalDateTime.now());
        // 生产订单编号
        order.setOrderNo(UUID.randomUUID().toString().replace("-",""));
        // 商品id
        order.setProductId(productId);
        // 购买数量
        order.setPayCount(payCount);
        baseMapper.insert(order);
   }
    @Override
    @Transactional(rollbackFor = Exception.class)
    public void submitOrderAndSaveTxNo(TxMessage txMessage) {
        TxLog txLog = txLogMapper.selectById(txMessage.getTxNo());
    if(txLog != null){
            log.info("订单微服务已经执行过事务,商品id为:{},事务编号为:{}",txMessage.getProductId(),txMessage.getTxNo());
            return;
       }
        //生成订单
      this.save(txMessage.getProductId(),txMessage.getPayCount());
        //生成订单
        txLog = new TxLog();
        txLog.setTxNo(txMessage.getTxNo());
        txLog.setCreateTime(LocalDateTime.now());
        //添加事务日志
        txLogMapper.insert(txLog);
   }
    /**
     * 提交订单
     * @param productId 商品id
     * @param payCount 购买数量
     */
    @Override
    public void submitOrder(Long productId,Integer payCount) {
        //生成全局分布式序列号
        String txNo = UUID.randomUUID().toString();
        TxMessage txMessage = new TxMessage(productId, payCount, txNo);
        JSONObject jsonObject = new JSONObject();
        jsonObject.put("txMessage", txMessage);
        Message<String> message = MessageBuilder.withPayload(jsonObject.toJSONString()).build();
        //发送事务消息   且该消息不允许消费   
        tx_order_group: 指定版事务消息组
      rocketMQTemplate.sendMessageInTransaction("tx_order_group", "topic_txmsg", message, null);
   }
}

可靠消息最终一致性分布式事务实战_订单微服务监听事务消息

执行本地的业务代码

package com.itbaizhan.order.message;
import com.alibaba.fastjson.JSONObject;
import com.itbaizhan.order.entity.TxLog;
import com.itbaizhan.order.mapper.TxLogMapper;
import com.itbaizhan.order.service.IOrderService;
import com.itbaizhan.order.service.ITxLogService;
import com.itbaizhan.order.tx.TxMessage;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.annotation.RocketMQTransactionListener;
import org.apache.rocketmq.spring.core.RocketMQLocalTransactionListener;
import org.apache.rocketmq.spring.core.RocketMQLocalTransactionState;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.Message;
import org.springframework.stereotype.Component;
import org.springframework.transaction.annotation.Transactional;
import javax.annotation.Resource;
/**
* @author itbaizhan
* @version 1.0.0
* @description 监听事务消息
*/
@Slf4j
@Component
@RocketMQTransactionListener(txProducerGroup = "tx_order_group")
public class OrderTxMessageListener implements
RocketMQLocalTransactionListener {
    @Autowired
    private IOrderService orderService;
    @Resource
    private TxLogMapper txLogMapper;
    /**
     * RocketMQ的Producer本地事务:先执行本地的业务代码(使用Spring的事件管理),判断是否成功。
     * 成功返回: RocketMQLocalTransactionState.COMMIT,
     * 失败返回:RocketMQLocalTransactionState.ROLLBACK
     */
    @Override
    @Transactional(rollbackFor = Exception.class)
    public RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object obj) 
      {
        try {
            log.info("订单微服务执行本地事务");
            TxMessage txMessage = this.getTxMessage(msg);
            //执行本地事务
            orderService.submitOrderAndSaveTxNo(txMessage);
            //提交事务
            log.info("订单微服务提交事务");
            // COMMIT:即生产者通知Rocket该消息可以消费
            return RocketMQLocalTransactionState.COMMIT;
       } catch (Exception e) {
            e.printStackTrace();
            //异常回滚事务
            log.info("订单微服务回滚事务");
            // ROLLBACK:即生产者通知Rocket将该消息删除
            return RocketMQLocalTransactionState.ROLLBACK;
       }
   }
    private TxMessage getTxMessage(Message msg)
     {
        String messageString = new String((byte[]) msg.getPayload());
        JSONObject jsonObject = JSONObject.parseObject(messageString);
        String txStr = jsonObject.getString("txMessage");
        return JSONObject.parseObject(txStr,TxMessage.class);
   }
}

网络异常消息处理

/**
     * 因为网络异常或其他原因时,RocketMQ的消息状态处于UNKNOWN时,调用该方法检查Producer的本地 
     * 事务是否已经执行成功,
     * 成功返回: RocketMQLocalTransactionState.COMMIT,
     * 失败返回:RocketMQLocalTransactionState.ROLLBACK
     */
    @Override
    public RocketMQLocalTransactionState checkLocalTransaction(Message msg) {
        log.info("订单微服务查询本地事务");
        TxMessage txMessage = this.getTxMessage(msg);
        // 获取订单的消息
        Integer exists = txLogService.isExistsTx(txMessage.getTxNo());
        if (exists != null) {
            // COMMIT:即生产者通知Rocket该消息可以消费
            return RocketMQLocalTransactionState.COMMIT;
       }
        // UNKNOWN:即生产者通知Rocket继续查询该消息的状态
        return RocketMQLocalTransactionState.UNKNOWN;
}
   private TxMessage getTxMessage(Message msg)
     {
        String messageString = new String((byte[]) msg.getPayload());
        JSONObject jsonObject = JSONObject.parseObject(messageString);
        String txStr = jsonObject.getString("txMessage");
        return JSONObject.parseObject(txStr,TxMessage.class);
     }

可靠消息最终一致性分布式事务实战_实现库存微服务

创建库存微服务tx-msg-stock

引入依赖

<dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starterweb</artifactId>
        </dependency>
        <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connectorjava</artifactId>
        </dependency>
        <dependency>
            <groupId>org.apache.rocketmq</groupId>
            <artifactId>rocketmq-spring-bootstarter</artifactId>
            <version>2.0.1</version>
        </dependency>
        <dependency>
            <groupId>com.baomidou</groupId>
            <artifactId>mybatis-plus-bootstarter</artifactId>
        </dependency>
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
        </dependency>
    </dependencies>

编写配置文件

server:
 port: 6060
spring:
 application:
   name: tx-msg-stock
 datasource:
   url: jdbc:mysql://192.168.66.100:3306/txmsg-stock?
useUnicode=true&characterEncoding=UTF8&useOldAliasMetadataBehavior=true&autoReconnec
t=true&failOverReadOnly=false&useSSL=false
   username: root
   password01: 123456
   driver-class-name: com.mysql.cj.jdbc.Driver
################ RocketMQ 配置 ##########
rocketmq:
 name-server: 192.168.66.100:9876

编写主启动类

package com.itbaizhan.stock;
import lombok.extern.slf4j.Slf4j;
import org.mybatis.spring.annotation.MapperScan;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
/**
* @author itbaizhan
* @version 1.0.0
* @description 库存微服务启动类
*/
@MapperScan("com.itbaizhan.stock.mapper")
@Slf4j
@SpringBootApplication
public class StockServerStarter {
    public static void main(String[] args) {
      SpringApplication.run(StockServerStarter.class, args);
        log.info("**************** 库存服务启动成功 ***********");
   }
}

代码生成

package com.itbaizhan.utils;
import com.baomidou.mybatisplus.generator.FastAutoGenerator;
import com.baomidou.mybatisplus.generator.config.rules.NamingStrategy;
import java.util.Arrays;
import java.util.List;
public class CodeGenerator {
    public static void main(String[] args) {
      FastAutoGenerator.create("jdbc:mysql://192.168.66.102:3306/tx-msg-stock", "root", "123456")
               .globalConfig(builder -> { builder.author("itbaizhan")// 设置作者
                     .commentDate("MMdd") // 注释日期格式
                     .outputDir(System.getProperty("user.dir") +"/rocketmq-msg/stock"+ "/src/main/java/")
                      .fileOverride(); //覆盖文件
               })
                // 包配置
               .packageConfig(builder -> {
                  builder.parent("com.itbaizhan.stock") // 包名前缀
                           .entity("entity")//实体类包名
                           .mapper("mapper")//mapper接口包名
                           .service("service"); //service包名
               })
               .strategyConfig(builder -> {
                    // 设置需要生成的表名
                  builder.addInclude(Arrays.asList("stock","tx_log"))
                            // 开始实体类配置
                           .entityBuilder()
                            // 开启lombok模型
                           .enableLombok() //表名下划线转驼峰
                           .naming(NamingStrategy.underline_to_camel)
                            //列名下划线转驼峰
                           .columnNaming(NamingStrategy.underline_to_camel);
               })
               .execute();
   }
}

编写库存接口

public interface StockService {
    /**
     * 根据id查询库存
     * @param id
     * @return
     */
    Stock getStockById(Long id);
    /**
     * 扣减库存
     */
    void decreaseStock(TxMessage txMessage);
}

可靠消息最终一致性分布式事务实战_库存微服务业务层实现

库存微服务的业务逻辑层主要监听RocketMQ发送过来的事务消 息,并在本地事务中执行扣减库存的操作。

编写库存接口

/**
     * 扣减库存
     */
    void decreaseStock(TxMessage txMessage);

库存接口实现类

package com.itbaizhan.stock.service.impl;
import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
import com.itbaizhan.stock.entity.Stock;
import com.itbaizhan.stock.entity.TxLog;
import com.itbaizhan.stock.mapper.StockMapper;
import com.itbaizhan.stock.mapper.TxLogMapper;
import com.itbaizhan.stock.service.IStockService;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import com.itbaizhan.stock.tx.TxMessage;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import javax.annotation.Resource;
import java.time.LocalDateTime;
/**
* <p>
* 服务实现类
* </p>
*
* @author itbaizhan
* @since 05-20
*/
@Slf4j
@Service
public class StockServiceImpl extends ServiceImpl<StockMapper, Stock> implements
IStockService {
    @Resource
    private StockMapper stockMapper;
    @Resource
    private TxLogMapper txLogMapper;
    @Transactional
    @Override
    public void decreaseStock(TxMessage txMessage) {
        log.info("库存微服务执行本地事务,商品id:{},购买数量:{}", txMessage.getProductId(),
txMessage.getPayCount());
        //检查是否执行过事务
        TxLog txLog = txLogMapper.selectById(txMessage.getTxNo());
        if(txLog != null){
            log.info("库存微服务已经执行过事务,事务编号为:{}", txMessage.getTxNo());
       }
        // 根据商品id查询库存
        QueryWrapper<Stock> queryWrapper = new QueryWrapper<>();
        queryWrapper.eq("product_id",txMessage.getProductId());
        Stock stock = stockMapper.selectOne(queryWrapper);
        if(stock.getTotalCount() < txMessage.getPayCount()){
            throw  new RuntimeException("库存不足");
       }
        // 减库存
       stock.setTotalCount(stock.getTotalCount()- txMessage.getPayCount());
        stockMapper.updateById(stock);
        //生成订单
        txLog = new TxLog();
        txLog.setTxNo(txMessage.getTxNo());
        txLog.setCreateTime(LocalDateTime.now());
        //添加事务日志
        txLogMapper.insert(txLog);
   }
}

库存微服务消费者实现

用于消费RocketMQ发送过来的事务消息,并且调用StockService中的decreaseStock(TxMessage)方法扣减库存。

库存事务消费者

package com.itbaizhan.stock.message;
import com.alibaba.fastjson.JSONObject;
import com.itbaizhan.stock.service.IStockService;
import com.itbaizhan.stock.tx.TxMessage;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
/**
* @author binghe
* @version 1.0.0
* @description 库存事务消费者
*/
@Component
@Slf4j
@RocketMQMessageListener(consumerGroup = "tx_stock_group", topic = "topic_txmsg")
public class StockTxMessageConsumer implements
RocketMQListener<String> {
    @Autowired
    private IStockService stockService;
    @Override
    public void onMessage(String message) {
        log.info("库存微服务开始消费事务消息:{}", message);
        TxMessage txMessage = this.getTxMessage(message);
        stockService.decreaseStock(txMessage);
   }
    private TxMessage getTxMessage(String msg){
        JSONObject jsonObject = JSONObject.parseObject(msg);
        String txStr = jsonObject.getString("txMessage");
        return JSONObject.parseObject(txStr,TxMessage.class);
   }
}

可靠消息最终一致性分布式事务实战_测试程序

查询数据

正式测试之前,先来查询下tx-msg-orders数据库和tx-msg-stock数 据库各个数据表中的数据。

分别启动库存和订单微服务

编写控制层接口

@Autowired
    private IOrderService iOrderService;
    /**
     * 创建订单
     * @param productId 商品id
     * @param payCount 购买数量
     * @return
     */
    @GetMapping(value = "/submit_order")
    public String transfer(@RequestParam("productId")Long productId, @RequestParam("payCount") Integer payCount){ 
        iOrderService.submitOrder(productId, payCount);
        return "下单成功";
   }

分别启动库存微服务stock和订单微服务orders,并在浏览器中访问 http://localhost:9090/order/submit_order?productId=1001&pay Count=1

最终一致性分布式事务解决方案_什么是最大努力通知型分布式事务

最大努力通知型( Best-effort delivery)是最简单的一种柔性事务。

适用场景

最大努力通知型解决方案适用于最终一致性时间敏感度低的场景。 最典型的使用场景就是支付成功后,支付平台异步通知商户支付结 果。并且事务被动方的处理结果不会影响主动方的处理结果。 典型的使用场景:如银行通知、商户通知等。

流程图

最大努力通知型分布式事务_最大努力通知与可靠消息最终一致性的区别

最大努力通知型分布式事务解决方案

流程:

1、发起通知方将通知发给MQ。 使用普通消息机制将通知发给MQ。

2、接收通知方监听 MQ。

3、接收通知方接收消息,业务处理完成回应ack。

4、接收通知方若没有回应ack则MQ会重复通知。 MQ会按照间隔1min、5min、10min、 30min、1h、2h、5h、10h的方式,逐步拉大通知间隔(如果MQ采用 rocketMq,在 broker中可进行配置),直到达到通知要求的时间窗口上限。

5、接收通知方可通过消息校对接口来校对消息的一致性。

最大努力通知型分布式事务_案例业务说明

设计完数据库后,创建tx-notifymsg-account库

SET NAMES utf8mb4;
SET FOREIGN_KEY_CHECKS = 0;
-- ----------------------------
-- Table structure for account_info
-- ----------------------------
DROP TABLE IF EXISTS `account_info`;
CREATE TABLE `account_info`  (
  `id` int(11) NOT NULL COMMENT '主键id',
  `account_no` varchar(255) CHARACTER SET utf8
COLLATE utf8_bin NOT NULL COMMENT '账户',
`account_name` varchar(255) CHARACTER SET
utf8 COLLATE utf8_bin NULL DEFAULT NULL COMMENT
'账户名',
  `account_balance` decimal(10, 2) NULL DEFAULT
NULL COMMENT '账户余额',
  PRIMARY KEY (`id`) USING BTREE
) ENGINE = InnoDB CHARACTER SET = utf8 COLLATE = utf8_bin ROW_FORMAT = DYNAMIC;
-- ----------------------------
-- Records of account_info
-- ----------------------------
-- ----------------------------
-- Table structure for pay_info
-- ----------------------------
DROP TABLE IF EXISTS `pay_info`;
CREATE TABLE `pay_info`  (
  `tx_no` varchar(255) CHARACTER SET utf8
COLLATE utf8_bin NOT NULL COMMENT '充值记录流水号',
  `account_no` varchar(255) CHARACTER SET utf8
COLLATE utf8_bin NULL DEFAULT NULL COMMENT '账
户',
  `pay_amount` decimal(10, 2) CHARACTER SET
utf8 COLLATE utf8_bin NULL DEFAULT NULL COMMENT
'充值金额',
  `pay_result` varchar(255) CHARACTER SET utf8
COLLATE utf8_bin NULL DEFAULT NULL COMMENT '充值
结果',
  `pay_time` datetime(0) NOT NULL COMMENT '充值
时间',
PRIMARY KEY (`tx_no`) USING BTREE
) ENGINE = InnoDB CHARACTER SET = utf8 COLLATE
= utf8_bin ROW_FORMAT = DYNAMIC;
-- ----------------------------
-- Records of pay_info
-- ----------------------------
SET FOREIGN_KEY_CHECKS = 1;

设计完数据库后,创建tx-notifymsg-payment库

SET NAMES utf8mb4;
SET FOREIGN_KEY_CHECKS = 0;
-- ----------------------------
-- Table structure for pay_info
-- ----------------------------
DROP TABLE IF EXISTS `pay_info`;
CREATE TABLE `pay_info`  (
  `tx_no` varchar(255) CHARACTER SET utf8
COLLATE utf8_bin NOT NULL COMMENT '充值记录流水
号',
  `account_no` varchar(255) CHARACTER SET utf8
COLLATE utf8_bin NULL DEFAULT NULL COMMENT '账
户',
  `pay_amount` decimal(10, 2) CHARACTER SET
utf8 COLLATE utf8_bin NULL DEFAULT NULL COMMENT
'充值金额',
`pay_result` varchar(255) CHARACTER SET utf8
COLLATE utf8_bin NULL DEFAULT NULL COMMENT '充值
结果',
  `pay_time` datetime(0) NOT NULL COMMENT '充值
时间',
  PRIMARY KEY (`tx_no`) USING BTREE
) ENGINE = InnoDB CHARACTER SET = utf8 COLLATE
= utf8_bin ROW_FORMAT = DYNAMIC;
-- ----------------------------
-- Records of pay_info
-- ----------------------------
SET FOREIGN_KEY_CHECKS = 1;

最大努力通知型分布式事务实战_实现充值微服务

主要实现功能

1、充值接口

2、查询充值结果接口

创建父项目rocketmq-notifymsg

创建子工程

引入依赖

<dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starterweb</artifactId>
        </dependency>
        <dependency>
            <groupId>com.baomidou</groupId>
            <artifactId>mybatis-plus-bootstarter</artifactId>
        </dependency>
        <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connectorjava</artifactId>
        </dependency>
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
        </dependency>
        <dependency>
            <groupId>org.apache.rocketmq</groupId>
            <artifactId>rocketmq-spring-bootstarter</artifactId>
            <version>2.0.1</version>
        </dependency>
        <!-- 引入nacos依赖 -->
        <dependency>
            <groupId>com.alibaba.cloud</groupId>
            <artifactId>spring-cloud-starteralibaba-nacos-discovery</artifactId>
        </dependency>
    </dependencies>

编写主启动类

@EnableDiscoveryClient
@MapperScan("com.itbaizhan.payment.mapper")
@SpringBootApplication
@Slf4j
public class PayMain7071 {
    public static void main(String[] args) {
       SpringApplication.run(PayMain7071.class,args);
        log.info("*********** 充值服务启动成功*********");
   }
}

编写配置文件

server:
 port: 7071
spring:
 cloud:
   nacos:
     discovery:
       server-addr: 192.168.66.100:8848
 application:
   name: tx-notifymsg-pay
 datasource:
   url: jdbc:mysql://192.168.66.100:3306/txnotifymsg-payment?
useUnicode=true&characterEncoding=UTF8&useOldAliasMetadataBehavior=true&autoReconnec
t=true&failOverReadOnly=false&useSSL=false
   username: root
   password01: 123456
   driver-class-name: com.mysql.cj.jdbc.Driver
################ RocketMQ 配置 ##########
rocketmq:
 name-server: 192.168.66.100:9876
 producer:
   group: payment-group

最大努力通知型分布式事务_充值微服务之业务层实现

充值微服务的业务逻辑层主要完成充值的业务逻辑处理,当充值成功时,会向RocketMQ发送充值结果信息,同时提供业务逻辑层查询充值结果信息的接口。

编写充值接口

public interface IPayInfoService extends IService<PayInfo> {
    /**
     * 保存充值信息
     */
    PayInfo savePayInfo(PayInfo payInfo);
    /**
     * 查询指定的充值信息
     */
    PayInfo getPayInfoByTxNo(String txNo);
}

充值接口实现

@Slf4j
@Service
public class PayInfoServiceImpl extends
ServiceImpl<PayInfoMapper, PayInfo> implements IPayInfoService {
    @Resource
    private PayInfoMapper payInfoMapper;
    @Resource
    private RocketMQTemplate rocketMQTemplate;
    @Override
    public PayInfo savePayInfo(PayInfo payInfo)
{
      payInfo.setTxNo(UUID.randomUUID().toString().replace("-",""));
        payInfo.setPayResult("success");
      payInfo.setPayTime(LocalDateTime.now());
        int count = payInfoMapper.insert(payInfo);
        //充值信息保存成功
        if(count > 0){
            log.info("充值微服务向账户微服务发送结果消息");
            //发送消息通知账户微服务
            rocketMQTemplate.convertAndSend("topic_nofitymsg",JSON.toJSONString(payInfo));
            return payInfo;
       }
        return null;
   }
    @Override
    public PayInfo getPayInfoByTxNo(String txNo) {
        return baseMapper.selectById(txNo);
   }
}

编写充值接口

@RestController
@RequestMapping("/payInfo")
public class PayInfoController {
    @Autowired
    private IPayInfoService payInfoService;
    /**
     * 充值
     * @param payInfo
     * @return
     */
    @GetMapping(value = "/pay_account")
    public PayInfo pay(PayInfo payInfo){
        //生成事务编号
        return payInfoService.savePayInfo(payInfo);
   }
    /**
     * 查询充值结果
     * @param txNo
     * @return
     */
    @GetMapping(value = "/query/payresult/{txNo}")
    public PayInfo payResult(@PathVariable("txNo") String txNo){
        return payInfoService.getPayInfoByTxNo(txNo);
   }
}

最大努力通知型分布式事务_实现账户微服务

创建子工程account

引入依赖

<dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starterweb</artifactId>
        </dependency>
        <dependency>
            <groupId>com.baomidou</groupId>
            <artifactId>mybatis-plus-bootstarter</artifactId>
        </dependency>
        <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connectorjava</artifactId>
        </dependency>
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
        </dependency>
        <dependency>
            <groupId>org.apache.rocketmq</groupId>
            <artifactId>rocketmq-spring-bootstarter</artifactId>
            <version>2.0.1</version>
        </dependency>
        <!--   引入Nacos注册中心   -->
        <dependency>
           <groupId>com.alibaba.cloud</groupId>
           <artifactId>spring-cloud-starteralibaba-nacos-discovery</artifactId>
        </dependency>
        <!--   引入OpenFeign -->
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starteropenfeign</artifactId>
        </dependency>
        <!--   引入负载均衡器-->
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloudloadbalancer</artifactId>
        </dependency>
    </dependencies>

编写配置文件

server:
 port: 7070
spring:
 cloud:
   nacos:
     discovery:
       server-addr: 192.168.66.100:8848
 application:
   name: tx-notifymsg-account
 datasource:
   url: jdbc:mysql://192.168.66.100:3306/txnotifymsg-account?
useUnicode=true&characterEncoding=UTF8&useOldAliasMetadataBehavior=true&autoReconnec
t=true&failOverReadOnly=false&useSSL=false
   username: root
   password01: 123456
   driver-class-name: com.mysql.jdbc.Driver
################ RocketMQ 配置 ##########
rocketmq:
 name-server: 192.168.66.100:9876

最大努力通知型分布式事务_账户微服务之业务层实现

RocketMQ消费充值信息

@Slf4j
@Component
@RocketMQMessageListener(consumerGroup = "consumer_group_account", topic = "topic_nofitymsg")
public class NotifyMsgAccountListener implements RocketMQListener<String> {
    @Autowired
    private IAccountInfoService accountInfoService;
    @Override
    public void onMessage(String message) {
        log.info("账户微服务收到RocketMQ的消息: {}", JSONObject.toJSONString(message));
        //如果是充值成功,则修改账户余额
        PayInfo payInfo = JSON.parseObject(message, PayInfo.class);
      if("success".equals(payInfo.getPayResult())){
           accountInfoService.updateAccountBalance(payInfo);
       }
        log.info("更新账户余额完毕:{}", JSONObject.toJSONString(payInfo));
   }
}

编写账户操作接口

/**
     * 更新账户余额
     */
    void updateAccountBalance(PayInfo payInfo);

实现账户操作接口

@Slf4j
@Service
public class AccountInfoServiceImpl extends
ServiceImpl<AccountInfoMapper, AccountInfo> implements IAccountInfoService {
    @Resource
    private AccountInfoMapper accountInfoMapper;
    @Resource
    private PayInfoMapper payInfoMapper;
    /**
     *
     * @param payInfo
     */
    @Transactional(rollbackFor = Exception.class)
    @Override
    public void updateAccountBalance(PayInfo payInfo) {
        if(payInfoMapper.selectById(payInfo.getTxNo()) != null){
            log.info("账户微服务已经处理过当前事务...");
            return;
       }
        LambdaUpdateWrapper<AccountInfo> lambdaUpdateWrapper = new LambdaUpdateWrapper<>();
        lambdaUpdateWrapper.eq(AccountInfo::getAccountNo,payInfo.getAccountNo());
        //更新账户余额
        List<AccountInfo> accountInfos = baseMapper.selectList(lambdaUpdateWrapper);
        if (accountInfos != null && !accountInfos.isEmpty()){
            AccountInfo accountInfo = accountInfos.get(0);
            accountInfo.setAccountBalance(accountInfo.getAccountBalance().add(payInfo.getPayAmount()));
            accountInfoMapper.updateById(accountInfo);
       }
        //保存充值记录
        payInfoMapper.insert(payInfo);
   }
}

最大努力通知型分布式事务_账户微服务远程调用实现

主启动类加Feign注解

@EnableDiscoveryClient
@EnableFeignClients
@MapperScan("com.itbaizhan.account.mapper")
@SpringBootApplication
@Slf4j
public class AccountMain7070 {
    public static void main(String[] args) {
       SpringApplication.run(AccountMain7070.class,args);
        log.info("*********** AccountMain7070启动成功 *********");
   }
}

编写远程调用接口

@Service
@FeignClient("tx-notifymsg-pay")
public interface IPayFeignService {
    @GetMapping(value = "/payInfo/query/payresult/{txNo}")
    PayInfo payResult(@PathVariable("txNo") String txNo);
}

编写查询账户接口

/**
     * 查询充值结果
     */
    PayInfo queryPayResult(String txNo);

实现查询账户信息

/**
     * 查询结果
     * @param txNo
     * @return
     */
    @Override
    public PayInfo queryPayResult(String txNo)
      {
        try{
            return iPayFeignService.payResult(txNo);
       }catch (Exception e){
            log.error("查询充值结果异常:{}", e);
       }
        return null;
   }

编写查询充值结果接口

/**
     * 主动查询充值结果
     * @param txNo
     * @return
     */
  @GetMapping(value = "/query/payresult/{txNo}")
    public ResponseEntity result(@PathVariable("txNo") String txNo){
        return ResponseEntity.ok(accountInfoService.queryPayResult(txNo));
   }

最大努力通知型分布式事务_测试程序

查看account库和payment库数据

启动账户和充值微服务

调用充值微服务的接口http://localhost:7071/payInfo/pay_accoun t为账户编号为1001的账户充值1000元。

账户微服务的日志文件中输出如下信息

可以看到,充值微服务将充值结果信息成功发送到了RocketMQ, 并且账户微服务成功订阅了RocketMQ的消息并执行了本地事务。

查询充值结果

相关实践学习
如何快速连接云数据库RDS MySQL
本场景介绍如何通过阿里云数据管理服务DMS快速连接云数据库RDS MySQL,然后进行数据表的CRUD操作。
全面了解阿里云能为你做什么
阿里云在全球各地部署高效节能的绿色数据中心,利用清洁计算为万物互联的新世界提供源源不断的能源动力,目前开服的区域包括中国(华北、华东、华南、香港)、新加坡、美国(美东、美西)、欧洲、中东、澳大利亚、日本。目前阿里云的产品涵盖弹性计算、数据库、存储与CDN、分析与搜索、云通信、网络、管理与监控、应用服务、互联网中间件、移动服务、视频服务等。通过本课程,来了解阿里云能够为你的业务带来哪些帮助 &nbsp; &nbsp; 相关的阿里云产品:云服务器ECS 云服务器 ECS(Elastic Compute Service)是一种弹性可伸缩的计算服务,助您降低 IT 成本,提升运维效率,使您更专注于核心业务创新。产品详情: https://www.aliyun.com/product/ecs
目录
相关文章
|
5月前
|
存储 SQL 分布式数据库
OceanBase 入门:分布式数据库的基础概念
【8月更文第31天】在当今的大数据时代,随着业务规模的不断扩大,传统的单机数据库已经难以满足高并发、大数据量的应用需求。分布式数据库应运而生,成为解决这一问题的有效方案之一。本文将介绍一款由阿里巴巴集团自主研发的分布式数据库——OceanBase,并通过一些基础概念和实际代码示例来帮助读者理解其工作原理。
515 0
|
7月前
|
机器学习/深度学习 分布式计算 算法
联邦学习是保障数据隐私的分布式机器学习方法
【6月更文挑战第13天】联邦学习是保障数据隐私的分布式机器学习方法,它在不暴露数据的情况下,通过在各设备上本地训练并由中心服务器协调,实现全局模型构建。联邦学习的优势在于保护隐私、提高训练效率和增强模型泛化。已应用于医疗、金融和物联网等领域。未来趋势包括更高效的数据隐私保护、提升可解释性和可靠性,以及与其他技术融合,有望在更多场景发挥潜力,推动机器学习发展。
141 4
|
7月前
|
消息中间件 NoSQL Java
Redis系列学习文章分享---第六篇(Redis实战篇--Redis分布式锁+实现思路+误删问题+原子性+lua脚本+Redisson功能介绍+可重入锁+WatchDog机制+multiLock)
Redis系列学习文章分享---第六篇(Redis实战篇--Redis分布式锁+实现思路+误删问题+原子性+lua脚本+Redisson功能介绍+可重入锁+WatchDog机制+multiLock)
272 0
|
3月前
|
消息中间件 关系型数据库 Java
‘分布式事务‘ 圣经:从入门到精通,架构师尼恩最新、最全详解 (50+图文4万字全面总结 )
本文 是 基于尼恩之前写的一篇 分布式事务的文章 升级而来 , 尼恩之前写的 分布式事务的文章, 在全网阅读量 100万次以上 , 被很多培训机构 作为 顶级教程。 此文修改了 老版本的 一个大bug , 大家不要再看老版本啦。
|
4月前
|
Dubbo Java 应用服务中间件
分布式-dubbo的入门
分布式-dubbo的入门
|
4月前
|
机器学习/深度学习 算法 自动驾驶
深度学习之分布式智能体学习
基于深度学习的分布式智能体学习是一种针对多智能体系统的机器学习方法,旨在通过多个智能体协作、分布式决策和学习来解决复杂任务。这种方法特别适用于具有大规模数据、分散计算资源、或需要智能体彼此交互的应用场景。
242 4
|
5月前
|
机器学习/深度学习 并行计算 PyTorch
PyTorch与DistributedDataParallel:分布式训练入门指南
【8月更文第27天】随着深度学习模型变得越来越复杂,单一GPU已经无法满足训练大规模模型的需求。分布式训练成为了加速模型训练的关键技术之一。PyTorch 提供了多种工具来支持分布式训练,其中 DistributedDataParallel (DDP) 是一个非常受欢迎且易用的选择。本文将详细介绍如何使用 PyTorch 的 DDP 模块来进行分布式训练,并通过一个简单的示例来演示其使用方法。
672 2
|
8月前
|
并行计算 算法 物联网
LLM 大模型学习必知必会系列(七):掌握分布式训练与LoRA/LISA微调:打造高性能大模型的秘诀进阶实战指南
LLM 大模型学习必知必会系列(七):掌握分布式训练与LoRA/LISA微调:打造高性能大模型的秘诀进阶实战指南
LLM 大模型学习必知必会系列(七):掌握分布式训练与LoRA/LISA微调:打造高性能大模型的秘诀进阶实战指南
|
7月前
|
存储 搜索推荐 Java
微服务SpringCloud ES分布式全文搜索引擎简介 下载安装及简单操作入门
微服务SpringCloud ES分布式全文搜索引擎简介 下载安装及简单操作入门
99 2
|
7月前
|
负载均衡 NoSQL 关系型数据库
Redis分布式锁学习总结
Redis分布式锁学习总结
42 0