Redis之Stream

本文涉及的产品
Redis 开源版,标准版 2GB
推荐场景:
搭建游戏排行榜
云数据库 Tair(兼容Redis),内存型 2GB
简介: Redis之Stream

场景

Redis使用list可以实现队列功能,但无法做到广播模式,使用PubSub可以做到广播模式,但是PubSub并不能保证数据的持久化。

Stream就是为了满足上面的需求在Redis 5.0中发布的,它的内部结构是一个链表,将消息都串起来,每个消息拥有一个自己的ID和内容,该结构是持久化保存的。

基本概念

每个Stream都以key作为自己的名字,相当于消息队列中的topic,生产者和消费者对这个key进行发布订阅即可,Stream在第一次使用 xadd 指令时被创建。

消费组

每个Stream可以有多个消费组(Consumer Group),每个消费组会有个游标(last_delivered_id)在Stream链表上往前移动,表示该消费组消费到了哪条消息。

消费组需要通过 xgroup create 进行创建,需要指定从Stream某个消息ID开始消费,该ID就是游标(last_delivered_id)的初始值。

每个消费者之间是独立的,A消费了数据1,不会影响到B消费数据1,即一份数据可以被多个消费组消费。

消费者

一个消费组可以有多个消费者(Consumer),这下消费者之间是竞争关系,任意一个消费者消费数据后,都会导致游标(last_delivered_id)向前移动,每个消费者都有一个组内唯一名称。

消费者内部有一个状态变量: pending_ids,该变量记录已经被客户端读取,但是没有ACK的消息,客户端没有ACK的消息越多,这里保存的消息id就越多,ACK后将相应减少,

官方称 pending_ids 为 PEL(Pending Entires List),用来确保客户端至少消费了一次,防止数据丢失。

image.png

消息ID与内容

消息ID格式为:timestamplnMillis-sequence,前者是消息产生的毫秒时间戳,后者是在该毫秒产生的第几条消息,例如 1527846880572-5 ,代表是 1527846880572 毫秒时的第 5 条消息,

消息ID可以由服务器自动生成,也可以是客户端自己指定,但是形式必须是 整数-整数,后来的消息ID必须大于前面的消息ID。

消息内容是一个类似hash结构的键值对。

基本命令

  1. xadd:向Stream发一条消息
# stream-1 表示stream的名字,* 则是系统该自动生成id,后面的则是我们的数据键值对

127.0.0.1:6379> xadd stream-1 * name zhangsan age 10

"1641454184593-0"

127.0.0.1:6379> xadd stream-1 * name lisi age 10

"1641454195771-0"

127.0.0.1:6379> xadd stream-1 * name wangwu age 10

"1641454208016-0"
  1. xdel:从Stream中删除一条消息
127.0.0.1:6379> xdel stream-1 1641454195771-0

(integer) 0
  1. xrange:获取Stream中的消息列表
127.0.0.1:6379> xrange stream-1 - +

1) 1) "1641454639427-0"

   2) 1) "name"

      2) "zhangsan"

      3) "age"

      4) "10"

2) 1) "1641454646977-0"

   2) 1) "name"

      2) "lisi"

      3) "age"

      4) "10"

3) 1) "1641454651750-0"

   2) 1) "name"

      2) "wangwu"

      3) "age"

      4) "10"
  1. xlen:获取Stream消息长度
127.0.0.1:6379> xlen stream-1

(integer) 3
  1. del:删除整个Stream消息列表
127.0.0.1:6379> del stream-1

(integer) 1

127.0.0.1:6379> xrange stream-1 - +

(empty array)

消费模式

消费模式有两种,独立消费与消费组,前者就是一个消息只能被全局消费一次,后者则是多个消费组可以对同一个消费者消费一次。

独立消费

Stream 提供了 一个 xread 指令,该指令在消费消息的时候会忽略消费组的存在。

  1. 使用 xread 从头部读取两条消息

127.0.0.1:6379> xread count 2 streams stream-1 0-0

1) 1) "stream-1"

   2) 1) 1) "1641455116178-0"

         2) 1) "name"

            2) "zhangsan"

            3) "age"

            4) "10"

      2) 1) "1641455118418-0"

         2) 1) "name"

            2) "zhangsan1"

            3) "age"

            4) "10"
  1. 从尾部接收最新消息,之前的消息全部会被忽略

这种接收最新消息一般配合阻塞使用,如果不使用阻塞大概率读的是空

127.0.0.1:6379> xread count 1 streams stream-1 $

(nil)

阻塞读取,当有xadd往该stream-1添加消息时,将立刻返回,block以s为单位,阻塞s表后没有数据将返回,block 0 就是永远阻塞,直到有消息来临。

127.0.0.1:6379> xread block 0 count 1 streams stream-1 $

1) 1) "stream-1"

   2) 1) 1) "1641455579648-0"

         2) 1) "name"

            2) "wangliu"

            3) "age"

            4) "20"

(38.07s)

消费组

通过 xgroup create 可以创建一个消费组,并提供一个消息起始id初始化 last_delivered_id 变量。

  1. 为 stream-1 添加一个名为 cgroup-1 的消费组,从第一条消息开始消费。
