Redisson源码(二)延迟队列RDelayedQueue的使用及原理分析

简介: Redisson源码(二)延迟队列RDelayedQueue的使用及原理分析

在工作中,我们有时候会遇到这样的场景,比如下单之后超过30分钟未支付自动取消订单,还有就比如过期/生效通知等等,这些场景一般有两种方法解决:
第一种可以通过定时任务扫描符合条件的去执行,第二种就是提前通过消息队列发送延迟消息到期自动消费。
本文我要介绍的就是通过第二种方式来实现这种业务逻辑,只不过这次不是使用MQ而是直接使用的是Redission提供的RDelayedQueue延迟队列。

Tip以下是本人经过多年的工作经验集成的JavaWeb脚手架,封装了各种通用的starter可开箱即用,同时列举了互联网各种高性能场景的使用示例。

// Git代码
https://gitee.com/yeeevip/yeee-memo
https://github.com/yeeevip/yeee-memo

1 延迟队列RDelayedQueue的简单用法

  • 生产者端

1 通过redissonClient的getBlockingDeque方法指定队列名称获得RBlockingDeque对象

2 然后再通过redissonClient的getDelayedQueue方法传入RBlockingDeque对象获得RDelayedQueue对象

3 最后调用RDelayedQueue对象的offer方法就可以将消息指定延迟时间发送到延迟队列了

@Component
public class DelayQueueKit {
   
   

    // 注入RedissonClient实例
    @Resource
    private RedissonClient redissonClient;

    /**
     * 添加消息到延迟队列
     *
     * @param queueCode 队列唯一KEY
     * @param msg       消息
     * @param delay     延迟时间
     * @param timeUnit  时间单位
     */
    public <T> void addDelayQueue(String queueCode, T msg, long delay, TimeUnit timeUnit) {
   
   
        RBlockingDeque<T> blockingDeque = redissonClient.getBlockingDeque(queueCode);
        RDelayedQueue<T> delayedQueue = redissonClient.getDelayedQueue(blockingDeque);
        // 这一步通过offer插入到队列
        delayedQueue.offer(msg, delay, timeUnit);
    }
}
  • 消费者端

1 通过redissonClient获取RBlockingDeque对象

2 通过RBlockingDeque对象获取RDelayedQueue

3 之后RBlockingDeque再通过自旋调用take方法获取到期的消息,没有消息时会阻塞的。

Tip 一般情况下我们在程序刚启动时异步开一个线程去自旋消费队列消息的

@Component
public class DelayQueueKit {
   
   

    // 注入RedissonClient实例
    @Resource
    private RedissonClient redissonClient;

    public <T> void consumeQueueMsg(String queueCode) {
   
   
        RBlockingDeque<T> delayQueue = redissonClient.getBlockingDeque(queueCode);
        RDelayedQueue<T> delayedQueue = redissonClient.getDelayedQueue(blockingDeque);
        log.info("【队列-{}】- 监听队列成功", queueCode);
        while (true) {
   
   
            T message = null;
            try {
   
   
                message = delayQueue.take();
                // 处理自己的业务
                handleMessage(message);
                log.info("【队列-{}】- 处理元素成功 - ele = {}", queueCode, ele);
            } catch (Exception e) {
   
   
                log.error("【队列-{}】- 处理元素失败 - ele = {}", queueCode, ele, e);
            }
        }
    }
}

Tip以下是我工作中使用并封装的DelayQueueKit的完整工具类代码,有兴趣的可以参考一下

// Git代码
https://gitee.com/yeeevip/yeee-memo/blob/master/memo-parent/memo-common/common-kit/common-redisson-kit/src/main/java/vip/yeee/memo/common/redisson/kit/DelayQueueKit.java

2 数据结构设计

Redission实现延迟队列消息用到了四个数据结构:

20240229-01.png

redisson_delay_queue_timeout:{queue_name} 定期队列,ZSET结构(value为消息,score为过期时间),这样就可以知道当前过期的消息。

redisson_delay_queue:{queue_name} 顺序队列,LIST结构,按照消息添加顺序存储,移除消息时可以按照添加顺序删除。

