补习系列(13)-springboot redis 与发布订阅

本文涉及的产品
Redis 开源版,标准版 2GB
推荐场景:
搭建游戏排行榜
云数据库 Tair(兼容Redis),内存型 2GB
简介:

[TOC]

一、订阅发布

订阅发布是一种常见的设计模式,常见于消息系统的场景。
如下面的图:

[图来自百科]
消息发布者是消息载体的生产者,其通过某些主题来向调度中心发送消息;
而消息订阅者会事先向调度中心订阅其"感兴趣"的主题,随后会获得新消息。
在这里,调度中心是一个负责消息控制中转的逻辑实体,可以是消息队列如ActiveMQ,也可以是Web服务等等。

常见应用

  • 微博,每个用户的粉丝都是该用户的订阅者,当用户发完微博,所有粉丝都将收到他的动态;
  • 新闻,资讯站点通常有多个频道,每个频道就是一个主题,用户可以通过主题来做订阅(如RSS),这样当新闻发布时,订阅者可以获得更新。

二、Redis 与订阅发布

Redis 支持 (pub/sub) 的订阅发布能力,客户端可以通过channel(频道)来实现消息的发布及接收。

  1. 客户端通过 SUBSCRIBE 命令订阅 channel;

  1. 客户端通过PUBLISH 命令向channel 发送消息;

而后,订阅 channel的客户端可实时收到消息。

除了简单的SUBSCRIBE/PUBLISH命令之外,Redis还支持订阅某一个模式的主题(正则表达式),
如下:

PSUBSCRIBE  /topic/cars/*

于是,我们可以利用这点实现相对复杂的订阅能力,比如:

  • 在电商平台中订阅多个品类的商品促销信息;
  • 智能家居场景,APP可以订阅所有房间的设备消息。
    ...

尽管如此,Redis pub/sub 机制存在一些缺点:

  • 消息无法持久化,存在丢失风险;
  • 没有类似 RabbitMQ的ACK机制;
  • 由于是广播机制,无法通过添加worker 提升消费能力;

因此,Redis 的订阅发布建议用于实时且可靠性要求不高的场景。

三、SpringBoot 与订阅发布

接下来,看一下SpringBoot 怎么实现订阅发布的功能。

spring-boot-starter-data-redis 帮我们实现了Jedis的引入,pom 依赖如下:

 <!-- redis -->
  <dependency>
   <groupId>org.springframework.boot</groupId>
   <artifactId>spring-boot-starter-data-redis</artifactId>
   <version>${spring-boot.version}</version>
  </dependency>

application.properties 中指定配置

# redis 连接配置
spring.redis.database=0 
spring.redis.host=127.0.0.1
spring.redis.password=
spring.redis.port=6379
spring.redis.ssl=false

# 连接池最大数
spring.redis.pool.max-active=10 
# 空闲连接最大数
spring.redis.pool.max-idle=10
# 获取连接最大等待时间(s)
spring.redis.pool.max-wait=600000

A. 消息模型

消息模型描述了订阅发布的数据对象,这要求生产者与消费者都能理解
以下面的POJO为例:

    public static class SimpleMessage {

        private String publisher;
        private String content;
        private Date createTime;

在SimpleMessage类中,我们声明了几个字段:

字段名 说明
publisher 发布者
content 文本内容
createTime 创建时间

B. 序列化

如下的代码采用了JSON 作为序列化方式:

@Configuration
public class RedisConfig {

    private static final Logger logger = LoggerFactory.getLogger(RedisConfig.class);

    /**
     * 序列化定制
     * 
     * @return
     */
    @Bean
    public Jackson2JsonRedisSerializer<Object> jackson2JsonSerializer() {
        Jackson2JsonRedisSerializer<Object> jackson2JsonRedisSerializer = new Jackson2JsonRedisSerializer<>(
                Object.class);

        // 初始化objectmapper
        ObjectMapper mapper = new ObjectMapper();
        mapper.setSerializationInclusion(Include.NON_NULL);
        mapper.enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL);
        jackson2JsonRedisSerializer.setObjectMapper(mapper);
        return jackson2JsonRedisSerializer;
    }

    /**
     * 操作模板
     * 
     * @param connectionFactory
     * @param jackson2JsonRedisSerializer
     * @return
     */
    @Bean
    public RedisTemplate<String, Object> redisTemplate(JedisConnectionFactory connectionFactory,
            Jackson2JsonRedisSerializer<Object> jackson2JsonRedisSerializer) {

        RedisTemplate<String, Object> template = new RedisTemplate<String, Object>();
        template.setConnectionFactory(connectionFactory);

        // 设置key/hashkey序列化
        RedisSerializer<String> stringSerializer = new StringRedisSerializer();
        template.setKeySerializer(stringSerializer);
        template.setHashKeySerializer(stringSerializer);

        // 设置值序列化
        template.setValueSerializer(jackson2JsonRedisSerializer);
        template.setHashValueSerializer(jackson2JsonRedisSerializer);
        template.afterPropertiesSet();

        return template;
    }

