Redis从入门到精通之底层数据类型Stream详解和使用示例

本文涉及的产品
云原生网关 MSE Higress,422元/月
Serverless 应用引擎免费试用套餐包,4320000 CU,有效期3个月
服务治理 MSE Sentinel/OpenSergo,Agent数量 不受限
简介: Redis Stream的底层数据结构还涉及到基数树和listpacks,这些结构使得Redis Stream具有高效的空间和时间复杂度,同时允许通过ID进行随机访问。基数树是一种高效的数据结构,用于存储有序数据集合。在Redis Stream中,消息ID有序集合就是基于基数树实现的。基数树可以快速定位一个元素在有序集合中的位置,以及在有序集合中查找一段范围内的元素。listpack是一种紧凑的、可扩展的、有序的、二进制数据结构,用于存储多个元素。在Redis Stream中,消息哈希表就是基于listpack实现的。listpack可以高效地存储多个元素,同时支持快速的元素访问插入。

0.前言

Redis的Stream是一种新的数据类型,于Redis 5.0版本中引入。它是一个有序、持久化、可重复读的消息流,可以用于实现消息队列、日志系统等应用场景。本文将从底层实现的角度对Redis的Stream进行详解。

1. Stream的基本概念

1.1. Stream的结构

image.png

Redis的Stream由多个消息构成,每个消息包含了一个唯一的ID和一个键值对数据。Stream中的消息是有序的,每个消息都有一个唯一的ID,ID是一个自增的64位整数。Stream中的每个消息都可以包含多个键值对数据,这些数据是以键值对的形式存储的。

1.2. Stream的持久化

Redis的Stream支持持久化,即可以将Stream中的消息保存到磁盘中,以便在Redis重启后能够恢复数据。

1.3. Stream的消费者组

Redis的Stream支持消费者组,多个消费者可以组成一个消费者组,每个消费者可以独立地消费Stream中的消息。当一个消费者组中的某个消费者消费了一个消息后,其他消费者将无法再消费该消息。
image.png

2.Stream的实现原理

image.png

2.1. Stream的数据结构

Redis的Stream数据结构由两个部分组成,一个是消息ID的有序集合,另一个是消息的哈希表。

Stream的消息ID有序集合中,每个元素都是一个消息的ID,按照递增的顺序排列。有序集合中的分值为消息的ID,成员为NULL。

Stream的消息哈希表中,每个消息都用一个哈希表表示,哈希表的键是消息的ID,值是一个由多个键值对数据组成的字典。

2.2. Stream的消息追加

当向Stream中追加新消息时,Redis会将新消息的ID插入到消息ID有序集合中,并将新消息的键值对数据插入到对应的哈希表中。如果新消息的ID已经存在于消息ID有序集合中,则插入操作会失败。
image.png

2.3. Stream的消费

当一个消费者要消费Stream中的消息时,它需要指定一个起始ID,Redis会将起始ID对应的消息之后的所有消息都返回给该消费者。消费者可以使用XRANGE命令获取消息。

当一个消费者消费了一个消息之后,Redis会将该消息的ID从消息ID有序集合中删除,并将该消息从消息哈希表中删除。如果该消息是该消费者组中的最后一个消息,则该消费者组的消费者将无法再消费该消息之前的消息。

2.4. Stream的消费者组

当一个消费者组中的某个消费者消费了一个消息之后,Redis会将该消息的ID从消费者组的"pending"列表中删除,并将该消息从消息哈希表中删除。如果该消费者是该消费者组中的最后一个消费者,则该消费者组的"pending"列表将被删除。

当某个消费者组中的所有消费者都没有消费一个特定的消息时,该消息的ID将存储在该消费者组的"pending"列表中。"pending"列表是一个有序集合,其中每个元素都是一个消息的ID,按照消息的时间戳排序。有序集合中的分值为消息的时间戳,成员为消息的ID。

2.5. Stream的持久化

Redis将Stream持久化到磁盘中的方式是将消息ID有序集合和消息哈希表分别保存到两个独立的RDB文件中。当Redis重启后,它会从这两个文件中重新加载Stream数据。

3.Redis Stream底层原理

Redis Stream的底层数据结构是基数树和listpack,这使得Redis Stream具有高效的空间和时间复杂度,同时允许通过ID进行随机访问。在Redis Stream中,每个条目都有一个唯一的ID,以实现排序。这使得可以按照时间戳或其他自定义标准对数据进行范围查询。每个Stream可以有多个消费者组,每个消费者组中可以有多个消费者,消费者组可以将Stream分配给不同的消费者,以均匀地分配负载。

