.Net Core下使用RabbitMQ比较完备的两种方案(虽然代码有点惨淡,不过我会完善)

简介: .Net Core下使用RabbitMQ比较完备的两种方案

一、前言


   上篇说给大家来写C#和Java的方案,最近工作也比较忙,迟到了一些,我先给大家补上C#的方案,另外如果没看我上篇博客的人最好看一下,否则你可能看的云里雾里的,这里我就不进行具体的方案画图了;传送门


二、使用的插件


   HangFire

   一个开源的.NET任务调度框架,最大特点在于内置提供集成化的控制台,方便后台查看及监控,支持多种存储方式;在方案中主要使用定时任务做补偿机制,后期可能会封装一些,能通过页面的形式直接添加任务;

  NLog

  日志记录框架,方案中使用记录日志,后期可能回集成多个日志框架;

  Autofac

  依赖注入的框架,应该不用做过多介绍;

 SqlSugar

 ORM框架,这个从刚开始我就在使用了,在现在公司没有推行起来,不过在上两家公司都留下的遗产,据说还用的可以,当然我还是最佩服作者;

 Polly

 容错服务框架,类似于Java下的Hystrix,主要是为了解决分布式系统中,系统之间相互依赖,可能会因为多种因素导致服务不可用的而产生的一套框架,支持服务的超时重试、限流、熔断器等等;

 RabbitMQ.Client

 官方提供的C#连接RabbitMQ的SDK;


三、方案


 模拟一个简单订单下单的场景,没有进行具体的实现。同时建议下游服务不要写在web端,最好以服务的形式奔跑,代码中是Web端实现的,大家不要这么搞。整体上还是实现了之前提到的两种方案:一是入库打标,二是延时队列(这块没有进行很好的测试,但是估计也没有很大的问题);当然也是有一些特点:RabbitMQ宕机情况下无需重启服务,网络异常的情况下也可以进行断线重连。接下来聊下代码和各方插件在系统中的具体应用:

 项目结构:

 1005447-20190523183424937-482663427.png

 RabbitMQExtensions:

 采用Autofac按照单例的形式注入,采用Polly进行断线重连,也开启了自身断线重连和心跳检测机制,配置方面采用最简单的URI规范进行配置,有兴趣参考下官方,整体上这块代码还相对比较规范,以后可能也不会有太多调整;

