ASP.NET Core微服务之开源事件总线CAP的初步使用

本文涉及的产品
云原生网关 MSE Higress,422元/月
注册配置 MSE Nacos/ZooKeeper,118元/月
服务治理 MSE Sentinel/OpenSergo,Agent数量 不受限
简介: Tip: 此篇已加入.NET Core微服务基础系列文章索引一、CAP简介下面的文字来自CAP的Wiki文档:https://github.com/dotnetcore/CAP/wiki  CAP 是一个在分布式系统中(SOA,MicroService)实现事件总线及最终一致性(分布式事务)的一个开源的 C# 库,她具有轻量级,高性能,易使用等特点。

Tip: 此篇已加入.NET Core微服务基础系列文章索引

一、CAP简介

CAP

下面的文字来自CAP的Wiki文档:https://github.com/dotnetcore/CAP/wiki

  CAP 是一个在分布式系统中(SOA,MicroService)实现事件总线及最终一致性(分布式事务)的一个开源的 C# 库,她具有轻量级,高性能,易使用等特点。我们可以轻松的在基于 .NET Core 技术的分布式系统中引入CAP,包括但限于 ASP.NET Core 和 ASP.NET Core on .NET Framework。

  CAP 的应用场景主要有以下两个:

  • 分布式事务中的最终一致性(异步确保)的方案
  • 具有高可用性的 EventBus

  CAP 同时支持使用 RabbitMQ 或 Kafka 进行底层之间的消息发送,我们不需要具备 RabbitMQ 或者 Kafka 的使用经验,仍然可以轻松的将CAP集成到项目中。

  CAP 目前支持使用 Sql Server,MySql,PostgreSql 数据库的项目;

  CAP 同时支持使用 EntityFrameworkCore 和 Dapper 的项目,可以根据需要选择不同的配置方式;

  CAP的作者为园友savorboard(杨晓东),成都地区的.NET社区领导者,棒棒哒!

二、案例结构

  此次试验仍然和上一篇基于MassTransit的案例一样(其实是我懒得再改,直接拿来复用),共有四个MicroService应用程序,当用户下订单时会通过CAP作为事件总线发布消息,作为订阅者的库存和配送服务会接收到消息并消费消息。此次试验会采用RabbitMQ作为消息队列,采用MSSQL作为关系型数据库(同时CAP也是支持MSSQL的)。

  准备工作:为所有服务通过NuGet安装CAP及其相关包

PM> Install-Package DotNetCore.CAP
 下面是RabbitMQ的支持包
PM> Install-Package DotNetCore.CAP.RabbitMQ
 下面是MSSQL的支持包
PM> Install-Package DotNetCore.CAP.SqlServer

三、具体实现