3.1. 基数树(Radix Tree)

基数树(Radix Tree)是一种多叉树,用于存储有序数据集合。在Redis中,基数树被用于实现有序集合和Stream中的消息ID有序集合。

基数树的每个节点都包含一个字符和多个子节点。通过不断遍历基数树的子节点,可以找到一个字符串在基数树中的位置。例如,在Redis中,有序集合中的一个元素可以表示为一个字符串和一个分值,Redis会将这个元素的字符串按照字符拆分成多个节点,每个节点对应一个字符,最终将这个元素的分值存储在基数树的叶子节点上。

基数树的优点是支持快速的前缀匹配和范围查找。例如,在Redis中,有序集合的ZRANGEBYLEX命令就是基于基数树实现的,可以快速地按照字典序查找有序集合中的元素。

3.2. listpacks

listpacks是一种紧凑的、可扩展的、有序的、二进制数据结构,用于存储多个元素。在Redis中,listpacks被用于实现Stream中的消息哈希表。

listpacks的一个特点是支持高效的元素访问、插入和删除操作。listpacks会将多个元素紧密地排列在一起,每个元素占用的空间大小是可变的,因此可以根据实际需求灵活地分配空间。同时,listpacks可以按照元素索引快速地定位某个元素,也可以在任意位置快速地插入或删除元素。

在Redis Stream中,消息哈希表中的每个元素都是一个键值对数据,这些数据会被序列化成二进制格式,然后存储在一个listpack中。当需要访问某个键值对数据时,Redis会根据该数据在listpack中的位置,快速地定位并反序列化出该数据。

基数树和listpacks是Redis中两个重要的底层数据结构,它们为Redis提供了高效、可扩展、有序、紧凑的数据存储能力。深入理解这些数据结构的实现原理,可以帮助开发者更好地理解Redis的底层实现,从而优化应用程序的性能和可扩展性。

4.Redis Streams命令和操作示例

4.1 Streams命令

  1. XADD:向指定的Stream中添加一条消息。
  2. XLEN:获取指定Stream中的消息数量。
  3. XRANGE:按照ID范围查询Stream中的消息。
  4. XREVRANGE:按照ID范围反向查询Stream中的消息。
  5. XREAD:从多个Stream中连续读取多条消息。
  6. XACK:确认接收并处理一条或多条消息。
  7. XDEL:删除指定Stream中的一条或多条消息。
  8. XGROUP:管理Stream的消费者组。
  9. XREADGROUP:从指定的Stream中连续读取多条消息,并将消息分配给指定的消费者组。

这些命令一起构成了Redis Streams的基本操作集合,可以对Stream中的消息进行读写、查询、删除、确认处理等各种操作。通过这些命令,开发者可以轻松地构建出高效、可扩展、高可用的实时应用程序。

4.操作示例

我们分别用jedis 和 SpringBoot 实现

4.2. Jedis工程

首先创建一个Maven项目,在pom.xml文件中添加以下依赖:

<dependency>
    <groupId>redis.clients</groupId>
    <artifactId>jedis</artifactId>
    <version>3.6.3</version>
</dependency>

然后创建一个名为JedisDemo的Java类,实现Redis Streams的基本操作:

import redis.clients.jedis.Jedis;
import redis.clients.jedis.StreamEntry;

import java.util.AbstractMap;
import java.util.Collections;
import java.util.List;
import java.util.Map;

public class JedisDemo {

    public static void main(String[] args) {
        Jedis jedis = new Jedis("localhost");

        // 添加消息
        String response = jedis.xadd("stream1", "*", "name", "John", "age", "30");
        System.out.println(response);

        // 获取消息数量
        Long len = jedis.xlen("stream1");
        System.out.println(len);

        // 查询消息
        List<StreamEntry> entries = jedis.xrange("stream1", "1000-2000");
        for (StreamEntry entry : entries) {
            System.out.println(entry);
        }

        // 反向查询消息
        List<StreamEntry> entries2 = jedis.xrevrange("stream1", "+", "-", 10);
        for (StreamEntry entry : entries2) {
            System.out.println(entry);
        }

        // 连续读取消息
        List<Map.Entry<String, List<StreamEntry>>> results = jedis.xread(10, 5000, new AbstractMap.SimpleEntry<>("stream1", "0"), new AbstractMap.SimpleEntry<>("stream2", "0"));
        for (Map.Entry<String, List<StreamEntry>> result : results) {
            System.out.println(result.getKey() + ": " + result.getValue());
        }

        // 确认处理消息
        Long count = jedis.xack("stream1", "consumer1", "1001", "1002");
        System.out.println(count);

        // 删除消息
        Long count2 = jedis.xdel("stream1", "1001", "1002");
        System.out.println(count2);

        // 管理消费者组
        String result = jedis.xgroupCreate("stream1", "consumer1", "0", true);
        System.out.println(result);

        // 读取并分配消息
        List<Map.Entry<String, List<StreamEntry>>> results2 = jedis.xreadGroup("consumer1", "consumer1-1", 10, 5000, true, new AbstractMap.SimpleEntry<>("stream1", ">"));
        for (Map.Entry<String, List<StreamEntry>> result2 : results2) {
            System.out.println(result2.getKey() + ": " + result2.getValue());
        }

        jedis.close();
    }
}