/// <summary>
    /// rabbitmq持久化连接
    /// </summary>
    public interface IRabbitMQPersistentConnection
    {
        bool IsConnected { get; }
        bool TryConnect();
        IModel CreateModel();
    }
     /// <summary>
    /// rabbitmq持久化连接具体实现
    /// </summary>
    public class DefaultRabbitMQPersistentConnection : IRabbitMQPersistentConnection
    {
        private readonly IConnectionFactory connectionFactory;
        private readonly ILogger<DefaultRabbitMQPersistentConnection> logger;
        private IConnection connection;
        private const int RETTRYCOUNT = 6;
        private static readonly object lockObj = new object();
        public DefaultRabbitMQPersistentConnection(IConnectionFactory connectionFactory, ILogger<DefaultRabbitMQPersistentConnection> logger)
        {
            this.connectionFactory = connectionFactory;
            this.logger = logger;
        }
        public bool IsConnected
        {
            get
            {
                return connection != null && connection.IsOpen;
            }
        }
        public void Cleanup()
        {
            try
            {
                connection.Dispose();
                connection.Close();
                connection = null;
            }
            catch (IOException ex)
            {
                logger.LogCritical(ex.ToString());
            }
        }
        public IModel CreateModel()
        {
            if (!IsConnected)
            {
                connection.Close();
                throw new InvalidOperationException("连接不到rabbitmq");
            }
            return connection.CreateModel();
        }
        public bool TryConnect()
        {
            logger.LogInformation("RabbitMQ客户端尝试连接");
            lock (lockObj)
            {
                if (connection == null)
                {
                    var policy = RetryPolicy.Handle<SocketException>()
                        .Or<BrokerUnreachableException>()
                        .WaitAndRetry(RETTRYCOUNT, retryAttempt => TimeSpan.FromSeconds(Math.Pow(2, retryAttempt)), (ex, time) =>    
                        {
                            logger.LogWarning(ex.ToString());
                        });
                    policy.Execute(() =>
                    {
                        connection = connectionFactory.CreateConnection();
                    });
                }
                if (IsConnected)
                {
                    connection.ConnectionShutdown += OnConnectionShutdown;
                    connection.CallbackException += OnCallbackException;
                    connection.ConnectionBlocked += OnConnectionBlocked;
                    logger.LogInformation($"RabbitMQ{connection.Endpoint.HostName}获取了连接");
                    return true;
                }
                else
                {
                    logger.LogCritical("无法创建和打开RabbitMQ连接");
                    return false;
                }
            }
        }
        private void OnConnectionBlocked(object sender, ConnectionBlockedEventArgs e)
        {
            logger.LogWarning("RabbitMQ连接异常,尝试重连...");
            Cleanup();
            TryConnect();
        }
        private void OnCallbackException(object sender, CallbackExceptionEventArgs e)
        {
            logger.LogWarning("RabbitMQ连接异常,尝试重连...");
            Cleanup();
            TryConnect();
        }
        private void OnConnectionShutdown(object sender, ShutdownEventArgs reason)
        {
            logger.LogWarning("RabbitMQ连接异常,尝试重连...");
            Cleanup();
            TryConnect();
        }
    }

 OrderDal

 SqlSugar的一些简单封装,有些小特点:大家可以可以通过配置来实现读写分离,采用仓储设计。如果不太喜欢这么写,也可以参考杰哥的做法;

网络异常,图片无法展示
|
View Code

 OrderCommon

 定义全局异常的中间件,还有包含一些用到的实体等等,这部分代码还可优化拆分一下;

 OrderService

 生产者和消费者的具体实现,这块我还想在改造一番,将消费和业务分割开,现在写的很凌乱,不建议这么写,先把代码放出来,看看大家赞同不赞同我的这些用法,可以讨论,也欢迎争论,虽然这块代码写的不好,但是其实里面涉及一些RabbitMQ回调函数的用法,也是比较重要的,没有这些函数也就实现不了我上面说那两个特点

//RabbitMQ宕机以后回调
//客户端这块大家不要采用递归调用恢复链接
//具体为什么大家可以测试下,这里留点小疑问哈哈
connection.ConnectionShutdown += OnConnectionShutdown;
//消费端异常以后回调
consumerchannel.CallbackException += OnOnConsumerMessageAndWriteMessageLogException

Order

 具体的调用者,大家应该根据方法名字就能区分出我上面提到的两种方案的设计,整体的设计思路都是最终一致,延时队列发送消息这块最终也是可以通过定时任务来实现最终一致,实现方式有很多种,简单来说下可以通过入库时生成的缓存机制,通过定时任务来进行补偿实现,这块我没有进行具体实现,有兴趣我们可以探讨下这个方案;