redisson_delay_queue_channel:{queue_name} 发布订阅channel主题,用于通知客户端定时器从定期队列转移到期的消息到目标队列。

{queue_name} 目标队列,LIST结构,存储实际到期可以被消费的消息供消费者拉取消费。

3 消息生产源码分析

  1. 通过redissonClient.getDelayedQueue获取RDelayedQueue对象

  2. 然后delayedQueue调用offer方法去保存消息

  3. 最后真正的保存逻辑是由RedissonDelayedQueue执行offerAsync方法调用的lua脚本

public class RedissonDelayedQueue<V> extends RedissonExpirable implements RDelayedQueue<V> {
   
   
    @Override
    public RFuture<Void> offerAsync(V e, long delay, TimeUnit timeUnit) {
   
   
        if (delay < 0) {
   
   
            throw new IllegalArgumentException("Delay can't be negative");
        }
        long delayInMs = timeUnit.toMillis(delay);
        // 消息过期时间 = 当前时间 + 延迟时间
        long timeout = System.currentTimeMillis() + delayInMs;
        // 生成随机id,应该是为了允许插入到zset重复的消息
        long randomId = ThreadLocalRandom.current().nextLong();
        // 执行脚本
        return commandExecutor.evalWriteAsync(getName(), codec, RedisCommands.EVAL_VOID,
            // 将消息打包成二进制的, 打包的消息 = 随机数 + 消息,有了随机数意味着消息就可以重复
            "local value = struct.pack('dLc0', tonumber(ARGV[2]), string.len(ARGV[3]), ARGV[3]);"
            // 将 打包的消息和过期时间 插入redisson_delay_queue_timeout队列
            + "redis.call('zadd', KEYS[2], ARGV[1], value);"
            // 顺序插入redisson_delay_queue队列
            + "redis.call('rpush', KEYS[3], value);"
            // 如果刚插入的消息就是timeout队列的最前面,即刚插入的消息最近要到期
            + "local v = redis.call('zrange', KEYS[2], 0, 0); "
            + "if v[1] == value then "
            // 发布消息通知客户端消息到期时间,让它定期执行转移操作
            + "redis.call('publish', KEYS[4], ARGV[1]); "
            + "end;",
            Arrays.<Object>asList(getName(), timeoutSetName, queueName, channelName),
            // 三个参数:1-过期时间 2-随机数 3-消息
            timeout, randomId, encode(e));
    }
}

4 定时器转移消息源码分析

大家如果仅仅使用而没有看过源码的可能不太容易知道redission究竟哪里执行的定时器去定时转移到期消息的,我也是最近看源码才知道,
其实就是在调用redissonClient.getDelayedQueue获取RDelayedQueue对象时创建的:

  1. 通过redissonClient.getDelayedQueue获取RDelayedQueue对象

  2. 然后会执行RedissonDelayedQueue的构造函数方法

  3. 在这个构造方法里就会新建QueueTransferTask这个对象去执行转移操作

