ScheduleExecutorService

简介: 当ScheduledExecutorService的调度周期小于任务执行时间时,任务不会并发执行,而是等待前一次完成后再立即执行。因任务被放入延时队列,下次触发时间基于上一次设定周期计算,若未完成则延迟执行,实际调度周期等于任务执行时间,避免了任务堆积和并发冲突。

如果在一个ScheduleExecutorService中提交一个任务,这个任务的调度周期设置
的时间比任务本身执行的时间短的话会出现什么情况?也就是在线程调度时间已经到了
但是上次的任务还没有做完的情况下,ScheduleExecutorService是怎么处理的?

这个问题曾经困扰了我很久,我们都知道,ScheduleExecutorService是一个支持周期调度的线程池,我们可以设置调度的周期period,ScheduleExecutorService会按照设定好的周期调度我们的任务,如果我们设定的调度周期小于任务运行时间,那么很好理解,比如说我们设置的调度周期为1秒,而任务实际只需要10毫秒就可以执行完成一次,那么执行完成之后放到调度队列即可,下次调度时间到了再次调度执行。那么,如果我们的任务执行时间大于我们设定的调度时间会怎么样?比如我们设定的调度周期为1秒,但是我们的任务每次需要执行2秒,这个情况是不是很奇怪呢?
对于ScheduleExecutorService来说,你给我设定的调度周期是1秒,那么我当然1秒就会去运行一次你,但是运行1秒后发现你还在运行,那我是再次运行你还是等你运行完成再调度你运行?
当然,这都是我的主观臆断来猜测ScheduleExecutorService的原理,ScheduleExecutorService的真正原理需要去阅读源码来理解,下面带着这个问题,以解决这个问题为目标去看一下ScheduleExecutorService的源码吧。
首先,我们使用下面的代码作为测试:

private static Runnable blockRunner = () -> {
try {
TimeUnit.SECONDS.sleep(2);
System.out.println("one round:" + new Date());
} catch (InterruptedException e) {
e.printStackTrace();
}
};
private static ScheduledExecutorService scheduledExecutorService =
Executors.newScheduledThreadPool(2);
public static void main(String ... args) {
scheduledExecutorService
.scheduleAtFixedRate(blockRunner, 0, 100, TimeUnit.MILLISECONDS);
}
我们设定了调度周期为100毫秒,但是blockRunner实际上需要执行2秒才能返回。关于java的线程池,已经在前面写到了。
先来看一下scheduleAtFixedRate这个方法:

public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
long initialDelay,
long period,
TimeUnit unit) {
if (command == null || unit == null)
throw new NullPointerException();
if (period <= 0)
throw new IllegalArgumentException();
ScheduledFutureTask sft =
new ScheduledFutureTask(command,
null,
triggerTime(initialDelay, unit),
unit.toNanos(period));
RunnableScheduledFuture t = decorateTask(command, sft);
sft.outerTask = t;
delayedExecute(t);
return t;
}

我们的任务command被包装了两次,一次变成了一个ScheduledFutureTask类型的对象,然后又变成了RunnableScheduledFuture类型的对象。然后执行了一个方法delayedExecute,这个方法字面意思上看起来像是延时执行的意思,看一下它的代码:

private void delayedExecute(RunnableScheduledFuture<?> task) {
if (isShutdown())
reject(task);
else {
super.getQueue().add(task);
if (isShutdown() &&
!canRunInCurrentRunState(task.isPeriodic()) &&
remove(task))
task.cancel(false);
else
ensurePrestart();
}
}

它的执行逻辑是:如果线程池被关闭了,那么拒绝提交的任务,否则,将该任务添加队列中去。这个队列就是ThreadPoolExecutor中的workQueue,而这个workQueue是在ThreadPoolExecutor的构造函数中被初始化的,也就是下面这关键的一句:

public ScheduledThreadPoolExecutor(int corePoolSize,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
new DelayedWorkQueue(), threadFactory, handler);
}