[Route("api/[controller]/[action]")]
    [ApiController]
    public class OrderController : ControllerBase
    {
        private readonly IBaseDal<OrderMessageLogEntity> orderBaseDal;
        private readonly IMessageService<OrderMessageLogEntity> messageService;
        private readonly IConsumerMessageService consumerMessageService;
        private const string EXCHANGENAME = "order";
        private const string QUEUENAME = "order";
        private const string ROUTINGKEY = "order";
        public OrderController(IBaseDal<OrderMessageLogEntity> orderBaseDal, IMessageService<OrderMessageLogEntity> messageService,IConsumerMessageService consumerMessageService)
        {
            this.orderBaseDal = orderBaseDal;
            this.messageService = messageService;
            this.consumerMessageService = consumerMessageService;
        }
        /// <summary>
        /// 创建订单
        /// </summary>
        /// <returns></returns>
        public ActionResult<bool> CreateOrder(long userId)
        {
            //创建订单成功
            OrderEntity orderEntity = new OrderEntity();
            Random random= new Random();
            orderEntity.OrderId = random.Next();
            orderEntity.OrderNo = random.Next();
            orderEntity.UserId = userId;
            orderEntity.OrderInfo = random.Next() + "详情";
            //bool isCreateOrderSuccress = orderService.CreateOrder(orderId);
            //if (!isCreateOrderSuccress)
            //{
            //    throw new Exception("创建订单失败");
            //}
            //创建订单成功以后开始入消息记录库
            //消息建议设计的冗余一些方便以后好查询
            //千万级以后连表太困难
            //建议冗余的信息有用户信息、订单信息、方便以后按照这个核对信息
            //消息表的建议是按照不同的业务进行分表存储
            Random messageRandom = new Random();
            OrderMessageLogEntity orderMessageLog = new OrderMessageLogEntity();
            orderMessageLog.MessageId = messageRandom.Next();
            orderMessageLog.MessageInfo = orderEntity.OrderId+"订单信息";
            orderMessageLog.Status = (int)MessageStatusEnum.SENDING;
            orderMessageLog.OrderId = orderEntity.OrderId;
            orderMessageLog.UserId = orderEntity.UserId;
            orderMessageLog.CreateTime = DateTime.Now;
            orderMessageLog.UpdateTime = DateTime.Now;
            orderMessageLog.TryCount = 0;
            orderMessageLog.NextRetryTime = DateTime.Now.AddMinutes(5);
            //必须保证消息先落库
            bool isCreateOrderMessageLosSuccess = orderBaseDal.Insert(orderMessageLog);
            if (!isCreateOrderMessageLosSuccess)
                throw new Exception("消息入库异常");
            Message message = new Message();
            message.ExchangeName = EXCHANGENAME;
            message.QueueName = QUEUENAME;
            message.MessageId = orderMessageLog.MessageId;
            message.RoutingKey = ROUTINGKEY;
            message.Body = Encoding.UTF8.GetBytes(orderMessageLog.MessageInfo);
            //落库成功以后开始发送消息到MQ
            //这个地方采用最终一致而不去使用分布式事物最终一致
            messageService.SendMessage(message, orderMessageLog);
            return true;
        }
        /// <summary>
        /// 消费订单
        /// </summary>
        /// <returns></returns>
        public ActionResult<bool> ConsumerOrder()
        {
            Message message = new Message();
            message.ExchangeName = EXCHANGENAME;
            message.QueueName = QUEUENAME;
            message.RoutingKey = ROUTINGKEY;
            consumerMessageService.ConsumerMessage();
            return true;
        }
        /// <summary>
        /// 通过延时队列发送消息
        /// </summary>
        /// <param name="userId"></param>
        /// <returns></returns>
        public ActionResult<bool> CreateDelayCreateOrder(long userId)
        {
            //创建订单成功
            OrderEntity orderEntity = new OrderEntity();
            Random random = new Random();
            orderEntity.OrderId = random.Next();
            orderEntity.OrderNo = random.Next();
            orderEntity.UserId = userId;
            orderEntity.OrderInfo = random.Next() + "详情";
            //bool isCreateOrderSuccress = orderService.CreateOrder(orderId);
            //if (!isCreateOrderSuccress)
            //{
            //    throw new Exception("创建订单失败");
            //}
            //创建订单成功以后开始入消息记录库
            //消息建议设计的冗余一些方便以后好查询
            //千万级以后连表太困难
            //建议冗余的信息有用户信息、订单信息、方便以后按照这个核对信息
            //消息表的建议是按照不同的业务进行分表存储
            Random messageRandom = new Random();
            OrderMessageLogEntity orderMessageLog = new OrderMessageLogEntity();
            orderMessageLog.MessageId = messageRandom.Next();
            orderMessageLog.MessageInfo = orderEntity.OrderId + "订单信息";
            orderMessageLog.Status = (int)MessageStatusEnum.SENDING;
            orderMessageLog.OrderId = orderEntity.OrderId;
            orderMessageLog.UserId = orderEntity.UserId;
            orderMessageLog.CreateTime = DateTime.Now;
            orderMessageLog.UpdateTime = DateTime.Now;
            orderMessageLog.TryCount = 0;
            orderMessageLog.NextRetryTime = DateTime.Now.AddMinutes(5);
            ////必须保证消息先落库
            //bool isCreateOrderMessageLosSuccess = orderBaseDal.Insert(orderMessageLog);
            //if (!isCreateOrderMessageLosSuccess)
            //    throw new Exception("消息入库异常");
            Message message = new Message();
            message.ExchangeName = EXCHANGENAME;
            message.QueueName = QUEUENAME;
            message.MessageId = orderMessageLog.MessageId;
            message.RoutingKey = ROUTINGKEY;
            message.Body = Encoding.UTF8.GetBytes(orderMessageLog.MessageInfo);
            //这里的设计是不进行落库
            //假如两条消息都失败必须借助定时任务去对比消息库和订单库的消息id然后进行再补发
            //剩下的只要有一条发送成功其实就能保证下游必然会消费调这条消息,排除下游消费异常的情况 这个地方我不在进行实现自己可脑补一下
            //开始发送消息到MQ
            messageService.SendMessage(message, orderMessageLog);
            //发送延时消息
            messageService.SendDelayMessage(message, orderMessageLog);
            return true;
        }
        /// <summary>
        /// 消费消息以后并入库
        /// </summary>
        /// <returns></returns>
        public ActionResult<bool> ConsumerOrderAndWirteMessageLog()
        {
            consumerMessageService.ConsumerMessageAndWriteMessageLog();
            return true;
        }
        /// <summary>
        /// 消费延时消息
        /// 进行二次检查核对
        /// </summary>
        /// <returns></returns>
        public ActionResult<bool> ConsumerDelayOrder()
        {
            consumerMessageService.ConsumerDelayMessage();
            return true;
        }
    }


 HangfireExtensions

 Hangfire定时框架,采用Mysql作为持久层的存储,写的也比较清晰,后期就是针对这些进行扩展,实现在界面就能添加定时任务;


