前言
通常情况下,SpringMVC接收到请求后会将请求具体分发给单个线程进行处理。如果请求处理中涉及到比较耗时的操作,为了能更快地将响应返回给用户,那么就需要将耗时的业务操作交由别的线程进行异步处理,而SpringBoot已经为我们提供了这样的实现。
@Async注解
新建一个AsyncController,给需要异步执行的方法加上@Async注解,代码如下:
kotlin复制代码package geek.springboot.application.controller; import lombok.extern.slf4j.Slf4j; import org.springframework.scheduling.annotation.Async; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController; @Slf4j @RequestMapping("/async/") @RestController public class AsyncController { @GetMapping public void processGet() { log.info("start process get"); this.doSomeThing(); } // 给需要异步执行的方法加上@Async @Async public void doSomeThing() { log.info("do some thing"); } }
需要开启SpringBoot异步任务执行功能,还需要加上@EnableAsync注解,在SpringApplication启动类加上,代码如下:
typescript复制代码package geek.springboot.application; import lombok.extern.slf4j.Slf4j; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.scheduling.annotation.EnableAsync; @EnableAsync // 开启异步任务执行功能 @Slf4j @SpringBootApplication public class Application { public static void main(String[] args) { SpringApplication.run(Application.class, args); } }
@Async无效的原因
启动后Get /async/ ,控制台输出如下:
可以看到执行doSomeThing()的线程和处理Get请求的线程是同一个,都是nio-8080-exec-1,那这里之所以@Async注解无效,这是因为Spring是给扫描到的每个Bean都创建代理对象(@Component、@Service、@Controller...),而只有这些代理对象才做了方法增强,调用代理对象的方法才能实现我们期望的行为。比如@Async、@Transactional...
所以@Async无效的问题在于AsyncController中,this.doSomeThing();调用的不是代理对象的doSomeThing(),而是当前对象this的doSomeThing().
明白了问题的根源,那么新建一个AsyncService,代码如下:
java复制代码package geek.springboot.application.service; import lombok.extern.slf4j.Slf4j; import org.springframework.scheduling.annotation.Async; import org.springframework.stereotype.Service; import java.util.concurrent.TimeUnit; @Slf4j @Service public class AsyncService { // 给需要异步执行的方法加上@Async @Async public void doSomeThing() { // 为了演示线程池的效果,这里模拟耗时操作 try { TimeUnit.SECONDS.sleep(2); } catch (InterruptedException e) { e.printStackTrace(); } log.info("do some thing"); } }
将AsyncController稍作调整
kotlin复制代码package geek.springboot.application.controller; import geek.springboot.application.service.AsyncService; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController; @Slf4j @RequestMapping("/async/") @RestController public class AsyncController { @Autowired private AsyncService asyncService; @GetMapping public void processGet() { log.info("start process get"); // 调用代理对象上@Async修饰的方法 this.asyncService.doSomeThing(); } }
Get /asysnc/ ,输出如下:
可以看到doSomeThing()和processGet()不是同一个线程,异步处理成功。
@Async无效的其他原因
- @Async修饰的方法不能是private的,最好是public,可以保证别的Bean访问该方法的权限,也保证Spring方法增强成功。
- @Async修饰的方法不能是static的。
- @Async修饰的方法返回值只能是void或者Future 。
- 没有加上@EnableAsync注解,或者添加@Async的类没有被Spring扫描到。
- 调用对象中@Async修饰的方法时,该对象一定要是被Spring托管的,如果是自行new出来的,因为不受Spring托管,并没有做方法增强。
自定义异步线程池
@Async本质,其实就是SpringBoot默认给我们封装好了一个线程池,所有Spring代理对象的@Async修饰方法,都会被扔到线程池中执行,从而不影响Http请求线程处理逻辑。而自定义的方式有如下几种:
application.yml
最简单的,通过application.yml自定义线程池
yaml复制代码spring: task: execution: # 核心线程数1,最大线程数2,任务队列容量为0,除核心线程外的线程空闲时存活时间60秒 pool: core-size: 1 max-size: 2 queue-capacity: 0 keep-alive: "60s"
短时间内多次Get /async/ ,输出如下:
说明线程池配置生效,每一次doSomeThing()执行,都是由线程池中的线程轮流处理。
还可能出现以下错误,原因是当时线程池全部线程都在执行doSomeThing(),同时因为任务队列容量为0,所以再有新的doSomeThing()任务,直接被拒绝
less复制代码2023-07-28 05:24:14.496 ERROR 39436 --- [nio-8080-exec-6] o.a.c.c.C.[.[.[/].[dispatcherServlet] : Servlet.service() for servlet [dispatcherServlet] in context with path [] threw exception [Request processing failed; nested exception is org.springframework.core.task.TaskRejectedException: Executor [java.util.concurrent.ThreadPoolExecutor@1a67c1cf[Running, pool size = 2, active threads = 2, queued tasks = 0, completed tasks = 2]] did not accept task: org.springframework.aop.interceptor.AsyncExecutionInterceptor$$Lambda$665/842994983@196a33ce] with root cause java.util.concurrent.RejectedExecutionException: Task java.util.concurrent.FutureTask@17adcf4b rejected from java.util.concurrent.ThreadPoolExecutor@1a67c1cf[Running, pool size = 2, active threads = 2, queued tasks = 0, completed tasks = 2] at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2063) ~[na:1.8.0_241] at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:830) ~[na:1.8.0_241] at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1379) ~[na:1.8.0_241] at java.util.concurrent.AbstractExecutorService.submit(AbstractExecutorService.java:134) ~[na:1.8.0_241] at org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor.submit(ThreadPoolTaskExecutor.java:388) ~[spring-context-5.3.29.jar:5.3.29] at org.springframework.aop.interceptor.AsyncExecutionAspectSupport.doSubmit(AsyncExecutionAspectSupport.java:292) ~[spring-aop-5.3.29.jar:5.3.29] at org.springframework.aop.interceptor.AsyncExecutionInterceptor.invoke(AsyncExecutionInterceptor.java:129) ~[spring-aop-5.3.29.jar:5.3.29] at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:186) ~[spring-aop-5.3.29.jar:5.3.29] at org.springframework.aop.framework.CglibAopProxy$CglibMethodInvocation.proceed(CglibAopProxy.java:763) ~[spring-aop-5.3.29.jar:5.3.29] at org.springframework.aop.framework.CglibAopProxy$DynamicAdvisedInterceptor.intercept(CglibAopProxy.java:708) ~[spring-aop-5.3.29.jar:5.3.29] at geek.springboot.application.service.AsyncService$$EnhancerBySpringCGLIB$$ffc452ec.doSomeThing(<generated>) ~[classes/:na] at geek.springboot.application.controller.AsyncController.processGet(AsyncController.java:22) ~[classes/:na] at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[na:1.8.0_241] at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[na:1.8.0_241] at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[na:1.8.0_241] at java.lang.reflect.Method.invoke(Method.java:498) ~[na:1.8.0_241] at org.springframework.web.method.support.InvocableHandlerMethod.doInvoke(InvocableHandlerMethod.java:205) ~[spring-web-5.3.29.jar:5.3.29] at org.springframework.web.method.support.InvocableHandlerMethod.invokeForRequest(InvocableHandlerMethod.java:150) ~[spring-web-5.3.29.jar:5.3.29] at org.springframework.web.servlet.mvc.method.annotation.ServletInvocableHandlerMethod.invokeAndHandle(ServletInvocableHandlerMethod.java:117) ~[spring-webmvc-5.3.29.jar:5.3.29] at org.springframework.web.servlet.mvc.method.annotation.RequestMappingHandlerAdapter.invokeHandlerMethod(RequestMappingHandlerAdapter.java:895) ~[spring-webmvc-5.3.29.jar:5.3.29] at org.springframework.web.servlet.mvc.method.annotation.RequestMappingHandlerAdapter.handleInternal(RequestMappingHandlerAdapter.java:808) ~[spring-webmvc-5.3.29.jar:5.3.29] at org.springframework.web.servlet.mvc.method.AbstractHandlerMethodAdapter.handle(AbstractHandlerMethodAdapter.java:87) ~[spring-webmvc-5.3.29.jar:5.3.29] at org.springframework.web.servlet.DispatcherServlet.doDispatch(DispatcherServlet.java:1072) ~[spring-webmvc-5.3.29.jar:5.3.29] at org.springframework.web.servlet.DispatcherServlet.doService(DispatcherServlet.java:965) ~[spring-webmvc-5.3.29.jar:5.3.29] at org.springframework.web.servlet.FrameworkServlet.processRequest(FrameworkServlet.java:1006) ~[spring-webmvc-5.3.29.jar:5.3.29] at org.springframework.web.servlet.FrameworkServlet.doGet(FrameworkServlet.java:898) ~[spring-webmvc-5.3.29.jar:5.3.29] at javax.servlet.http.HttpServlet.service(HttpServlet.java:529) ~[tomcat-embed-core-9.0.78.jar:4.0.FR] at org.springframework.web.servlet.FrameworkServlet.service(FrameworkServlet.java:883) ~[spring-webmvc-5.3.29.jar:5.3.29] at javax.servlet.http.HttpServlet.service(HttpServlet.java:623) ~[tomcat-embed-core-9.0.78.jar:4.0.FR] at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:209) ~[tomcat-embed-core-9.0.78.jar:9.0.78] at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:153) ~[tomcat-embed-core-9.0.78.jar:9.0.78] at org.apache.tomcat.websocket.server.WsFilter.doFilter(WsFilter.java:51) ~[tomcat-embed-websocket-9.0.78.jar:9.0.78] at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:178) ~[tomcat-embed-core-9.0.78.jar:9.0.78] at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:153) ~[tomcat-embed-core-9.0.78.jar:9.0.78] at org.springframework.web.filter.RequestContextFilter.doFilterInternal(RequestContextFilter.java:100) ~[spring-web-5.3.29.jar:5.3.29] at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:117) ~[spring-web-5.3.29.jar:5.3.29] at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:178) ~[tomcat-embed-core-9.0.78.jar:9.0.78] at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:153) ~[tomcat-embed-core-9.0.78.jar:9.0.78] at org.springframework.web.filter.FormContentFilter.doFilterInternal(FormContentFilter.java:93) ~[spring-web-5.3.29.jar:5.3.29] at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:117) ~[spring-web-5.3.29.jar:5.3.29] at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:178) ~[tomcat-embed-core-9.0.78.jar:9.0.78] at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:153) ~[tomcat-embed-core-9.0.78.jar:9.0.78] at org.springframework.web.filter.CharacterEncodingFilter.doFilterInternal(CharacterEncodingFilter.java:201) ~[spring-web-5.3.29.jar:5.3.29] at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:117) ~[spring-web-5.3.29.jar:5.3.29] at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:178) ~[tomcat-embed-core-9.0.78.jar:9.0.78] at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:153) ~[tomcat-embed-core-9.0.78.jar:9.0.78] at org.apache.catalina.core.StandardWrapperValve.invoke(StandardWrapperValve.java:167) ~[tomcat-embed-core-9.0.78.jar:9.0.78] at org.apache.catalina.core.StandardContextValve.invoke(StandardContextValve.java:90) [tomcat-embed-core-9.0.78.jar:9.0.78] at org.apache.catalina.authenticator.AuthenticatorBase.invoke(AuthenticatorBase.java:481) [tomcat-embed-core-9.0.78.jar:9.0.78] at org.apache.catalina.core.StandardHostValve.invoke(StandardHostValve.java:130) [tomcat-embed-core-9.0.78.jar:9.0.78] at org.apache.catalina.valves.ErrorReportValve.invoke(ErrorReportValve.java:93) [tomcat-embed-core-9.0.78.jar:9.0.78] at org.apache.catalina.core.StandardEngineValve.invoke(StandardEngineValve.java:74) [tomcat-embed-core-9.0.78.jar:9.0.78] at org.apache.catalina.connector.CoyoteAdapter.service(CoyoteAdapter.java:343) [tomcat-embed-core-9.0.78.jar:9.0.78] at org.apache.coyote.http11.Http11Processor.service(Http11Processor.java:390) [tomcat-embed-core-9.0.78.jar:9.0.78] at org.apache.coyote.AbstractProcessorLight.process(AbstractProcessorLight.java:63) [tomcat-embed-core-9.0.78.jar:9.0.78] at org.apache.coyote.AbstractProtocol$ConnectionHandler.process(AbstractProtocol.java:926) [tomcat-embed-core-9.0.78.jar:9.0.78] at org.apache.tomcat.util.net.NioEndpoint$SocketProcessor.doRun(NioEndpoint.java:1791) [tomcat-embed-core-9.0.78.jar:9.0.78] at org.apache.tomcat.util.net.SocketProcessorBase.run(SocketProcessorBase.java:52) [tomcat-embed-core-9.0.78.jar:9.0.78] at org.apache.tomcat.util.threads.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1191) [tomcat-embed-core-9.0.78.jar:9.0.78] at org.apache.tomcat.util.threads.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:659) [tomcat-embed-core-9.0.78.jar:9.0.78] at org.apache.tomcat.util.threads.TaskThread$WrappingRunnable.run(TaskThread.java:61) [tomcat-embed-core-9.0.78.jar:9.0.78] at java.lang.Thread.run(Thread.java:748) [na:1.8.0_241]
ThreadPoolTaskExecutor
自定义ThreadPoolTaskExecutor,并作为Spring中的Bean存在,代码如下:
java复制代码package geek.springboot.application.configuration; import lombok.extern.slf4j.Slf4j; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; import java.util.concurrent.atomic.AtomicInteger; /** * 任务执行Config * * @author Bruse */ @Slf4j @Configuration public class TaskExecutorConfig { private final AtomicInteger threadSeq = new AtomicInteger(0); @Bean public ThreadPoolTaskExecutor taskExecutor() { ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); // 核心线程数 executor.setCorePoolSize(2); // 最大线程数 executor.setMaxPoolSize(2); // 空闲线程存活时间,单位:秒 executor.setKeepAliveSeconds(60); // 任务队列容量 executor.setQueueCapacity(0); // 线程名称前缀,如果设置了线程池工厂自定义线程,那么设置该参数无效 executor.setThreadNamePrefix("async task_"); // 自定义线程工厂,可以在这里自定义线程池中线程的一些参数 executor.setThreadFactory(r -> { Thread thread = new Thread(r); thread.setName("async task_" + threadSeq.getAndIncrement()); thread.setUncaughtExceptionHandler((t, e) -> log.error("线程抛出未捕获异常 {} {}", t.getName(), e.getMessage())); return thread; }); // 任务拒绝策略,实现RejectedExecutionHandler接口,自定义处理逻辑 executor.setRejectedExecutionHandler((r, executor1) -> log.error("任务被拒绝")); // 或者使用ThreadPoolExecutor现有的拒绝策略 // executor.setRejectedExecutionHandler(new ThreadPoolExecutor.AbortPolicy()); return executor; } }
再次短时间内多次请求,输出如下:
可以看到设置到拒绝策略,线程名称等符合预期。
TaskExecutorBuilder
除了上面实例化ThreadPoolTaskExecutor的方式,也可以通过TaskExecutorBuilder自定义线程池,使用TaskExecutorBuilder最终也是构建出ThreadPoolTaskExecutor,代码如下:
scss复制代码@Bean public ThreadPoolTaskExecutor taskExecutor() { TaskExecutorBuilder builder = new TaskExecutorBuilder() .corePoolSize(2) .maxPoolSize(2) .threadNamePrefix("async thread_") .queueCapacity(0) .keepAlive(Duration.ofSeconds(60)) .taskDecorator(runnable -> { // 装饰者模式,这里可以在线程执行任务前,做一些额外操作 log.info("任务将要执行"); return runnable; }); return builder.build(); }
输出如下:
或者直接定义一个TaskExecutorBuilder Bean,那么SpringBoot创建的ThreadPoolTaskExecutor将会使用该builder进行构建,代码如下:
scss复制代码@Bean public TaskExecutorBuilder executorBuilder() { return new TaskExecutorBuilder() .corePoolSize(2) .maxPoolSize(2) .threadNamePrefix("async thread_") .queueCapacity(0) .keepAlive(Duration.ofSeconds(60)) .taskDecorator(runnable -> { // 装饰者模式,这里可以在线程执行任务前,做一些额外操作 log.info("任务将要执行"); return runnable; }); }
定时任务
@Scheduled
SpringBoot提供了简单的定时任务实现,只需要使用@EnableScheduling和@Scheduled这两个注解,新建一个SyncTask,代码如下:
kotlin复制代码package geek.springboot.application.task; import lombok.extern.slf4j.Slf4j; import org.springframework.scheduling.annotation.EnableScheduling; import org.springframework.scheduling.annotation.Scheduled; import org.springframework.stereotype.Component; /** * 定时同步 * * @author Bruse */ @Slf4j @EnableScheduling // 开启定时任务 @Component public class SyncTask { // 在要执行的方法上加上@Scheduled注解,这里表示每5秒执行一次sync() @Scheduled(cron = "0/5 * * * * ?") public void sync() { log.info("do sync task ..."); } }
启动后控制台输出如下:
application.yml
定时任务线程池也可以通过application.yml自定义,演示代码如下:
yaml复制代码spring: task: scheduling: thread-name-prefix: "scheduling-" pool: size: 2
再次重启,控制台输出如下,可以看到定时任务线程池的线程名称前缀已生效:
ThreadPoolTaskScheduler
SpringBoot定时任务执行底层实现依赖于ThreadPoolTaskScheduler,我们也可以自行创建ThreadPoolTaskScheduler并注册到Spring中来进行自定义,代码如下:
scss复制代码@Bean public ThreadPoolTaskScheduler taskScheduler() { return new TaskSchedulerBuilder() // 线程名称前缀 .threadNamePrefix("scheduling-") // 线程数 .poolSize(1) .build(); }
TaskSchedulerBuilder
也可以自行创建TaskSchedulerBuilder,并注册到Spring中,那么SpringBoot在创建定时任务线程池时,则会按照该builder来进行线程池的创建,代码如下:
typescript复制代码@Bean public TaskSchedulerBuilder schedulerBuilder() { return new TaskSchedulerBuilder() // 线程名称前缀 .threadNamePrefix("schedule-") // 线程数 .poolSize(1); }
注意
ThreadPoolTaskScheduler默认线程数只有1,所以如果执行的任务过于耗时,那么则不能执行任务时都非常准时,SyncTask稍作如下调整:
typescript复制代码@Scheduled(cron = "0/5 * * * * ?") public void sync() { // 模拟耗时操作 try { TimeUnit.SECONDS.sleep(6); } catch (InterruptedException e) { throw new RuntimeException(e); } log.info("do sync task ..."); }
在不修改线程数的情况下,期望sync()每5秒执行一次,实际sync()执行耗时6秒,那么输出如下:
可以看到sync()里每次log打印的时间间隔都大于5秒。
那么是不是调整线程数就可以了呢?答案是否定的。就算把线程数调整为2,重启后输出可以看到,sync()每次打印的时间间隔还是大于5秒。
因为sync()这个需要定时执行的任务,已经和ThreadPoolTaskScheduler中的某个线程绑定了,sync()将由这个线程一直负责处理。
调整线程数的意义在于如果有多个定时任务需要处理的时候,需要将不同的定时任务交由更多的线程进行处理。
比如在线程数默认为1的情况下,新建另一个需要定时调用的方法doSomeThing():
typescript复制代码@Scheduled(cron = "0/5 * * * * ?") public void doSomeThing() { try { TimeUnit.SECONDS.sleep(6); } catch (InterruptedException e) { throw new RuntimeException(e); } log.info("do some thing ..."); }
启动后输出如下,可以看到系统中已有的定时任务都交由单个线程处理:
那这个时候我们把线程数调整为2,重启后输出如下:
这次2个定时任务分别交由2个线程进行处理,互不干扰了。
作者:LBruse