【Spring Cloud】新闻头条微服务项目:使用Reids延迟队列实现文章定时发布(下)

简介: 主要介绍了如何使用redis的zset及list数据类型实现延迟队列完成文章的定时发布功能。

目录

一:未来数据定时刷新

1.redis key值匹配

方案一:keys模糊匹配

方案二:scan

2.redis管道

3.定时刷新功能实现

二:分布式锁解决集群下的方法抢占执行

1.问题描述

2.分布式锁

3.redis分布式锁

4.实现

(1)方法添加

(2) 代码修改

5.数据库同步

三:延迟队列实现定时发布

1.提供对外接口

2.具体实现

(1)前期准备

(2)添加任务到延迟队列

(3)修改发布文章代码

(4)消费任务进行文章审核


一:未来数据定时刷新

1.redis key值匹配

       将未来5分钟之内要发布的文章加入到redis之后,我们需要定时对这部分数据(也就是zset中的数据)进行扫描,以便将zset中时间到了文章存入list中准备发布,但是这时候扫描zset中的数据有两种选择,见下面分析:

方案一:keys模糊匹配

keys的模糊匹配功能很方便也很强大,但是在生产环境需要慎用!开发中使用keys的模糊匹配却发现redis的CPU使用率极高,所以公司的redis生产环境将keys命令禁用了!redis是单线程,会被堵塞

image.gif编辑

方案二:scan

SCAN 命令是一个基于游标的迭代器,SCAN命令每次被调用之后, 都会向用户返回一个新的游标, 用户在下次迭代时需要使用这个新游标作为SCAN命令的游标参数, 以此来延续之前的迭代过程。  

image.gif编辑

2.redis管道

普通redis客户端和服务器交互模式

image.gif编辑

Pipeline请求模型

image.gif编辑

       两者的区别从图中就可以看出来,第一种方式对每一个命令都需要向服务端发送一次请求,假如命令过多会不断创建连接,降低执行效率;而第二种方式则是将一批命令积攒到一起再开启通道一次性执行,大大减少了连接数。  

3.定时刷新功能实现

在taskserviceImpl中添加如下方法,并且引导类中开启任务调度注解@EnableScheduling

@Scheduled(cron = "0 */1 * * * ?")
public void refresh() {
    // 获取所有未来数据集合的key值
    Set<String> futureKeys = cacheService.scan(ScheduleConstants.FUTURE + "*");// future_*
    for (String futureKey : futureKeys) { // future_250_250
        String topicKey = ScheduleConstants.TOPIC + futureKey.split(ScheduleConstants.FUTURE)[1];
        //获取该组key下当前需要消费的任务数据
        Set<String> tasks = cacheService.zRangeByScore(futureKey, 0, System.currentTimeMillis());
        if (!tasks.isEmpty()) {
            //将这些任务数据添加到消费者队列中
            cacheService.refreshWithPipeline(futureKey, topicKey, tasks);
            log.info("成功将{}下的当前需要执行的任务添加到{}",futureKey,topicKey);
        }
    }
}

image.gif

二:分布式锁解决集群下的方法抢占执行

1.问题描述

       假如启动两台tbug-headlines-schedule服务,这时候两者都会去执行refresh方法,但是我们只需要其中一台去执行扫描任务即可,这时候就需要加入分布式锁来进行控制。

2.分布式锁

分布式锁:控制分布式系统有序的去对共享资源进行操作,通过互斥来保证数据的一致性。

解决方案:

image.gif编辑

3.redis分布式锁

setnx (Set if Not Exists) 命令在指定的 key 不存在时,为 key 设置指定的值。

image.gif编辑