也就是说,我们的任务被添加到了一个DelayedWorkQueue队列中去了,而DelayedWorkQueue我们在Java阻塞队列详解中已经分析过,它是一个可以延迟消费的阻塞队列。而延时的时间是通过接口Delayed的getDelay方法来获得的,我们最后找到ScheduledFutureTask实现了Delayed的getDelay方法。

public long getDelay(TimeUnit unit) {
return unit.convert(time - now(), NANOSECONDS);
}

time变量是什么?原来是delay,好像和period无关啊!!分析了这么久,发现这是第一次执行任务的逻辑啊,我想知道的是第二次、第三次以后和初始的delay无关之后的周期调度的情况啊,继续找吧!
然后发现了ScheduledFutureTask的run方法,很明显这就是任务调度被执行的关键所在,看下代码:

public void run() {
boolean periodic = isPeriodic();
if (!canRunInCurrentRunState(periodic))
cancel(false);
else if (!periodic)
ScheduledFutureTask.super.run();
else if (ScheduledFutureTask.super.runAndReset()) {
setNextRunTime();
reExecutePeriodic(outerTask);
}
}
}

最为关键的地方在于:

else if (ScheduledFutureTask.super.runAndReset()) {
setNextRunTime();
reExecutePeriodic(outerTask);
}

首先是:runAndReset()这个方法,然后是setNextRunTime()这个方法,然后是reExecutePeriodic(outerTask)这个方法。

第一个方法runAndReset()貌似是执行我们的提交的任务的,我们看下代码:

protected boolean runAndReset() {
if (state != NEW || !UNSAFE.compareAndSwapObject(this, runnerOffset, null, Thread.currentThread()))
return false;
boolean ran = false;
int s = state;
try {
Callable c = callable;
if (c != null && s == NEW) {
try {
c.call(); // don't set result
ran = true;
} catch (Throwable ex) {
setException(ex);
}
}
} finally {
// runner must be non-null until state is settled to
// prevent concurrent calls to run()
runner = null;
// state must be re-read after nulling runner to prevent
// leaked interrupts
s = state;
if (s >= INTERRUPTING)
handlePossibleCancellationInterrupt(s);
}
return ran && s == NEW;
}

关键的地方是c.call()这一句,这个c就是我们提交的任务。
第二个方法setNextRunTime()的意思是设置下次执行的时间,下面是他的代码细节:

private void setNextRunTime() {
long p = period;
if (p > 0)
time += p;
else
time = triggerTime(-p);
}

我们只需要看p>0这个分支就可以了,其实这是两种策略。我们的示例对应了第一个分支的策略,所以很显然,time这个变量会被加p,而p则是我们设定好的period。下面我们找一下这个time是在哪里初始化的,回忆一下scheduleAtFixedRate这个方法的内,我们说我们的任务被包装了两次,而time就是在这里被初始化的:

/**

  • Returns the trigger time of a delayed action.
    */
    private long triggerTime(long delay, TimeUnit unit) {
    return triggerTime(unit.toNanos((delay < 0) ? 0 : delay));
    }

/**

  • Returns the trigger time of a delayed action.
    */
    long triggerTime(long delay) {
    return now() +
      ((delay < (Long.MAX_VALUE >> 1)) ? delay : overflowFree(delay));
    
    }

无论如何,我们知道一个任务会被运行完一次之后再次设置时间,然后线程池会获取任务来执行,而任务队列是一个延时阻塞队列,所以也就造成了周期性运行的假象。可以看下下面获取任务的take方法:

public RunnableScheduledFuture<?> take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
for (;;) {
RunnableScheduledFuture<?> first = queue[0];
if (first == null)
available.await();
else {
long delay = first.getDelay(NANOSECONDS);
if (delay <= 0)
return finishPoll(first);
first = null; // don't retain ref while waiting
if (leader != null)
available.await();
else {
Thread thisThread = Thread.currentThread();
leader = thisThread;
try {
available.awaitNanos(delay);
} finally {
if (leader == thisThread)
leader = null;
}
}
}
}
} finally {
if (leader == null && queue[0] != null)
available.signal();
lock.unlock();
}
}

