前言:
最近在做一个基于SpringCloud+Springboot+Docker的新闻头条微服务项目,用的是黑马的教程,现在项目开发进入了尾声,我打算通过写文章的形式进行梳理一遍,并且会将梳理过程中发现的Bug进行修复,有需要改进的地方我也会继续做出改进。这一系列的文章我将会放入微服务项目专栏中,这个项目适合刚接触微服务的人作为练手项目,假如你对这个项目感兴趣你可以订阅我的专栏进行查看,需要资料可以私信我,当然要是能给我点个小小的关注就更好了,你们的支持是我最大的动力。
目录
一:前期准备
1.需求分析
当创作者创作好文章之后可以选择立马发布,还能选择定时发布(见下图),这个相信大家在CSDN创作时候都知道,我们需要使用延迟任务来实现文章的定时发布。
编辑
2. 延迟任务概述
在介绍延迟任务之前,我们先了解一下定时任务,定时任务就是有固定的的执行频率,每隔一段时间就执行一次,定时任务与延迟任务区别如下:
- 定时任务:有固定周期的,有明确的触发时间
- 延迟任务:没有固定的开始时间,它常常是由一个事件触发的,而在这个事件触发之后的一段时间内触发另一个事件,任务可以立即执行,也可以延迟。
- 延迟任务使用场景:
- 商品下单30分钟之内没有付款则将订单取消
- 接口对接出现网络问题,1分钟之后重试,如果再失败则2分钟之后重试,直至达到阈值
3.技术对比
- DelayQueue
JDK自带DelayQueue 是一个支持延时获取元素的阻塞队列, 内部采用优先队列 PriorityQueue 存储元素,同时元素必须实现 Delayed 接口;在创建元素时可以指定多久才可以从队列中获取当前元素,只有在延迟期满时才能从队列中提取元素。 DelayQueue属于排序队列,它的特殊之处在于队列的元素必须实现Delayed接口,该接口需要实现compareTo和getDelay方法
getDelay方法:获取元素在队列中的剩余时间,只有当剩余时间为0时元素才可以出队列。
compareTo方法:用于排序,确定元素出队列的顺序。
需要注意的是,使用线程池或者原生DelayQueue程序挂掉之后,任务都是放在内存,需要考虑未处理消息的丢失带来的影响,要保证数据不丢失,就需要持久化(磁盘)。
- RabbitMQ实现延迟任务
RabbitMQ实现延迟任务有两种形式,一种是传统的TTL+DLX(死信交换机)来实现,另一种是直接使用插件。TTL+DLX的实现原理是给每个队列设置过期时间,当消息过期之后变成Dead message,就将死信消息发送到另一个交换机,这个交换机叫做死信交换机。
不过更方便的还是直接使用RabbitMQ提供的延迟队列插件比较方便,实践过程可以翻看我前面关于微信支付的文章,里面就是使用了RabbitMQ提供的延迟插件来实现订单的管理。
- Redis实现
由于项目后面还需要用到Redis缓存热度靠前的文章,所以这里我选择了使用Redis来实现延迟队列,也正好可以了解其实现原理。
我们都知道Redis中的ZSet数据类型可以实现对数据的排序,那么我们就可以利用这个特点来实现延迟队列,使用时间戳作为score来进行排序。
4.实现思路
编辑
1.为什么任务需要存储在数据库中?
延迟任务是一个通用的服务,任何需要延迟得任务都可以调用该服务,需要考虑数据持久化的问题,存储数据库中是一种数据安全的考虑。
2.为什么redis中使用两种数据类型,list和zset?
效率问题,算法的时间复杂度
3.在添加zset数据的时候,为什么不需要预加载?
任务模块是一个通用的模块,项目中任何需要延迟队列的地方,都可以调用这个接口,要考虑到数据量的问题,如果数据量特别大,为了防止阻塞,只需要把未来几分钟要执行的数据存入缓存即可。
二:环境搭建
1.搭建模块
①在service模块下创建一个tbug-headlines-schedule模块
编辑
②添加bootstrap.yml
server: port: 51701 spring: application: name: headlines-schedule cloud: nacos: discovery: server-addr: 49.234.52.192:8848 config: server-addr: 49.234.52.192:8848 file-extension: yml
③在nacos添加相关配置
spring: datasource: driver-class-name: com.mysql.jdbc.Driver url: jdbc:mysql://localhost:3306/headlines_schedule?useUnicode=true&characterEncoding=UTF-8&serverTimezone=UTC&useSSL=false username: root password: 440983 # 设置Mapper接口所对应的XML文件位置,如果你在Mapper接口中有自定义方法,需要进行该配置 mybatis-plus: mapper-locations: classpath*:mapper/*.xml # 设置别名包扫描路径,通过该属性可以给包中的类注册别名 type-aliases-package: com.my.model.schedule.pojos
2.数据库准备
taskinfo任务表
编辑
taksinfo_log任务日志表
编辑 3.在docker中安装redis
略
4.项目中集成redis
①在项目中导入redis依赖
<!--spring data redis & cache--> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-data-redis</artifactId> </dependency> <!-- redis依赖commons-pool 这个依赖一定要添加 --> <dependency> <groupId>org.apache.commons</groupId> <artifactId>commons-pool2</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-cache</artifactId> </dependency>
②在nacos添加redis配置信息
spring: redis: host: 49.234.52.192 password: 440983 port: 6379
③拷贝CacheService类到tbug-headlines-common模块下,并添加自动配置
org.springframework.boot.autoconfigure.EnableAutoConfiguration=\ com.my.common.exception.ExceptionCatch,\ com.my.common.swagger.SwaggerConfiguration,\ com.my.common.swagger.Swagger2Configuration,\ com.my.common.tencentcloud.TextDetection,\ com.my.common.tencentcloud.ImageDetection,\ com.my.common.tess4j.Tess4jClient,\ com.my.common.redis.CacheService,\
三:代码编写
1.实体类导入
①创建task类,用于接收添加任务的参数
package com.my.model.schedule.dtos; import lombok.Data; import java.io.Serializable; @Data public class Task implements Serializable { /** * 任务id */ private Long taskId; /** * 类型 */ private Integer taskType; /** * 优先级 */ private Integer priority; /** * 执行id */ private long executeTime; /** * task参数 */ private byte[] parameters; }
②创建mapper
package com.my.schedule.mapper; import com.baomidou.mybatisplus.core.mapper.BaseMapper; import com.my.model.schedule.pojos.Taskinfo; import org.apache.ibatis.annotations.Mapper; import org.apache.ibatis.annotations.Param; import java.util.Date; import java.util.List; /** * <p> * Mapper 接口 * </p> * * @author itheima */ @Mapper public interface TaskInfoMapper extends BaseMapper<Taskinfo> { List<Taskinfo> queryFutureTime(@Param("taskType")int type, @Param("priority")int priority, @Param("future")Date future); }
<?xml version="1.0" encoding="UTF-8"?> <!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd"> <mapper namespace="com.my.schedule.mapper.TaskInfoMapper"> <select id="queryFutureTime" resultType="com.my.model.schedule.pojos.Taskinfo"> select * from taskinfo where task_type = #{taskType} and priority = #{priority} and execute_time <![CDATA[<]]> #{future,javaType=java.util.Date} </select> </mapper>
package com.my.schedule.mapper; import com.baomidou.mybatisplus.core.mapper.BaseMapper; import com.my.model.schedule.pojos.TaskinfoLogs; import org.apache.ibatis.annotations.Mapper; /** * <p> * Mapper 接口 * </p> * * @author itheima */ @Mapper public interface TaskInfoLogsMapper extends BaseMapper<TaskinfoLogs> { }
2.创建taskService
package com.my.schedule.service; import com.my.model.schedule.dtos.Task; /** * 对外访问接口 */ public interface TaskService { /** * 添加任务接口 * @param task 任务对象 * @return 任务ID */ long addTask(Task task); /** * 取消任务 * @param taskId 任务id * @return 取消结果 */ boolean cancelTask(long taskId); /** * 按照类型和优先级来拉取任务 * @param type * @param priority * @return */ Task poll(int type,int priority); }
3.功能实现
ScheduleConstants常量类
package com.my.model.common.enums; import lombok.AllArgsConstructor; import lombok.Getter; @Getter @AllArgsConstructor public enum TaskTypeEnum { NEWS_SCAN_TIME(1001, 1,"文章定时审核"), REMOTEERROR(1002, 2,"第三方接口调用失败,重试"); private final int taskType; //对应具体业务 private final int priority; //业务不同级别 private final String desc; //描述信息 }
TaskServiceImpl
package com.my.schedule.service.serviceImpl; import com.alibaba.fastjson.JSON; import com.baomidou.mybatisplus.core.toolkit.Wrappers; import com.my.common.constans.ScheduleConstants; import com.my.common.redis.CacheService; import com.my.model.schedule.dtos.Task; import com.my.model.schedule.pojos.Taskinfo; import com.my.model.schedule.pojos.TaskinfoLogs; import com.my.schedule.mapper.TaskInfoLogsMapper; import com.my.schedule.mapper.TaskInfoMapper; import com.my.schedule.service.TaskService; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang.StringUtils; import org.springframework.beans.BeanUtils; import org.springframework.beans.BeansException; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.scheduling.annotation.Scheduled; import org.springframework.stereotype.Service; import org.springframework.transaction.annotation.Transactional; import javax.annotation.PostConstruct; import java.util.Calendar; import java.util.Date; import java.util.List; import java.util.Set; @Slf4j @Service @Transactional public class TaskServiceImpl implements TaskService { /** * 添加延迟任务 * @param task 任务对象 * @return */ @Override public long addTask(Task task) { //1.将任务添加到数据库中 boolean success = addTaskToDb(task); if(success) { //2.将任务添加到redis addTaskToCache(task); } return task.getTaskId(); } @Autowired private TaskInfoMapper taskInfoMapper; @Autowired private TaskInfoLogsMapper taskInfoLogsMapper; /** * 将任务添加到数据库中 * @param task * @return */ private boolean addTaskToDb(Task task) { boolean flag = false; try { //1.保存任务表 Taskinfo taskinfo = new Taskinfo(); BeanUtils.copyProperties(task,taskinfo); taskinfo.setExecuteTime(new Date(task.getExecuteTime())); taskInfoMapper.insert(taskinfo); //2.设置任务id task.setTaskId(taskinfo.getTaskId()); //3.保存任务日志数据 TaskinfoLogs taskinfoLogs = new TaskinfoLogs(); BeanUtils.copyProperties(taskinfo,taskinfoLogs); taskinfoLogs.setVersion(1); taskinfoLogs.setStatus(ScheduleConstants.SCHEDULED); taskInfoLogsMapper.insert(taskinfoLogs); flag = true; } catch (Exception e) { e.printStackTrace(); } return flag; } @Autowired private CacheService cacheService; /** * 将任务添加到redis * @param task */ private void addTaskToCache(Task task) { //1.构造key String key = task.getTaskType()+"_"+task.getPriority(); //2.获取5分钟之后的时间 毫秒级 Calendar calendar = Calendar.getInstance(); calendar.add(Calendar.MINUTE,5); long nextScheduleTime = calendar.getTimeInMillis(); //3.如果任务执行时间小于等于当前时间,存入list if(task.getExecuteTime() <= System.currentTimeMillis()) { cacheService.lLeftPush(ScheduleConstants.TOPIC + key, JSON.toJSONString(task)); } else if(task.getExecuteTime() <= nextScheduleTime) { //4.如果任务执行时间在5分钟之内,存入zSet cacheService.zAdd(ScheduleConstants.FUTURE + key,JSON.toJSONString(task),task.getExecuteTime()); } } /** * 删除任务 * @param taskId 任务id * @return */ @Override public boolean cancelTask(long taskId) { boolean flag = false; //1.删除任务并更新日志 Task task = UpdateDb(taskId,ScheduleConstants.EXECUTED); //2.从Redis中删除任务 if(task != null) { removeFromCache(task); log.info("删除Redis中的任务成功:{}",taskId); flag = true; } return flag; } /** * 从redis中删除任务 * @param task */ private void removeFromCache(Task task) { String key = task.getTaskType() + "_" + task.getPriority(); if(task.getExecuteTime() <= System.currentTimeMillis()) { log.info("删除正要执行的任务..."); cacheService.lRemove(ScheduleConstants.TOPIC + key,0,JSON.toJSONString(task)); } else { log.info("删除将要执行的任务..."); cacheService.zRemove(ScheduleConstants.FUTURE + key,JSON.toJSONString(task)); } } /** * 删除任务,更新日志 * @param taskId * @param status * @return */ private Task UpdateDb(long taskId, int status) { Task task = null; try { //1.删除任务 log.info("删除数据库中的任务..."); taskInfoMapper.deleteById(taskId); //2.更新日志 log.info("更新任务日志..."); TaskinfoLogs taskinfoLogs = taskInfoLogsMapper.selectById(taskId); taskinfoLogs.setStatus(status); taskInfoLogsMapper.updateById(taskinfoLogs); //3.设置返回值 task = new Task(); BeanUtils.copyProperties(taskinfoLogs,task); task.setExecuteTime(taskinfoLogs.getExecuteTime().getTime()); } catch (BeansException e) { throw new RuntimeException(e); } return task; } /** * 消费任务 * @param type 任务类型 * @param priority 任务优先级 * @return Task */ @Override public Task poll(int type, int priority) { Task task = null; try { String key = type + "_" + priority; String task_json = cacheService.lRightPop(ScheduleConstants.TOPIC + key); if(StringUtils.isNotBlank(task_json)) { task = JSON.parseObject(task_json,Task.class); //更新数据库 UpdateDb(task.getTaskId(),ScheduleConstants.EXECUTED); } } catch (Exception e) { e.printStackTrace(); log.error("poll task exception"); } return task; } }
主要包括添加任务、取消任务、消费任务三个功能。
下篇预告:实现数据定时刷新