Redis Stream 数据结构实现原理真的很强

本文涉及的产品
云数据库 Tair(兼容Redis),内存型 2GB
Redis 开源版,标准版 2GB
推荐场景:
搭建游戏排行榜
简介: Redis Stream 数据结构实现原理真的很强

你好,我是码哥,一个拥抱硬核技术和对象,面向人民币编程的男人,设置星标不迷路。

我在【Redis 使用 List 实现消息队列的利与弊】说过使用 List 实现消息队列有很多局限性。

  • 没有 ACK 机制。
  • 没有类似 Kafka 的 ConsumerGroup 消费组概念。
  • 消息堆积。
  • List 是线性结构,查询指定数据需要遍历整个列表。

1. 是什么

Stream 是 Redis 5.0 版本专门为消息队列设计的数据类型,借鉴了 Kafka 的 Consume Group 设计思路,提供了消费组概念

同时提供了消息的持久化和主从复制机制,客户端可以访问任何时刻的数据,并且能记住每一个客户端的访问位置,从而保证消息不丢失。

以下几个是 Stream 类型的主要特性。

  • 使用 Radix Tree 和 listpack 结构来存储消息。
  • 消息 ID 序列化生成。
  • 借鉴 Kafka Consume Group 的概念,多个消费者划分到不同的 Consume Group 中,消费同一个 Streams,同一个 Consume Group 的多个消费者可以一起并行但不重复消费,提升消费能力。
  • 支持多播(多对多),阻塞和非阻塞读取。
  • ACK 确认机制,保证了消息至少被消费一次。
  • 可设置消息保存上限阈值,我会把历史消息丢弃,防止内存占用过大。

需要注意的是,Redis Stream 是一种超轻量级的 MQ,并没有完全实现消息队列的所有设计要点,所以它的使用场景需要考虑业务的数据量和对性能、可靠性的需求。

适合系统消息量不大,容忍数据丢失,使用 Redis Stream 作为消息队列就能享受高性能快速读写消息的优势。

2. 修炼心法

每个 Stream 都有一个唯一的名称,作为 Stream 在 Redis 的 key,在首次使用 xadd 指令添加消息的时候会自动创建。

可以看到 Stream 在一个 Redix Tree 树上,树上存储的是消息 ID,每个消息 ID 对应的消息通过一个指针指向 listpack。

Stream 流就像是一个仅追加内容的消息链表,把消息一个个串起来,每个消息都有一个唯一的 ID 和消息内容,消息内容则由多个 field/value 键值对组成。底层使用 Radix Tree 和 listpack 数据结构存储数据。

为了便于理解,我画了一张图,并对 Radix Tree 的存储数据做了下变形,使用列表来体现 Stream 中消息的逻辑有序性。

图 2-31

图 2-31

这张图涉及很多概念,但是你不要慌。我一步步拆开说,最后你再回头看就懂了。

先带你屡下全局思路。

  • Consumer Group:消费组,每个消费组可以有一个或者多个消费者,消费者之间是竞争关系。不同消费组的消费者之间无任何关系。
  • *pel,全称是 Pending Entries List,记录了当前被客户端读取但是还没有 ack(Acknowledge character 确认字符)的消息。如果客户端没有 ack,这个变量的消息 ID 会越来越多。这是一个核心数据结构,用来确保客户端至少消费消息一次。

Stream 结构

Streams 结构的源码定义在 stream.h 源码中的 stream 结构体中。

typedef struct stream {
    rax *rax;
    uint64_t length;
    streamID last_id;
    streamID first_id;
    streamID max_deleted_entry_id;
    uint64_t entries_added;
    rax *cgroups;
} stream;
typedef struct streamID {
    uint64_t ms;
    uint64_t seq;
} streamID;
  • *rax,是一个 rax 的指针,指向一个 Radix Tree,key 存储消息 ID,value 实际上指向一个 listpack 数据结构,存储了多条消息,每条消息的 ID 都大于等于 这个 key 的消息 ID。
  • length,该 Stream 的消息条数。
  • streamID结构体,消息 ID 抽象,一共占 128 位,内部维护了毫秒时间戳(字段 ms);一个毫秒内的自增序号(字段 seq),用于区分同一毫秒内插入多条消息
  • last_id,当前 Stream 最后一条消息的 ID。
  • first_id,当前 Stream 第一条消息的 ID。
  • max_deleted_entry_id,当前 Stream 被删除的最大的消息 ID。
  • entries_added,总共有多少条消息添加到 Stream 中,entries_added = 已删除消息条数 + 未删除消息条数
  • *cgroups,rax 指针,也指向一个 Radix Tree ,记录当前 Stream 的所有 Consume Group,每个 Consume Group 的名称都是唯一标识,作为 Radix Tree 的 key,Consumer Group 实例作为 value。