四、结束


 生产端和消费端这段代码写的凌乱,希望大家不要介意这一点,是有原因的,这里我就不说了。希望大家看到闪光点,不要在一点上纠结;下次会加入Elasticsearch和监控部分的时候我会把这块代码改掉,还大家一片整洁的世界;

 Github地址:https://github.com/wangtongzhou520/rabbitmq.git  有什么问题大家可以问我

相关实践学习
消息队列RocketMQ版:基础消息收发功能体验
本实验场景介绍消息队列RocketMQ版的基础消息收发功能,涵盖实例创建、Topic、Group资源创建以及消息收发体验等基础功能模块。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
相关文章
|
26天前
|
API
【Azure 媒体服务】Media Service的编码示例 -- 创建缩略图子画面的.NET代码调试问题
【Azure 媒体服务】Media Service的编码示例 -- 创建缩略图子画面的.NET代码调试问题
|
13天前
|
开发框架 NoSQL .NET
利用分布式锁在ASP.NET Core中实现防抖
【9月更文挑战第5天】在 ASP.NET Core 中,可通过分布式锁实现防抖功能,仅处理连续相同请求中的首个请求,其余请求返回 204 No Content,直至锁释放。具体步骤包括:安装分布式锁库如 `StackExchange.Redis`;创建分布式锁服务接口及其实现;构建防抖中间件;并在 `Startup.cs` 中注册相关服务和中间件。这一机制有效避免了短时间内重复操作的问题。
|
23天前
|
Kubernetes 监控 Devops
【独家揭秘】.NET项目中的DevOps实践:从代码提交到生产部署,你不知道的那些事!
【8月更文挑战第28天】.NET 项目中的 DevOps 实践贯穿代码提交到生产部署全流程,涵盖健壮的源代码管理、GitFlow 工作流、持续集成与部署、容器化及监控日志记录。通过 Git、CI/CD 工具、Kubernetes 及日志框架的最佳实践应用,显著提升软件开发效率与质量。本文通过具体示例,助力开发者构建高效可靠的 DevOps 流程,确保项目成功交付。
43 0
|
23天前
|
XML 开发框架 .NET
.NET框架:软件开发领域的瑞士军刀,如何让初学者变身代码艺术家——从基础架构到独特优势,一篇不可错过的深度解读。
【8月更文挑战第28天】.NET框架是由微软推出的统一开发平台,支持多种编程语言,简化应用程序的开发与部署。其核心组件包括公共语言运行库(CLR)和类库(FCL)。CLR负责内存管理、线程管理和异常处理等任务,确保代码稳定运行;FCL则提供了丰富的类和接口,涵盖网络、数据访问、安全性等多个领域,提高开发效率。此外,.NET框架还支持跨语言互操作,允许开发者使用C#、VB.NET等语言编写代码并无缝集成。这一框架凭借其强大的功能和广泛的社区支持,已成为软件开发领域的重要工具,适合初学者深入学习以奠定职业生涯基础。
80 1
|
25天前
|
API
【Azure Key Vault】.NET 代码如何访问中国区的Key Vault中的机密信息(Get/Set Secret)
【Azure Key Vault】.NET 代码如何访问中国区的Key Vault中的机密信息(Get/Set Secret)
|
23天前
|
开发框架 监控 .NET
开发者的革新利器:ASP.NET Core实战指南,构建未来Web应用的高效之道
【8月更文挑战第28天】本文探讨了如何利用ASP.NET Core构建高效、可扩展的Web应用。ASP.NET Core是一个开源、跨平台的框架,具有依赖注入、配置管理等特性。文章详细介绍了项目结构规划、依赖注入配置、中间件使用及性能优化方法,并讨论了安全性、可扩展性以及容器化的重要性。通过这些技术要点,开发者能够快速构建出符合现代Web应用需求的应用程序。
31 0
|
23天前
|
缓存 数据库连接 API
Entity Framework Core——.NET 领域的 ORM 利器,深度剖析其最佳实践之路
【8月更文挑战第28天】在软件开发领域,高效的数据访问与管理至关重要。Entity Framework Core(EF Core)作为一款强大的对象关系映射(ORM)工具,在 .NET 开发中扮演着重要角色。本文通过在线书店应用案例,展示了 EF Core 的核心特性和优势。我们定义了 `Book` 实体类及其属性,并通过 `BookStoreContext` 数据库上下文配置了数据库连接。EF Core 提供了简洁的 API,支持数据的查询、插入、更新和删除操作。
38 0
|
25天前
|
消息中间件 开发工具
【Azure Event Hub】原生应用中使用RabbitMQ,是否可以不改动代码的情况下直接转换为使用Event Hub呢?
【Azure Event Hub】原生应用中使用RabbitMQ,是否可以不改动代码的情况下直接转换为使用Event Hub呢?
|
25天前
|
存储 Linux 网络安全
【Azure App Service】.NET代码实验App Service应用中获取TLS/SSL 证书 (App Service Linux/Linux Container)
【Azure App Service】.NET代码实验App Service应用中获取TLS/SSL 证书 (App Service Linux/Linux Container)
|
25天前
|
网络安全 API 数据安全/隐私保护
【Azure App Service】.NET代码实验App Service应用中获取TLS/SSL 证书 (App Service Windows)
【Azure App Service】.NET代码实验App Service应用中获取TLS/SSL 证书 (App Service Windows)