运行JedisDemo类,可以看到输出了Redis Streams的基本操作结果。

  1. RedisTemplate工程

首先创建一个Maven项目,在pom.xml文件中添加以下依赖:

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-data-redis</artifactId>
    <version>2.5.0</version>
</dependency>

然后创建一个名为RedisTemplateDemo的Java类,实现Redis Streams的基本操作:

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.ApplicationContext;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.connection.stream.MapRecord;
import org.springframework.data.redis.connection.stream.ReadOffset;
import org.springframework.data.redis.connection.stream.RecordId;
import org.springframework.data.redis.connection.stream.StreamOffset;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.core.SessionCallback;
import org.springframework.data.redis.core.StreamOperations;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.data.redis.core.types.RedisClientInfo;
import org.springframework.util.CollectionUtils;

import java.time.Duration;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

@SpringBootApplication
public class RedisTemplateDemo {

    public static void main(String[] args) {
        ApplicationContext context = SpringApplication.run(RedisTemplateDemo.class, args);

        RedisTemplate<String, String> redisTemplate = context.getBean(StringRedisTemplate.class);

        // 添加消息
        redisTemplate.opsForStream().add("stream1", Collections.singletonMap("name", "John"), Collections.singletonMap("age", "30"));

        // 获取消息数量
        Long len = redisTemplate.opsForStream().size("stream1");
        System.out.println(len);

        // 查询消息
        List<MapRecord<String, String, String>> records= redisTemplate.opsForStream().range("stream1", Range.unbounded());
        for (MapRecord<String, String, String> record : records) {
            System.out.println(record);
        }

        // 反向查询消息
        List<MapRecord<String, String, String>> records2 = redisTemplate.opsForStream().reverseRange("stream1", Range.unbounded());
        for (MapRecord<String, String, String> record : records2) {
            System.out.println(record);
        }

        // 连续读取消息
        Map<StreamOffset<String>, ReadOffset> streams = new HashMap<>();
        streams.put(StreamOffset.create("stream1", ReadOffset.from("0")), ReadOffset.lastConsumed());
        List<MapRecord<String, String, String>> results = redisTemplate.opsForStream().read(streams, Duration.ofMillis(5000));
        for (MapRecord<String, String, String> result : results) {
            System.out.println(result);
        }

        // 确认处理消息
        redisTemplate.execute(new SessionCallback<>() {
            @Override
            public <K, V> Object execute(RedisOperations<K, V> operations) throws DataAccessException {
                operations.watch("stream1");
                RecordId id1 = RecordId.of("1001");
                RecordId id2 = RecordId.of("1002");
                String key = "stream1";
                StreamOperations<String, String, String> ops = operations.opsForStream();
                List<MapRecord<String, String, String>> records = ops.range(key, Range.closed(id1, id2));
                if (!CollectionUtils.isEmpty(records)) {
                    RedisConnectionFactory factory = operations.getConnectionFactory();
                    RedisClientInfo info = factory.getConnection().getClientList().get(0);
                    ops.acknowledge(key, info.getAddress(), id1, id2);
                }
                return null;
            }
        });

        // 删除消息
        Long count = redisTemplate.opsForStream().delete("stream1", "1001", "1002");
        System.out.println(count);

        // 管理消费者组
        redisTemplate.opsForStream().createGroup("stream1", "consumer1");

        // 读取并分配消息
        MapRecord<String, String, String> record = redisTemplate.opsForStream().read("consumer1", StreamOffset.lastConsumed("stream1"));
        if (record != null) {
            System.out.println(record);
            redisTemplate.opsForStream().acknowledge("stream1", "consumer1", record.getId());
        }
    }
}

运行RedisTemplateDemo类,可以看到输出了Redis Streams的基本操作结果。