这种加锁的思路是,如果 key 不存在则为 key 设置 value,如果 key 已存在则 SETNX 命令不做任何操作

    • 客户端A请求服务器设置key的值,如果设置成功就表示加锁成功
    • 客户端B也去请求服务器设置key的值,如果返回失败,那么就代表加锁失败
    • 客户端A执行代码完成,删除锁
    • 客户端B在等待一段时间后再去请求设置key的值,设置成功
    • 客户端B执行代码完成,删除锁

    4.实现

    (1)方法添加

    在工具类CacheService中添加如下方法:

    /**
     * 加锁
     *
     * @param name
     * @param expire
     * @return
     */
    public String tryLock(String name, long expire) {
        name = name + "_lock";
        String token = UUID.randomUUID().toString();
        RedisConnectionFactory factory = stringRedisTemplate.getConnectionFactory();
        RedisConnection conn = factory.getConnection();
        try {
            //参考redis命令:
            //set key value [EX seconds] [PX milliseconds] [NX|XX]
            Boolean result = conn.set(
                    name.getBytes(),
                    token.getBytes(),
                    Expiration.from(expire, TimeUnit.MILLISECONDS),
                    RedisStringCommands.SetOption.SET_IF_ABSENT //NX
            );
            if (result != null && result)
                return token;
        } finally {
            RedisConnectionUtils.releaseConnection(conn, factory,false);
        }
        return null;
    }

    image.gif

    参数name表示锁的名称,expire表示锁的过期时间,最重要的是set方法中最后一个参数RedisStringCommands.SetOption.SET_IF_ABSENT,这表示当有一个请求过来之后就会设置key值进行加锁,这样再有请求过来就获取不到了。

    (2) 代码修改

    /**
     * 定时器任务,每分钟扫描redis一次
     */
    @Scheduled(cron = "0 */1 * * * ?")
    public void refresh() {
        String token = cacheService.tryLock("FUTURE_TASK_SYNC", 1000 * 30);//加锁,30秒过期
        if(StringUtils.isNotBlank(token)) {
            log.info("开始执行定时扫描redis任务...");
            //获取所有未来数据集合的key值
            Set<String> futureKeys = cacheService.scan(ScheduleConstants.FUTURE + "*");
            for (String futureKey : futureKeys) {
                //截取后半部分 future_200_100 --> _200_100
                String topicKey = ScheduleConstants.TOPIC + futureKey.split(ScheduleConstants.FUTURE)[1];
                //获取该组key下当前需要清理的任务数据
                Set<String> tasks = cacheService.zRangeByScore(futureKey,0,System.currentTimeMillis());
                if(!tasks.isEmpty()) {
                    //将这些数据添加到消费队列中
                    cacheService.refreshWithPipeline(futureKey,topicKey,tasks);
                    log.info("成功将{}下的当前需要执行的任务添加到{}",futureKey,topicKey);
                }
            }
        }
    }

    image.gif

    5.数据库同步

    在完成上述操作之后,我们需要知道的是redis中只是存放现在就需要发布和5分钟之内需要发布的文章,而那些超过5分钟之后才需要发布的文章(比如一天之后发布)我们是不将其存入redis中的,它们只是存放在数据库中,这时候就需要定时去扫描数据库查看哪些文章需要被放入redis进行处理,流程图还可以参考上一篇文章的:

    image.gif编辑

    /**
     * 数据库同步任务,每五分钟执行一次
     */
    @PostConstruct  //表示服务一启动便执行一次
    @Scheduled(cron = "0 */5 * * * ?")
    public void reloadData() {
        log.info("开始同步数据库任务到redis...");
        //1.清理缓存任务,避免重复
        clearCache();
        //2.获取5分钟之后的时间
        Calendar calendar = Calendar.getInstance();
        calendar.add(Calendar.MINUTE,5);
        //3.查看未来所有小于5分钟的任务
        List<Taskinfo> tasks = taskInfoMapper.selectList
                (Wrappers.<Taskinfo>lambdaQuery().lt(Taskinfo::getExecuteTime, calendar.getTime()));
        if(tasks != null && tasks.size() > 0) {
            for (Taskinfo taskinfo : tasks) {
                Task task = new Task();
                BeanUtils.copyProperties(taskinfo,task);
                task.setExecuteTime(taskinfo.getExecuteTime().getTime());
                addTaskToCache(task);
            }
        }
    }
    /**
     * 清理缓存任务
     */
    private void clearCache() {
        log.info("开始清理缓存任务...");
        //获取任务集
        Set<String> futureKeys = cacheService.scan(ScheduleConstants.FUTURE + "*");
        Set<String> topicKeys = cacheService.scan(ScheduleConstants.TOPIC + "*");
        cacheService.delete(futureKeys);
        cacheService.delete(topicKeys);
    }

    image.gif

    三:延迟队列实现定时发布

    1.提供对外接口

    提供远程的feign接口,在tbug-headlines-feign-api编写类如下:  

    package com.my.apis.schedule;
    import com.my.model.common.dtos.ResponseResult;
    import com.my.model.schedule.dtos.Task;
    import org.springframework.cloud.openfeign.FeignClient;
    import org.springframework.web.bind.annotation.GetMapping;
    import org.springframework.web.bind.annotation.PathVariable;
    import org.springframework.web.bind.annotation.PostMapping;
    import org.springframework.web.bind.annotation.RequestBody;
    @FeignClient(value = "headlines-schedule")
    public interface IScheduleClient {
        /**
         * 添加任务
         * @param task   任务对象
         * @return       任务id
         */
        @PostMapping("/api/v1/task/add")
        ResponseResult addTask(@RequestBody Task task);
        /**
         * 取消任务
         * @param taskId        任务id
         * @return              取消结果
         */
        @GetMapping("/api/v1/task/cancel/{taskId}")
        ResponseResult cancelTask(@PathVariable("taskId") long taskId);
        /**
         * 按照类型和优先级来拉取任务
         * @param type
         * @param priority
         * @return
         */
        @GetMapping("/api/v1/task/poll/{type}/{priority}")
        ResponseResult poll(@PathVariable("type") int type, @PathVariable("priority")  int priority);
    }

    image.gif

    在tbug-headlines-schedule微服务下提供对应的实现

    package com.my.schedule.feign;
    import com.my.apis.schedule.IScheduleClient;
    import com.my.model.common.dtos.ResponseResult;
    import com.my.model.schedule.dtos.Task;
    import com.my.schedule.service.TaskService;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.web.bind.annotation.*;
    @RestController
    public class ScheduleClient implements IScheduleClient {
        @Autowired
        private TaskService taskService;
        /**
         * 添加任务
         * @param task   任务对象
         * @return       任务id
         */
        @Override
        @PostMapping("/api/v1/task/add")
        public ResponseResult addTask(@RequestBody Task task) {
            return ResponseResult.okResult(taskService.addTask(task));
        }
        /**
         * 取消任务
         * @param taskId        任务id
         * @return              取消结果
         */
        @Override
        @GetMapping("/api/v1/task/cancel/{taskId}")
        public ResponseResult cancelTask(@PathVariable long taskId) {
            return ResponseResult.okResult(taskService.cancelTask(taskId));
        }
        /**
         * 按照类型和优先级来拉取任务
         * @param type
         * @param priority
         * @return
         */
        @Override
        @GetMapping("/api/v1/task/poll/{type}/{priority}")
        public ResponseResult poll(@PathVariable("type") int type,@PathVariable("priority") int priority) {
            return ResponseResult.okResult(taskService.poll(type,priority));
        }
    }

    image.gif

    2.具体实现

    (1)前期准备

    ①枚举类

    package com.my.model.common.enums;
    import lombok.AllArgsConstructor;
    import lombok.Getter;
    @Getter
    @AllArgsConstructor
    public enum TaskTypeEnum {
        NEWS_SCAN_TIME(1001, 1,"文章定时审核"),
        REMOTEERROR(1002, 2,"第三方接口调用失败,重试");
        private final int taskType; //对应具体业务
        private final int priority; //业务不同级别
        private final String desc; //描述信息
    }

    image.gif

    ②序列化工具

           在添加任务到延迟队列的方法中,我们需要用到序列化工具进行序列化操作,而在任务消费时候又需要进行反序列化操作。java内置的序列化能将实现了Serilazable接口的对象进行序列化和反序列化,但是这里我选用的是效率更高的Protostuff。Protostuff是google开源的,其采用更为紧凑的二进制数组,表现更加优异。

    将ProtostuffUtil拷贝到tbug-headlines-utils中,然后导入如下依赖

    <dependency>
        <groupId>io.protostuff</groupId>
        <artifactId>protostuff-core</artifactId>
        <version>1.6.0</version>
    </dependency>
    <dependency>
        <groupId>io.protostuff</groupId>
        <artifactId>protostuff-runtime</artifactId>
        <version>1.6.0</version>
    </dependency>

    image.gif

    (2)添加任务到延迟队列

    创建WmNewsTaskService

    package com.my.wemedia.service;
    import java.util.Date;
    public interface WmNewsTaskService {
        /**
         * 添加任务到延迟队列中
         * @param id 文章id
         * @param publishTime  文章发布时间
         */
        void addNewsToTask(Integer id, Date publishTime);
        /**
         * 消费延迟队列数据
         */
        void scanNewsByTask();
    }

    image.gif

    实现类

    package com.my.wemedia.service.impl;
    import com.alibaba.fastjson.JSON;
    import com.my.apis.schedule.IScheduleClient;
    import com.my.common.constans.ScheduleConstants;
    import com.my.common.redis.CacheService;
    import com.my.model.common.dtos.ResponseResult;
    import com.my.model.common.enums.TaskTypeEnum;
    import com.my.model.schedule.dtos.Task;
    import com.my.model.wemedia.pojos.WmNews;
    import com.my.utils.common.ProtostuffUtil;
    import com.my.wemedia.service.WmAutoScanService;
    import com.my.wemedia.service.WmNewsTaskService;
    import lombok.extern.slf4j.Slf4j;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.scheduling.annotation.Async;
    import org.springframework.scheduling.annotation.Scheduled;
    import org.springframework.stereotype.Service;
    import java.util.Calendar;
    import java.util.Date;
    @Slf4j
    @Service
    public class WmNewsTaskServiceImpl implements WmNewsTaskService {
        @Autowired
        private IScheduleClient scheduleClient;
        /**
         * 添加任务到延迟队列
         * @param id 文章id
         * @param publishTime  文章发布时间
         */
        @Override
        @Async
        public void addNewsToTask(Integer id, Date publishTime) {
            log.info("添加任务到延迟服务中---begin");
            Task task = new Task();
            task.setExecuteTime(publishTime.getTime());
            task.setTaskType(TaskTypeEnum.NEWS_SCAN_TIME.getTaskType());
            task.setPriority(TaskTypeEnum.NEWS_SCAN_TIME.getPriority());
            WmNews wmNews = new WmNews();
            wmNews.setId(id);
            task.setParameters(ProtostuffUtil.serialize(wmNews));
            scheduleClient.addTask(task);
            log.info("添加任务到延迟服务中---end");
        }
    }

    image.gif

    (3)修改发布文章代码

    将之前的异步调用审核文章修改为将文章数据加入延迟队列

    @Autowired
        private WmAutoScanService wmAutoScanService;
        @Autowired
        private WmNewsTaskService wmNewsTaskService;
        /**
         * 提交文章
         * @param dto
         * @return
         */
        @Override
        public ResponseResult submitNews(WmNewsDto dto) throws Exception {
            //1.参数校验
            if(dto == null || dto.getContent().length() == 0) {
                return ResponseResult.errorResult(AppHttpCodeEnum.PARAM_INVALID);
            }
            //2.保存或修改文章
            //2.1属性拷贝
            WmNews wmNews = new WmNews();
            BeanUtils.copyProperties(dto,wmNews);
            //2.2设置封面图片
            if(dto.getImages() != null && dto.getImages().size() != 0) {
                String images = StringUtils.join(dto.getImages(), ",");
                wmNews.setImages(images);
            }
            //2.3封面类型为自动
            if(dto.getType().equals(WemediaConstants.WM_NEWS_TYPE_AUTO)) {
                wmNews.setType(null);
            }
            saveOrUpdateWmNews(wmNews);
            //3.判断是否为草稿
            if(dto.getStatus().equals(WmNews.Status.NORMAL.getCode())) {
                //直接保存结束
                return ResponseResult.okResult(AppHttpCodeEnum.SUCCESS);
            }
            //4.不是草稿
            //4.1保存文章图片素材与文章关系
            //4.1.1提取图片素材列表
            List<String> imagesList = getImagesList(dto);
            //4.1.2保存
            saveRelatedImages(imagesList,wmNews.getId(),WemediaConstants.WM_CONTENT_REFERENCE);
            //4.2保存封面图片和文章关系
            saveRelatedCover(dto,imagesList,wmNews);
            //5.审核文章(异步调用)
            // wmAutoScanService.AutoScanTextAndImage(wmNews.getId());
            //5.将任务添加到延迟服务
            wmNewsTaskService.addNewsToTask(wmNews.getId(),wmNews.getPublishTime());
            return ResponseResult.okResult(AppHttpCodeEnum.SUCCESS);
        }

    image.gif

    (4)消费任务进行文章审核

    在WmNewsTaskServiceImpl中添加如下方法:

    @Autowired
    private WmNewsAutoScanServiceImpl wmNewsAutoScanService;
    /**
         * 消费延迟队列数据
         */
    @Scheduled(fixedRate = 1000)
    @Override
    @SneakyThrows
    public void scanNewsByTask() {
        ResponseResult responseResult = scheduleClient.poll(TaskTypeEnum.NEWS_SCAN_TIME.getTaskType(), TaskTypeEnum.NEWS_SCAN_TIME.getPriority());
        if(responseResult.getCode().equals(200) && responseResult.getData() != null){
            log.info("文章审核---消费任务执行---begin---");
            String json_str = JSON.toJSONString(responseResult.getData());
            Task task = JSON.parseObject(json_str, Task.class);
            byte[] parameters = task.getParameters();
            WmNews wmNews = ProtostuffUtil.deserialize(parameters, WmNews.class);
            System.out.println(wmNews.getId()+"-----------");
            wmNewsAutoScanService.autoScanWmNews(wmNews.getId());
            log.info("文章审核---消费任务执行---end---");
        }
    }

    image.gif

    下篇预告:定时发布文章优化策略

    相关文章
    |
    5月前
    |
    数据可视化 Java BI
    将 Spring 微服务与 BI 工具集成:最佳实践
    本文探讨了 Spring 微服务与商业智能(BI)工具集成的潜力与实践。随着微服务架构和数据分析需求的增长,Spring Boot 和 Spring Cloud 提供了构建可扩展、弹性服务的框架,而 BI 工具则增强了数据可视化与实时分析能力。文章介绍了 Spring 微服务的核心概念、BI 工具在企业中的作用,并深入分析了两者集成带来的优势,如实时数据处理、个性化报告、数据聚合与安全保障。同时,文中还总结了集成过程中的最佳实践,包括事件驱动架构、集中配置管理、数据安全控制、模块化设计与持续优化策略,旨在帮助企业构建高效、智能的数据驱动系统。
    314 1
    将 Spring 微服务与 BI 工具集成:最佳实践
    |
    5月前
    |
    存储 安全 Java
    管理 Spring 微服务中的分布式会话
    在微服务架构中,管理分布式会话是确保用户体验一致性和系统可扩展性的关键挑战。本文探讨了在 Spring 框架下实现分布式会话管理的多种方法,包括集中式会话存储和客户端会话存储(如 Cookie),并分析了它们的优缺点。同时,文章还涵盖了与分布式会话相关的安全考虑,如数据加密、令牌验证、安全 Cookie 政策以及服务间身份验证。此外,文中强调了分布式会话在提升系统可扩展性、增强可用性、实现数据一致性及优化资源利用方面的显著优势。通过合理选择会话管理策略,结合 Spring 提供的强大工具,开发人员可以在保证系统鲁棒性的同时,提供无缝的用户体验。
    118 0
    |
    5月前
    |
    消息中间件 Java 数据库
    Spring 微服务中的数据一致性:最终一致性与强一致性
    本文探讨了在Spring微服务中实现数据一致性的策略,重点分析了最终一致性和强一致性的定义、优缺点及适用场景。结合Spring Boot与Spring Cloud框架,介绍了如何根据业务需求选择合适的一致性模型,并提供了实现建议,帮助开发者在分布式系统中确保数据的可靠性与同步性。
    381 0
    |
    4月前
    |
    监控 Cloud Native Java
    Spring Boot 3.x 微服务架构实战指南
    🌟蒋星熠Jaxonic,技术宇宙中的星际旅人。深耕Spring Boot 3.x与微服务架构,探索云原生、性能优化与高可用系统设计。以代码为笔,在二进制星河中谱写极客诗篇。关注我,共赴技术星辰大海!(238字)
    Spring Boot 3.x 微服务架构实战指南
    |
    4月前
    |
    负载均衡 Java API
    《深入理解Spring》Spring Cloud 构建分布式系统的微服务全家桶
    Spring Cloud为微服务架构提供一站式解决方案,涵盖服务注册、配置管理、负载均衡、熔断限流等核心功能,助力开发者构建高可用、易扩展的分布式系统,并持续向云原生演进。
    |
    5月前
    |
    消息中间件 Java Kafka
    消息队列比较:Spring 微服务中的 Kafka 与 RabbitMQ
    本文深入解析了 Kafka 和 RabbitMQ 两大主流消息队列在 Spring 微服务中的应用与对比。内容涵盖消息队列的基本原理、Kafka 与 RabbitMQ 的核心概念、各自优势及典型用例,并结合 Spring 生态的集成方式,帮助开发者根据实际需求选择合适的消息中间件,提升系统解耦、可扩展性与可靠性。
    394 1
    消息队列比较:Spring 微服务中的 Kafka 与 RabbitMQ
    |
    5月前
    |
    监控 Java 数据库
    从零学 Dropwizard:手把手搭轻量 Java 微服务,告别 Spring 臃肿
    Dropwizard 整合 Jetty、Jersey 等成熟组件,开箱即用,无需复杂配置。轻量高效,启动快,资源占用少,内置监控、健康检查与安全防护,搭配 Docker 部署便捷,是构建生产级 Java 微服务的极简利器。
    494 3
    |
    5月前
    |
    监控 安全 Java
    Spring Cloud 微服务治理技术详解与实践指南
    本文档全面介绍 Spring Cloud 微服务治理框架的核心组件、架构设计和实践应用。作为 Spring 生态系统中构建分布式系统的标准工具箱,Spring Cloud 提供了一套完整的微服务解决方案,涵盖服务发现、配置管理、负载均衡、熔断器等关键功能。本文将深入探讨其核心组件的工作原理、集成方式以及在实际项目中的最佳实践,帮助开发者构建高可用、可扩展的分布式系统。
    359 1
    |
    5月前
    |
    jenkins Java 持续交付
    使用 Jenkins 和 Spring Cloud 自动化微服务部署
    随着单体应用逐渐被微服务架构取代,企业对快速发布、可扩展性和高可用性的需求日益增长。Jenkins 作为领先的持续集成与部署工具,结合 Spring Cloud 提供的云原生解决方案,能够有效简化微服务的开发、测试与部署流程。本文介绍了如何通过 Jenkins 实现微服务的自动化构建与部署,并结合 Spring Cloud 的配置管理、服务发现等功能,打造高效、稳定的微服务交付流程。
    700 0
    使用 Jenkins 和 Spring Cloud 自动化微服务部署
    |
    设计模式 Java API
    微服务架构演变与架构设计深度解析
    【11月更文挑战第14天】在当今的IT行业中,微服务架构已经成为构建大型、复杂系统的重要范式。本文将从微服务架构的背景、业务场景、功能点、底层原理、实战、设计模式等多个方面进行深度解析,并结合京东电商的案例,探讨微服务架构在实际应用中的实施与效果。
    763 6