「跨数据库、微服务」 FreeSql 分布式事务 TCC/Saga 编排重要性

本文涉及的产品
注册配置 MSE Nacos/ZooKeeper,118元/月
服务治理 MSE Sentinel/OpenSergo,Agent数量 不受限
云原生网关 MSE Higress,422元/月
简介: FreeSql 支持MySql/SqlServer/PostgreSQL/Oracle/Sqlite/Firebird/达梦/Gbase/神通/人大金仓/翰高/Clickhouse/MsAccess Ado.net 数据库,以及 Odbc 的专门实现包。FreeSql.Cloud 为 FreeSql 提供跨数据库访问,分布式事务TCC、SAGA解决方案,支持 .NET Core 2.1+, .NET Framework 4.0+.

前言

FreeSql 支持
MySql/SqlServer/PostgreSQL/Oracle/Sqlite/Firebird/达梦/Gbase/神通/人大金仓/翰高/Clickhouse/MsAccess Ado.net 数据库,以及 Odbc 的专门实现包。

FreeSql.Cloud 为 FreeSql 提供跨数据库访问,分布式事务TCC、SAGA解决方案,支持 .NET Core 2.1+, .NET Framework 4.0+.

本文主要讲解从跨数据库访问,到分布式事务落地,再升级到微服务服务编排探讨。写下本文更多的成份是带有疑问号,希望有微服务落地经验的朋友指教一下。

TCC 事务特点:

  • Try 用于资源冻结/预扣;
  • Try 全部环节通过,代表业务一定能完成,进入 Confirm 环节;
  • Try 任何环节失败,代表业务失败,进入 Cancel 环节;
  • Confirm 失败会进行重试N次,直到交付成功,或者人工干预;
  • Cancel 失败会进行重试N次,直到取消成功,或者人工干预;

SAGA 事务特点:

  • Commit 用于业务提交;
  • Commit 全部环节通过,代表业务交付成功;
  • Commit 任何环节失败,代表业务失败,进入 Cancel 环节;
  • Cancel 失败会进行重试N次,直到取消成功,或者人工干预;

由于 TCC/Saga 两种流程有相似之处,因此本文主要以 Saga 为例讲解应用代码。本文讲解的落地场景如下:

第一步:去 数据库db1 扣除 user.Point - 10
第二步:去 数据库db2 扣除 goods.Stock - 1
第三步:去 数据库db3 创建订单

第二步库存不足时,整个流程怎么执行?


⚡ 快速开始

dotnet add package FreeSql.Cloud

or

Install-Package FreeSql.Cloud

public enum DbEnum { db1, db2, db3 }
var fsql = new FreeSqlCloud<DbEnum>("app001"); //提示:泛型可以传入 string
fsql.DistributeTrace = log => Console.WriteLine(log.Split('\n')[0].Trim());
fsql.Register(DbEnum.db1, () => new FreeSqlBuilder()
    .UseConnectionString(DataType.SqlServer, @"Data Source=...")
    .Build());
fsql.Register(DbEnum.db2, () => new FreeSqlBuilder()
    .UseConnectionString(DataType.MySql, @"Data Source=...")
    .Build());
fsql.Register(DbEnum.db3, () => new FreeSqlBuilder()
    .UseConnectionString(DataType.Oracle, @"Data Source=...")
    .Build());
services.AddSingleton<IFreeSql>(fsql);
services.AddSingleton(fsql);
//注入两个类型,稳

FreeSqlCloud 必须定义成单例模式


关于分布式事务

FreeSqlCloud 提供 TCC/SAGA 分布式事务调度、失败重试、持久化重启后重新唤醒事务单元、等管理功能。

// 测试数据
fsql.Use(DbEnum.db1).Insert(new User { Id = 1, Name = "testuser01", Point = 10 }).ExecuteAffrows();
fsql.Use(DbEnum.db2).Insert(new Goods { Id = 1, Title = "testgoods01", Stock = 0 }).ExecuteAffrows();
var orderId = Guid.NewGuid();
await DB.Cloud.StartSaga(orderId.ToString(), "支付购买SAGA事务",
    new SagaOptions
    {
        MaxRetryCount = 10, //重试次数
        RetryInterval = TimeSpan.FromSeconds(10) //重试间隔
    })
    .Then<Saga1>(DbEnum.db1, new SagaBuyState { UserId = 1, Point = 10, GoodsId = 1, OrderId = orderId })
    .Then<Saga2>(DbEnum.db2, new SagaBuyState { UserId = 1, Point = 10, GoodsId = 1, OrderId = orderId })
    .Then<Saga3>(DbEnum.db3, new SagaBuyState { UserId = 1, Point = 10, GoodsId = 1, OrderId = orderId })
    .ExecuteAsync();