xgroup create stream-1 cgroup-1 0-0
  1. 为 stream-1 添加一个名为 cgroup-2 的消费组,该消费组只监听新消息
xgroup create stream-1 cgroup-2 $
  1. 查看 stream-1 的详细信息
127.0.0.1:6379> xinfo stream stream-1

 1) "length"

 2) (integer) 4 # 共4个消息

 3) "radix-tree-keys"

 4) (integer) 1

 5) "radix-tree-nodes"

 6) (integer) 2

 7) "last-generated-id"

 8) "1641455579648-0"

 9) "groups"

10) (integer) 2 # 两个消费组

11) "first-entry" # 第一个消息

12) 1) "1641455116178-0"

    2) 1) "name"

       2) "zhangsan"

       3) "age"

       4) "10"

13) "last-entry" # 最后一个消息

14) 1) "1641455579648-0"

    2) 1) "name"

       2) "wangliu"

       3) "age"

       4) "20"

image.png

消费

xreadgroup 用于消费组进行消费消息,需要提供消费组名称、消费者名称、起始消息ID,与 xread 一样,也可以进行阻塞等待新消息,

读到新消息时,该消息ID将存入消费者的PEL中,处理完毕后客户端使用 xack 指令通知服务器,该消息ID将被从PEL中移除。

  1. 消费消息

使用消费组 cgroup-1 中的 c1 消费者读取 stream-1 中的 一条消息,> 表示从当前消费组的 last_delivered_id 后面开始读,没读一条消息,该值就会往前进一个。

127.0.0.1:6379> xreadgroup GROUP cgroup-1 c1 count 1 streams stream-1 >

1) 1) "stream-1"

   2) 1) 1) "1641455579648-0"

         2) 1) "name"

            2) "wangliu"

            3) "age"

            4) "20"

127.0.0.1:6379> xreadgroup GROUP cgroup-1 c1 count 1 streams stream-1 >

(nil) 读完了
  1. 阻塞等待读取
xreadgroup GROUP cgroup-1 c1 block 0 count 1 streams stream-1 >

用另一个客户端发送消息

127.0.0.1:6379> xadd stream-1 * name mn age 30

"1641457442683-0"

读到新消息返回

127.0.0.1:6379> xreadgroup GROUP cgroup-1 c1 block 0 count 1 streams stream-1 >

1) 1) "stream-1"

   2) 1) 1) "1641457442683-0"

         2) 1) "name"

            2) "mn"

            3) "age"

            4) "30"

(45.20s)
  1. 观察消费组信息
127.0.0.1:6379> xinfo groups stream-1

1) 1) "name"

   2) "cgroup-1"

   3) "consumers"

   4) (integer) 1 # 一个消费者
 
   5) "pending"

   6) (integer) 5 # 5 条消息还没有ack

   7) "last-delivered-id"

   8) "1641457442683-0"

2) 1) "name"

   2) "cgroup-2"

   3) "consumers"

   4) (integer) 0

   5) "pending"

   6) (integer) 0

   7) "last-delivered-id"

   8) "1641455579648-0"
  1. 观察消费者信息

# cgroup-1下的消费者

127.0.0.1:6379> xinfo consumers stream-1 cgroup-1 

1) 1) "name"

   2) "c1" # c1 消费者

   3) "pending"

   4) (integer) 5 # 5条消息还没有ack

   5) "idle"

   6) (integer) 504275 # 空闲了多久 ms 没有读取消息

  1. ack消息
# ack一条消息
xack stream-1 cgroup-1 1641455116178-0
# ack多条消息

127.0.0.1:6379> xack stream-1 cgroup-1 1641455118418-0 1641455120461-0 1641455579648-0 1641457442683-0

(integer) 4

5条消息全部ack后,待ack的数量变为0了。

127.0.0.1:6379> xinfo consumers stream-1 cgroup-1 

1) 1) "name"

   2) "c1"

   3) "pending"

   4) (integer) 0

   5) "idle"

   6) (integer) 684848

其他场景

消息定长

如果需要将Stream中的消息保持一定的长度,可以在xadd时,增加一个maxlen参数,该参数可以把老的消息剔除,保证消息长度不超过maxlen。

先增加5条数据

127.0.0.1:6379> xadd stream-1 * name n1

"1641458449368-0"

127.0.0.1:6379> xadd stream-1 * name n2

"1641458451353-0"

127.0.0.1:6379> xadd stream-1 * name n3

"1641458452737-0"

127.0.0.1:6379> xadd stream-1 * name n4

"1641458454740-0"

127.0.0.1:6379> xadd stream-1 * name n5

"1641458456357-0"

使用定长参数,设置长度为3,可以看到,当使用定长参数后,链表中的消息只有3个了。

127.0.0.1:6379> xadd stream-1 maxlen 3 * name n5

"1641458535493-0"

127.0.0.1:6379> xinfo stream stream-1

 1) "length"

 2) (integer) 3

 3) "radix-tree-keys"

 4) (integer) 1

 5) "radix-tree-nodes"

 6) (integer) 2

 7) "last-generated-id"

 8) "1641458535493-0"

 9) "groups"