C. 发布消息

消息发布,需要先指定一个ChannelTopic对象,随后通过RedisTemplate方法操作。

@Service
public class RedisPubSub {  
    private static final Logger logger = LoggerFactory.getLogger(RedisPubSub.class);

    @Autowired
    private RedisTemplate<String, Object> redisTemplate;

    private ChannelTopic topic = new ChannelTopic("/redis/pubsub");

  
    @Scheduled(initialDelay = 5000, fixedDelay = 10000)
    private void schedule() {
        logger.info("publish message");
        publish("admin", "hey you must go now!");
    }

    /**
     * 推送消息
     * 
     * @param publisher
     * @param message
     */
    public void publish(String publisher, String content) {
        logger.info("message send {} by {}", content, publisher);

        SimpleMessage pushMsg = new SimpleMessage();
        pushMsg.setContent(content);
        pushMsg.setCreateTime(new Date());
        pushMsg.setPublisher(publisher);

        redisTemplate.convertAndSend(topic.getTopic(), pushMsg);
    }

上述代码使用一个定时器(@Schedule)来做发布,为了保证运行需要在主类中启用定时器注解:

@EnableScheduling
@SpringBootApplication
public class BootSampleRedis{
...
}

D. 接收消息

定义一个消息接收处理的Bean:

    @Component
    public static class MessageSubscriber {

        public void onMessage(SimpleMessage message, String pattern) {
            logger.info("topic {} received {} ", pattern, JsonUtil.toJson(message));
        }
    }

接下来,利用 MessageListenerAdapter 可将消息通知到Bean方法:

       /**
         * 消息监听器,使用MessageAdapter可实现自动化解码及方法代理
         * 
         * @return
         */
        @Bean
        public MessageListenerAdapter listener(Jackson2JsonRedisSerializer<Object> jackson2JsonRedisSerializer,
                MessageSubscriber subscriber) {
            MessageListenerAdapter adapter = new MessageListenerAdapter(subscriber, "onMessage");
            adapter.setSerializer(jackson2JsonRedisSerializer);
            adapter.afterPropertiesSet();
            return adapter;
        }

最后,关联到消息发布的Topic:

