0.前言
Redis的Stream是一种新的数据类型,于Redis 5.0版本中引入。它是一个有序、持久化、可重复读的消息流,可以用于实现消息队列、日志系统等应用场景。本文将从底层实现的角度对Redis的Stream进行详解。
1. Stream的基本概念
1.1. Stream的结构
Redis的Stream由多个消息构成,每个消息包含了一个唯一的ID和一个键值对数据。Stream中的消息是有序的,每个消息都有一个唯一的ID,ID是一个自增的64位整数。Stream中的每个消息都可以包含多个键值对数据,这些数据是以键值对的形式存储的。
1.2. Stream的持久化
Redis的Stream支持持久化,即可以将Stream中的消息保存到磁盘中,以便在Redis重启后能够恢复数据。
1.3. Stream的消费者组
Redis的Stream支持消费者组,多个消费者可以组成一个消费者组,每个消费者可以独立地消费Stream中的消息。当一个消费者组中的某个消费者消费了一个消息后,其他消费者将无法再消费该消息。
2.Stream的实现原理
2.1. Stream的数据结构
Redis的Stream数据结构由两个部分组成,一个是消息ID的有序集合,另一个是消息的哈希表。
Stream的消息ID有序集合中,每个元素都是一个消息的ID,按照递增的顺序排列。有序集合中的分值为消息的ID,成员为NULL。
Stream的消息哈希表中,每个消息都用一个哈希表表示,哈希表的键是消息的ID,值是一个由多个键值对数据组成的字典。
2.2. Stream的消息追加
当向Stream中追加新消息时,Redis会将新消息的ID插入到消息ID有序集合中,并将新消息的键值对数据插入到对应的哈希表中。如果新消息的ID已经存在于消息ID有序集合中,则插入操作会失败。
2.3. Stream的消费
当一个消费者要消费Stream中的消息时,它需要指定一个起始ID,Redis会将起始ID对应的消息之后的所有消息都返回给该消费者。消费者可以使用XRANGE命令获取消息。
当一个消费者消费了一个消息之后,Redis会将该消息的ID从消息ID有序集合中删除,并将该消息从消息哈希表中删除。如果该消息是该消费者组中的最后一个消息,则该消费者组的消费者将无法再消费该消息之前的消息。
2.4. Stream的消费者组
当一个消费者组中的某个消费者消费了一个消息之后,Redis会将该消息的ID从消费者组的"pending"列表中删除,并将该消息从消息哈希表中删除。如果该消费者是该消费者组中的最后一个消费者,则该消费者组的"pending"列表将被删除。
当某个消费者组中的所有消费者都没有消费一个特定的消息时,该消息的ID将存储在该消费者组的"pending"列表中。"pending"列表是一个有序集合,其中每个元素都是一个消息的ID,按照消息的时间戳排序。有序集合中的分值为消息的时间戳,成员为消息的ID。
2.5. Stream的持久化
Redis将Stream持久化到磁盘中的方式是将消息ID有序集合和消息哈希表分别保存到两个独立的RDB文件中。当Redis重启后,它会从这两个文件中重新加载Stream数据。
3.Redis Stream底层原理
Redis Stream的底层数据结构是基数树和listpack,这使得Redis Stream具有高效的空间和时间复杂度,同时允许通过ID进行随机访问。在Redis Stream中,每个条目都有一个唯一的ID,以实现排序。这使得可以按照时间戳或其他自定义标准对数据进行范围查询。每个Stream可以有多个消费者组,每个消费者组中可以有多个消费者,消费者组可以将Stream分配给不同的消费者,以均匀地分配负载。
3.1. 基数树(Radix Tree)
基数树(Radix Tree)是一种多叉树,用于存储有序数据集合。在Redis中,基数树被用于实现有序集合和Stream中的消息ID有序集合。
基数树的每个节点都包含一个字符和多个子节点。通过不断遍历基数树的子节点,可以找到一个字符串在基数树中的位置。例如,在Redis中,有序集合中的一个元素可以表示为一个字符串和一个分值,Redis会将这个元素的字符串按照字符拆分成多个节点,每个节点对应一个字符,最终将这个元素的分值存储在基数树的叶子节点上。
基数树的优点是支持快速的前缀匹配和范围查找。例如,在Redis中,有序集合的ZRANGEBYLEX命令就是基于基数树实现的,可以快速地按照字典序查找有序集合中的元素。
3.2. listpacks
listpacks是一种紧凑的、可扩展的、有序的、二进制数据结构,用于存储多个元素。在Redis中,listpacks被用于实现Stream中的消息哈希表。
listpacks的一个特点是支持高效的元素访问、插入和删除操作。listpacks会将多个元素紧密地排列在一起,每个元素占用的空间大小是可变的,因此可以根据实际需求灵活地分配空间。同时,listpacks可以按照元素索引快速地定位某个元素,也可以在任意位置快速地插入或删除元素。
在Redis Stream中,消息哈希表中的每个元素都是一个键值对数据,这些数据会被序列化成二进制格式,然后存储在一个listpack中。当需要访问某个键值对数据时,Redis会根据该数据在listpack中的位置,快速地定位并反序列化出该数据。
基数树和listpacks是Redis中两个重要的底层数据结构,它们为Redis提供了高效、可扩展、有序、紧凑的数据存储能力。深入理解这些数据结构的实现原理,可以帮助开发者更好地理解Redis的底层实现,从而优化应用程序的性能和可扩展性。
4.Redis Streams命令和操作示例
4.1 Streams命令
- XADD:向指定的Stream中添加一条消息。
- XLEN:获取指定Stream中的消息数量。
- XRANGE:按照ID范围查询Stream中的消息。
- XREVRANGE:按照ID范围反向查询Stream中的消息。
- XREAD:从多个Stream中连续读取多条消息。
- XACK:确认接收并处理一条或多条消息。
- XDEL:删除指定Stream中的一条或多条消息。
- XGROUP:管理Stream的消费者组。
- XREADGROUP:从指定的Stream中连续读取多条消息,并将消息分配给指定的消费者组。
这些命令一起构成了Redis Streams的基本操作集合,可以对Stream中的消息进行读写、查询、删除、确认处理等各种操作。通过这些命令,开发者可以轻松地构建出高效、可扩展、高可用的实时应用程序。
4.操作示例
我们分别用jedis 和 SpringBoot 实现
4.2. Jedis工程
首先创建一个Maven项目,在pom.xml文件中添加以下依赖:
<dependency>
<groupId>redis.clients</groupId>
<artifactId>jedis</artifactId>
<version>3.6.3</version>
</dependency>
然后创建一个名为JedisDemo的Java类,实现Redis Streams的基本操作:
import redis.clients.jedis.Jedis;
import redis.clients.jedis.StreamEntry;
import java.util.AbstractMap;
import java.util.Collections;
import java.util.List;
import java.util.Map;
public class JedisDemo {
public static void main(String[] args) {
Jedis jedis = new Jedis("localhost");
// 添加消息
String response = jedis.xadd("stream1", "*", "name", "John", "age", "30");
System.out.println(response);
// 获取消息数量
Long len = jedis.xlen("stream1");
System.out.println(len);
// 查询消息
List<StreamEntry> entries = jedis.xrange("stream1", "1000-2000");
for (StreamEntry entry : entries) {
System.out.println(entry);
}
// 反向查询消息
List<StreamEntry> entries2 = jedis.xrevrange("stream1", "+", "-", 10);
for (StreamEntry entry : entries2) {
System.out.println(entry);
}
// 连续读取消息
List<Map.Entry<String, List<StreamEntry>>> results = jedis.xread(10, 5000, new AbstractMap.SimpleEntry<>("stream1", "0"), new AbstractMap.SimpleEntry<>("stream2", "0"));
for (Map.Entry<String, List<StreamEntry>> result : results) {
System.out.println(result.getKey() + ": " + result.getValue());
}
// 确认处理消息
Long count = jedis.xack("stream1", "consumer1", "1001", "1002");
System.out.println(count);
// 删除消息
Long count2 = jedis.xdel("stream1", "1001", "1002");
System.out.println(count2);
// 管理消费者组
String result = jedis.xgroupCreate("stream1", "consumer1", "0", true);
System.out.println(result);
// 读取并分配消息
List<Map.Entry<String, List<StreamEntry>>> results2 = jedis.xreadGroup("consumer1", "consumer1-1", 10, 5000, true, new AbstractMap.SimpleEntry<>("stream1", ">"));
for (Map.Entry<String, List<StreamEntry>> result2 : results2) {
System.out.println(result2.getKey() + ": " + result2.getValue());
}
jedis.close();
}
}
运行JedisDemo类,可以看到输出了Redis Streams的基本操作结果。
- RedisTemplate工程
首先创建一个Maven项目,在pom.xml文件中添加以下依赖:
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
<version>2.5.0</version>
</dependency>
然后创建一个名为RedisTemplateDemo的Java类,实现Redis Streams的基本操作:
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.ApplicationContext;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.connection.stream.MapRecord;
import org.springframework.data.redis.connection.stream.ReadOffset;
import org.springframework.data.redis.connection.stream.RecordId;
import org.springframework.data.redis.connection.stream.StreamOffset;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.core.SessionCallback;
import org.springframework.data.redis.core.StreamOperations;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.data.redis.core.types.RedisClientInfo;
import org.springframework.util.CollectionUtils;
import java.time.Duration;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@SpringBootApplication
public class RedisTemplateDemo {
public static void main(String[] args) {
ApplicationContext context = SpringApplication.run(RedisTemplateDemo.class, args);
RedisTemplate<String, String> redisTemplate = context.getBean(StringRedisTemplate.class);
// 添加消息
redisTemplate.opsForStream().add("stream1", Collections.singletonMap("name", "John"), Collections.singletonMap("age", "30"));
// 获取消息数量
Long len = redisTemplate.opsForStream().size("stream1");
System.out.println(len);
// 查询消息
List<MapRecord<String, String, String>> records= redisTemplate.opsForStream().range("stream1", Range.unbounded());
for (MapRecord<String, String, String> record : records) {
System.out.println(record);
}
// 反向查询消息
List<MapRecord<String, String, String>> records2 = redisTemplate.opsForStream().reverseRange("stream1", Range.unbounded());
for (MapRecord<String, String, String> record : records2) {
System.out.println(record);
}
// 连续读取消息
Map<StreamOffset<String>, ReadOffset> streams = new HashMap<>();
streams.put(StreamOffset.create("stream1", ReadOffset.from("0")), ReadOffset.lastConsumed());
List<MapRecord<String, String, String>> results = redisTemplate.opsForStream().read(streams, Duration.ofMillis(5000));
for (MapRecord<String, String, String> result : results) {
System.out.println(result);
}
// 确认处理消息
redisTemplate.execute(new SessionCallback<>() {
@Override
public <K, V> Object execute(RedisOperations<K, V> operations) throws DataAccessException {
operations.watch("stream1");
RecordId id1 = RecordId.of("1001");
RecordId id2 = RecordId.of("1002");
String key = "stream1";
StreamOperations<String, String, String> ops = operations.opsForStream();
List<MapRecord<String, String, String>> records = ops.range(key, Range.closed(id1, id2));
if (!CollectionUtils.isEmpty(records)) {
RedisConnectionFactory factory = operations.getConnectionFactory();
RedisClientInfo info = factory.getConnection().getClientList().get(0);
ops.acknowledge(key, info.getAddress(), id1, id2);
}
return null;
}
});
// 删除消息
Long count = redisTemplate.opsForStream().delete("stream1", "1001", "1002");
System.out.println(count);
// 管理消费者组
redisTemplate.opsForStream().createGroup("stream1", "consumer1");
// 读取并分配消息
MapRecord<String, String, String> record = redisTemplate.opsForStream().read("consumer1", StreamOffset.lastConsumed("stream1"));
if (record != null) {
System.out.println(record);
redisTemplate.opsForStream().acknowledge("stream1", "consumer1", record.getId());
}
}
}
运行RedisTemplateDemo类,可以看到输出了Redis Streams的基本操作结果。
总结
Redis的Stream是一种有序、持久化、可重复读的消息流数据类型,非常适合用于实现消息队列、日志系统等应用场景。Stream的数据结构由消息ID有序集合和消息哈希表两部分组成,消息ID有序集合用于维护消息的顺序,消息哈希表用于存储消息的键值对数据。Stream支持消费者组,多个消费者可以组成一个消费者组,每个消费者可以独立地消费Stream中的消息。Redis将Stream持久化到磁盘中的方式是将消息ID有序集合和消息哈希表分别保存到两个独立的RDB文件中。当Redis重启后,它会从这两个文件中重新加载Stream数据。Stream的底层实现非常高效,可以支持非常大的消息流数据。