由于商品库存不足,测试结果如下:

2022-08-17 05:24:00 【app001】db1 注册成功, 并存储 TCC/SAGA 事务相关数据
2022-08-17 05:24:00 【app001】成功加载历史未完成 TCC 事务 0 个
2022-08-17 05:24:00 【app001】成功加载历史未完成 SAGA 事务 0 个
2022-08-17 05:24:00 【app001】SAGA(85a95966-d5b0-4371-b54b-07d079d9fd78, 支付购买SAGA事务) Created successful, retry count: 10, interval: 10S
2022-08-17 05:24:00 【app001】SAGA(85a95966-d5b0-4371-b54b-07d079d9fd78, 支付购买SAGA事务) Unit1(第1步:数据库db1 扣除用户积分) COMMIT successful
2022-08-17 05:24:00 【app001】数据库使用[Use] db2
2022-08-17 05:24:00 【app001】SAGA(85a95966-d5b0-4371-b54b-07d079d9fd78, 支付购买SAGA事务) Unit2(第2步:数据库db2 扣除库存) COMMIT failed, ready to CANCEL, -ERR 扣除库存失败
2022-08-17 05:24:00 【app001】SAGA(85a95966-d5b0-4371-b54b-07d079d9fd78, 支付购买SAGA事务) Unit1(第1步:数据库db1 扣除用户积分) CANCEL successful
2022-08-17 05:24:00 【app001】SAGA(85a95966-d5b0-4371-b54b-07d079d9fd78, 支付购买SAGA事务) Completed, all units CANCEL successfully
  • Commit 用于业务提交;
  • Commit 全部环节通过,代表业务交付成功;
  • Commit 任何环节失败,代表业务失败,进入 Cancel 环节;
  • Cancel 失败会进行重试N次,直到取消成功,或者人工干预;

Saga1、Saga2、Saga3 的实现代码如下:

[Description("第1步:数据库db1 扣除用户积分")]
class Saga1 : SagaUnit<SagaBuyState>
{
    public override async Task Commit()
    {
        var affrows = await Orm.Update<User>().Set(a => a.Point - State.Point)
            .Where(a => a.Id == State.UserId && a.Point >= State.Point)
            .ExecuteAffrowsAsync();
        if (affrows <= 0) throw new Exception("扣除积分失败");
        //记录积分变动日志?
    }
    public override async Task Cancel()
    {
        await Orm.Update<User>().Set(a => a.Point + State.Point)
            .Where(a => a.Id == State.UserId)
            .ExecuteAffrowsAsync(); //退还积分
        //记录积分变动日志?
    }
}
[Description("第2步:数据库db2 扣除库存")]
class Saga2 : SagaUnit<SagaBuyState>
{
    public override async Task Commit()
    {
        var affrows = await Orm.Update<Goods>().Set(a => a.Stock - 1)
            .Where(a => a.Id == State.GoodsId && a.Stock >= 1)
            .ExecuteAffrowsAsync();
        if (affrows <= 0) throw new Exception("扣除库存失败");
    }
    public override async Task Cancel()
    {
        await Orm.Update<Goods>().Set(a => a.Stock + 1)
            .Where(a => a.Id == State.GoodsId)
            .ExecuteAffrowsAsync(); //退还库存
    }
}
[Description("第3步:数据库db3 创建订单")]
class Saga3 : SagaUnit<SagaBuyState>
{
    public override async Task Commit()
    {
        await Orm.Insert(new Order { Id = State.OrderId, Status = Order.OrderStatus.Success })
            .ExecuteAffrowsAsync();
    }
    public override Task Cancel()
    {
        return Task.CompletedTask;
    }
}
class BuySagaState
{
    public int UserId { get; set; }
    public int Point { get; set; }
    public Guid BuyLogId { get; set; }
    public int GoodsId { get; set; }
    public Guid OrderId { get; set; }
}

关于微服务

最近几天在整理 FreeSql.Cloud 代码及相关示例,发现 TCC/Saga 事务单元内不是只能 CRUD 操作,它还可以调用远程 webapi 甚至 gRPC 服务。

事务单元内调用远程 webapi,同样可以获取失败重试、持久化等特点。请看以下代码示例:

