smart-retry源代码阅读

简介: 本文主要讲述smart-retry主流程源码阅读

背景

基础技术组的接口重推组件基于smart-retry源码进行了改造

smart-retry信息


仓库地址

https://gitee.com/hack3389/smart-retry/

阅读分支

分支:master

commit

主要功能

Smart Retry主要是用来进行方法重试的。和Guava Retry、Spring Retry相比,Smart Retry最大的特点是异步重试,支持持久化,系统重启之后可以继续重试。

功能特点

  • 方法重试持久化,系统重启之后可以继续重试
  • 异步重试(不支持同步重试)
  • 支持接口实现和声明式方式

架构图


如何使用

引入依赖


 

     com.github.hadoop002.smartretry

     retry-spring4

     使用最新版本

 


初始化表

create table sys_retry_task (

task_id bigint not null primary key auto_increment,

identity_name varchar(50) not null COMMENT '任务的唯一标识',

params text COMMENT '参数',

status tinyint not null COMMENT '状态。1: 处理中,2: 成功,3: 失败',

retry_count int not null default 0 COMMENT '重试次数',

remark varchar(1000) COMMENT '备注',

create_date datetime not null,

edit_date datetime) ENGINE=InnoDB COMMENT='系统重试表';


create index idx_identityname_status ON sys_retry_task(identity_name asc,status asc);

编写业务逻辑

 @RetryFunction(identity = "order.payment")

 public void payOrderAndUpdateStatus(Order order) {

     boolean success = paymentBusiness.doPayment(order);

     if (success) {

         orderBusiness.updateOrderPayStatus(order);

     } else {

         orderBusiness.updateOrderPayFail(order);

     }

 }

或者

 @Slf4j

 @Service("orderPaymentBusiness")

 public class OrderPaymentBusiness implements RetryHandler {

 

     @Autowired

     private PaymentBusiness paymentBusiness;

 

     @Autowired

     private OrderBusiness orderBusiness;

 

     @Override

     public String identity() {

         return "order.payment";

     }

 

     @Override

     public Void handle(Order order) {

         boolean success = paymentBusiness.doPayment(order);

         if (success) {

             orderBusiness.updateOrderPayStatus(order);

         } else {

             orderBusiness.updateOrderPayFail(order);

         }

         return null;

     }

 }

打开开关

在启动入口上加上@EnableRetrying 注解

源码阅读

源码结构

  • retry-cpre:重试模块的核心,定义了一系列的接口和扩展点
  • retry-spring4:基于spring4实现的重试模块
  • retry-serializer-jackson2:使用jackson2来实现参数的序列化和反序列化
  • retry-serializer-gson:使用gson来实现参数的序列化和反序列化
  • retry-serializer-fastjson:使用fastjson来实现参数的序列化和反序列化
  • retry-samples:配套的示例demo,可直接使用

大致流程

  • 系统启动后,把所有com.github.smartretry.core.RetryHandler和带有@RetryFunction注解的方法注册为定时任务。
  • 所有com.github.smartretry.core.RetryHandler和带有@RetryFunction注解的方法都会被Spring进行代理,执行的时候,会先把参数序列化,然后把执行任务插入到数据库。最后根据任务执行的成功与否,更新任务的相应状态。
  • 定时任务定时从表里面获取未成功的任务,进行重试

根据流程走读代码

根据整个流程走读代码应该会对代码有更清晰的认识

系统启动

系统启动的核心处理逻辑主要是在类RetryAnnotationBeanPostProcessor中,下面通过流程仔细分析该类


  • 扫描所有带有@RetryFunction注解和实现RetryHandler接口的类

   @Override

   public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException {

       if (bean instanceof AopInfrastructureBean) {

           // Ignore AOP infrastructure such as scoped proxies.

           return bean;

       }


       Class targetClass = AopProxyUtils.ultimateTargetClass(bean);

       if (!this.postedClasseCache.contains(targetClass)) {

           Object targetObject = AopProxyUtils.getSingletonTarget(bean);

           if (RetryHandler.class.isAssignableFrom(targetClass)) {

               RetryHandlerUtils.validateRetryHandler(targetClass);

               log.info("发现RetryHandler的实例:{},准备注册", targetClass);

               retryHandlers.add((RetryHandler) targetObject);

               return bean;

           }

           ReflectionUtils.MethodFilter methodFilter = method -> method.getAnnotation(RetryFunction.class) != null;

           Set methods = MethodIntrospector.selectMethods(targetClass, methodFilter);

           methods.forEach(method -> processRetryFunction(targetObject, method));


           postedClasseCache.add(targetClass);

       }

       return bean;

   }

