博主介绍: ✌博主从事应用安全和大数据领域,有8年研发经验,5年面试官经验,Java技术专家✌
Java知识图谱点击链接:体系化学习Java(Java面试专题)
💕💕 感兴趣的同学可以收藏关注下 ,不然下次找不到哟💕💕
1、什么是 Stream
Stream 是 Redis 5.0 版本中新增的一种数据结构,它是一个高性能、持久化的消息队列,可以用于实现消息的发布和订阅。Stream 可以看作是一个有序的消息队列,每个消息都有一个唯一的 ID,可以根据 ID 进行消息的查找、删除和确认。在 Stream 中,消息以键值对的形式存储,可以存储任意类型的数据。Stream 还支持多个消费者组,每个消费者组可以独立消费消息,避免消息重复消费。Stream 的引入使得 Redis 在消息队列领域更具竞争力,同时也为开发者提供了一种高效、可靠的消息处理方式。
2、为什么要设计 Stream
Redis 设计 Stream 的原因主要是为了满足大规模实时数据处理的需求。在传统的消息队列中,消息的消费者只能消费最新的消息,而无法消费过去的消息。而在实时数据处理中,往往需要对历史数据进行分析和处理,因此需要一种能够存储大量历史数据并支持快速查询和消费的数据结构。Stream 的引入解决了这个问题,它支持持久化存储和快速查询,可以存储大量历史数据,并且支持多个消费者组独立消费消息,从而满足了大规模实时数据处理的需求。此外,Stream 还支持消息的延迟和重试等功能,使得 Redis 在消息队列领域更具竞争力。
3、Stream 命令详解
Stream 是 Redis 5.0 版本新增的一种数据结构,支持高性能、持久化的消息队列。下面是 Stream 命令的详细介绍:
- XADD key ID field1 value1 [field2 value2 ...]:向指定的 Stream 中添加一条消息,消息的 ID 由用户指定,消息的字段和值由用户指定。
127.0.0.1:6379> XADD mystream 1000 name John age 30
"1000-0"
- XRANGE key start end [COUNT count]:返回指定 Stream 中指定范围内的消息,范围由 start 和 end 指定,可以使用 "-" 表示最大或最小 ID,COUNT 参数表示返回消息的数量。
127.0.0.1:6379> XRANGE mystream 1000-0 1000-1
1) 1) "1000-0"
2) 1) "name"
2) "John"
3) "age"
4) "30"
- XREVRANGE key end start [COUNT count]:同 XRANGE 命令,但是返回的消息是逆序的。
127.0.0.1:6379> XADD mystream 1001 name Tom age 25
"1001-0"
127.0.0.1:6379> XADD mystream 1002 name Mary age 28
"1002-0"
127.0.0.1:6379> XADD mystream 1003 name Jack age 30
"1003-0"
127.0.0.1:6379> XREVRANGE mystream 1003-0 1001-0
1) 1) "1003-0"
2) 1) "name"
2) "Jack"
3) "age"
4) "30"
2) 1) "1002-0"
2) 1) "name"
2) "Mary"
3) "age"
4) "28"
3) 1) "1001-0"
2) 1) "name"
2) "Tom"
3) "age"
4) "25"
- XLEN key:返回指定 Stream 中消息的数量。
127.0.0.1:6379> XLEN mystream
1
- XREAD [COUNT count] [BLOCK milliseconds] STREAMS key [ID]:从指定 Stream 中读取消息,可以指定读取的消息数量和阻塞时间,如果没有新的消息,则等待指定时间后返回空结果。
127.0.0.1:6379> XADD mystream 1001 name Tom age 25
"1001-0"
127.0.0.1:6379> XADD mystream 1002 name Mary age 28
"1002-0"
127.0.0.1:6379> XADD mystream 1003 name Jack age 30
"1003-0"
127.0.0.1:6379> XREAD COUNT 2 BLOCK 1000 STREAMS mystream 0
1) 1) "mystream"
2) 1) 1) "1001-0"
2) 1) "name"
2) "Tom"
3) "age"
4) "25"
2) 1) "1002-0"
2) 1) "name"
2) "Mary"
3) "age"
4) "28"
- XACK key group ID [ID ...]:确认指定消费者组已经处理了指定 ID 的消息。
127.0.0.1:6379> XACK mystream mygroup 1000-0
(integer) 1
- XGROUP CREATE key groupname ID [MKSTREAM]:创建一个新的消费者组,可以指定组名和起始 ID,如果指定 MKSTREAM 参数,则会自动创建 Stream。
127.0.0.1:6379> XGROUP CREATE mystream mygroup 1000-0
OK
- XGROUP SETID key groupname ID:设置消费者组的起始 ID。
127.0.0.1:6379> XADD mystream 1001 name Tom age 25
"1001-0"
127.0.0.1:6379> XADD mystream 1002 name Mary age 28
"1002-0"
127.0.0.1:6379> XADD mystream 1003 name Jack age 30
"1003-0"
127.0.0.1:6379> XGROUP CREATE mystream mygroup $
OK
127.0.0.1:6379> XGROUP SETID mystream mygroup 1002-0
OK
- XGROUP DESTROY key groupname:销毁指定的消费者组。
127.0.0.1:6379> XADD mystream 1001 name Tom age 25
"1001-0"
127.0.0.1:6379> XADD mystream 1002 name Mary age 28
"1002-0"
127.0.0.1:6379> XADD mystream 1003 name Jack age 30
"1003-0"
127.0.0.1:6379> XGROUP CREATE mystream mygroup $
OK
127.0.0.1:6379> XGROUP DESTROY mystream mygroup
(integer) 1
- XGROUP DELCONSUMER key groupname consumername:从指定消费者组中删除指定的消费者。
127.0.0.1:6379> XADD mystream 1001 name Tom age 25
"1001-0"
127.0.0.1:6379> XADD mystream 1002 name Mary age 28
"1002-0"
127.0.0.1:6379> XADD mystream 1003 name Jack age 30
"1003-0"
127.0.0.1:6379> XGROUP CREATE mystream mygroup $
OK
127.0.0.1:6379> XREADGROUP GROUP mygroup myconsumer COUNT 1 STREAMS mystream >
1) 1) "mystream"
2) 1) 1) "1001-0"
2) 1) "name"
2) "Tom"
3) "age"
4) "25"
127.0.0.1:6379> XGROUP DELCONSUMER mystream mygroup myconsumer
(integer) 1
- XREADGROUP GROUP groupname consumername [COUNT count] [BLOCK milliseconds] STREAMS key [ID]:从指定消费者组中读取消息,可以指定读取的消息数量和阻塞时间,如果没有新的消息,则等待指定时间后返回空结果。
127.0.0.1:6379> XREADGROUP GROUP mygroup consumer1 STREAMS mystream >
1) 1) "mystream"
2) 1) 1) "1000-0"
2) 1) "name"
2) "John"
3) "age"
4) "30"
- XCLAIM key groupname consumername min-idle-time ID [ID ...] [IDLE milliseconds] [TIME milliseconds] [RETRYCOUNT count] [FORCE]:从指定消费者组中获取一条未被确认的消息,并将其标记为正在处理,可以指定最小空闲时间、最大空闲时间、重试次数等参数。
127.0.0.1:6379> XADD mystream 1001 name Tom age 25
"1001-0"
127.0.0.1:6379> XADD mystream 1002 name Mary age 28
"1002-0"
127.0.0.1:6379> XADD mystream 1003 name Jack age 30
"1003-0"
127.0.0.1:6379> XGROUP CREATE mystream mygroup $
OK
127.0.0.1:6379> XREADGROUP GROUP mygroup myconsumer COUNT 1 STREAMS mystream >
1) 1) "mystream"
2) 1) 1) "1001-0"
2) 1) "name"
2) "Tom"
3) "age"
4) "25"
127.0.0.1:6379> XCLAIM mystream mygroup myconsumer 0 1001-0
1) 1) "mystream"
2) 1) 1) "1001-0"
2) 1) "name"
2) "Tom"
3) "age"
4) "25"
- XDEL key ID [ID ...]:从指定 Stream 中删除指定 ID 的消息。
127.0.0.1:6379> XDEL mystream 1000-0
(integer) 1
- XTRIM key MAXLEN [~] count:删除 Stream 中多余的消息,可以指定删除的数量或者删除到指定的 ID。
127.0.0.1:6379> XADD mystream 1001 name Tom age 25
"1001-0"
127.0.0.1:6379> XADD mystream 1002 name Mary age 28
"1002-0"
127.0.0.1:6379> XADD mystream 1003 name Jack age 30
"1003-0"
127.0.0.1:6379> XADD mystream 1004 name Lucy age 27
"1004-0"
127.0.0.1:6379> XADD mystream 1005 name Bob age 32
"1005-0"
127.0.0.1:6379> XTRIM mystream MAXLEN 3
(integer) 2
4、java 写一点 Stream 的 demo
我这边 redis 的版本是 Redis-x64-5.0.14.1,windows 上玩的,绿色版的,下载地址https://github.com/tporadowski/redis/releases
一下是 demo 的代码:
package com.pany.camp.redis;
import cn.hutool.core.thread.ThreadUtil;
import cn.hutool.json.JSON;
import com.alibaba.fastjson.JSONObject;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.StreamConsumersInfo;
import redis.clients.jedis.StreamEntry;
import redis.clients.jedis.StreamEntryID;
import redis.clients.jedis.exceptions.JedisDataException;
import redis.clients.jedis.params.XReadGroupParams;
import redis.clients.jedis.params.XReadParams;
import java.util.*;
public class RedisStreamDemo {
private static final Logger logger = LoggerFactory.getLogger(RedisStreamDemo.class);
private static final int MESSAGE_READ_COUNT = 1;
private static final long MESSAGE_READ_TIMEOUT = 120000L;
public static void main(String[] args) {
// 创建 Jedis 实例
try (Jedis jedis = new Jedis("127.0.0.1", 6379)) {
// 定义 Stream 名称和消费者组名称
String streamName = "mystream";
String groupName = "mygroup5";
// 创建消费者组
try {
jedis.xgroupCreate(streamName, groupName, new StreamEntryID(), true);
} catch (JedisDataException e) {
// 如果 Stream 已经存在,则忽略异常
if (!e.getMessage().contains("BUSYGROUP")) {
throw e;
}
}
logger.info("消费者组已创建");
// 添加消息到 Stream 中
Map<String, String> fields = new HashMap<>();
fields.put("field1", "value1");
fields.put("field2", "value2");
StreamEntryID messageId = jedis.xadd(streamName, StreamEntryID.NEW_ENTRY, fields);
logger.info("消息已添加到 Stream 中,消息内容为:{}", JSONObject.toJSONString(fields));
// 读取消息
Map.Entry<String, StreamEntryID> streams = new AbstractMap.SimpleImmutableEntry<>(streamName,
new StreamEntryID().UNRECEIVED_ENTRY);
List<Map.Entry<String, List<StreamEntry>>> messages = jedis.xreadGroup(groupName, "c1", MESSAGE_READ_COUNT,
MESSAGE_READ_TIMEOUT, true, streams);
logger.info("从 Stream 中读取了 {} 条消息", messages.size());
for (Map.Entry<String, List<StreamEntry>> entry : messages) {
String sn = entry.getKey();
List<StreamEntry> streamMessages = entry.getValue();
for (StreamEntry message : streamMessages) {
logger.info("Stream 名称:{}", sn);
logger.info("Message ID:{}", message.getID());
logger.info("Message fields:{}", message.getFields());
}
}
// 确认消息已经被消费
jedis.xack(streamName, groupName, messageId);
logger.info("消息已确认消费");
// 删除消息
jedis.xdel(streamName, messageId);
logger.info("消息已删除");
} catch (Exception e) {
logger.error("执行 Redis 操作出错", e);
}
}
}
<dependency>
<groupId>redis.clients</groupId>
<artifactId>jedis</artifactId>
<version>3.7.0</version>
</dependency>
5、Stream 的应用场景
Redis Stream 的常用的应用场景:
消息队列:Stream 可以作为一个高性能的消息队列使用,支持多个消费者对同一 Stream 进行消费,且支持消费者组的管理、消息确认和消息持久化等功能。
日志收集:Stream 可以作为一个分布式的日志收集系统使用,支持多个客户端将日志写入到同一 Stream 中,且支持按照时间戳和 ID 进行查询和过滤。
实时数据处理:Stream 可以作为一个实时数据处理系统使用,支持多个客户端将实时数据写入到同一 Stream 中,且支持按照时间戳和 ID 进行查询和过滤。
事件驱动架构:Stream 可以作为一个事件驱动架构的基础设施使用,支持多个事件源将事件写入到同一 Stream 中,且支持按照事件类型和时间戳进行查询和过滤。
💕💕 本文由激流丶原创,原创不易,感谢支持!
💕💕喜欢的话记得点赞收藏啊!