public class Redisson implements RedissonClient {
   
   
    @Override
    public <V> RDelayedQueue<V> getDelayedQueue(RQueue<V> destinationQueue) {
   
   
        if (destinationQueue == null) {
   
   
            throw new NullPointerException();
        }
        // 执行RedissonDelayedQueue构造方法
        return new RedissonDelayedQueue<V>(queueTransferService, destinationQueue.getCodec(), connectionManager.getCommandExecutor(), destinationQueue.getName());
    }
}
public class RedissonDelayedQueue<V> extends RedissonExpirable implements RDelayedQueue<V> {
   
   
    protected RedissonDelayedQueue(QueueTransferService queueTransferService, Codec codec, final CommandAsyncExecutor commandExecutor, String name) {
   
   
        ...
        QueueTransferTask task = new QueueTransferTask(commandExecutor.getConnectionManager()) {
   
   
            @Override
            protected RFuture<Long> pushTaskAsync() {
   
   
                return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_LONG,
                    // 从redisson_delay_queue_timeout队列获取100个到期的消息
                    "local expiredValues = redis.call('zrangebyscore', KEYS[2], 0, ARGV[1], 'limit', 0, ARGV[2]); "
                    + "if #expiredValues > 0 then "
                    + "for i, v in ipairs(expiredValues) do "
                    // 将包装的消息执行解包操作,随机数 + 原消息        
                    + "local randomId, value = struct.unpack('dLc0', v);"
                    // 将原消息插入到{queue_name}队列,就可以被消费了        
                    + "redis.call('rpush', KEYS[1], value);"
                    + "redis.call('lrem', KEYS[3], 1, v);"
                    + "end; "
                    // 转移后redisson_delay_queue_timeout队列也移除这些消息        
                    + "redis.call('zrem', KEYS[2], unpack(expiredValues));"
                    + "end; "
                    // 从定时队列获取最近到期时间然后供定时器到时间再执行
                    + "local v = redis.call('zrange', KEYS[2], 0, 0, 'WITHSCORES'); "
                    + "if v[1] ~= nil then "
                    + "return v[2]; "
                    + "end "
                    + "return nil;",
                    Arrays.<Object>asList(getName(), timeoutSetName, queueName),
                    System.currentTimeMillis(), 100);
            }
            // 主题redisson_delay_queue_channel:{queue_name}注册发布/订命令执行阅监听器
            @Override
            protected RTopic getTopic() {
   
   
                return new RedissonTopic(LongCodec.INSTANCE, commandExecutor, channelName);
            }
        };
        // 将定时器命令执行逻辑注册到发布/订阅主题,这样就可以在收到订阅时执行转移操作了
        queueTransferService.schedule(queueName, task);
        ...
    }
}

5 消息消费源码分析

消息消费的逻辑就比较简单了,从RBlockingDeque使用take方法获取消息时,直接调用的就是redis中List的BLPOP命令。

Redis Blpop 命令移出并获取列表的第一个元素, 如果列表没有元素会阻塞列表直到等待超时或发现可弹出元素为止。

public class RedissonBlockingQueue<V> extends RedissonQueue<V> implements RBlockingQueue<V> {
   
   
    @Override
    public RFuture<V> takeAsync() {
   
   
        // 执行redis中List的BLPOP命令,从{queue_name}队列阻塞取出元素
        return commandExecutor.writeAsync(getName(), codec, RedisCommands.BLPOP_VALUE, getName(), 0);
    }
}

最后

到此为止,Redission延迟队列的使用方式及原理我基本分享到这里了,大家如果有不懂的地方可以评论区留言或者直接私信我哦,同时有细节分析不到位的欢迎大家指出来,来一起学习嘛~

Tip以下是本人经过多年的工作经验集成的JavaWeb脚手架,封装了各种通用的starter可开箱即用,同时列举了互联网各种高性能场景的使用示例。