3.1 OrderService

  (1)启动配置:这里主要需要给CAP指定数据库(它会在这个数据库中创建本地消息表Published和Received)以及使用到的消息队列(这里是RabbitMQ)

    public void ConfigureServices(IServiceCollection services)
    {
        services.AddMvc();

        // Repository
        services.AddScoped<IOrderRepository, OrderRepository>();

        // EF DbContext
        services.AddDbContext<OrderDbContext>();

        // Dapper-ConnString
        services.AddSingleton(Configuration["DB:OrderDB"]);

        // CAP
        services.AddCap(x =>
        {
            x.UseEntityFramework<OrderDbContext>(); // EF

            x.UseSqlServer(Configuration["DB:OrderDB"]); // SQL Server

            x.UseRabbitMQ(cfg =>
            {
                cfg.HostName = Configuration["MQ:Host"];
                cfg.VirtualHost = Configuration["MQ:VirtualHost"];
                cfg.Port = Convert.ToInt32(Configuration["MQ:Port"]);
                cfg.UserName = Configuration["MQ:UserName"];
                cfg.Password = Configuration["MQ:Password"]; 
            }); // RabbitMQ

            // Below settings is just for demo
            x.FailedRetryCount = 2;
            x.FailedRetryInterval = 5;
        });

        ......
    }

    // This method gets called by the runtime. Use this method to configure the HTTP request pipeline.
    public void Configure(IApplicationBuilder app, IHostingEnvironment env, IApplicationLifetime lifetime)
    {
        ......

        app.UseMvc();

        // CAP
        app.UseCap();

        ......
    }

  (2)Controller:这里会调用Repository去实现业务逻辑和发送消息

    [Route("api/Order")]
    public class OrderController : Controller
    {
        public IOrderRepository OrderRepository { get; }

        public OrderController(IOrderRepository OrderRepository)
        {
            this.OrderRepository = OrderRepository;
        }

        [HttpPost]
        public string Post([FromBody]OrderDTO orderDTO)
        {
            var result = OrderRepository.CreateOrderByDapper(orderDTO).GetAwaiter().GetResult();

            return result ? "Post Order Success" : "Post Order Failed";
        }
    }

  (3)Repository:这里实现了两种方式:EF和Dapper(基于ADO.NET),其中EF方式中不需要传transaction(当CAP检测到 Publish 是在EF事务区域内的时候,将使用当前的事务上下文进行消息的存储),而基于ADO.NET方式中需要传transaction(由于不能获取到事务上下文,所以需要用户手动的传递事务上下文到CAP中)。

    public class OrderRepository : IOrderRepository
    {
        public OrderDbContext DbContext { get; }
        public ICapPublisher CapPublisher { get; }
        public string ConnStr { get; } // For Dapper use

        public OrderRepository(OrderDbContext DbContext, ICapPublisher CapPublisher, string ConnStr)
        {
            this.DbContext = DbContext;
            this.CapPublisher = CapPublisher;
            this.ConnStr = ConnStr;
        }

        public async Task<bool> CreateOrderByEF(IOrder order)
        {
            using (var trans = DbContext.Database.BeginTransaction())
            {
                var orderEntity = new Order()
                {
                    ID = GenerateOrderID(),
                    OrderUserID = order.OrderUserID,
                    OrderTime = order.OrderTime,
                    OrderItems = null,
                    ProductID = order.ProductID // For demo use
                };

                DbContext.Orders.Add(orderEntity);
                await DbContext.SaveChangesAsync();

                // When using EF, no need to pass transaction
                var orderMessage = new OrderMessage()
                {
                    ID = orderEntity.ID,
                    OrderUserID = orderEntity.OrderUserID,
                    OrderTime = orderEntity.OrderTime,
                    OrderItems = null,
                    ProductID = orderEntity.ProductID // For demo use
                };
                
                await CapPublisher.PublishAsync(EventConstants.EVENT_NAME_CREATE_ORDER, orderMessage);

                trans.Commit();
            }

            return true;
        }

        public async Task<bool> CreateOrderByDapper(IOrder order)
        {
            using (var conn = new SqlConnection(ConnStr))
            {
                conn.Open();
                using (var trans = conn.BeginTransaction())
                {
                    // business code here
                    string sqlCommand = @"INSERT INTO [dbo].[Orders](OrderID, OrderTime, OrderUserID, ProductID)
                                                                VALUES(@OrderID, @OrderTime, @OrderUserID, @ProductID)";

                    order.ID = GenerateOrderID();
                    await conn.ExecuteAsync(sqlCommand, param: new
                    {
                        OrderID = order.ID,
                        OrderTime = DateTime.Now,
                        OrderUserID = order.OrderUserID,
                        ProductID = order.ProductID
                    }, transaction: trans);

                    // For Dapper/ADO.NET, need to pass transaction
                    var orderMessage = new OrderMessage()
                    {
                        ID = order.ID,
                        OrderUserID = order.OrderUserID,
                        OrderTime = order.OrderTime,
                        OrderItems = null,
                        MessageTime = DateTime.Now,
                        ProductID = order.ProductID // For demo use
                    };

                    await CapPublisher.PublishAsync(EventConstants.EVENT_NAME_CREATE_ORDER, orderMessage, trans);

                    trans.Commit();
                }
            }

            return true;
        }

        private string GenerateOrderID()
        {
            // TODO: Some business logic to generate Order ID
            return Guid.NewGuid().ToString();
        }

        private string GenerateEventID()
        {
            // TODO: Some business logic to generate Order ID
            return Guid.NewGuid().ToString();
        }
    }

  这里摘抄一段CAP wiki中关于事务的一段介绍:

  事务在 CAP 具有重要作用,它是保证消息可靠性的一个基石。 在发送一条消息到消息队列的过程中,如果不使用事务,我们是没有办法保证我们的业务代码在执行成功后消息已经成功的发送到了消息队列,或者是消息成功的发送到了消息队列,但是业务代码确执行失败。

  这里的失败原因可能是多种多样的,比如连接异常,网络故障等等。

只有业务代码和CAP的Publish代码必须在同一个事务中,才能够保证业务代码和消息代码同时成功或者失败___。_

换句话说,CAP会确保我们这段逻辑中业务代码和消息代码都成功了,才会真正让事务commit。