Consumer Group

Consumer Group 由 streamCG 结构体定义,每个 Stream 可以有多个 Consumer Group,一个消费组可以有多个消费者同时对组内消息进行消费。

/* Consumer group. */
typedef struct streamCG {
    streamID last_id;
    long long entries_read;
    rax *pel;
    rax *consumers;
} streamCG;
  • last_id,表示该消费组的消费者已经读取但还未 ACK 的最后一条消息 ID。
  • *pel,是 pending entries list 简写,指向一个 Radix Tree 的指针,保存着 Consumer group 中所有消费者读取但还未 ACK 确认的消息,就是这玩意实现了 ACK 机制。该树的 key 是消息 ID,value 关联一个 streamNACK 实例。
  • *consumers, Radix Tree 指针,表示消费组中的所有消费者,key 是消费者名称,value 指向一个 streamConsumer 实例。
streamNACK

streamCG -> *pel 对应的 value 是一个 streamNACK 实例,用于抽象消费者已经读取,但是未 ACK 的消息 ID 相关信息。

/* Pending (yet not acknowledged) message in a consumer group. */
typedef struct streamNACK {
    mstime_t delivery_time;
    uint64_t delivery_count;
    streamConsumer *consumer;
} streamNACK;
  • delivery_time,该消息最后一次推送给 Consumer 的时间戳。
  • delivery_count,消息被推送次数。
  • *consumer,消息推送的 Consumer 客户端。
streamConsumer

Consumer Group 中对 Consumer 的抽象。

