AspNetCore结合Redis实践消息队列

简介: 这是年中首发在博客园上的文章,个人觉得是AspNetCore结合Redis做的一次比较优秀的消息队列重构,其中对于点对点/发布-订阅的思路应该也是面试必考题。

引言


.Net TPL Dataflow是一个进程内数据流管道,应对高并发、低延迟的要求非常有效, 但在实际Docker部署的过程中, 有一个问题一直无法回避:


单体程序部署的瞬间(服务不可用)会有少量流量无法处理;


更糟糕的情况下,迭代部署的这个版本有问题,上线后无法工作, 导致更多流量没有处理。

 

背负神圣使命(巨大压力)的程序猿心生一计,为何不将单体程序改成分布式:


增加服务ReceiverApp,ReceiverApp只接受数据,WebApp只处理数据。


41af355283d1d767d9c90a0b032e9d7c.png


知识储备


消息队列和订阅发布作为老生常谈的两个知识点被反复提及,按照JMS的规范, 官方称为点对点(point to point, queue)和发布/订阅(publish/subscribe,channel)


240f2477fc4c9d8eec4e0357b07cb7ea.png


点对点


   生产者发送消息到Message Queue中,然后消费者从队列中取出消息并消费。


队列会保留消息,直到他们被消费或超时;

① MQ支持多消费者,但每个消息只能被一个消费者处理

② 发送者和消费者在时间上没有依赖性,当发送者发送消息之后,不管消费者有没有在运行(甚至不管有没有消费者),都不会影响到消息被发送到队列

③ 一般消费者在消费之后需要向队列应答成功

如果希望发送的每个消息都被成功处理,你应该使用p2p模型


发布/订阅


消息生产者将消息发布到Channel,在此之前已有多个消费者订阅该通道。


和点对点方式不同,发布到特定通道的消息会被通道订阅者实时接收。


通道没有队列机制,发布的消息只能被当前收听的订阅者接收到

① 每个消息可以有多个订阅者

② 发布者和消费者有时间上依赖性:某通道的订阅者,必须先创建该通道订阅,才能收到消息

发布消息至通道,不关注订阅者是谁;订阅者可收听自己感兴趣的多个通道(类似于topic),也不关注发布者是谁。

③ 故如果没有订阅者,发布的消息将得不到处理;


头脑风暴


Redis内置的List数据结构能形成轻量级消息队列的效果;Redis原生支持发布/订阅 模型

如上分析, Pub/Sub模型在订阅者宕机的时候,发布的消息得不到处理,故此模型不能用于强业务的数据接收和处理。


本次采用的消息队列模型:


  • 解耦业务:新建ReceiverApp作为生产者,专注于接收并发送到队列;原有的WebApp作为消费者专注数据处理。


  • 起到削峰填谷的作用,若缩放出多个WebApp消费者容器,还能形成负载均衡的效果。


需要关注Redis操作List结构的两个命令( 左进右出,右进左出同理):


  LPUSH  &  RPOP/BRPOP


Brpop中的B 表示"Block",是一个rpop命令的阻塞版本:若指定List没有新元素,在给定时间内,该命令会阻塞当前redis客户端连接,直到超时返回nil


AspNetCore编程实践


本次使用AspNetCore 完成RedisMQ的实践,引入Redis国产第三方开源库CSRedisCore


生产者ReceiverApp


生产者使用LPush命令向Redis List数据结构写入消息。


------------------截取自Startup.cs-------------------------
public void ConfigureServices(IServiceCollection services)
{    
// Redis客户端要定义成单例, 不然在大流量并发收数的时候, 会造成redis client来不及释放。另一方面也确认api控制器不是单例模式,    
var csredis = new CSRedisClient(Configuration.GetConnectionString("redis")+",name=receiver");    
RedisHelper.Initialization(csredis);    
services.AddSingleton(csredis);    
services.AddMvc();
}
------------------截取自数据接收Controller-------------------[Route("batch")]
[HttpPost]
public async Task BatchPutEqidAndProfileIds([FromBody]List<EqidPair> eqidPairs){
    if (!ModelState.IsValid)  
    throw new ArgumentException("Http Body Payload Error.");  
    var redisKey = $"{DateTime.Now.ToString("yyyyMMdd")}";    
    eqidPairs = await EqidExtractor.EqidExtractAsync(eqidPairs);   
    if (eqidPairs != null && eqidPairs.Any())    
    RedisHelper.LPush(redisKey, eqidPairs.ToArray());    
    await Task.CompletedTask; 
    }