3.2 StorageService

  (1)启动配置:这里主要是指定Subscriber

    public void ConfigureServices(IServiceCollection services)
    {
        services.AddMvc();

        // EF DbContext
        services.AddDbContext<StorageDbContext>();

        // Dapper-ConnString
        services.AddSingleton(Configuration["DB:StorageDB"]);

        // Subscriber
        services.AddTransient<IOrderSubscriberService, OrderSubscriberService>();

        // CAP
        services.AddCap(x =>
        {
            x.UseEntityFramework<StorageDbContext>(); // EF

            x.UseSqlServer(Configuration["DB:StorageDB"]); // SQL Server

            x.UseRabbitMQ(cfg =>
            {
                cfg.HostName = Configuration["MQ:Host"];
                cfg.VirtualHost = Configuration["MQ:VirtualHost"];
                cfg.Port = Convert.ToInt32(Configuration["MQ:Port"]);
                cfg.UserName = Configuration["MQ:UserName"];
                cfg.Password = Configuration["MQ:Password"];
            }); // RabbitMQ

            // Below settings is just for demo
            x.FailedRetryCount = 2;
            x.FailedRetryInterval = 5;
        });

        ......
    }

    // This method gets called by the runtime. Use this method to configure the HTTP request pipeline.
    public void Configure(IApplicationBuilder app, IServiceProvider serviceProvider, IHostingEnvironment env, IApplicationLifetime lifetime)
    {
        ......

        app.UseMvc();

        // CAP
        app.UseCap();

        ......
    }

  (2)实现Subscriber

  首先定义一个接口,建议放到公共类库中

    public interface IOrderSubscriberService
    {
        Task ConsumeOrderMessage(OrderMessage message);
    }

  然后实现这个接口,记得让其实现ICapSubscribe接口,然后我们就可以使用 CapSubscribeAttribute 来订阅 CAP 发布出来的消息。

    public class OrderSubscriberService : IOrderSubscriberService, ICapSubscribe
    {
        private readonly string _connStr;
        
        public OrderSubscriberService(string connStr)
        {
            _connStr = connStr;
        }

        [CapSubscribe(EventConstants.EVENT_NAME_CREATE_ORDER)]
        public async Task ConsumeOrderMessage(OrderMessage message)
        {
            await Console.Out.WriteLineAsync($"[StorageService] Received message : {JsonHelper.SerializeObject(message)}");
            await UpdateStorageNumberAsync(message);
        }

        private async Task<bool> UpdateStorageNumberAsync(OrderMessage order)
        {
            //throw new Exception("test"); // just for demo use
            using (var conn = new SqlConnection(_connStr))
            {
                string sqlCommand = @"UPDATE [dbo].[Storages] SET StorageNumber = StorageNumber - 1
                                                                WHERE StorageID = @ProductID";

                int count = await conn.ExecuteAsync(sqlCommand, param: new
                {
                    ProductID = order.ProductID
                });

                return count > 0;
            }
        }
    }

*.CAP约定消息端在方法实现的过程中需要实现幂等性,所谓幂等性就是指用户对于同一操作发起的一次请求或者多次请求的结果是一致的,不会因为多次点击而产生了副作用。这里我没有考虑,实际中需要首先进行验证,避免二次更新

3.3 DeliveryService

  (1)启动配置:与StorageService高度类似,只是使用的不是同一个数据库

  (2)实现Subscriber

    public class OrderSubscriberService : IOrderSubscriberService, ICapSubscribe
    {
        private readonly string _connStr;

        public OrderSubscriberService(string connStr)
        {
            _connStr = connStr;
        }

        [CapSubscribe(EventConstants.EVENT_NAME_CREATE_ORDER)]
        public async Task ConsumeOrderMessage(OrderMessage message)
        {
            await Console.Out.WriteLineAsync($"[DeliveryService] Received message : {JsonHelper.SerializeObject(message)}");
            await AddDeliveryRecordAsync(message);
        }

        private async Task<bool> AddDeliveryRecordAsync(OrderMessage order)
        {
            //throw new Exception("test"); // just for demo use
            using (var conn = new SqlConnection(_connStr))
            {
                string sqlCommand = @"INSERT INTO [dbo].[Deliveries](DeliveryID, OrderID, ProductID, OrderUserID, CreatedTime)
                                                            VALUES (@DeliveryID, @OrderID, @ProductID, @OrderUserID, @CreatedTime)";

                int count = await conn.ExecuteAsync(sqlCommand, param: new
                {
                    DeliveryID = Guid.NewGuid().ToString(),
                    OrderID = order.ID,
                    OrderUserID = order.OrderUserID,
                    ProductID = order.ProductID,
                    CreatedTime = DateTime.Now
                });

                return count > 0;
            }
        }
    }