总结

Redis的Stream是一种有序、持久化、可重复读的消息流数据类型,非常适合用于实现消息队列、日志系统等应用场景。Stream的数据结构由消息ID有序集合和消息哈希表两部分组成,消息ID有序集合用于维护消息的顺序,消息哈希表用于存储消息的键值对数据。Stream支持消费者组,多个消费者可以组成一个消费者组,每个消费者可以独立地消费Stream中的消息。Redis将Stream持久化到磁盘中的方式是将消息ID有序集合和消息哈希表分别保存到两个独立的RDB文件中。当Redis重启后,它会从这两个文件中重新加载Stream数据。Stream的底层实现非常高效,可以支持非常大的消息流数据。

相关实践学习
基于Redis实现在线游戏积分排行榜
本场景将介绍如何基于Redis数据库实现在线游戏中的游戏玩家积分排行榜功能。
云数据库 Redis 版使用教程
云数据库Redis版是兼容Redis协议标准的、提供持久化的内存数据库服务,基于高可靠双机热备架构及可无缝扩展的集群架构,满足高读写性能场景及容量需弹性变配的业务需求。 产品详情:https://www.aliyun.com/product/kvstore &nbsp; &nbsp; ------------------------------------------------------------------------- 阿里云数据库体验:数据库上云实战 开发者云会免费提供一台带自建MySQL的源数据库&nbsp;ECS 实例和一台目标数据库&nbsp;RDS实例。跟着指引,您可以一步步实现将ECS自建数据库迁移到目标数据库RDS。 点击下方链接,领取免费ECS&amp;RDS资源,30分钟完成数据库上云实战!https://developer.aliyun.com/adc/scenario/51eefbd1894e42f6bb9acacadd3f9121?spm=a2c6h.13788135.J_3257954370.9.4ba85f24utseFl
目录
相关文章
|
3月前
|
缓存 NoSQL Java
【Azure Redis 缓存】示例使用 redisson-spring-boot-starter 连接/使用 Azure Redis 服务
【Azure Redis 缓存】示例使用 redisson-spring-boot-starter 连接/使用 Azure Redis 服务
|
1月前
|
缓存 NoSQL Java
springboot的缓存和redis缓存,入门级别教程
本文介绍了Spring Boot中的缓存机制,包括使用默认的JVM缓存和集成Redis缓存,以及如何配置和使用缓存来提高应用程序性能。
85 1
springboot的缓存和redis缓存,入门级别教程
|
15天前
|
消息中间件 NoSQL Redis
Redis Stream
10月更文挑战第20天
21 2
|
21天前
|
存储 消息中间件 NoSQL
Redis 数据类型
10月更文挑战第15天
31 1
|
1月前
|
存储 消息中间件 NoSQL
Redis 入门 - C#.NET Core客户端库六种选择
Redis 入门 - C#.NET Core客户端库六种选择
53 8
|
1月前
|
消息中间件 分布式计算 NoSQL
大数据-41 Redis 类型集合(2) bitmap位操作 geohash空间计算 stream持久化消息队列 Z阶曲线 Base32编码
大数据-41 Redis 类型集合(2) bitmap位操作 geohash空间计算 stream持久化消息队列 Z阶曲线 Base32编码
25 2
|
2月前
|
存储 消息中间件 缓存
深入探析Redis常见数据类型及应用场景
深入探析Redis常见数据类型及应用场景
44 2
|
2月前
|
缓存 NoSQL PHP
使用PHP-redis实现键空间通知监听key失效事件的技术与代码示例
通过上述方法,你可以有效地在PHP中使用Redis来监听键空间通知,特别是针对键失效事件。这可以帮助你更好地管理缓存策略,及时响应键的变化。
92 3
|
3月前
|
NoSQL 安全 Java
Redis6入门到实战------ 三、常用五大数据类型(字符串 String)
这篇文章深入探讨了Redis中的String数据类型,包括键操作的命令、String类型的命令使用,以及String在Redis中的内部数据结构实现。
Redis6入门到实战------ 三、常用五大数据类型(字符串 String)
|
3月前
|
NoSQL 关系型数据库 Redis
Redis6入门到实战------ 九、10. Redis_事务_锁机制_秒杀
这篇文章深入探讨了Redis事务的概念、命令使用、错误处理机制以及乐观锁和悲观锁的应用,并通过WATCH/UNWATCH命令展示了事务中的锁机制。
Redis6入门到实战------ 九、10. Redis_事务_锁机制_秒杀

热门文章

最新文章

下一篇
无影云桌面