Redis实现消息队列与延时消息队列

简介: Redis实现消息队列与延时消息队列

前序

提到redis,更多的可能想到用作缓存的用途,其实redis也可以实现一些简单的消息队列用途,我们可以使用 list 数据结构实现队列。

list的几个命令

lpush (left push)

由队列的左边存放进去

rpush (right push)

由队列的右边存放进去

lpop (left pop)

由队列的左边取出来

rpop (right pop)

由队列的右边取出来

以上的四个命令,可以让 list 帮我们实现队列 或者 栈,队列的特性是先进先出,栈的特性是先进后出,

所以队列的实现可以使用 lpush + rpop 或者 rpush + lpop,

栈的实现则是lpush + lpop 或者 rpush + rpop。

image.png

使用命令演示队列

生产者发布消息

首先我们使用 rpush 对一个叫做notify-queue的队列,增加五个元素,即 1 2 3 4 5,也就是作为生产者发布消息啦

image.png

消费者消费消息

既然生产者使用的是rpush,那么消费者就要用lpop,可以看下下图,我们不停对notify-queue进行消息消费,并且是按照顺序的,从1一直到5,按顺序读出,最终队列中没有消息了,弹出的则一直是空

image.png

空轮询问题

在上面使用lpop消费消息时,可以看到,消息消费完后,我们每次再去pop时,读到的都是一个空的信息,

上面是手动执行命令,但是如果是写好的代码程序不停的去pop数据(拉取数据)的话,会造成空轮询(无用的读取),

既拉高了客户端的CPU消耗,又拉高了redis的QPS,并且还是无用操作,这些无用操作可能会造成其他客户端对redis的访问变得响应缓慢。

解决方案A (休眠)

既然空轮询会让客户端和redis的资源消耗都会变得较高,那么我们可以让客户端在收到空数据的时候,进行1s的休眠,1s后再进行数据拉取,这样可以降低消耗

Thread.sleep(1000)

这个方案也是存在瑕疵的,即消息消费延迟性增大了,如果只有一个消费者的话,延迟就是1s,即空轮询后,正好休眠了,但是这时候刚好有消息过来了,还是要等到1s醒来后才能消费,

如果有多个消费者的话,由于每个消费者的睡眠时间是岔开的,会降低一些延迟性,但是有没有办法更好的方法,可以做到几乎 0 延迟?

解决方案B (阻塞读)

redis中关于队列取数据其实还有两个命令,即阻塞读取,

blpop (blocking left pop)

brpop (blocking right pop)

阻塞读在队列没有数据的时候,会进入休眠状态,一旦有消息来了以后,则立刻做出反应,读取数据,因此使用 blpop/brpop 替换 lpop/rpop 则可以解决消息延迟性的问题,

继续入队3个属性,6、7、8

image.png

使用 blpop 进行队列的读取,最后一个参数是阻塞读的等待时间,如果超过这个时间还没有消息,将会返回nil,此时可以继续重复blpop操作,

image.png

阻塞读的空闲连接自动断开问题

客户端使用阻塞读时,如果阻塞的时间过长,服务一般会当成空闲连接,从而对其进行主动断开,减少无用的连接占用资源,这个时候客户端会抛出异常,

所以请注意,在客户端使用阻塞读的时候,要进行异常的捕获,从而做出相应的处理,例如重试。

java客户端实现消息队列

思路和上述一样,只是由命令行客户端redis-cli变成了java语言,一个线程或多个线程进行 rpush 的发布,

另外一个或多个线程进行 blpop 消费,完成的代码在:https://github.com/qiaomengnan16/redis-demo/tree/main/redis-queue

发布者

image.png

订阅者

image.png

延时队列的实现思路

延时队列指的是,消息发送的一段时间后,再由消费者进行消费,而不是发送过去后,消费者就能立即读取到,

zset的可以帮我们做到这个事情,首先zset可以通过score进行排序,score可以存一个时间戳,所以我们每次发布消息的时候,用当前时间戳加上延时的时间戳,

随后消费者取消息的时候,通过截取zset的数据,取到已经满足当前时间的消息(即取score小于等于当前时间戳的数据,score小于等于当前时间戳代表消息已经到时间了,如果大于的话,说明还得等一会才能消费)。

关键命令 zadd (发布者),zrangebyscore(订阅者),zrem (订阅者消费完数据后删除)

命令实现

我们使用zadd添加了4个数据,分别是1、2、3秒(伪说法,这里其实只是个score)后才能消费的数据,还有一个10秒后才能消费的kafka,

image.png

假如现在已经到了第三秒,我们取zset中大于等于1秒的和小于等于3秒的数据,因为这个区间的数据正好是我们可以消费的,可以看到,我们取出了符合条件的3条数据,

image.png

如果每次只能消费一个数据的话,可以加一个limit限制条件,可以看下图取出了第一个可以消费的数据,redis

image.png

同时注意,和list的lpop/和blpop不同(它们弹出即自动删除原始队列里的该数据),

虽然获取到了数据,但是如果不使用zrem进行删除的话,这条数据还会被其他人读到,因为他还一直存在zset中,

不过zrem可能会发生已经被别人抢先一步删除(消费)的情况,所以代码中还需要根据zrem的返回值是否大于0判断,本次消息我们是否抢占成功,成功后再进行正确消费。

代码实现

发布者

image.png

订阅者

image.png

测试延迟效果

image.png

完整代码地址:https://github.com/qiaomengnan16/redis-demo/tree/main/redis-delayed-queue

优化, 使用lua实现

上面实现的延迟队列中,有一个问题,就是使用zrem判断是否抢到这个数据时,很有可能没有抢到,这样继续进行读取,可能几轮都抢不到,资源白白浪费了,所以可以通过lua脚本,来进行优化,

