基于smart-retry源码改造升级

简介: 本文主要讲述基于smart-retry进行优化改造思路

前瞻

阅读本篇前,请先阅读smart-retry源码阅读文章

背景

smart-retry有以下缺点

  • 只支持入参有且仅有一个
  • 每一个重试方法都对应一个定时任务,会造成线程的过度使用
  • 不支持抛出指定异常后重试

基于此,对smart-retry做了升级改造

改造后特性

针对smart-retry的确定的改进

  • 支持多个入参方法重试
  • 只提供重试接口给用户,具体定时任务选用由用户决定,灵活性大大增加
  • 支持抛出指定异常后重试


新增的功能

  • 支持配置在注解上配置是否在执行方法前入库
  • 增加了重试记录表,把每次重试都记录下来
  • 重试规则由cron表达式改为按时间,或者间隔重试,更通俗易懂

代码跟进改造点

下边会讲述代码如何改造

支持多个入参方法重试

改造前入参,以及参数类型都是单个,改成数组,我列举这两种的前后对比,因为改的地方比较多,我不一一列举

改造前入参

   public Object handle(Object arg) {

改造后入参

public Object handle(Object arg[]) {

改造前入参类型(用于序列化和反序列化)

   public Class<?> getInputArgsType() {

       return inputArgsType;

   }

改造后入参类型(用于序列化和反序列化)

   public Class<?>[] getInputArgsType() {

       return inputArgsType;

   }

只提供重试接口给用户,具体定时任务选用由用户决定,灵活性大大增加

我们先看原来的代码

   @Override

   public void afterSingletonsInstantiated() {

       postedClasseCache.clear();


       this.retryTaskMapper = defaultListableBeanFactory.getBean(RetryTaskMapper.class);

       this.retryRegistry = defaultListableBeanFactory.getBean(RetryRegistry.class);


       boolean beforeTask = environment.getProperty(EnvironmentConstants.RETRY_BEFORETASK, Boolean.class, Boolean.TRUE);

       this.retrySerializer = getRetrySerializerFromBeanFactory(defaultListableBeanFactory);

       if (this.retrySerializer == null) {

           this.retryHandlerPostProcessor = new DefaultRetryHandlerPostProcessor(retryTaskMapper, beforeTask);

       } else {

           this.retryHandlerPostProcessor = new DefaultRetryHandlerPostProcessor(new DefaultRetryTaskFactory(retrySerializer), retryTaskMapper, beforeTask);

       }


       retryHandlers.forEach(this::registerJobBean);


       retryHandlers.clear();

   }

   protected void registerJobBean(RetryHandler retryHandler) {

       if (retryHandler.identity().length() > 50) {

           throw new IllegalArgumentException("identity=" + retryHandler.identity() + " is too long, it must be less than 50");

       }


       RetryHandler retryHandlerProxy = retryHandlerPostProcessor.doPost(retryHandler);

       RetryHandlerRegistration.registry(retryHandlerProxy);


       RetryProcessor retryProcessor = new DefaultRetryProcessor(retryHandler, retryTaskMapper, retrySerializer);


       retryRegistry.register(retryHandler, retryProcessor);

   }

   @Override

   public void register(RetryHandler retryHandler, RetryProcessor retryProcessor) {

       if (StringUtils.isBlank(retryHandler.cron())) {

           throw new IllegalArgumentException("identity=" + retryHandler.identity() + ", 使用Elastic-Job注册器,必须指定RetryHandler/RetryFunction的cron表达式");

       }

       BeanDefinitionBuilder beanDefinitionBuilder = BeanDefinitionBuilder.rootBeanDefinition(SpringJobScheduler.class);

       beanDefinitionBuilder.addConstructorArgValue(new RetryJob(retryProcessor));

       beanDefinitionBuilder.addConstructorArgValue(registryCenter);

       beanDefinitionBuilder.addConstructorArgValue(createLiteJobConfiguration(retryHandler));

       if (jobEventConfiguration != null) {

           beanDefinitionBuilder.addConstructorArgValue(jobEventConfiguration);

       }

       beanDefinitionBuilder.addConstructorArgValue(getElasticJobListeners());

       beanDefinitionBuilder.setInitMethodName("init");


       String jobBeanName = getJobBeanName(retryHandler);

       defaultListableBeanFactory.registerBeanDefinition(jobBeanName, beanDefinitionBuilder.getBeanDefinition());


       //此处的getBean调用是为了手工触发Bean的初始化

       defaultListableBeanFactory.getBean(jobBeanName);

       log.info("identity={}已成功注册到Elastic-Job", retryHandler.identity());

   }

可以看到,每一个retryHandler会单独注册一个定时任务,并且注册的时候需要指定注册到哪个定时任务

再看改造后代码,先拿到所有retryHandler,封装为可执行的RetryFunctionProcessor

   @Override

   public void afterSingletonsInstantiated() {

       postedClasseCache.clear();


       this.retryTaskMapper = defaultListableBeanFactory.getBean(RetryTaskMapper.class);

       RetryFunctionProcessor retryFunctionProcessor = defaultListableBeanFactory.getBean(RetryFunctionProcessor.class);


       this.retrySerializer = getRetrySerializerFromBeanFactory(defaultListableBeanFactory);


       List<RetryProcessor> retryProcessorList = new ArrayList<>();

       for (RetryHandler retryHandler : retryHandlers) {

           if (retryHandler.identity().length() > 50) {

               throw new IllegalArgumentException("identity=" + retryHandler.identity() + " is too long, it must be less than 50");

           }


           RetryHandler retryHandlerProxy =new ImmediatelyRetryHandler(retryHandler, new DefaultRetryTaskFactory(retrySerializer), retryTaskMapper, retryRule);

           RetryHandlerRegistration.registry(retryHandlerProxy);


           RetryProcessor retryProcessor = new DefaultRetryProcessor(retryHandler, retryTaskMapper, retrySerializer, retryRule);

           retryProcessorList.add(retryProcessor);

       }

       

       retryFunctionProcessor.register(retryProcessorList);


       retryHandlers.clear();

   }

RetryFunctionProcessor其实就是把retryProcessorList保存为成员变量,提供给用户去调用,调用方式将在如何使用篇章讲解

支持抛出特定异常后重试

smart-retry只能在抛出RuntimeException的时候,进行重试,改造后,可以支持抛出特定异常后重试,这样当自定义异常时,更加灵活,代码改造点如下

RetryFunction注解增加变量

   /**

    * 如果方法抛出该异常则会创建重试任务

    */

   Class<? extends RuntimeException>[] retryException() default {RuntimeException.class};

在执行方法抛出异常时,判断是否在配置的这些异常范围,来决定是否要进行重试

       } catch (RuntimeException e) {

           boolean isIncludeException = RetryHandlerUtils.isIncludeException(e, retryException);

           if (!isIncludeException) {

               throw e;

           }

支持配置在注解上配置是否在执行方法前入库

smart-retry是支持是否在执行方法前入库的,但是是全局的配置,改造后可以对每一个任务单独配置

   /**

    * 是否在执行任务之前插入数据库 |配置false则表示,只有任务执行报错才插入数据库|

    * @return

    */

   boolean beforeTask() default false;

具体逻辑就比较简单了,就是根据配置,来决定是否在抛出异常前插入数据

增加了重试记录表,把每次重试都记录下来

smart-retry只有一张重试表,比如重试过5次,没办法知道这5次重试的具体详情,只能知道最后一次是成功还是失败等,所以增加了重试记录表,来记录每一次重试的记录,利用了spring的切面的原理,对原代码没有任何侵入

@Aspect

@Slf4j

public class RetryTaskRecordAspect implements ApplicationContextAware {


   private ApplicationContext applicationContext;

   

   @Pointcut("execution(* com.aliyun.gts.bpaas.retry.core.RetryTaskMapper.insert(..)) ||" +

           "execution(* com.aliyun.gts.bpaas.retry.core.RetryTaskMapper.update(..))")

   public void pointCut(){

       

   }

   

   @Around(value = "pointCut()")

   public Object insertTaskRecord(ProceedingJoinPoint proceedingJoinPoint) {

       Object o = null;

       try {

           o = proceedingJoinPoint.proceed();

       } catch (Throwable throwable) {

           log.error("切面insertTaskRecord报错,", throwable);

       }


       Object[] args = proceedingJoinPoint.getArgs();

       RetryTask retryTask = (RetryTask) args[0];

       RetryTaskMapper retryTaskMapper = applicationContext.getBean(RetryTaskMapper.class);

       retryTaskMapper.insertTaskRecord(retryTask);

       return o;

   }


   @Override

   public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {

       this.applicationContext = applicationContext;  

   }

}

重试规则由cron表达式改为按时间,或者间隔重试,更通俗易懂

改造前:smart-retry原来的重试规则,是为每一条重试任务配置一个cron表达式

改造后:重试规则在application.properties中全局配置,更方便管理,cron表达式改为按时间,或者间隔,更容易配置,不易配置错

   /**

    * 重试规则,有两种方式,指定时间或者间隔

    * 1.时间,格式为:onTime-时间-最大重试次数,例如  onTime-23:59:59-3,onTime为类型,23:59:59为时间的时分秒,3位最大重试次数,用中划线隔开

    * 2.间隔,格式为:onInterval-间隔重试规则,例如 onInterval-10,20,30,onInterval为类型,10,20,30表示每次重试的间隔时间,单位为秒

    */

   private String retryRule;

指定时间重试的实现逻辑:获取到下次的执行时间,如果执行时间小于当前时间,则需要加一天,否则,就是该时间

指定间隔重试的实现逻辑:现在的时间,加上重试次数对应的间隔时间即可

代码如下

   /**

    * 获取下次执行时间

    * @param retryRule

    * @param retryCount

    * @return

    */

   public static LocalDateTime getNextExecTime(RetryRule retryRule, int retryCount) {

       

       if (retryRule instanceof OnTimeRetryRule) {

           OnTimeRetryRule onTimeRetryRule = (OnTimeRetryRule) retryRule;


           LocalTime retryTime = onTimeRetryRule.getRetryTime();


           LocalDateTime nextExecTime = LocalDateTime.of(LocalDate.now(), retryTime);

           

           if (nextExecTime.isAfter(LocalDateTime.now())) {

               return nextExecTime;

           } else {

               return nextExecTime.plusDays(1L);

           }

       }


       OnIntervalRetryRule onIntervalRetryRule = (OnIntervalRetryRule) retryRule;


       Long[] retryInterval = onIntervalRetryRule.getRetryInterval();

       

       return plusSeconds(retryInterval[retryCount]);

   }

如何使用


建表

接口重推依赖两张表重试表以及重试记录表

重试表sql

create table sys_retry_task (

task_id bigint not null primary key auto_increment,

identity_name varchar(50) not null COMMENT '任务的唯一标识',

params text COMMENT '参数',

status tinyint not null COMMENT '状态。1: 处理中,2: 成功,3: 失败',

retry_count int not null default 0 COMMENT '重试次数',

remark varchar(1000) COMMENT '备注',

create_date datetime not null,

edit_date datetime,

next_date DATETIME COMMENT '下次执行的时间') ENGINE=InnoDB COMMENT='系统重试表';


create index idx_identityname_status ON sys_retry_task(identity_name asc,status asc);

重试记录表sql

CREATE TABLE `sys_retry_task_record`

(

   `id`            BIGINT(20)    NOT NULL AUTO_INCREMENT,

   `task_id`       BIGINT(20)    NULL     DEFAULT NULL COMMENT '重试任务id',

   `identity_name` VARCHAR(50)   NOT NULL COMMENT '任务的唯一标识' COLLATE 'utf8_general_ci',

   `params`        TEXT          NULL     DEFAULT NULL COMMENT '参数' COLLATE 'utf8_general_ci',

   `status`        TINYINT(4)    NOT NULL COMMENT '状态。1: 处理中,2: 成功,3: 失败',

   `retry_count`   INT(11)       NOT NULL DEFAULT '0' COMMENT '重试次数',

   `remark`        VARCHAR(1000) NULL     DEFAULT NULL COMMENT '备注' COLLATE 'utf8_general_ci',

   `next_date`     DATETIME      NULL     DEFAULT NULL COMMENT '下次执行的时间',

   `edit_date`     DATETIME      NULL     DEFAULT NULL,

   `create_date`   DATETIME      NOT NULL,

   PRIMARY KEY (`id`) USING BTREE,

   INDEX `idx_identityname_status` (`identity_name`, `status`) USING BTREE

)

   COMMENT ='系统重试记录表'

   COLLATE = 'utf8_general_ci'

   ENGINE = InnoDB


引入依赖

       <dependency>

           <groupId>com.aliyun.gts.bpaas</groupId>

           <artifactId>aliyun-gts-retry-starter</artifactId>

           <version>1.0.0-SNAPSHOT</version>

       </dependency>

application.properties配置

# 数据源配置,注意要和建表的数据源和数据库在同一个

spring.datasource.driver-class-name=com.mysql.jdbc.Driver

spring.datasource.url=jdbc:mysql://localhost:3306/test?characterEncoding=utf8&autoReconnect=true&useSSL=false

spring.datasource.username=root

spring.datasource.password=123456


# 重试开关,默认为false

gts.retry.enable=true

# 重试的规则

gts.retry.retry-rule=onInterval-10,30,30,30

# 序列化方式,支持fastjson,jackson以及gson,默认为fastjson

gts.retry.serialize-type=fastjson

下边对重试规则配置gts.retry.retry-rule进行详细说明

重试规则,有两种方式,指定时间或者间隔
1. 时间,格式为:onTime-时间-最大重试次数,例如(onTime-23:59:59-3),onTime为类型,23:59:59为时间的时分秒,3位最大重试次数,用中划线隔开
2. 间隔,格式为:onInterval-间隔重试规则,例如 (onInterval-10,20,30),onInterval为类型,10,20,30表示每次重试的间隔时间,单位为秒

编程界面

接口重推需要的编程界面有两处,第一处是在方法上打上@RetryFunction注解,第二处是在定时任务中调用处理定时任务的方法

在方法上打上@RetryFunction注解

   @RetryFunction(identity = "demo.simplest", beforeTask = true,

           retryListener = SimpleTestRetryListener.class,

           retryException = {RuntimeException.class}, ignoreException = true)

   public void simplestWithId(int id) {

       

       log.info("simplestWithId[{}]执行开始", id);

       // doSomething()

       log.info("simplestWithId[{}]执行完成", id);

   }

下面对注解中的每一个属性做详细的解释

属性

类型

备注

默认值

identity

string

唯一标识,系统内不能重复

长度要小50个字节

类的全名称+方法名称

beforeTask

boolean

是否在执行任务之前插入数据

配置false则表示,只有任务执行报错才插入数据库|,true表示在方法执行前就会插入数据库

false

retryListener

Class<? extends RetryListener>

任务监听器。可以在任务重试、任务完成、任务失败时进行回调

onRetry():每次重试时触发(执行后触发)

onComplete():任务完成时触发

onError():失败时触发(超过最大重试次数)

不进行任务监听

retryException

Class<? extends RuntimeException>[]

如果方法抛出指定异常则会创建重试任务或执行重试动作

{RuntimeException.class}

ignoreException

boolean

当重试任务有多个的时候,上一个重试报错,是否忽略错误继续执行下一个任务

true

定时任务中触发重试任务示例

@Slf4j

@Component

public class DistributeRetrySchedule extends AbstractGtsSchedulerTaskProcessor {


   

   @Autowired

   private RetryFunctionProcessor retryFunctionProcessor;

   

   @Override

   public String getTaskId(){

       return "retry-job";

   }


   @Override

   public GtsSchedulerTaskResult process(GtsSchedulerTaskParameter parameter) throws Exception{

       DateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");

       log.info("执行定时任务,当前时间{}",format.format(new Date()));

       retryFunctionProcessor.processTask();

       return GtsSchedulerTaskResult.successResult();

   }

}

重试任务持久化示例

重试表

重试记录表

相关文章
|
开发框架 Linux C语言
C、C++、boost、Qt在嵌入式系统开发中的使用
C、C++、boost、Qt在嵌入式系统开发中的使用
624 1
|
6月前
|
Kubernetes Java 调度
无需接入执行器,0 代码改造实现微服务任务调度
本文提出了一种基于云原生的任务调度新方案,不需要依赖SDK,不依赖语言,实现定时调度和分布式跑批。
354 53
|
7月前
|
机器学习/深度学习 人工智能 机器人
文本分块大揭秘,五种境界让你的RAG系统从菜鸟变大神
如果你的AI应用程序返回的答案总是不着边际,问题可能出在文本分块上!本文用轻松幽默的方式,带你玩转从基础到高级的五种文本分块策略,让你的RAG系统检索效率提升10倍。无论你是RAG新手还是老手,这篇文章都能让你事半功倍!
493 0
|
消息中间件 Java Kafka
集成Kafka到Spring Boot项目中的步骤和配置
集成Kafka到Spring Boot项目中的步骤和配置
1620 7
|
消息中间件
分布式篇问题之通过本地消息表实现分布式事务的最终一致性问题如何解决
分布式篇问题之通过本地消息表实现分布式事务的最终一致性问题如何解决
589 0
|
Java fastjson
springboot切面编程基础实践
springboot切面编程基础实践
238 0
|
Java 调度 数据库
|
监控 Java API
如何在Spring Boot中集成Elastic APM进行应用性能监控
如何在Spring Boot中集成Elastic APM进行应用性能监控
|
缓存 NoSQL Java
spring cache整合redis实现springboot项目中的缓存功能
spring cache整合redis实现springboot项目中的缓存功能
866 1