10) (integer) 2

11) "first-entry"

12) 1) "1641458454740-0"

    2) 1) "name"

       2) "n4"

13) "last-entry"

14) 1) "1641458535493-0"

    2) 1) "name"

       2) "n5"

忘记ACK

消费者的PEL保存了没有ACK的消息ID,如果没有ACK的消息越来越多,那么这个结构就会占用空间越来越大,所以千万不要忘记ACK。

PEL防止消息丢失

PEL主要的作用是,如果客户端消费了一条消息,此时发生网络断开等原因,这个消息还没被客户端消费就丢失了,因此PEL可以保证,在没有ACK前,一直在PEL中保存该消息ID,等客户端重新连接上后,可以再次消费PEL中的消息ID,一般建议客户端将参数设置成 0-0,表示从laster_delivered_id后开始消费。

Stream高可用

Stream与其他数据结构并没有区别,它的高可用也是通过建立在主从复制上,在 Cluster 和 Sentinel 即可支持高可用,由于Redis指令复制是异步的,因此可能会发生丢失极小部分的数据丢失。

分区Partition

Stream没有提供分区的能力,如果需要实现的话,可以建立多个Stream结构,然后客户端通过对key进行hash的方式,路由到不同的Stream分区中。

相关实践学习
基于Redis实现在线游戏积分排行榜
本场景将介绍如何基于Redis数据库实现在线游戏中的游戏玩家积分排行榜功能。
云数据库 Redis 版使用教程
云数据库Redis版是兼容Redis协议标准的、提供持久化的内存数据库服务,基于高可靠双机热备架构及可无缝扩展的集群架构,满足高读写性能场景及容量需弹性变配的业务需求。 产品详情:https://www.aliyun.com/product/kvstore     ------------------------------------------------------------------------- 阿里云数据库体验:数据库上云实战 开发者云会免费提供一台带自建MySQL的源数据库 ECS 实例和一台目标数据库 RDS实例。跟着指引,您可以一步步实现将ECS自建数据库迁移到目标数据库RDS。 点击下方链接,领取免费ECS&RDS资源,30分钟完成数据库上云实战!https://developer.aliyun.com/adc/scenario/51eefbd1894e42f6bb9acacadd3f9121?spm=a2c6h.13788135.J_3257954370.9.4ba85f24utseFl
目录
相关文章
|
8月前
|
消息中间件 存储 NoSQL
深入Redis消息队列:Pub/Sub和Stream的对决【redis第六部分】
深入Redis消息队列:Pub/Sub和Stream的对决【redis第六部分】
273 0
|
8月前
|
消息中间件 NoSQL Java
别再用 Redis List 实现消息队列了,Stream 专为队列而生
别再用 Redis List 实现消息队列了,Stream 专为队列而生
153 0
|
18天前
|
存储 消息中间件 监控
Redis Stream:实时数据流的处理与存储
通过上述分析和具体操作示例,您可以更好地理解和应用 Redis Stream,满足各种实时数据处理需求。
53 14
|
3月前
|
消息中间件 NoSQL Redis
Redis Stream
10月更文挑战第20天
45 2
|
7月前
|
消息中间件 负载均衡 NoSQL
Redis系列学习文章分享---第七篇(Redis快速入门之消息队列--List实现消息队列 Pubsub实现消息队列 stream的单消费模式 stream的消费者组模式 基于stream消息队列)
Redis系列学习文章分享---第七篇(Redis快速入门之消息队列--List实现消息队列 Pubsub实现消息队列 stream的单消费模式 stream的消费者组模式 基于stream消息队列)
84 0
|
3月前
|
消息中间件 分布式计算 NoSQL
大数据-41 Redis 类型集合(2) bitmap位操作 geohash空间计算 stream持久化消息队列 Z阶曲线 Base32编码
大数据-41 Redis 类型集合(2) bitmap位操作 geohash空间计算 stream持久化消息队列 Z阶曲线 Base32编码
35 2
|
5月前
|
消息中间件 NoSQL Redis
Redis Stream消息队列之基本语法与使用方式
这篇文章详细介绍了Redis Stream消息队列的基本语法和使用方式,包括消息的添加、读取、删除、修剪以及消费者组的使用和管理,强调了其在消息持久化和主备复制方面的优势。
92 0
|
8月前
|
消息中间件 缓存 NoSQL
Redis stream 用做消息队列完美吗
Redis Stream 是 Redis 5.0 版本中引入的一种新的数据结构,它用于实现简单但功能强大的消息传递模式。 这篇文章,我们聊聊 Redis Stream 基本用法 ,以及如何在 SpringBoot 项目中应用 Redis Stream 。
Redis stream 用做消息队列完美吗
|
NoSQL Redis
redis序列化问题:invalid stream header
redis序列化问题:invalid stream header
666 0
|
8月前
|
消息中间件 存储 负载均衡
Redis类型 Stream Bitfield
Redis类型 Stream Bitfield
49 0