让zrangebyscore 和 zrem成为一个原子化操作,这就可以避免多线程争抢,抢不到的资源浪费了。

image.png

image.png

结语

一些专业的队列中间件,应用起来会比较复杂并且增加运维成本,例如RabbitMQ,发消息前需要创建Exchange交换机,再创建Queue,随后Exchange和Queue要进行绑定,发消息的时候还要指定routing-key 才能和Exchange匹配,最终到达Queue中,

如果场景简单的话,可以使用redis实现一个队列,但是需要注意,redis没有专业队列的特性,没有ack的保证,也就是说消息是不可靠的,消费失败后,就没有了,如果需要百分百的可靠性,还是需要采用专业的队列中间件的ack等机制作为保障。

相关实践学习
基于Redis实现在线游戏积分排行榜
本场景将介绍如何基于Redis数据库实现在线游戏中的游戏玩家积分排行榜功能。
云数据库 Redis 版使用教程
云数据库Redis版是兼容Redis协议标准的、提供持久化的内存数据库服务,基于高可靠双机热备架构及可无缝扩展的集群架构,满足高读写性能场景及容量需弹性变配的业务需求。 产品详情:https://www.aliyun.com/product/kvstore     ------------------------------------------------------------------------- 阿里云数据库体验:数据库上云实战 开发者云会免费提供一台带自建MySQL的源数据库 ECS 实例和一台目标数据库 RDS实例。跟着指引,您可以一步步实现将ECS自建数据库迁移到目标数据库RDS。 点击下方链接,领取免费ECS&RDS资源,30分钟完成数据库上云实战!https://developer.aliyun.com/adc/scenario/51eefbd1894e42f6bb9acacadd3f9121?spm=a2c6h.13788135.J_3257954370.9.4ba85f24utseFl
目录
相关文章
|
3月前
|
消息中间件 缓存 NoSQL
Redis 是一个高性能的键值对存储系统,常用于缓存、消息队列和会话管理等场景。
【10月更文挑战第4天】Redis 是一个高性能的键值对存储系统,常用于缓存、消息队列和会话管理等场景。随着数据增长,有时需要将 Redis 数据导出以进行分析、备份或迁移。本文详细介绍几种导出方法:1)使用 Redis 命令与重定向;2)利用 Redis 的 RDB 和 AOF 持久化功能;3)借助第三方工具如 `redis-dump`。每种方法均附有示例代码,帮助你轻松完成数据导出任务。无论数据量大小,总有一款适合你。
85 6
|
4月前
|
canal 缓存 NoSQL
Redis缓存与数据库如何保证一致性?同步删除+延时双删+异步监听+多重保障方案
根据对一致性的要求程度,提出多种解决方案:同步删除、同步删除+可靠消息、延时双删、异步监听+可靠消息、多重保障方案
Redis缓存与数据库如何保证一致性?同步删除+延时双删+异步监听+多重保障方案
|
5月前
|
消息中间件 存储 负载均衡
现代消息队列与云存储问题之消息队列支持定时消息和延迟队列的问题如何解决
现代消息队列与云存储问题之消息队列支持定时消息和延迟队列的问题如何解决
|
6月前
|
消息中间件 存储 负载均衡
Redis使用ZSET实现消息队列使用总结二
Redis使用ZSET实现消息队列使用总结二
80 0
|
4月前
|
消息中间件 存储 NoSQL
剖析 Redis List 消息队列的三种消费线程模型
Redis 列表(List)是一种简单的字符串列表,它的底层实现是一个双向链表。 生产环境,很多公司都将 Redis 列表应用于轻量级消息队列 。这篇文章,我们聊聊如何使用 List 命令实现消息队列的功能以及剖析消费者线程模型 。
110 20
剖析 Redis List 消息队列的三种消费线程模型
|
3月前
|
消息中间件 分布式计算 NoSQL
大数据-41 Redis 类型集合(2) bitmap位操作 geohash空间计算 stream持久化消息队列 Z阶曲线 Base32编码
大数据-41 Redis 类型集合(2) bitmap位操作 geohash空间计算 stream持久化消息队列 Z阶曲线 Base32编码
35 2
|
3月前
|
消息中间件 存储 NoSQL
python 使用redis实现支持优先级的消息队列详细说明和代码
python 使用redis实现支持优先级的消息队列详细说明和代码
55 0
|
5月前
|
消息中间件 监控 UED
【揭秘消息队列背后的秘密!】如何解决消息队列的延时及过期失效问题?深入剖析与实战指南!
【8月更文挑战第24天】本文以随笔形式探讨了消息队列在实际应用中面临的消息延时及过期失效问题。针对消息延时,文章提出了包括优化消息队列配置、提高消费者效率和利用优先级队列在内的解决方案;并通过示例代码展示了如何优化RabbitMQ中的消费者处理流程。对于消息过期失效问题,则建议设置消息TTL、采用死信队列并实施监控报警机制;同样提供了基于RabbitMQ设置消息TTL的具体实现。这些策略有助于提升消息队列的性能和系统的整体稳定性。
70 2
|
5月前
|
消息中间件 NoSQL Redis
Redis Stream消息队列之基本语法与使用方式
这篇文章详细介绍了Redis Stream消息队列的基本语法和使用方式,包括消息的添加、读取、删除、修剪以及消费者组的使用和管理,强调了其在消息持久化和主备复制方面的优势。
92 0
|
5月前
|
消息中间件 存储 Kafka
现代消息队列与云存储问题之Kafka在海量队列场景下存在性能的问题如何解决
现代消息队列与云存储问题之Kafka在海量队列场景下存在性能的问题如何解决