ScheduledThreadPoolExecutor解析

简介: ScheduledThreadPoolExecutor继承自ThreadPoolExecutor,支持延时及周期性任务调度。其核心在于ScheduledFutureTask和DelayedWorkQueue:前者通过重设执行时间实现周期性,后者基于延迟队列实现任务的定时触发,结合addWorker机制确保线程池持续运行,从而完成精准调度。

ScheduledThreadPoolExecutor适用于延时执行或者周期性执行的任务调度,ScheduledThreadPoolExecutor在实现上继承了ThreadPoolExecutor,所以你依然可以将ScheduledThreadPoolExecutor当成ThreadPoolExecutor来使用,但是ScheduledThreadPoolExecutor的功能要强大得多,因为ScheduledThreadPoolExecutor可以根据设定的参数来周期性调度运行,下面的图片展示了四个和周期性相关的方法:
四个Scheduled方法
如果你想延时一段时间之后运行一个Runnable,那么使用第一个方法
如果你想延时一段时间然后运行一个Callable,那么使用的第二个方法
如果你想要延时一段时间,然后根据设定的参数周期执行Runnable,那么可以选择第三个和第四个方法,第三个方法和第四个方法的区别在于:第三个方法严格按照规划的时间路径来执行,比如周期为2,延时为0,那么执行的序列为0,2,4,6,8....,而第四个方法将基于上次执行时间来规划下次的执行,也就是在上次执行完成之后再次执行。比如上面的执行序列0,2,4,6,8...,如果第2秒没有被调度执行,而在第三秒的时候才被调度,那么下次执行的时间不是4,而是5,以此类推。
下面来看一下这四个方法的一些细节:
通过上面的代码我们可以发现,前两个方法是类似的,后两个方法也是类似的。前两个方法属于一次性调度,所以period都为0,区别在于参数不同,一个是Runnable,而一个是Callable,可笑的是,最后都变为了Callable了,见下面的构造函数:
对于后两个方法,区别仅仅在于period的,scheduleWithFixedDelay对参数进行了操作,将原来的时间变为负数了,而后面在计算下次被调度的时间的时候会根据这个参数的正负值来分别处理,正数代表scheduleAtFixedRate,而负数代表了scheduleWithFixedDelay。
一个需要被我们注意的细节是,以上四个方法最后都会调用一个方法: delayedExecute(t),下面看一下这个方法:
大概的意思就是先判断线程池是否被关闭了,如果被关闭了,则拒绝任务的提交,否则将任务加入到任务队列中去等待被调度执行。最后的ensurePrestart的意思是需要确保线程池已经被启动起来了。下面是这个方法:
主要是增加了一个没有任务的worker,有什么用呢?我们还记得Worker的逻辑吗?addWorker方法的执行,会触发Worker的run方法的执行,然后runWorker方法就会被执行,而runWorker方法是循环从workQueue中取任务执行的,所以确保线程池被启动起来是重要的,而只需要简单的执行addWorker便会触发线程池的启动流程。 对于调度线程池来说,只要执行了addWorker方法,那么线程池就会一直在后台周期性的调度执行任务。到此,似乎我们还是没有闹明白ScheduledThreadPoolExecutor是如何实现周期性的,上面讲到四个scheduled方法时,我们没有提一个重要的类:ScheduledFutureTask,对,所有神奇的事情将会发生在这个类中,下面来分析一下这个类。
ScheduledFutureTask类图
看上面的类图,貌似这个类非常复杂,还好,我们发现他实现了Runnable接口,那么必然会有一个run方法,而这个run方法必然是整个类的核心,下面来看一下这个run方法的内容:
首先,判断是否是周期性的任务,如果不是,则直接执行(一次性),否则执行,然后设置下次执行的时间,然后重新调度,等待下次执行。这里有一个方法需要注意,也就是setNextRunTime,上面我们提到scheduleAtFixedRate和scheduleWithFixedDelay在传递参数时不一样,后者将delay值变为了负数,所以下面的处理正好印证了前文所述。
下面来看一下reExecutePeriodic方法是如何做的,他的目标是将任务再次被调度执行,下面的代码展示了这个功能的实现:
可以看到,这个方法就是将我们的任务再次放到了workQueue里面,那这个参数是什么?在上面的run方法中我们调用了reExecutePeriodic方法,参数为outerTask,而这个变量是什么?看下面的代码:
这个变量指向了自己,而this的类型是什么?是ScheduledFutureTask,也就是可以被调度的task,这样就实现了循环执行任务了。
上面的分析已经到了循环执行,但是ScheduledThreadPoolExecutor的功能是周期性执行,所以我们接着分析ScheduledThreadPoolExecutor是如何根据我们的参数走走停停的。这个时候,是应该看一下ScheduledThreadPoolExecutor的构造函数了,我们来看一个最简单的构造函数:
我们知道ScheduledThreadPoolExecutor的父类是ThreadPoolExecutor,所以这里的super其实是ThreadPoolExecutor的构造函数,我们发现其中有一个参数DelayedWorkQueue,看名字貌似是一个延迟队列的样子,进一步跟踪代码,发现了下面的一行代码(构造函数中):
所以在ScheduledThreadPoolExecutor中,workQueue是一个DelayedWorkQueue类型的队列,我们暂且认为DelayedWorkQueue是一种具备延迟功能的队列吧,那么,到此我们便可以想明白了,上面的分析我们明白了ScheduledThreadPoolExecutor是如何循环执行任务的,而这里我们明白了ScheduledThreadPoolExecutor使用DelayedWorkQueue来达到延迟的目标,所以组合起来,就可以实现ScheduledThreadPoolExecutor周期性执行的目标。下面我们来看一下DelayedWorkQueue是如何做到延迟的吧,上文中提到一个方法:getTask,这个方法的作用是从workQueue中取出任务来执行,而在ScheduledThreadPoolExecutor里面,getTask方法是从DelayedWorkQueue中取任务的,而取任务无非两个方法:poll或者take,下面我们对DelayedWorkQueue的take方法来分析一下:
在for循环里面,首先从queue中获取第一个任务,然后从任务中取出延迟时间,而后使用available变量来实现延迟效果。这里面需要几个点需要探索一下:
这个queue是什么东西?
延迟时间的来龙去脉?
available变量的来龙去脉?
对于第一个问题,看下面的代码:
它是一个RunnableScheduledFuture类型的数组,下面是RunnableScheduledFuture类的类关系图:
RunnableScheduledFuture类关系
数组里面保存了我们的RunnableScheduledFuture,对queue的操作,主要来看一下增加元素和消费元素的操作。首先,假设使用add方法来增加RunnableScheduledFuture到queue,调用的链路如下:
Java
运行代码
复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
public boolean add(Runnable e) {
return offer(e);
}