消费者WebApp


根据以上RedisMQ思路,事件消费方式是拉取pull,故需要轮询Redis  List数据结构,这里使用AspNetCore内置的BackgroundService后台服务类后台轮询消费:

关注后台Job中的循环接收方法。


    public class BackgroundJob : BackgroundService
    {    
        private readonly IEqidPairHandler _eqidPairHandler;    
        private readonly CSRedisClient[] _cSRedisClients;    
        private readonly I   Configuration _conf;    
        private readonly ILogger _logger;    
        public BackgroundJob(IEqidPairHandler eqidPairHandler, CSRedisClient[] csRedisClients,IConfiguration conf,ILoggerFactory loggerFactory)    
        {        
        _eqidPairHandler = eqidPairHandler;        
        _cSRedisClients = csRedisClients;        
        _conf = conf;        
        _logger = loggerFactory.CreateLogger(nameof(BackgroundJob));    
      }
        protected override async Task ExecuteAsync(CancellationToken stoppingToken)    
        {        
        _logger.LogInformation("Service starting");        
        if (_cSRedisClients[0] == null)        
        {            
          _cSRedisClients[0] = new CSRedisClient(_conf.GetConnectionString("redis") + ",defaultDatabase=" + 0);        
          }        
          RedisHelper.Initialization(_cSRedisClients[0]);
            while (!stoppingToken.IsCancellationRequested)        
            {           
            var key = $"eqidpair:{DateTime.Now.ToString("yyyyMMdd")}";           var eqidpair = RedisHelper.BRPop(5, key);           
            if (eqidpair != null)              
                await_ eqidPairHandler.AcceptEqidParamAsync(JsonConvert.DeserializeObject<EqidPair>(eqidpair));           // 强烈建议无论如何休眠一段时间,防止突发大流量导致WebApp进程CPU满载,自行根据场景设置合理休眠时间           await Task.Delay(10, stoppingToken);        
                }       
                _logger.LogInformation("Service stopping");    
         }
     }


    迭代验证


    使用docker-compose单机部署Nginx,ReceiverApp,WebApp容器。


    docker-compose up指令默认只会重建[Service配置或Image变更]的容器


    If there are existing containers for a service, and the service’s configuration or image was changed after the container’s creation, docker-compose up picks up the changes by stopping and recreating the containers (preserving mounted volumes). To prevent Compose from picking up changes, use the --no-recreate flag.


    做一次迭代验证,更新docke-compose.yml文件WebApp服务的镜像版本


    docker-compose up;


    下图显示仅 数据处理容器 WebApp被Recreate:


    c93e76a74477bdc21c01e8d2560a085c.png


    Nice,分布式改造完成,效果很明显,现在可以放心安全的迭代核心WebApp数据处理程序。

    相关文章
    |
    3月前
    |
    消息中间件 缓存 NoSQL
    Redis各类数据结构详细介绍及其在Go语言Gin框架下实践应用
    这只是利用Go语言和Gin框架与Redis交互最基础部分展示;根据具体业务需求可能需要更复杂查询、事务处理或订阅发布功能实现更多高级特性应用场景。
    306 86
    |
    3月前
    |
    存储 缓存 监控
    Redis分区的核心原理与应用实践
    Redis分区通过将数据分散存储于多个节点,提升系统处理高并发与大规模数据的能力。本文详解分区原理、策略及应用实践,涵盖哈希、范围、一致性哈希等分片方式,分析其适用场景与性能优势,并探讨电商秒杀、物联网等典型用例,为构建高性能、可扩展的Redis集群提供参考。
    199 0
    |
    11月前
    |
    消息中间件 存储 监控
    活动实践 | 快速体验云消息队列RocketMQ版
    本方案介绍如何使用阿里云消息队列RocketMQ版Serverless实例进行消息管理。主要步骤包括获取接入点、创建Topic和订阅组、收发消息、查看消息轨迹及仪表盘监控。通过这些操作,用户可以轻松实现消息的全生命周期管理,确保消息收发的高效与可靠。此外,还提供了消费验证、下载消息等功能,方便用户进行详细的消息处理与调试。
    |
    5月前
    |
    存储 缓存 NoSQL
    Redis 核心知识与项目实践解析
    本文围绕 Redis 展开,涵盖其在项目中的应用(热点数据缓存、存储业务数据、实现分布式锁)、基础数据类型(string 等 5 种)、持久化策略(RDB、AOF 及混合持久化)、过期策略(惰性 + 定期删除)、淘汰策略(8 种分类)。 还介绍了集群方案(主从复制、哨兵、Cluster 分片)及主从同步机制,分片集群数据存储的哈希槽算法。对比了 Redis 与 Memcached 的区别,说明了内存用完的情况及与 MySQL 数据一致性的保证方案。 此外,详解了缓存穿透、击穿、雪崩的概念及解决办法,如何保证 Redis 中是热点数据,Redis 分布式锁的实现及问题解决,以及项目中分布式锁
    151 1
    |
    10月前
    |
    消息中间件 监控 数据挖掘
    【有奖实践】轻量消息队列(原 MNS)订阅 OSS 事件实时处理文件变动
    当你需要对对象存储 OSS(Object Storage Service)中的文件变动进行实时处理、同步、监听、业务触发、日志记录等操作时,你可以通过设置 OSS 的事件通知规则,自定义关注的文件,并将 OSS 事件推送到轻量消息队列(原 MNS)的队列或主题中,开发者的服务即可及时收到相关通知,并通过消费消息进行后续的业务处理。
    234 92
    |
    9月前
    |
    缓存 NoSQL Java
    Redis应用—6.热key探测设计与实践
    热key问题在高并发系统中可能导致数据层和服务层的严重瓶颈,如Redis集群瘫痪和用户体验下降。为解决此问题,京东开发了JdHotkey热key探测框架,具备实时性、准确性、集群一致性和高性能等特点。该框架由etcd集群、Client端jar包、Worker端集群和Dashboard控制台组成,通过分布式计算快速识别热key并推送至应用内存,有效减轻数据层负载,提升服务性能。JdHotkey适用于多种场景,安装部署简便,支持毫秒级热key探测和集群一致性维护。
    465 61
    Redis应用—6.热key探测设计与实践
    |
    7月前
    |
    缓存 NoSQL Java
    Redis:现代服务端开发的缓存基石与电商实践-优雅草卓伊凡
    Redis:现代服务端开发的缓存基石与电商实践-优雅草卓伊凡
    193 5
    Redis:现代服务端开发的缓存基石与电商实践-优雅草卓伊凡
    |
    10月前
    |
    存储 缓存 NoSQL
    Redis哈希结构在提升数据检索速度中的实践应用
    本文详细介绍了 Redis 哈希结构的特点、常见使用场景以及如何在实际应用中利用哈希结构提升数据检索速度。通过合理使用 Redis 哈希结构,可以显著提高系统的性能和响应速度。在实际开发中,结合具体业务需求,灵活运用 Redis 提供的多种数据结构,构建高效的缓存和数据存储解决方案。希望本文能帮助您更好地理解和应用 Redis 哈希结构,提升数据检索速度。
    283 18
    |
    10月前
    |
    消息中间件 对象存储
    轻量消息队列(原 MNS)订阅 OSS 事件实践
    使用轻量消息队列订阅OSS事件,实时处理文件变动,赢取ins风U型枕(限量500个)。访问活动页面,完成实操并上传截图即可参与领奖。活动时间:即日起至2025年2月28日16:00。奖品数量有限,先到先得,快来报名吧!
    183 2