改类实现了BeanPostProcessor接口,重写了postProcessAfterInitialization方法(每个bean初始化之后执行)

主要为两处判断/过滤

1.判断是否实现了RetryHandler

if (RetryHandler.class.isAssignableFrom(targetClass)) {

2.过滤打了@RetryFunction的方法

ReflectionUtils.MethodFilter methodFilter = method -> method.getAnnotation(RetryFunction.class) != null;

Set methods = MethodIntrospector.selectMethods(targetClass, methodFilter);

  • 把打了@RetryFunction注解的都转化为RetryHandler,即最终都是走的RetryHandler

   protected void processRetryFunction(Object bean, Method method) {

       log.info("发现@RetryFunction的实例:{},准备注册", method.toString());

       Method invocableMethod = AopUtils.selectInvocableMethod(method, bean.getClass());

       RetryHandlerUtils.validateRetryFunction(method);


       RetryFunction retryFunction = method.getAnnotation(RetryFunction.class);

       Supplier retryListenerSupplier = () -> {

           RetryListener retryListener = null;

           String retryListenerName = retryFunction.retryListener();

           if (StringUtils.isNotBlank(retryListenerName)) {

               retryListener = defaultListableBeanFactory.getBean(retryListenerName, RetryListener.class);

           }

           return retryListener;

       };

       retryHandlers.add(new MethodRetryHandler(bean, invocableMethod, retryFunction, retryListenerSupplier));

   }

  • 把所有的retryHandlers遍历注册为定时任务,默认用的quartz

   @Override

   public void afterSingletonsInstantiated() {

       postedClasseCache.clear();


       this.retryTaskMapper = defaultListableBeanFactory.getBean(RetryTaskMapper.class);

       this.retryRegistry = defaultListableBeanFactory.getBean(RetryRegistry.class);


       boolean beforeTask = environment.getProperty(EnvironmentConstants.RETRY_BEFORETASK, Boolean.class, Boolean.TRUE);

       this.retrySerializer = getRetrySerializerFromBeanFactory(defaultListableBeanFactory);

       if (this.retrySerializer == null) {

           this.retryHandlerPostProcessor = new DefaultRetryHandlerPostProcessor(retryTaskMapper, beforeTask);

       } else {

           this.retryHandlerPostProcessor = new DefaultRetryHandlerPostProcessor(new DefaultRetryTaskFactory(retrySerializer), retryTaskMapper, beforeTask);

       }


       retryHandlers.forEach(this::registerJobBean);


       retryHandlers.clear();

   }


   protected void registerJobBean(RetryHandler retryHandler) {

       if (retryHandler.identity().length() > 50) {

           throw new IllegalArgumentException("identity=" + retryHandler.identity() + " is too long, it must be less than 50");

       }


       RetryHandler retryHandlerProxy = retryHandlerPostProcessor.doPost(retryHandler);

       RetryHandlerRegistration.registry(retryHandlerProxy);


       RetryProcessor retryProcessor = new DefaultRetryProcessor(retryHandler, retryTaskMapper, retrySerializer);


       retryRegistry.register(retryHandler, retryProcessor);

   }

至此,系统启动所做的任务就完成了

打有@RetryFunction的注解和实现RetryHandler的接口的方法都会被Spring代理

  • @RetryFunction注解的方法如何被代理

public class RetryHandlerMethodPointcut implements Pointcut {


   @Override

   public ClassFilter getClassFilter() {

       return ClassFilter.TRUE;

   }


   @Override

   public MethodMatcher getMethodMatcher() {

       return new StaticMethodMatcher() {


           @Override

           public boolean matches(Method method, Class targetClass) {

               return RetryHandlerUtils.isRetryFunctionMethod(method);

           }

       };

   }

}


   public static boolean isRetryFunctionMethod(Method method) {

       if (method.getAnnotation(RetryFunction.class) != null && method.getParameterCount() == 1) {

           return !Object.class.equals(method.getParameterTypes()[0]);

       }

       return false;

   }

实现Pointcut接口通过isRetryFunctionMethod(Method method)方法判断是否是需要代理的方法

  • 实现RetryHandler接口的方法如何被代理

public class RetryHandlerClassPointcut implements Pointcut {


   @Override

   public ClassFilter getClassFilter() {

       return RetryHandler.class::isAssignableFrom;

   }


   @Override

   public MethodMatcher getMethodMatcher() {

       return new StaticMethodMatcher() {


           @Override

           public boolean matches(Method method, Class targetClass) {

               return RetryHandlerUtils.isRetryHandlerMethod(targetClass, method);

           }

       };

   }

}

   public static boolean isRetryHandlerMethod(Class targetClass, Method method) {

       if ("handle".equals(method.getName()) && method.getParameterCount() == 1 && method.isBridge() && method.isSynthetic()) {

           //RetryHandler接口有泛型,需要特殊处理

           return true;

       }

       Type interfaceType = getRetryHandlerGenericInterface(targetClass);

       if (interfaceType == null) {

           return false;

       }

       Class argsInputType = Object.class;

       if (interfaceType instanceof ParameterizedType) {

           argsInputType = (Class) ((ParameterizedType) interfaceType).getActualTypeArguments()[0];

       }

       Class parameterType = argsInputType;

       return "handle".equals(method.getName()) && method.getParameterCount() == 1 && method.getParameterTypes()[0].equals(parameterType);

   }

先对类进行过滤,要求实现RetryHandler

   @Override

   public ClassFilter getClassFilter() {

       return RetryHandler.class::isAssignableFrom;

   }

再对方法进行过滤,详细请看isRetryHandlerMethod(Class targetClass, Method method)方法


打有@RetryFunction注解的方法被调用时

当执行到带有@RetryFunction方法时(实现了RetryHandler也差不多的逻辑,就不再赘述了),会被方法拦截器拦截,

public class RetryHandlerMethodInterceptor implements MethodInterceptor {


   @Override

   public Object invoke(MethodInvocation invocation) {

       RetryFunction retryFunction = invocation.getMethod().getAnnotation(RetryFunction.class);

       Object[] args = invocation.getArguments();

       String identity = retryFunction.identity();

       if (StringUtils.isBlank(identity)) {

           identity = RetryHandlerUtils.getMethodIdentity(invocation.getMethod());

       }

       Optional optional = RetryHandlerRegistration.get(identity);

       if (optional.isPresent()) {

           return optional.get().handle(ArrayUtils.isEmpty(args) ? null : args[0]);

       }

       throw new IllegalArgumentException("找不到对应的RetryHandler代理,identity=" + identity);

   }

}

因为RetryHandlerRegistration中注册的是ImmediatelyRetryHandler,所以执行的是ImmediatelyRetryHandler的handle方法

       RetryHandler retryHandlerProxy = retryHandlerPostProcessor.doPost(retryHandler);

       RetryHandlerRegistration.registry(retryHandlerProxy);

doPost方法创建的是ImmediatelyRetryHandler

   @Override

   public RetryHandler doPost(RetryHandler retryHandler) {

       if (retryHandler instanceof GenericRetryHandler) {

           return new ImmediatelyRetryHandler((GenericRetryHandler) retryHandler, retryTaskFactory, retryTaskMapper, beforeTask);

       }

       return new ImmediatelyRetryHandler(new DefaultRetryHandler(retryHandler), retryTaskFactory, retryTaskMapper, beforeTask);

   }

接下来我们看看ImmediatelyRetryHandler.handle方法做了什么

   @Override

   public Object handle(Object arg) {

       RetryContext retryContext = new RetryContext(genericRetryHandler, arg);

       Object result;

       RetryTask retryTask;

       // 是否在执行任务之前插入数据库 |配置false则表示,只有任务执行报错才插入数据库|

       if (beforeTask) {

           retryTask = retryTaskFactory.create(genericRetryHandler, arg);

           retryTaskMapper.insert(retryTask);

           try {

               result = genericRetryHandler.handle(arg);

               retryContext.setResult(result);

               completeTask(retryTask);

               onRetry(retryContext);

               onComplete(retryContext);

           } catch (NoRetryException e) {

               retryContext.setException(e);

               failureTask(retryTask, retryContext);


               onRetry(retryContext);

               onError(retryContext);

               throw e;

           } catch (RuntimeException e) {

               retryContext.setException(e);


               if (retryContext.getRetryCount() == genericRetryHandler.maxRetryCount()) {

                   //只有最大可重试次数为0,才会执行到这里

                   failureTask(retryTask, retryContext);


                   onRetry(retryContext);

                   onError(retryContext);

               } else {

                   updateRemark(retryTask, e);

                   onRetry(retryContext);

               }


               throw e;

           }

           return result;

       } else {

           try {

               result = genericRetryHandler.handle(arg);

               retryContext.setResult(result);

               onRetry(retryContext);

               onComplete(retryContext);

           } catch (NoRetryException e) {

               retryContext.setException(e);


               onRetry(retryContext);

               onError(retryContext);


               throw e;

           } catch (RuntimeException e) {

               retryContext.setException(e);

               if (retryContext.getRetryCount() == genericRetryHandler.maxRetryCount()) {

                   //只有最大可重试次数为0,才会执行到这里

                   onRetry(retryContext);

                   onError(retryContext);

               } else {

                   //等待重试

                   retryTask = retryTaskFactory.create(genericRetryHandler, arg);

                   retryTask.setRemark(StringUtils.left(e.getMessage(), 1000));

                   retryTaskMapper.insert(retryTask);

                   onRetry(retryContext);

               }


               throw e;

           }

       }

       return result;

   }

这个方法有点长,不过代码还算简单,简而言之就是重试的方法发生异常后入库( retryTaskFactory.create(genericRetryHandler, arg);)的操作,当然里边有序列化参数,修改重试表的状态等操作,就不再详细讲了(比较简单,相信大家都看得懂(*╹▽╹*)

至此,调用带有@RetryFunction注解的方法第一被调用,以及如何把重试任务入库的操作就完成了,下面讲解重试的逻辑

定时重试逻辑

上边有讲到把重试任务注册为定时任务的逻辑,再看一下代码吧

       RetryProcessor retryProcessor = new DefaultRetryProcessor(retryHandler, retryTaskMapper, retrySerializer);


       retryRegistry.register(retryHandler, retryProcessor);

可以看到,注册的是一个DefaultRetryProcessor,就是说,每次定时任务调用的是该类的doRetry方法,以quartz为例

public class RetryJob implements Job {


   private RetryProcessor retryProcessor;


   public RetryJob() {

   }


   public RetryJob(RetryProcessor retryProcessor) {

       this.retryProcessor = retryProcessor;

   }


   @Override

   public void execute(JobExecutionContext context) {

       retryProcessor.doRetry();

   }

}

下边我们看看doRetry都做了些什么

   @Override

   public void doRetry() {

       log.info("开始执行Identity={}的重试,maxRetryCount={}, initialDelay={}", genericRetryHandler.identity(), genericRetryHandler.maxRetryCount(), genericRetryHandler.initialDelay());

       List tasks = retryTaskMapper.queryNeedRetryTaskList(genericRetryHandler.identity(), genericRetryHandler.maxRetryCount(), genericRetryHandler.initialDelay());

       if (tasks == null) {

           return;

       }

       log.info("Identity={}当前有{}个任务准备重试", genericRetryHandler.identity(), tasks.size());

       if (genericRetryHandler.ignoreException()) {

           tasks.forEach(this::doRetryWithIgnoreException);

       } else {

           tasks.forEach(this::doRetry);

       }

   }

相信聪明的你肯定猜到了,没错,取出之前入库的数据开始进行重试

   private void doRetry(RetryTask retryTask) {

       log.info("开始重试Identity={},Id={}的任务", retryTask.getIdentity(), retryTask.getTaskId());

       retryedRetryHandler.setRetryTask(retryTask);

       String json = retryTask.getParams();

       if (StringUtils.isBlank(json)) {

           retryedRetryHandler.handle(null);

       } else {

           retryedRetryHandler.parseArgsAndhandle(json);

       }

   }

重试调用的是retryedRetryHandler.handle()的方法

   @Override

   public Object handle(Object arg) {

       retryTask.setRetryCount(retryTask.getRetryCount() + 1);

       RetryContext retryContext = new RetryContext(genericRetryHandler, arg, retryTask.getRetryCount());

       Object result;

       try {

           result = genericRetryHandler.handle(arg);

           retryContext.setResult(result);

           completeTask(retryTask);

           onRetry(retryContext);

           onComplete(retryContext);

       } catch (NoRetryException e) {

           retryContext.setException(e);


           failureTask(retryTask, retryContext);

           onRetry(retryContext);

           onError(retryContext);

           throw e;

       } catch (RuntimeException e) {

           retryContext.setException(e);


           if (retryTask.getRetryCount() == genericRetryHandler.maxRetryCount()) {

               failureTask(retryTask, retryContext);

           } else {

               update(retryTask, retryContext);

           }


           onRetry(retryContext);


           if (retryContext.getRetryCount() == genericRetryHandler.maxRetryCount()) {

               //重试次数达到最大,触发失败回调

               onError(retryContext);

           }


           throw e;

       }


       return result;

   }

retryedRetryHandler.handle方法主要是调用目标方法后,如果目标方法没报错,则把表中的状态修改成功,发生异常后更新表中的异常信息,达到最大重试次数后,把表中的状态改为失败,当然其中有把数据库中的参数反序列的操作.


到这里,smart-retry的大致流程,源码解读就完成了,当然,这并不是全部代码,只是主流程的代码,有兴趣的同学可以把代码拉下来,详细阅读以下

总结

smart-retry支持异步重试,支持重试持久化,用着还是相当不错的,但是还是有缺点的,比如,1. 只支持有且仅有一个参数2.每一个重试方法都对应一个定时任务,会造成线程的过度使用

所以,我在该源码的基础上,对smart-retry进行了改造,改造点如下

  • 支持重试的方法有多个参数
  • 支持指定抛出哪些异常后重试
  • 支持配置在注解上是否在执行方法前入库
  • 只提供重试的接口给用户,具体定时任务让用户自己去实现,比如改造后例子中的定时任务用的是xxl-job


如果想详细了解改造后的smart-retry,请参照smart-retry改造升级文档




相关文章
Glide Caused by: javax.net.ssl.SSLPeerUnverifiedException: Hostname not verified:
Glide Caused by: javax.net.ssl.SSLPeerUnverifiedException: Hostname not verified:
1601 0
|
6月前
|
机器学习/深度学习 存储 缓存
129_量化技术:INT8与动态量化 - 推导压缩的精度损失公式
在2025年的大语言模型(LLM)时代,随着模型规模的指数级增长,部署这些庞然大物变得越来越具有挑战性。GPT-5和Claude 3等最新模型的参数量已经达到数千亿甚至上万亿,这给计算资源和内存带来了巨大压力。模型量化作为一种有效的压缩技术,正在成为解决这一挑战的关键方案。本文将深入探讨LLM量化技术,特别是INT8和动态量化方法,推导其精度损失公式,并提供2025年最新的优化策略和实现代码。
660 4
|
10月前
|
SQL 存储 缓存
海量数据分页查询效率低?一文解析阿里云AnalyticDB深分页优化方案
本文介绍了AnalyticDB(简称ADB)针对深分页问题的优化方案。深分页是指从海量数据中获取靠后页码的数据,常导致性能下降。ADB通过快照缓存技术解决此问题:首次查询生成结果集快照并缓存,后续分页请求直接读取缓存数据。该方案在数据导出、全量结果分页展示及业务报表并发控制等场景下表现出色。测试结果显示,相比普通分页查询,开启深分页优化后查询RT提升102倍,CPU使用率显著降低,峰值内存减少至原方案的几分之一。实际应用中,某互联网金融客户典型慢查询从30秒优化至0.5秒,性能提升60+倍。
738 1
|
存储 网络安全 数据安全/隐私保护
Docker--harbor私有仓库部署与管理
Docker--harbor私有仓库部署与管理
Docker--harbor私有仓库部署与管理
|
供应链 网络协议 数据安全/隐私保护
|
消息中间件 Java Kafka
集成Kafka到Spring Boot项目中的步骤和配置
集成Kafka到Spring Boot项目中的步骤和配置
1620 7
|
消息中间件
分布式篇问题之通过本地消息表实现分布式事务的最终一致性问题如何解决
分布式篇问题之通过本地消息表实现分布式事务的最终一致性问题如何解决
589 0
|
Java fastjson
springboot切面编程基础实践
springboot切面编程基础实践
238 0
|
存储
C 标准库 - <stdio.h> 详解1
C 标准库 - <stdio.h> 详解
731 0
|
运维 数据可视化 持续交付
如何解决技术债
本文介绍了技术债的概念及其影响。技术债是指在开发过程中因选择快速解决方案而非最优方法而产生的额外工作量。文章指出,技术债可能导致项目中出现如流水线失败、无用代码、难以理解的代码等问题。还强调了管理技术债的重要性,因为它会影响软件的交付速率和质量。有效的管理包括识别技术债、可视化问题、分析优先级、制定执行计划和持续改进。建议团队通过价值/成本矩阵来确定优先解决的技术债,并通过建立技术规范、服务责任人制度和持续关注技术趋势来预防和解决技术债。此外,应确保持续投入资源进行技术优化,并与团队和客户分享改进成果,以维持软件的高质量和稳定性。
736 1