        /**
         * 将订阅器绑定到容器
         * 
         * @param connectionFactory
         * @param listenerAdapter
         * @return
         */
        @Bean
        public RedisMessageListenerContainer container(RedisConnectionFactory connectionFactory,
                MessageListenerAdapter listener) {

            RedisMessageListenerContainer container = new RedisMessageListenerContainer();
            container.setConnectionFactory(connectionFactory);
            container.addMessageListener(listener, new PatternTopic("/redis/*"));
            return container;
        }

运行结果
启动程序,从控制台可输出:

.RedisPubSub : publish message
.RedisPubSub : message send hey you must go now! by admin
.RedisPubSub : topic /redis/* received {"publisher":"admin","content":"hey you must go now!","createTime":1543418694007} 

这样,我们便完成了订阅发布功能。

小结

消息订阅发布是分布式系统中的常用手段,也经常用来实现系统解耦、性能优化等目的;
当前小节结合SpringBoot 演示了 Redis订阅发布(pub/sub)的实现,在部分场景下可以参考使用。
欢迎继续关注"美码师的补习系列-springboot篇" ,期待更多精彩内容^-^

原文链接:https://www.cnblogs.com/littleatp/p/10035786.html

相关实践学习
基于Redis实现在线游戏积分排行榜
本场景将介绍如何基于Redis数据库实现在线游戏中的游戏玩家积分排行榜功能。
云数据库 Redis 版使用教程
云数据库Redis版是兼容Redis协议标准的、提供持久化的内存数据库服务,基于高可靠双机热备架构及可无缝扩展的集群架构,满足高读写性能场景及容量需弹性变配的业务需求。 产品详情:https://www.aliyun.com/product/kvstore &nbsp; &nbsp; ------------------------------------------------------------------------- 阿里云数据库体验:数据库上云实战 开发者云会免费提供一台带自建MySQL的源数据库&nbsp;ECS 实例和一台目标数据库&nbsp;RDS实例。跟着指引,您可以一步步实现将ECS自建数据库迁移到目标数据库RDS。 点击下方链接,领取免费ECS&amp;RDS资源,30分钟完成数据库上云实战!https://developer.aliyun.com/adc/scenario/51eefbd1894e42f6bb9acacadd3f9121?spm=a2c6h.13788135.J_3257954370.9.4ba85f24utseFl
目录
相关文章
|
16天前
|
NoSQL Redis
Redis 发布订阅
10月更文挑战第18天
24 1
Redis 发布订阅
|
1月前
|
NoSQL Java Redis
redis的基本命令,并用netty操作redis(不使用springboot或者spring框架)就单纯的用netty搞。
这篇文章介绍了Redis的基本命令,并展示了如何使用Netty框架直接与Redis服务器进行通信,包括设置Netty客户端、编写处理程序以及初始化Channel的完整示例代码。
38 1
redis的基本命令,并用netty操作redis(不使用springboot或者spring框架)就单纯的用netty搞。
|
1月前
|
缓存 NoSQL Java
springboot的缓存和redis缓存,入门级别教程
本文介绍了Spring Boot中的缓存机制,包括使用默认的JVM缓存和集成Redis缓存,以及如何配置和使用缓存来提高应用程序性能。
85 1
springboot的缓存和redis缓存,入门级别教程
|
19天前
|
缓存 NoSQL Java
Spring Boot与Redis:整合与实战
【10月更文挑战第15天】本文介绍了如何在Spring Boot项目中整合Redis,通过一个电商商品推荐系统的案例,详细展示了从添加依赖、配置连接信息到创建配置类的具体步骤。实战部分演示了如何利用Redis缓存提高系统响应速度,减少数据库访问压力,从而提升用户体验。
51 2
|
27天前
|
JSON NoSQL Java
springBoot:jwt&redis&文件操作&常见请求错误代码&参数注解 (九)
该文档涵盖JWT(JSON Web Token)的组成、依赖、工具类创建及拦截器配置,并介绍了Redis的依赖配置与文件操作相关功能,包括文件上传、下载、删除及批量删除的方法。同时,文档还列举了常见的HTTP请求错误代码及其含义,并详细解释了@RequestParam与@PathVariable等参数注解的区别与用法。
|
5天前
|
JavaScript NoSQL Java
CC-ADMIN后台简介一个基于 Spring Boot 2.1.3 、SpringBootMybatis plus、JWT、Shiro、Redis、Vue quasar 的前后端分离的后台管理系统
CC-ADMIN后台简介一个基于 Spring Boot 2.1.3 、SpringBootMybatis plus、JWT、Shiro、Redis、Vue quasar 的前后端分离的后台管理系统
23 0
|
27天前
|
NoSQL Java Redis
shiro学习四:使用springboot整合shiro,正常的企业级后端开发shiro认证鉴权流程。使用redis做token的过滤。md5做密码的加密。
这篇文章介绍了如何使用Spring Boot整合Apache Shiro框架进行后端开发,包括认证和授权流程,并使用Redis存储Token以及MD5加密用户密码。
22 0
shiro学习四:使用springboot整合shiro,正常的企业级后端开发shiro认证鉴权流程。使用redis做token的过滤。md5做密码的加密。
|
1月前
|
缓存 NoSQL Java
Springboot自定义注解+aop实现redis自动清除缓存功能
通过上述步骤,我们不仅实现了一个高度灵活的缓存管理机制,还保证了代码的整洁与可维护性。自定义注解与AOP的结合,让缓存清除逻辑与业务逻辑分离,便于未来的扩展和修改。这种设计模式非常适合需要频繁更新缓存的应用场景,大大提高了开发效率和系统的响应速度。
49 2
|
25天前
|
JavaScript 安全 Java
如何使用 Spring Boot 和 Ant Design Pro Vue 实现动态路由和菜单功能,快速搭建前后端分离的应用框架
本文介绍了如何使用 Spring Boot 和 Ant Design Pro Vue 实现动态路由和菜单功能,快速搭建前后端分离的应用框架。首先,确保开发环境已安装必要的工具,然后创建并配置 Spring Boot 项目,包括添加依赖和配置 Spring Security。接着,创建后端 API 和前端项目,配置动态路由和菜单。最后,运行项目并分享实践心得,包括版本兼容性、安全性、性能调优等方面。
126 1
|
9天前
|
JavaScript 安全 Java
如何使用 Spring Boot 和 Ant Design Pro Vue 构建一个具有动态路由和菜单功能的前后端分离应用。
本文介绍了如何使用 Spring Boot 和 Ant Design Pro Vue 构建一个具有动态路由和菜单功能的前后端分离应用。首先,创建并配置 Spring Boot 项目,实现后端 API;然后,使用 Ant Design Pro Vue 创建前端项目,配置动态路由和菜单。通过具体案例,展示了如何快速搭建高效、易维护的项目框架。
86 62