// HTTP 服务编排??
var orderId = Guid.NewGuid();
await DB.Cloud.StartSaga(orderId.ToString(), "支付购买webapi(saga)",
    new SagaOptions
    {
        MaxRetryCount = 10,
        RetryInterval = TimeSpan.FromSeconds(10)
    })
    .Then<HttpSaga>(default, new HttpUnitState
    {
        Url = "https://192.168.1.100/saga/UserPoint",
        Data = "UserId=1&Point=10&GoodsId=1&OrderId=" + orderId
    })
    .Then<HttpSaga>(default, new HttpUnitState
    {
        Url = "https://192.168.1.100/saga/GoodsStock",
        Data = "UserId=1&Point=10&GoodsId=1&OrderId=" + orderId
    })
    .Then<HttpSaga>(default, new HttpUnitState
    {
        Url = "https://192.168.1.100/saga/OrderNew",
        Data = "UserId=1&Point=10&GoodsId=1&OrderId=" + orderId
    })
    .ExecuteAsync();
class HttpSaga : SagaUnit<HttpUnitState>
{
    public override Task Commit()
    {
        //Console.WriteLine("请求 webapi:" + State.Url + "/Commit" + State.Data);
        return Task.CompletedTask;
    }
    public override Task Cancel()
    {
        //Console.WriteLine("请求 webapi:" + State.Url + "/Cancel" + State.Data);
        return Task.CompletedTask;
    }
}
class HttpUnitState
{
    public string Url { get; set; }
    public string Data { get; set; }
}
2022-08-17 06:11:05 【app001】db1 注册成功, 并存储 TCC/SAGA 事务相关数据
2022-08-17 06:11:05 【app001】成功加载历史未完成 TCC 事务 0 个
2022-08-17 06:11:05 【app001】成功加载历史未完成 SAGA 事务 0 个
2022-08-17 06:11:06 【app001】SAGA(2cad53d3-6d7e-481e-ad9a-15d4773a5397, 支付购买webapi(saga)) Created successful, retry count: 10, interval: 10S
2022-08-17 06:11:06 【app001】SAGA(2cad53d3-6d7e-481e-ad9a-15d4773a5397, 支付购买webapi(saga)) Unit1 COMMIT successful
2022-08-17 06:11:06 【app001】SAGA(2cad53d3-6d7e-481e-ad9a-15d4773a5397, 支付购买webapi(saga)) Unit2 COMMIT successful
2022-08-17 06:11:06 【app001】SAGA(2cad53d3-6d7e-481e-ad9a-15d4773a5397, 支付购买webapi(saga)) Unit3 COMMIT successful
2022-08-17 06:11:06 【app001】SAGA(2cad53d3-6d7e-481e-ad9a-15d4773a5397, 支付购买webapi(saga)) Completed, all units COMMIT successfully

这段代码是突然想出来的,由于没正式落地过微服务项目,故携带代码及类似的场景在 Natasha 技术大牛群里提出来讨论。

讨论原文:

微服务这些业务编排的,比如支付购买业务,用微服务怎么做。

第一步:去 server1 扣除 user.Point - 10
第二步:去 server2 扣除 goods.Stock - 1
第三步:去 server3 创建订单

第二步扣库存失败,怎么办?

很多人会回复消息队列,业务复杂了,不编排很难维护消息队列的。编排后的代码,让维护者更加直观。

感谢 dongfo 提供的参考方案:
https://dtm.pub/app/order.html

DTM 解决方案也是使用的 saga 业务流程,看来 FreeSql.Cloud 没有走偏,做跨数据库事务可行,用来做 webapi 编排也不错。

我仍然好奇,很多 .net 微服务文章介绍 服务编排 的少之又少,希望有微服务落地经验的朋友多多指教。


问:是不是缺少了条件链路呢?A条件走A,B条件走B。

答:这种应该整个判断,在分支做条件会复杂很多,直观性会变差。

if (场景A)
   StartSaga(...) 流程1
if (场景B)
   StartSaga(...) 流程2

⛳ 结束语

FreeSql 支持很多数据库,功能强大、稳定性好,有好的想法可以一起讨论。

希望这篇文章能帮助大家轻松理解并熟练掌握 TCC/Saga 事务,为企业的项目研发贡献力量。