3.4 快速测试

  (1)启动3个微服务,Check 数据库表状态

  首先会看到在各个数据库中均创建了本地消息表,这两个表的含义如下:

  Cap.Published:这个表主要是用来存储 CAP 发送到MQ(Message Queue)的客户端消息,也就是说你使用 ICapPublisher 接口 Publish 的消息内容。

  Cap.Received:这个表主要是用来存储 CAP 接收到 MQ(Message Queue) 的客户端订阅的消息,也就是使用 CapSubscribe[] 订阅的那些消息。

  然后看看各个表的数据,目前只有库存表有数据,因为我们要做的只是更新。

  (2)通过Postman发一个Post请求

  (3)Check控制台输出的日志信息

  (4)Check数据库中的业务表和消息表数据:可以看到发送者和接收者都执行成功了,如果其中任何一个参与者发生了异常或者连接不上,CAP会有默认的重试机制(默认是50次最大重试次数,每次重试间隔60s),当失败总次数达到默认失败总次数后,就不会进行重试了,我们可以在 Dashboard 中查看消息失败的原因,然后进行人工重试处理。

  另外,由于CAP会在数据库中创建消息表,因此难免会考虑到其性能。CAP提供了一个数据清理的机制,默认情况下会每隔一个小时将消息表的数据进行清理删除,避免数据量过多导致性能的降低。清理规则为 ExpiresAt (字段名)不为空并且小于当前时间的数据。

四、小结

  本篇首先简单介绍了一下CAP这个开源项目,然后基于上一篇中的下订单的小案例来进行了基于CAP的改造,并通过一个实例的运行来看到了结果。当然,这个实例并不完美,很多点都没有考虑(比如消息端消费时的幂等性)和失败重试的场景实践等等等等。由于时间和精力的关系,目前只使用到这儿,以后有机会能够应用上会研究下CAP的源码,最后感谢杨晓东为.NET社区带来了一个优秀的开源项目!

示例代码

  Click Here => 点我点我

参考资料

  CAP - GitHub : https://github.com/dotnetcore/CAP

  CAP - Wiki : https://github.com/dotnetcore/CAP/wiki

  杨晓东,《BASE:一种ACID的替代方案

相关实践学习
消息队列RocketMQ版:基础消息收发功能体验
本实验场景介绍消息队列RocketMQ版的基础消息收发功能,涵盖实例创建、Topic、Group资源创建以及消息收发体验等基础功能模块。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
目录
相关文章
|
1月前
|
存储 开发框架 JSON
ASP.NET Core OData 9 正式发布
【10月更文挑战第8天】Microsoft 在 2024 年 8 月 30 日宣布推出 ASP.NET Core OData 9,此版本与 .NET 8 的 OData 库保持一致,改进了数据编码以符合 OData 规范,并放弃了对旧版 .NET Framework 的支持,仅支持 .NET 8 及更高版本。新版本引入了更快的 JSON 编写器 `System.Text.UTF8JsonWriter`,优化了内存使用和序列化速度。
|
14天前
|
关系型数据库 C# 数据库
.NET 8.0 开源在线考试系统(支持移动端)
【10月更文挑战第27天】以下是适用于 .NET 8.0 的开源在线考试系统(支持移动端)的简介: 1. **基于 .NET Core**:跨平台,支持多种数据库,前后端分离,适用于多操作系统。 2. **结合 Blazor**:使用 C# 开发 Web 应用,支持响应式设计,优化移动端体验。 3. **基于 .NET MAUI**:跨平台移动应用开发,一套代码多平台运行,提高开发效率。 开发时需关注界面设计、安全性与稳定性。
|
25天前
|
网络协议 Unix Linux
一个.NET开源、快速、低延迟的异步套接字服务器和客户端库
一个.NET开源、快速、低延迟的异步套接字服务器和客户端库
|
25天前
|
Linux C# Android开发
.NET开源跨平台桌面和移动应用的统一框架 - Eto.Forms
.NET开源跨平台桌面和移动应用的统一框架 - Eto.Forms
112 1
|
25天前
|
存储 数据可视化 开发工具
2款.NET开源且免费的Git可视化管理工具
2款.NET开源且免费的Git可视化管理工具
|
1月前
mcr.microsoft.com/dotnet/core/aspnet:2.1安装libgdiplus
mcr.microsoft.com/dotnet/core/aspnet:2.1安装libgdiplus
29 1
|
18天前
|
开发者 Windows
.NET 开源扁平化、美观的 C/S 控件库
【10月更文挑战第23天】介绍了三款适用于 .NET 平台的开源扁平化、美观的 C/S 控件库:MaterialSkin 采用 Google Material Design 风格,适合现代感界面;Krypton Toolkit 提供丰富控件,界面易于定制;Fluent Ribbon Control Suite 模仿 Office 界面,适合复杂功能应用。每款控件库均附有示例代码及 GitHub 链接。
|
25天前
|
前端开发 JavaScript C#
2款.NET开源且高效的代码格式化工具
2款.NET开源且高效的代码格式化工具
|
25天前
|
开发框架 JavaScript 前端开发
一个适用于 ASP.NET Core 的轻量级插件框架
一个适用于 ASP.NET Core 的轻量级插件框架
|
25天前
|
开发框架 缓存 算法
开源且实用的C#/.NET编程技巧练习宝库(学习,工作,实践干货)
开源且实用的C#/.NET编程技巧练习宝库(学习,工作,实践干货)