public boolean offer(Runnable x) {
if (x == null)
throw new NullPointerException();
RunnableScheduledFuture<?> e = (RunnableScheduledFuture<?>)x;
final ReentrantLock lock = this.lock;
lock.lock();
try {
int i = size;
if (i >= queue.length)
grow();
size = i + 1;
if (i == 0) {
queue[0] = e;
setIndex(e, 0);
} else {
siftUp(i, e);
}
if (queue[0] == e) {
leader = null;
available.signal();
}
} finally {
lock.unlock();
}
return true;
}

相关文章
|
18小时前
|
NoSQL MongoDB
3.4.1 文档的插入
MongoDB中使用insert()或insertMany()向集合插入文档,支持单条或批量添加。若集合不存在则自动创建,未指定_id时自动生成,整型需用NumberInt(),日期用new Date()。键值对有序,区分类型和大小写,不可重复,键命名需遵循UTF-8规范,避免特殊字符。批量插入失败不回滚已成功数据,建议用try-catch捕获异常。
|
18小时前
|
Java 调度
ScheduleExecutorService提交死循环任务
本文探讨向调度线程池提交死循环任务的影响。通过分析`scheduleAtFixedRate`的执行流程,揭示任务如何被包装、调度及执行。死循环将导致线程被永久占用,无法释放,若线程池容量有限,后续任务将被阻塞,影响整体调度。结合延时队列与任务重提交机制,完整呈现周期性任务的运行原理。
|
18小时前
|
Java 调度
ScheduleExecutorService
当ScheduledExecutorService的调度周期小于任务执行时间时,任务不会并发执行,而是等待前一次完成后再立即执行。因任务被放入延时队列,下次触发时间基于上一次设定周期计算,若未完成则延迟执行,实际调度周期等于任务执行时间,避免了任务堆积和并发冲突。
|
18小时前
3.3.3 集合的删除
集合删除语法为 `db.collection.drop()` 或 `db.集合.drop()`,成功返回 true,失败返回 false。例如:`db.mycollection.drop()` 可删除 mycollection 集合。
|
18小时前
|
缓存 算法 搜索推荐
Executors工厂类详解
本文详解Java中Executors类提供的12种线程池创建方法,涵盖newFixedThreadPool、newCachedThreadPool、newSingleThreadExecutor及newWorkStealingPool等,分析其核心参数、工作原理与适用场景,帮助开发者根据并发需求合理选择线程池类型,提升性能与资源利用率。
|
18小时前
3.5 文档的分页查询
统计查询使用count()方法,可统计集合中符合条件的记录数量。语法为db.collection.count(query, options)。如统计comment集合全部记录:db.comment.count();按条件统计userid为1003的记录数:db.comment.count({userid:&quot;1003&quot;})。默认返回所有匹配文档的数量。
|
18小时前
|
存储 NoSQL MongoDB
3.2 数据库操作
MongoDB中使用`use 数据库名`选择或创建数据库,若不存在则自动创建。通过`show dbs`查看所有数据库,`db`查看当前库。集合需插入文档后才真正创建。数据库名须为非空、小写、不超过64字节的UTF-8字符串,不可含特殊字符。保留库包括admin(权限管理)、local(本地数据)和config(分片信息)。
|
18小时前
|
XML NoSQL Java
5.3 技术选型
本节介绍MongoDB的Java连接驱动mongodb-driver及SpringDataMongoDB持久层框架,通过搭建文章微服务工程,配置Spring Boot整合MongoDB,实现项目初始化与启动。
|
18小时前
|
存储 NoSQL Linux
2.4 Linux系统中的安装启动和连接
本文介绍在Linux系统部署单机MongoDB用于生产环境的完整步骤,包括下载、解压、目录配置、日志与数据路径设置、配置文件编写及服务启停方法。操作类似Windows,通过配置`mongod.conf`实现后台运行,支持命令行与图形工具连接,并提供防火墙处理与安全关闭服务方案,确保稳定运行。