相关文章
|
2天前
|
NoSQL 关系型数据库 MySQL
微服务架构下的数据库选择:MySQL、PostgreSQL 还是 NoSQL?
在微服务架构中,数据库的选择至关重要。不同类型的数据库适用于不同的需求和场景。在本文章中,我们将深入探讨传统的关系型数据库(如 MySQL 和 PostgreSQL)与现代 NoSQL 数据库的优劣势,并分析在微服务架构下的最佳实践。
|
2天前
|
设计模式 缓存 关系型数据库
探索微服务架构中的数据库设计挑战
微服务架构因其模块化和高扩展性被广泛应用于现代软件开发。然而,这种架构模式也带来了数据库设计上的独特挑战。本文探讨了在微服务架构中实现数据库设计时面临的问题,如数据一致性、服务间的数据共享和分布式事务处理。通过分析实际案例和提出解决方案,旨在为开发人员提供有效的数据库设计策略,以应对微服务架构下的复杂性。
|
2天前
|
消息中间件 缓存 监控
优化微服务架构中的数据库访问:策略与最佳实践
在微服务架构中,数据库访问的效率直接影响到系统的性能和可扩展性。本文探讨了优化微服务架构中数据库访问的策略与最佳实践,包括数据分片、缓存策略、异步处理和服务间通信优化。通过具体的技术方案和实例分析,提供了一系列实用的建议,以帮助开发团队提升微服务系统的响应速度和稳定性。
|
2月前
|
SQL 数据库 微服务
微服务03,最简单的Demo,我们每个服务不能重复开发相同业务,微服务数据独立,不要访问其他微服务的数据库,微服务的特点之一是提供不能功能的数据库互相分割,微服务需要根据业务模块拆分,做到单一职责,
微服务03,最简单的Demo,我们每个服务不能重复开发相同业务,微服务数据独立,不要访问其他微服务的数据库,微服务的特点之一是提供不能功能的数据库互相分割,微服务需要根据业务模块拆分,做到单一职责,
|
3月前
|
分布式计算 Java Hadoop
杨校老师课堂之分布式数据库HBase的部署和基本操作
杨校老师课堂之分布式数据库HBase的部署和基本操作
44 0
|
2天前
|
消息中间件 缓存 监控
优化微服务架构中的数据库访问:策略与实践
随着微服务架构的普及,如何高效管理和优化数据库访问成为了关键挑战。本文探讨了在微服务环境中优化数据库访问的策略,包括数据库分片、缓存机制、异步处理等技术手段。通过深入分析实际案例和最佳实践,本文旨在为开发者提供实际可行的解决方案,以提升系统性能和可扩展性。
|
21天前
|
消息中间件 存储 运维
微服务架构下的数据库选择与挑战
【8月更文第29天】随着微服务架构的流行,如何为每个服务选择合适的数据库成为了一个重要的话题。微服务架构强调将大型应用程序分解为一组小型、独立的服务,这些服务通常各自拥有自己的数据库。这种架构模式带来了灵活性和可扩展性,但也带来了数据一致性、事务管理和跨服务数据访问等方面的挑战。
26 0
|
1月前
|
安全 Nacos 数据库
【技术安全大揭秘】Nacos暴露公网后被非法访问?!6大安全加固秘籍,手把手教你如何保护数据库免遭恶意篡改,打造坚不可摧的微服务注册与配置中心!从限制公网访问到启用访问控制,全方位解析如何构建安全防护体系,让您从此告别数据安全风险!
【8月更文挑战第15天】Nacos是一款广受好评的微服务注册与配置中心,但其公网暴露可能引发数据库被非法访问甚至篡改的安全隐患。本文剖析此问题并提供解决方案,包括限制公网访问、启用HTTPS、加强数据库安全、配置访问控制及监控等,帮助开发者确保服务安全稳定运行。
79 0
|
2月前
|
关系型数据库 分布式数据库 数据库
PolarDB,阿里云的开源分布式数据库,与微服务相结合,提供灵活扩展和高效管理解决方案。
【7月更文挑战第3天】PolarDB,阿里云的开源分布式数据库,与微服务相结合,提供灵活扩展和高效管理解决方案。通过数据分片和水平扩展支持微服务弹性,保证高可用性,且兼容MySQL协议,简化集成。示例展示了如何使用Spring Boot配置PolarDB,实现服务动态扩展。PolarDB缓解了微服务数据库挑战,加速了开发部署,为云原生应用奠定基础。
202 3
|
2月前
|
消息中间件 存储 数据库
微服务架构下的数据库设计策略
【6月更文挑战第30天】在分布式系统和微服务架构的浪潮中,传统的单一数据库模式已不再适应快速迭代和高并发的需求。本文将深入探讨在微服务环境下如何进行有效的数据库设计,包括数据一致性、可伸缩性以及安全性等方面的考量。通过分析不同的数据存储方案和同步策略,我们将为后端开发者提供一套实用且高效的数据库设计方案。
61 1