可以看到,如果delay小于等于0,那么就是说需要被立即调度,否则延时delay这样一段时间。也就是延时消费。
结论就是:一个任务会被重复添加到一个延时任务队列,所以同一时间任务队列中会有多个任务待调度,线程池会首先获取优先级高的任务执行。如果我们的任务运行时间大于设置的调度时间,那么效果就是任务运行多长时间,调度时间就会变为多久,因为添加到任务队列的任务的延时时间每次都是负数,所以会被立刻执行

上面列出了最近写的关于java线程池ScheduleExecutorService的内容,可以作为参考,本文是对ScheduleExecutorService学习和总结的一个收尾,对java线程池技术更为深入的学习和总结将在未来适宜的时候进行。

相关文章
|
18小时前
3.5 文档的分页查询
统计查询使用count()方法,可统计集合中符合条件的记录数量。语法为db.collection.count(query, options)。如统计comment集合全部记录:db.comment.count();按条件统计userid为1003的记录数:db.comment.count({userid:&quot;1003&quot;})。默认返回所有匹配文档的数量。
|
18小时前
|
XML NoSQL Java
5.3 技术选型
本节介绍MongoDB的Java连接驱动mongodb-driver及SpringDataMongoDB持久层框架,通过搭建文章微服务工程,配置Spring Boot整合MongoDB,实现项目初始化与启动。
|
18小时前
|
存储 NoSQL MongoDB
3.2 数据库操作
MongoDB中使用`use 数据库名`选择或创建数据库,若不存在则自动创建。通过`show dbs`查看所有数据库,`db`查看当前库。集合需插入文档后才真正创建。数据库名须为非空、小写、不超过64字节的UTF-8字符串,不可含特殊字符。保留库包括admin(权限管理)、local(本地数据)和config(分片信息)。
|
18小时前
|
存储 NoSQL Linux
2.4 Linux系统中的安装启动和连接
本文介绍在Linux系统部署单机MongoDB用于生产环境的完整步骤,包括下载、解压、目录配置、日志与数据路径设置、配置文件编写及服务启停方法。操作类似Windows,通过配置`mongod.conf`实现后台运行,支持命令行与图形工具连接,并提供防火墙处理与安全关闭服务方案,确保稳定运行。
|
18小时前
|
NoSQL MongoDB
2.3 Compass-图形化界面客户端
前往MongoDB官网下载MongoDB Compass,安装版按提示安装,压缩版解压后运行MongoDBCompassCommunity.exe。打开后输入主机地址、端口等信息,点击连接即可使用。
|
18小时前
|
存储 NoSQL 关系型数据库
4-MongoDB索引知识
MongoDB索引通过B树结构提升查询效率,避免全表扫描。支持单字段和复合索引,前者适用于单一字段排序与查询,后者按字段顺序构建,优化多条件查询与排序操作,显著提升大数据量下的查询性能。
|
18小时前
5.5 文章评论实体类的编写
创建实体类Comment,位于cn.itcast.article.po包中,包含评论的ID、内容、发布时间、用户信息、点赞数、回复数、状态及关联文章ID等属性,并提供getter/setter及toString方法,用于封装评论数据。
|
18小时前
3.5.3 排序查询
sort()方法用于对查询结果排序,1为升序,-1为降序,可按多字段排序。语法:db.集合名.find().sort({字段:1/-1})。与skip()、limit()联用时,执行顺序为先排序,再跳过,最后限制显示。
|
18小时前
3.5.2 分页列表查询
使用limit()限制查询返回的记录数,skip()跳过指定数量数据,实现分页。如每页2条:第一页skip(0).limit(2),第二页skip(2).limit(2),以此类推,默认limit为20,skip为0。