/* A specific consumer in a consumer group.  */
typedef struct streamConsumer {
    mstime_t seen_time;
    sds name;
    rax *pel;
} streamConsumer;
  • seen_time,消费者最近一次被激活的时间戳。
  • name,消费者名称。
  • *pel, Radix Tree 指针,对于同一个消息而言,``streamCG -> pelstreamConsumer -> pelstreamNACK` 实例是同一个。

最后来一张图,便于你理解。

图 2-32

图 2-32

肖材积:“Redis 你好,Stream 如何结合 Radix Tree 和 listpack 结构来存储消息?为什么不使用散列表来存储,消息 ID 作为散列表的 key,散列表的 value 存储消息键值对内容。’”

在回答之前,先插入几条消息到 Stream,让你对 Stream 消息的存储格式有个大体认知。

该命令的语法如下。

XADD key id field value [field value ...]

Stream 中的每个消息可以包含不同数量的多个键值对,写入消息成功后,我会把消息的 ID 返回给客户端。

执行如下指令把用户购买书籍的下单消息存放到 hotlist:books队列,消息内容主要由 payerID、amount 和 orderID。

> XADD hotlist:books * payerID 1 amount 69.00 orderID 9
1679218539571-0
> XADD hotlist:books * payerID 1 amount 36.00 orderID 15
1679218572182-0
> XADD hotlist:books * payerID 2 amount 99.00 orderID 88
1679218588426-0
> XADD hotlist:books * payerID 3 amount 68.00 orderID 80
1679218604492-0

hotlist:books 是 Stream 的名称,后面的 “*” 表示让 Redis 为插入的消息自动生成一个唯一 ID,你也可以自定义。

消息 ID 由两部分组成。

  • 当前毫秒内的时间戳;
  • 顺序编号。从 0 为起始值,用于区分同一时间内产生的多个命令。

肖材积:“如何理解 Stream 是一种只执行追加操作(append only)的数据结构?”

通过将元素 ID 与时间进行关联,并强制要求新元素的 ID 必须大于旧元素的 ID, Redis 从逻辑上将 Stream 变成了一种只执行追加操作(append only)的数据结构

用户可以确信,新的消息和事件只会出现在已有消息和事件之后,就像现实世界里新事件总是发生在已有事件之后一样,一切都是有序进行的。

肖材积:“插入的消息 ID 大部分相同,比如这四条消息的 ID 都是 1679218 前缀。另外,每条消息键值对的键通常都是一样的,比如这四条消息的键都是 payerID、amount 和 orderID。使用散列表存储的话会很多冗余数据,你这么抠门,所以不使用散列表对不对?”

没毛病,小老弟很聪明。为了节省内存,我使用了 Radix Tree 和 listpack。Radix Tree 的 key 存储消息 ID,value 使用 listpack 数据结构存储多个消息, listapck 中的消息 ID 都大于等于 key 存储的消息 ID。

我在前面已经讲过 listpack,这是一个紧凑型列表,非常节省内存。而 Radix Tree 数据结构的最大特点是适合保存具有相同前缀的数据,从而达到节省内存。

到底 Radix Tree 是怎样的数据结构,继续往下看。

Radix Tree

Radix Tree,也被称为 Radix Trie,或者 Compact Prefix Tree),用于高效地存储和查找字符串集合。它将字符串按照前缀拆分成一个个字符,并将每个字符作为一个节点存储在树中

当插入一个键值对时,Redis 会将键按照字符拆分成一个个字符,并根据字符在 Radix tree 中的位置找到合适的节点,如果该节点不存在,则创建新节点并添加到 Radix tree 中。

当所有字符都添加完毕后,将值对象指针保存到最后一个节点中。当查询一个键时,Redis 按照字符顺序遍历 Radix tree,如果发现某个字符不存在于树中,则键不存在;否则,如果最后一个节点表示一个完整的键,则返回对应的值对象。

如下图展示一个简单的前缀树,将根节点到叶子节点的路径对应字符拼接起来,就得到了两个 key(“他说碉堡了”、“他说碉炸了”)。

图 2-33

图 2-33

你应该发现了,这两个 key 拥有公共前缀(他说碉),前缀树实现了共享使用,这样就可以避免相同字符串重复存储。如果采用散列表的保存方式,那个 key 的相同前缀就会被多次存储,导致内存浪费。

Radix Tree 改进

每个节点只保存一个字符,一是会浪费内存空间,二是在进行查询时,还需要逐一匹配每个节点表示的字符,对查询性能也会造成影响。

所以,Redis 并没有直接使用标准前缀树,而是做了一次变种——Compact Prefix Tree(压缩前缀树)。通俗来说,当多个 key 具有相同的前缀时,那就将相同前缀的字符串合并在一个共享节点中,从而减少存储空间

如下几个 key(test、toaster、toasting、slow、slowly)在 Radix Tree 上的布局。

图 2-34

由于 Compact Prefix Tree 可以共享相同前缀的节点,所以在存储一组具有相同前缀的键时,Redis 的 Radix tree 比其他数据结构(如哈希表)具有更低的空间消耗和更快的查询速度。

Radix Tree 节点的数据结构由 rax.h文件中的 raxNode 定义。

typedef struct raxNode {
    uint32_t iskey:1;
    uint32_t isnull:1;
    uint32_t iscompr:1;
    uint32_t size:29;
    unsigned char data[];
} raxNode;
  • iskey:从 Radix Tree 根节点到当前节点组成的字符串是否是一个完整的 key。是的话 iskey 的值为 1。
  • isnull:当前节点是否为空节点,如果当前节点是空节点的话,就不需要为该节点分配指向 value 的指针内存。
  • iscompr,是否为压缩节点。
  • size,当前节点的大小,具体指会根据节点类型而改变。如果是压缩节点,该值表示压缩数据的长度;如果是非压缩节点,该值表示节点的子节点个数。
  • data[],实际存储的数据,根据节点类型不同而有所不同。
  • 压缩节点,data 数据包括子节点对应的字符、指向子节点的指针,节点为最终 key 对应的 value 指针。
  • 压缩节点,data 数据包含子节点对应的合并字符串、指向子节点的指针,以及节点为最终 key 的 value 指针。
  • value 指针指向一个 listpack 实例,里面保存了消息实际内容

Radix Tree 最大的特点就是适合保存具有相同前缀的数据,实现节省内存的目标,以及支持范围查找。而这个就是 Stream 采用 Radix Tree 作为底层数据结构的原因。

相关实践学习
基于Redis实现在线游戏积分排行榜
本场景将介绍如何基于Redis数据库实现在线游戏中的游戏玩家积分排行榜功能。
云数据库 Redis 版使用教程
云数据库Redis版是兼容Redis协议标准的、提供持久化的内存数据库服务,基于高可靠双机热备架构及可无缝扩展的集群架构,满足高读写性能场景及容量需弹性变配的业务需求。 产品详情:https://www.aliyun.com/product/kvstore     ------------------------------------------------------------------------- 阿里云数据库体验:数据库上云实战 开发者云会免费提供一台带自建MySQL的源数据库 ECS 实例和一台目标数据库 RDS实例。跟着指引,您可以一步步实现将ECS自建数据库迁移到目标数据库RDS。 点击下方链接,领取免费ECS&RDS资源,30分钟完成数据库上云实战!https://developer.aliyun.com/adc/scenario/51eefbd1894e42f6bb9acacadd3f9121?spm=a2c6h.13788135.J_3257954370.9.4ba85f24utseFl
相关文章
|
2月前
|
存储 消息中间件 监控
Redis Stream:实时数据流的处理与存储
通过上述分析和具体操作示例,您可以更好地理解和应用 Redis Stream,满足各种实时数据处理需求。
93 14
|
2月前
|
存储 消息中间件 缓存
Redis 5 种基础数据结构?
Redis的五种基础数据结构——字符串、哈希、列表、集合和有序集合——提供了丰富的功能来满足各种应用需求。理解并灵活运用这些数据结构,可以极大地提高应用程序的性能和可扩展性。
54 2
|
3月前
|
缓存 NoSQL PHP
Redis作为PHP缓存解决方案的优势、实现方式及注意事项。Redis凭借其高性能、丰富的数据结构、数据持久化和分布式支持等特点,在提升应用响应速度和处理能力方面表现突出
本文深入探讨了Redis作为PHP缓存解决方案的优势、实现方式及注意事项。Redis凭借其高性能、丰富的数据结构、数据持久化和分布式支持等特点,在提升应用响应速度和处理能力方面表现突出。文章还介绍了Redis在页面缓存、数据缓存和会话缓存等应用场景中的使用,并强调了缓存数据一致性、过期时间设置、容量控制和安全问题的重要性。
64 5
|
3月前
|
存储 NoSQL 关系型数据库
Redis的ZSet底层数据结构,ZSet类型全面解析
Redis的ZSet底层数据结构,ZSet类型全面解析;应用场景、底层结构、常用命令;压缩列表ZipList、跳表SkipList;B+树与跳表对比,MySQL为什么使用B+树;ZSet为什么用跳表,而不是B+树、红黑树、二叉树
|
3月前
|
存储 NoSQL Redis
Redis常见面试题:ZSet底层数据结构,SDS、压缩列表ZipList、跳表SkipList
String类型底层数据结构,List类型全面解析,ZSet底层数据结构;简单动态字符串SDS、压缩列表ZipList、哈希表、跳表SkipList、整数数组IntSet
|
3月前
|
C语言
【数据结构】栈和队列(c语言实现)(附源码)
本文介绍了栈和队列两种数据结构。栈是一种只能在一端进行插入和删除操作的线性表,遵循“先进后出”原则;队列则在一端插入、另一端删除,遵循“先进先出”原则。文章详细讲解了栈和队列的结构定义、方法声明及实现,并提供了完整的代码示例。栈和队列在实际应用中非常广泛,如二叉树的层序遍历和快速排序的非递归实现等。
315 9
|
3月前
|
存储 算法
非递归实现后序遍历时,如何避免栈溢出?
后序遍历的递归实现和非递归实现各有优缺点,在实际应用中需要根据具体的问题需求、二叉树的特点以及性能和空间的限制等因素来选择合适的实现方式。
50 1
|
30天前
|
存储 C语言 C++
【C++数据结构——栈与队列】顺序栈的基本运算(头歌实践教学平台习题)【合集】
本关任务:编写一个程序实现顺序栈的基本运算。开始你的任务吧,祝你成功!​ 相关知识 初始化栈 销毁栈 判断栈是否为空 进栈 出栈 取栈顶元素 1.初始化栈 概念:初始化栈是为栈的使用做准备,包括分配内存空间(如果是动态分配)和设置栈的初始状态。栈有顺序栈和链式栈两种常见形式。对于顺序栈,通常需要定义一个数组来存储栈元素,并设置一个变量来记录栈顶位置;对于链式栈,需要定义节点结构,包含数据域和指针域,同时初始化栈顶指针。 示例(顺序栈): 以下是一个简单的顺序栈初始化示例,假设用C语言实现,栈中存储
138 77
|
30天前
|
存储 C++ 索引
【C++数据结构——栈与队列】环形队列的基本运算(头歌实践教学平台习题)【合集】
【数据结构——栈与队列】环形队列的基本运算(头歌实践教学平台习题)【合集】初始化队列、销毁队列、判断队列是否为空、进队列、出队列等。本关任务:编写一个程序实现环形队列的基本运算。(6)出队列序列:yzopq2*(5)依次进队列元素:opq2*(6)出队列序列:bcdef。(2)依次进队列元素:abc。(5)依次进队列元素:def。(2)依次进队列元素:xyz。开始你的任务吧,祝你成功!(4)出队一个元素a。(4)出队一个元素x。
42 13
【C++数据结构——栈与队列】环形队列的基本运算(头歌实践教学平台习题)【合集】
|
30天前
|
存储 C语言 C++
【C++数据结构——栈与队列】链栈的基本运算(头歌实践教学平台习题)【合集】
本关任务:编写一个程序实现链栈的基本运算。开始你的任务吧,祝你成功!​ 相关知识 初始化栈 销毁栈 判断栈是否为空 进栈 出栈 取栈顶元素 初始化栈 概念:初始化栈是为栈的使用做准备,包括分配内存空间(如果是动态分配)和设置栈的初始状态。栈有顺序栈和链式栈两种常见形式。对于顺序栈,通常需要定义一个数组来存储栈元素,并设置一个变量来记录栈顶位置;对于链式栈,需要定义节点结构,包含数据域和指针域,同时初始化栈顶指针。 示例(顺序栈): 以下是一个简单的顺序栈初始化示例,假设用C语言实现,栈中存储整数,最大
45 9