// Git代码
https://gitee.com/yeeevip/yeee-memo
https://github.com/yeeevip/yeee-memo
相关实践学习
基于Redis实现在线游戏积分排行榜
本场景将介绍如何基于Redis数据库实现在线游戏中的游戏玩家积分排行榜功能。
云数据库 Redis 版使用教程
云数据库Redis版是兼容Redis协议标准的、提供持久化的内存数据库服务,基于高可靠双机热备架构及可无缝扩展的集群架构,满足高读写性能场景及容量需弹性变配的业务需求。 产品详情:https://www.aliyun.com/product/kvstore &nbsp; &nbsp; ------------------------------------------------------------------------- 阿里云数据库体验:数据库上云实战 开发者云会免费提供一台带自建MySQL的源数据库&nbsp;ECS 实例和一台目标数据库&nbsp;RDS实例。跟着指引,您可以一步步实现将ECS自建数据库迁移到目标数据库RDS。 点击下方链接,领取免费ECS&amp;RDS资源,30分钟完成数据库上云实战!https://developer.aliyun.com/adc/scenario/51eefbd1894e42f6bb9acacadd3f9121?spm=a2c6h.13788135.J_3257954370.9.4ba85f24utseFl
目录
相关文章
|
2月前
|
设计模式 NoSQL Go
Redis 实现高效任务队列:异步队列与延迟队列详解
本文介绍了如何使用 Redis 实现异步队列和延迟队列。通过 Go 语言的 `github.com/go-redis/redis` 客户端,详细讲解了 Redis 客户端的初始化、异步队列的实现和测试、以及延迟队列的实现和测试。文章从基础连接开始,逐步构建了完整的队列系统,帮助读者更好地理解和应用这些概念,提升系统的响应速度和性能。
64 6
|
8月前
|
NoSQL Java API
Redisson源码(一)RedissonLock加锁与解锁过程原理分析
RedissonLock加锁与解锁过程原理分析
1588 12
Redisson源码(一)RedissonLock加锁与解锁过程原理分析
|
8月前
|
存储 缓存 安全
Java线程池ThreadPoolExcutor源码解读详解05-阻塞队列之DelayQueue原理及扩容机制详解
DelayQueue` 是 Java 中的一个线程安全的阻塞队列,它用于存储实现了 `Delayed` 接口的元素,这些元素都有一个延迟时间。当元素的延迟时间过去之后,它们才能被从队列中取出。以下是摘要: 1. **核心特性**: - 基于 `PriorityQueue` 实现,元素按延迟时间排序,优先级高的先出队。 - 使用 `ReentrantLock` 和条件变量 `available` 控制并发。 - 只有延迟时间小于0的元素才能被取出。 - 不允许插入 `null` 元素。 2. **构造器**: - 默认构造器创建无初始元素的队列。 - 可以
126 5
|
8月前
|
存储 安全 Java
Java线程池ThreadPoolExcutor源码解读详解04-阻塞队列之PriorityBlockingQueue原理及扩容机制详解
1. **继承实现图关系**: - `PriorityBlockingQueue`实现了`BlockingQueue`接口,提供了线程安全的队列操作。 - 内部基于优先级堆(小顶堆或大顶堆)的数据结构实现,可以保证元素按照优先级顺序出队。 2. **底层数据存储结构**: - 默认容量是11,存储数据的数组会在需要时动态扩容。 - 数组长度总是2的幂,以满足堆的性质。 3. **构造器**: - 无参构造器创建一个默认容量的队列,元素需要实现`Comparable`接口。 - 指定容量构造器允许设置初始容量,但不指定排序规则。 - 可指定容量和比较
267 2
|
8月前
|
运维 NoSQL Java
【Redis】6、Redisson 分布式锁的简单使用(可重入、重试机制...)
【Redis】6、Redisson 分布式锁的简单使用(可重入、重试机制...)
499 1
|
消息中间件 缓存 NoSQL
Redisson实现简单消息队列:优雅解决缓存清理冲突
在项目中,缓存是提高应用性能和响应速度的关键手段之一。然而,当多个模块在短时间内发布工单并且需要清理同一个接口的缓存时,容易引发缓存清理冲突,导致缓存失效的问题。为了解决这一难题,我们采用Redisson的消息队列功能,实现了一个简单而高效的消息队列,优雅地解决了缓存清理冲突问题。本文将为您详细介绍Redisson实现简单消息队列的方案,以及如何在项目中使用它来优化缓存清理。
465 0
Redisson实现简单消息队列:优雅解决缓存清理冲突
|
NoSQL Java Redis
Spring Boot + Redis 实现延时队列,写得太好了!
首先我们分析下这个流程 用户提交任务。首先将任务推送至延迟队列中。
Spring Boot + Redis 实现延时队列,写得太好了!
|
NoSQL Java Redis
源码分析:Redisson分布式锁过程分析
本文基于Redisson3.12.2版本源码,对Redisson分布式锁过程进行了分析。从获取锁、释放锁的过程,可以大概了解Redisson的主要设计思想。此外,还对基于Jedis实现的一个分布式锁示例与Redisson进行对比,来看基于Redis的分布式锁的两种不同实现方式。
396 0
|
消息中间件 Java
springboot整合延迟队列
springboot整合延迟队列
|
存储
什么是队列,如何实现?
什么是队列,如何实现?
119 0
什么是队列,如何实现?