楔子
前面我们介绍了如何通过 List 和 Pub/Sub 来实现一个消息队列,但很明显它们都有很严重的缺陷,作为消息队列是不合格的。
而 Redis 作者也注意到了这一点,于是开发了 disque,目的是成为一个基于内存的分布式消息中间件。但该项目没什么人关注,于是在 5.0 的时候将 disque 的功能移植到了 Redis 中,并给它定义了一个新的数据类型:Stream。
而前面我们说过,一个专业的消息队列应该支持以下功能:
- 支持阻塞等待拉取消息;
- 支持发布 / 订阅模式;
- 消费者下线之后重新上线,仍能消费下线期间生产者发送的消息;
- 消费者消费失败,可重新消费,也就是支持消息被同一个消费者消费多次;
- 实例宕机,消息不丢失,数据可持久化;
- 即使消息大量堆积,也不会丢数据;
那么这些功能 Stream 是不是都支持呢?以及 Stream 类型采用的底层数据结构又是什么呢?带着这些问题,我们来开始 Stream 的学习,首先是它的使用。
剧透:Stream 采用了 Radix Tree 和 listpack 两种数据结构来保存消息,后面会介绍 Radix Tree。
Stream 的使用
首先,Stream 作为消息队列,它保存的消息通常具有以下两个特征:
- 一条消息由一个或多个键值对组成;
- 每插入一条消息,这条消息都会对应一个消息 ID;
关于消息 ID,我们一般会让 Redis 自动生成,并且 ID 是递增的。消息 ID 由时间戳和序号组成,时间戳是消息插入时,以毫秒为单位的服务器当前时间;但光有时间戳还不够,因为同一毫秒内,可能会插入多条数据,所以还要有序号。
而 Stream 支持的 API 如下:
- xadd:添加消息;
- xread:读取消息;
- xlen:查询消息的长度;
- xdel:根据消息 ID 删除消息;
- xrange:读取某个区间的消息;
- del:删除整个 Stream,当然 del 可以删除任意 key;
我们实际操作一波。
添加消息
命令:xadd key ID field1 string1 field2 string2···
> xadd girl * name satori age 17 "1663918086746-0"
添加消息之后会返回消息的 ID,由时间戳+序号组成,并且 ID 我们指定的是 *,表示让 Redis 自动生成 ID,当然我们也可以手动指定。
再插入两条消息,此时 girl 这个 Stream 里面就有了三条消息。
> xadd girl * name koishi age 16 "1663918266484-0" > xadd girl * name marisa age 16 "1663918276271-0"
所以一条消息会包含一组键值对,并且从插入数据和返回结果能够看出,对于 Stream 类型来说,它要保存的数据有以下两个特征。
- 连续插入的消息 ID,其前缀有较多部分是相同的,因为它们的插入时间非常接近;
- 连续插入的消息,它们对应的键值对中的键通常是相同的(也可以不同),比如这里都是 name 和 age,因为理论上发往同一个 Stream 里面的消息应该具备相同的特征;
那么针对 Stream 的这两个数据特征,应该使用什么样的数据结构来保存这些消息数据呢?
毫无疑问,我们首先想到的就是哈希表,一个消息 ID 对应哈希表中的一个 key,消息内容对应这个 key 的 value。
但是就像刚才说的一样,消息 ID 和消息中的键经常会有重复的部分。如果使用哈希表,就会导致有不少冗余数据,这会浪费 Redis 宝贵的内存空间。
因此,为了充分节省内存空间,Stream 使用了两种内存友好的数据结构:listpack 和 Radix Tree。其中消息 ID 使用 String 保存,作为 Radix Tree 中的 key,而消息具体数据是使用 listpack 保存,作为 Radix Tree 的 value。
关于 Radix Tree 我们一会再说,先回到 Stream 的 API 上面来。
查询消息长度
命令:xlen key
# 显然当前有 3 条消息 > xlen girl (integer) 3 # 再插入一条 > xadd girl * name scarlet age 400 "1663919561887-0" # 长度变为 4 127.0.0.1:6379> xlen girl (integer) 4 # 如果 key 不存在,则长度为 0 > xlen not_exists (integer) 0
删除消息
命令:xdel key 消息ID·····,可以同时删除多个
> xlen girl (integer) 4 # 根据消息 ID 删除消息,可同时删除多个 # 并返回删除的消息数量 > xdel girl 1663918276271-0 (integer) 1 # 还剩下 3 个 > xlen girl (integer) 3
删除 Stream
直接使用 del,它可以删除任意多个任意的 key。
> del girl (integer) 1
查询区间消息
命令:xrange key start end count n,这里的 start 和 end 指的是消息ID。
# 添加消息 > xadd girl * name satori age 17 "1663921038755-0" > xadd girl * name koishi age 16 "1663921047878-0" > xadd girl * name marisa age 16 "1663921054277-0" > xadd girl * name scarlet age 400 "1663921064383-0" # 查询 > xrange girl 1663921038755-0 1663921064383-0 1) 1) "1663921038755-0" 2) 1) "name" 2) "satori" 3) "age" 4) "17" 2) 1) "1663921047878-0" 2) 1) "name" 2) "koishi" 3) "age" 4) "16" 3) 1) "1663921054277-0" 2) 1) "name" 2) "marisa" 3) "age" 4) "16" 4) 1) "1663921064383-0" 2) 1) "name" 2) "scarlet" 3) "age" 4) "400"
还是比较简单的,把整个过程想象成数组的截取即可,只不过数组用的是索引,Stream 用的是消息 ID。另外这里我们指定的是第一条和最后一条的消息 ID,所以全部返回了,而返回全量消息还有一种做法:xrange girl - +。
- - 代表第一条消息;
- + 代表最后一条消息;
并且在返回的时候,还可以指定 count 来限制数量。
# 总共 4 条,但只查询 2 条 > xrange girl - + count 2 1) 1) "1663921038755-0" 2) 1) "name" 2) "satori" 3) "age" 4) "17" 2) 1) "1663921047878-0" 2) 1) "name" 2) "koishi" 3) "age" 4) "16"
注意:这个过程并不是先全量查询,然后只返回前 count 条;而是当查询的条数达到 count 时,直接返回。另外即使数量达不到 count 也是可以的,有多少返回多少,比如这里的消息总量是 4,但 count 指定为 10,那么就只会返回 4 条。
最后,虽然这里查询用的是消息ID,但是也要像索引一样注意先后关系。start 对应的消息要在 end 对应的消息之前,类似于索引。
读取某条消息之后的 n 条消息
命令:xread count n streams xxx MESSAGE_ID
从名为 xxx 的 Stream 中,读取消息 ID 为 MESSAGE_ID 之后的 n 条消息。
# 读取 '1663921047878-0' 之后的两条消息 > xread count 2 streams girl 1663921047878-0 1) 1) "girl" 2) 1) 1) "1663921054277-0" 2) 1) "name" 2) "marisa" 3) "age" 4) "16" 2) 1) "1663921064383-0" 2) 1) "name" 2) "scarlet" 3) "age" 4) "400" # 1663921054277-0 后面只剩下一条消息了 # 所以即便 count 为 2,也只返回了一条 > xread count 2 streams girl 1663921054277-0 1) 1) "girl" 2) 1) 1) "1663921064383-0" 2) 1) "name" 2) "scarlet" 3) "age" 4) "400"
并且该命令还提供了一个可以阻塞读取的参数 block,我们可以使用它读取某条数据之后的新增数据。
xread count 1 block 0 streams girl MESSAGE_ID
不使用 block 的话,如果该消息 ID 后面没有消息了,那么会直接返回空。但通过 block 超时时间 可以在没有消息的时候让程序处于阻塞状态,如果超时时间为 0,那么会一直等待,直到队列里面有数据再返回。
一般来说,如果使用 block,那么 MESSAGE_ID 一定是最后一条消息的 ID。如果不是最后一条消息的 ID,那么有没有 block 没什么区别。所以 Redis 为了使用方便,还支持我们使用 $,它代表的就是最后一条消息的 ID。
我们看到程序阻塞在这里了,因为 $ 代表最后一条消息,它后面已经没有消息了。所以该命令会阻塞,直到别的客户端往 girl 这个 Stream 里插入一条消息之后才会返回。
> xadd girl * name sakura age 20 "1663923328212-0"
新开一个终端,往里面写入一条消息,然后查看第一个终端。
发现第一个终端读取到消息,并解除阻塞,而且输出也告诉我们整个过程阻塞了 232.43 秒。当超时时间为 0 时,代表没有上限,如果大于 0,代表阻塞指定的毫秒数。
最后,我们这里的 count 指定的是 1,但不管指定的是多少,只要有消息过来,都会解除阻塞。
所以从这里我们看到,消息队列的第一个特性:支持阻塞式拉取消息,Stream 是满足的。
消费者组
Stream 也支持消费者组,我们来看一下。
创建消费者组
命令:xgroup create <stream_key> <group_key> <ID>
> xgroup create girl group1 0-0 OK
- girl:stream key 的名称;
- group1:group key 的名称
- 0-0:消息 ID,0-0 表示从第一条开始向后读取;
如果要从最后一条消息开始向后读取的话,那么使用 $ 即可。
> xgroup create girl group2 $ OK
以上两个消费者组就创建完毕了。
读取消息
命令:xreadgroup group group_key consumer_key [count n] streams stream_key
- group_key:创建的分组名;
- consumer_key:消费者名,随便指定即可;
- count n:每次读取的数量,可选,不指定全部返回;
- stream_key:消息队列;
# 从组 'group1' 里面指定一个消费者 'c1' # 然后消费队列 girl 里面的消息,并且消费 1 条 # 注意结尾有一个 >,表示后续从下一条消息开始消费 6379> xreadgroup group group1 c1 count 1 streams girl > 1) 1) "girl" 2) 1) 1) "1663921038755-0" 2) 1) "name" 2) "satori" 3) "age" 4) "17"
然后还可以再启动一个消费者:
# 从组 'group' 里面指定一个消费者 c2,继续消费 # 消费者名字可以随便起 6379> xreadgroup group group1 c2 count 2 streams girl > 1) 1) "girl" 2) 1) 1) "1663921047878-0" 2) 1) "name" 2) "koishi" 3) "age" 4) "16" 2) 1) "1663921054277-0" 2) 1) "name" 2) "marisa" 3) "age" 4) "16"
这里消费了两条,如果不指定 count,那么默认全部消费。当消息消费完毕之后,会返回空。
# 此时已全部消费完毕了,如果再消费就会返回空 6379> xreadgroup group group1 c3 streams girl > 1) 1) "girl" 2) 1) 1) "1663921064383-0" 2) 1) "name" 2) "scarlet" 3) "age" 4) "400" 2) 1) "1663923328212-0" 2) 1) "name" 2) "sakura" 3) "age" 4) "20"
注意:消费者的数量是不受限制的。这个有点类似 kafka,一个组里面可以有任意个消费者,它们共同消费一个队列里的数据,实现并行消费。但一条消息最多只能被组里面的一个消费者消费,如果一条消息同时被两个消费者消费,那么这两个消费者应该隶属于不同的消费者组。
此外,xreadgroup 也支持阻塞式拉取消息。
6379> xreadgroup group group1 c4 block 0 streams girl > # 客户端陷入阻塞
如果我们此时另一个客户端往 girl 里面写入一条消息,那么此处就会解除阻塞,并返回新写入的消息。这里我们不写了,直接停掉,然后创建一个新的消费者组,看看能不能从头开始消费。
# 已经消费到头了,再消费的话,则返回空 > xreadgroup group group1 c4 streams girl > (nil) # 新建一个消费者组 > xgroup create girl group1_1 0-0 OK # 从头开始消费 > xreadgroup group group1_1 c4 count 1 streams girl > 1) 1) "girl" 2) 1) 1) "1663921038755-0" 2) 1) "name" 2) "satori" 3) "age" 4) "17"
所以这里我们又可以得出一个结论:Stream 满足消息队列的第二个特点,支持发布 / 订阅模式,就是让多组消费者消费同一批数组。
从图中可以看到,两组消费者获取同一批数据,这样一来就达到了多组消费者「订阅」消费的目的。
消息消费确认
一般消息接收完了,我们会回复一个确认信息,告知已经消费完毕,命令:xack stream-key group-key ID···
> xack girl group1_1 1663921038755-0 (integer) 1
消费确认增加了消息的可靠性,一般在业务处理完成之后,需要执行 ack 确认消息已经被消费完成。
所以除了上面拉取消息时用到了消息 ID,这里为了保证重新消费,也要用到了消息 ID。当一组消费者处理完消息后,需要执行 XACK 命令告知 Redis,这时 Redis 就会把这条消息标记为「处理完成」。
如果消费者异常宕机,肯定不会发送 XACK,那么 Redis 就会依旧保留这条消息,待这组消费者重新上线后,Redis 就会把之前没有处理成功的数据,重新发给这个消费者。这样一来,即使消费者异常,也不会丢失数据了。
并且即使是生成者在消费者下线期间生产的消息,消费者上线之后也是可以收到的。因此消息队列的第三和第四个特性,Stream 也是支持的。
查询未确认的消息
# 未确认的消息有 6 条 > xpending girl group1 1) (integer) 6 2) "1663921038755-0" 3) "1663923499682-0" 4) 1) 1) "c1" 2) "1" 2) 1) "c2" 2) "2" 3) 1) "c3" 2) "3" # 确认两条 > xack girl group1 1663921038755-0 1663921047878-0 (integer) 2 # 还剩四条 > xpending girl group1 1) (integer) 4 2) "1663921054277-0" 3) "1663923499682-0" 4) 1) 1) "c2" 2) "1" 2) 1) "c3" 2) "3"
xinfo 信息查询
xinfo stream stream_key:查询 Stream 的相关信息;
> xinfo stream girl 1) "length" 2) (integer) 6 # 队列中有6个消息 3) "radix-tree-keys" 4) (integer) 1 5) "radix-tree-nodes" 6) (integer) 2 7) "last-generated-id" 8) "1663923499682-0" 9) "groups" # 5 个消费分组,我中间又创建了几个 10) (integer) 5 11) "first-entry" # 第一条消息 12) 1) "1663921038755-0" 2) 1) "name" 2) "satori" 3) "age" 4) "17" 13) "last-entry" # 最后一条消息 14) 1) "1663923499682-0" 2) 1) "name" 2) "sakura" 3) "age" 4) "20"
xinfo groups stream_key:查询 Stream 消费者组信息;
> xinfo groups girl 1) 1) "name" 2) "group1" # 消费者组名称 3) "consumers" 4) (integer) 4 # 组里面有 3 个消费者 5) "pending" 6) (integer) 3 # 3 个未确认的消息 7) "last-delivered-id" 8) "1663923499682-0" 2) 1) "name" 2) "group1_1" 3) "consumers" 4) (integer) 1 5) "pending" 6) (integer) 0 7) "last-delivered-id" 8) "1663921038755-0" 3) ... ...
xinfo consumers stream_key group_key:查询某个消费组的成员信息
> xinfo consumers girl group1 1) 1) "name" 2) "c1" 3) "pending" 4) (integer) 0 5) "idle" 6) (integer) 4667346 2) 1) "name" 2) "c2" 3) "pending" 4) (integer) 1 5) "idle" 6) (integer) 4405245 3) 1) "name" 2) "c3" 3) "pending" 4) (integer) 2 5) "idle" 6) (integer) 4259811 4) 1) "name" 2) "c4" 3) "pending" 4) (integer) 0 5) "idle" 6) (integer) 1879248
xgroup delconsumer stream-key group-key consumer-key:删除组里面的某个消费者
> xgroup delconsumer girl group1 c2 (integer) 1
xgroup destroy stream-key group-key:删除消费者组
> xgroup destroy girl group1 (integer) 1
关于 Stream 的命令我们就介绍完了,并且在过程中,我们知道消息队列的前 4 个特性,Stream 都是满足的。那么问题来了,后两个特性是否也满足呢?
首先倒数第二个特性:实例宕机,消息不丢失,数据可持久化,显然 Stream 是满足的。因为 Stream 是新增的数据类型,与其它数据类型一样,每个写操作,也都会写入到 RDB 和 AOF 中。我们只需要配置好持久化策略,这样就算 Redis 宕机重启,Stream 中的数据也可以从 RDB 或 AOF 中恢复回来。
最后一个特性:即使消息大量堆积,也不会丢数据,这个 Stream 是否支持呢?一般来说,当消息队列发生消息堆积时,一般只有 2 个解决方案:
- 生产者限流:避免消费者处理不及时,导致持续积压;
- 丢弃消息:中间件丢弃旧消息,只保留固定长度的新消息;
而 Redis 在实现 Stream 时,采用了第 2 个方案,在发布消息时,你可以指定队列的最大长度,防止队列积压导致内存爆炸。
# 指定队列长度最大 10000 > XADD queue MAXLEN 10000 * name satori "1638518032447-0"
当队列长度超过上限后,旧消息会被删除,只保留固定长度的新消息。这么来看,Stream 在消息积压时,如果指定了最大长度,还是有可能丢失消息的。
经过以上分析,我们发现 Redis 的 Stream 几乎覆盖了消息队列的各种场景,那这是不是意味着,Stream 可以作为专业的消息队列中间件来使用呢?其实还不够,就算 Redis 能做到以上这些,也只是「趋近于」专业的消息队列。原因在于 Redis 本身的一些问题,如果把其定位成消息队列,还是有些欠缺的。
下面就来将 Redis 与专业的队列中间件做个对比,看看 Redis 作队列时,还有哪些欠缺?
Redis 和专业消息队列的差异
首先使用消息队列,会涉及三个部分:生产者、中间件本身、消费者。
因此消息是否会丢失,需要考虑以下三种情况:
- 生产者会不会丢消息;
- 消费者会不会丢消息;
- 队列中间件会不会丢消息;
生产者会不会丢消息
当生产者在发布消息时,可能发生以下异常情况:
- 消息没发出去:网络故障或其它问题导致发布失败,中间件直接返回失败;
- 不确定是否发布成功:网络问题导致发布超时,可能数据已发送成功,但读取响应结果超时了;
如果是第一种,消息根本没发出去,那么重新发一次就好了。如果是第二种,生产者没办法知道消息到底有没有发成功?所以,为了避免消息丢失,它也只能继续重试,直到发布成功为止。
生产者一般会设定一个最大重试次数,超过上限依旧失败,需要记录日志报警处理。
也就是说,生产者为了避免消息丢失,只能采用失败重试的方式来处理。但这也意味着消息可能会重复发送,因为消息可能发送成功了,但消费者不知道而已。所以消费者这边,就需要多做一些逻辑了,对于敏感业务,当消费者收到重复数据数据时,要设计幂等逻辑,保证业务的正确性。
从这个角度来看,生产者会不会丢消息,取决于生产者对于异常情况的处理是否合理。所以无论是 Redis 还是专业的队列中间件,生产者在这一点上都是可以保证消息不丢的。
消费者会不会丢消息
这种情况就是我们前面提到的,消费者拿到消息后,还没处理完成,就异常宕机了,那消费者还能否重新消费失败的消息?
要解决这个问题,就必须要有一个机制,只要当消费者在处理完消息、并告知中间件之后,中间件才能把消息标记已处理,否则仍会把这些数据发给消费者。
这种方案需要消费者和中间件互相配合,才能保证消费者这一侧的消息不丢。无论是 Redis 的 Stream,还是专业的队列中间件,例如 RabbitMQ, Kafka,其实都是这么做的。所以从这个角度来看,Redis 也是合格的。
中间件本身会不会丢消息
前面两个问题都比较好处理,只要客户端和服务端配合好,就能保证不丢消息。但如果队列中间件本身就不可靠呢?毕竟生产者和消费者都依赖它,如果它不可靠,那么无论生产者和消费者怎么做,都无法保证数据不丢。
在这个方面,Redis 其实没有达到要求,Redis 在以下两个场景下,都会导致数据丢失。
- AOF 持久化配置为每秒写盘,但这个写盘过程是异步的,Redis 宕机时会存在数据丢失的可能;
- 主从复制也是异步的,主从切换时,也存在丢失数据的可能(从库还未同步完成主库发来的数据,就被提成主库);
基于以上原因我们可以看到,Redis 本身无法保证严格的数据完整性,所以如果把 Redis 当做消息队列,在这方面是有可能导致数据丢失的。
而像 RabbitMQ 或 Kafka 这类专业的队列中间件,在使用时一般都是部署一个集群,生产者在发布消息时,队列中间件通常会写「多个节点」,以此保证消息的完整性。这样一来,即便其中一个节点挂了,也能保证集群的数据不丢失。也正因为如此,它们在设计时也更复杂,毕竟是专门用作消息队列的。
但 Redis 的定位则不同,它的定位更多是当作缓存来用,它们两者在这个方面肯定是存在差异的。最后,我们来看消息积压怎么办?
消息积压怎么办
因为 Redis 的数据都存储在内存中,这意味着一旦发生消息积压,就会导致 Redis 的内存持续增长,如果超过机器内存上限,则面临被 OOM 的风险。所以,Redis 的 Stream 提供了可以指定队列最大长度的功能,就是为了避免这种情况发生。
但 Kafka、RabbitMQ 这类消息队列就不一样了,它们的数据都会存储在磁盘上,磁盘的成本要比内存小得多。当消息积压时,无非就是多占用一些磁盘空间,相比于内存,在面对积压时也会更加「坦然」。综上我们可以看到,把 Redis 当作队列来使用时,始终面临的两个问题:
- Redis 本身可能会丢数据;
- 面对消息积压,Redis 内存资源紧张;
到这里,关于 Redis 是否可以用作队列,结论已经很清晰了。如果你的业务场景不复杂,对于数据丢失不敏感,而且消息积压概率比较小的情况下,把 Redis 当作队列是完全可以的。而且 Redis 相比于 Kafka 和 RabbitMQ,部署和运维也更加轻量。
关于 Redis 用作消息队列,我们再总结一下:
该部分内容引用自:水滴与银弹《把Redis当作队列来用,真的合适吗?》
Stream 类型相关的内容我们就介绍完了,再来看看 Stream 是怎么实现的?
Stream 底层结构
Stream 结构如下:
typedef struct stream { //保存消息的Radix Tree rax *rax; //消息流中的消息个数 uint64_t length; //当前消息流中最后插入的消息的ID streamID last_id; //当前消息流的消费组信息,也是用Radix Tree保存 rax *cgroups; } stream;
所以重点是 Radix Tree,它是前缀树的一种,前缀树也被称为字典树,英文是 Trie。刷 LeetCode 的话,应该会遇到相关的问题。
前缀树的特点是,每个 key 会被拆分成单字符,然后逐一保存在树上的节点中。前缀树的根节点不保存任何字符,而除了根节点以外的其他节点,每个节点只保存一个字符。当我们把从根节点到当前节点的路径上的字符拼接在一起时,就可以得到相应 key 的值了。
前缀树在查找指定字符串的时候,时间复杂度是 O(K),换句话说它只和要查找的字符串的长度有关。并且 pen 和 pencil 都具有相同的前缀,如果采用哈希表,那么 pen 三个字符会被保存两遍。
但话虽如此,前缀树不可能保证每个相同的字符,都能被共享,举个例子:
对于当前这个例子,我们就无法保证每个字符都能被共享。