五、Executors工厂类详解

简介: 本文深入解析Java中Executors类提供的12种线程池创建方法,涵盖newFixedThreadPool、newCachedThreadPool、newWorkStealingPool及ScheduledExecutorService等,对比其核心参数、工作原理与适用场景,并结合源码分析任务调度机制、线程复用策略与队列行为,重点探讨周期任务调度延迟、死循环任务影响等实际问题,帮助开发者准确选择和使用线程池,提升并发编程能力。

首先列出了Executors这个类提供的一些方法。

Executors方法

本文需要对以上12个类做一些区分,从其特点出发,然后分析其应用场景。

public static ExecutorService newFixedThreadPool(int nThreads)

使用这个方法会产生这样一个线程池:线程池最多会保持nThreads个线程处于活动状态,如果当前所有任务都处于活动状态,那么新提交的任务会被添加到任务阻塞队列中去。总结一下就是:使用固定大小的线程池,并发数是固定的。

  • Creates a thread pool that reuses a fixed number of threads
    • operating off a shared unbounded queue. At any point, at most
    • {@code nThreads} threads will be active processing tasks.
    • If additional tasks are submitted when all threads are active,
    • they will wait in the queue until a thread is available.
    • If any thread terminates due to a failure during execution
    • prior to shutdown, a new one will take its place if needed to
    • execute subsequent tasks. The threads in the pool will exist
    • until it is explicitly {@link ExecutorService#shutdown shutdown}.

public static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory)

相比于newFixedThreadPool(int nThreads), 你可以使用这个方法来传递你自己的线程工厂,线程工厂是用来干嘛的?就是用来生成线程的,你可以使用线程工厂做一些个性化的线程特性定制。

public static ExecutorService newWorkStealingPool(int parallelism)

在了解或者使用这个方法之前,你你该对java的Fork/Join并行框架有一些了解,如果你想要快速了解一下该部分的内容,可以参考这篇文章:Java Fork/Join并行框架。

从名字上我们就知道这个方法生产出来的线程池具有某种“小偷”的行为,在Fork/Join里面,线程的工作模式为“盗窃算法”,也就是在自己的任务队列消费完了之后不是进入等到状态,而是会主动去偷窃别的线程的任务来做,其实是没有一种奖励机制来鼓励这些线程去帮助别的线程去消费任务的,所以可以认为这些线程都是好人,都为了快速完成任务协调作战。这种工作方式的重点在于,每个线程都将有一个任务队列,线程之间通过“偷窃”的方式互相帮助来完成任务的消费。
可以看下这个方法的实现:

return new ForkJoinPool(parallelism, ForkJoinPool.defaultForkJoinWorkerThreadFactory, null, true);

可以发现,这个方法不是使用我们在第一篇文章中分析了ThreadPoolExecutor来生成线程池。而是使用了ForkJoinPool,也就是Fork/Join里面的线程池,关于ForkJoinPool更为深入的分析不再本文的涉及范围内,你只要知道Fork/Join框架的一般运行原理就可以了,下面的描述可以帮助你决策你是否需要该方法提供的线程池来工作:

  • Creates a thread pool that maintains enough threads to support
    • the given parallelism level, and may use multiple queues to
    • reduce contention. The parallelism level corresponds to the
    • maximum number of threads actively engaged in, or available to
    • engage in, task processing. The actual number of threads may
    • grow and shrink dynamically. A work-stealing pool makes no
    • guarantees about the order in which submitted tasks are
    • executed.

public static ExecutorService newWorkStealingPool()

参考newWorkStealingPool(int parallelism)。

public static ExecutorService newSingleThreadExecutor()

下面是对该方法的描述:

  • Creates an Executor that uses a single worker thread operating
    • off an unbounded queue. (Note however that if this single
    • thread terminates due to a failure during execution prior to
    • shutdown, a new one will take its place if needed to execute
    • subsequent tasks.) Tasks are guaranteed to execute
    • sequentially, and no more than one task will be active at any
    • given time. Unlike the otherwise equivalent
    • {@code newFixedThreadPool(1)} the returned executor is
    • guaranteed not to be reconfigurable to use additional threads.

可以从方法的名字上知道,该方法产生的线程池仅仅有一个Worker,任何时刻都将只有一个Worker在工作,添加的任务有很大概率被放在阻塞任务队列中等待执行。这些任务会被顺序执行,这个方法的返回值其实是对ThreadPoolExecutor的一层包装,下面的代码展示了最终执行任务的类:

static class DelegatedExecutorService extends AbstractExecutorService {
private final ExecutorService e;
DelegatedExecutorService(ExecutorService executor) { e = executor; }
public void execute(Runnable command) { e.execute(command); }
public void shutdown() { e.shutdown(); }
public List shutdownNow() { return e.shutdownNow(); }
public boolean isShutdown() { return e.isShutdown(); }
public boolean isTerminated() { return e.isTerminated(); }
public boolean awaitTermination(long timeout, TimeUnit unit)
throws InterruptedException {
return e.awaitTermination(timeout, unit);
}
public Future<?> submit(Runnable task) {
return e.submit(task);
}
public Future submit(Callable task) {
return e.submit(task);
}
public Future submit(Runnable task, T result) {
return e.submit(task, result);
}
public List> invokeAll(Collection<? extends Callable> tasks)
throws InterruptedException {
return e.invokeAll(tasks);
}
public List> invokeAll(Collection<? extends Callable> tasks,
long timeout, TimeUnit unit)
throws InterruptedException {
return e.invokeAll(tasks, timeout, unit);
}
public T invokeAny(Collection<? extends Callable> tasks)
throws InterruptedException, ExecutionException {
return e.invokeAny(tasks);
}
public T invokeAny(Collection<? extends Callable> tasks,
long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException {
return e.invokeAny(tasks, timeout, unit);
}
}

从上面的代码可以看出,这个类其实就是使用了构造时传递的参数e来完成,更像是代理。而e是什么?看下面的代码:

ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue())

其实就是一个只有一个线程的ThreadPoolExecutor。

public static ExecutorService newSingleThreadExecutor(ThreadFactory threadFactory)

参考newSingleThreadExecutor(),多了一个线程工厂参数

public static ExecutorService newCachedThreadPool()

首先看它的方法体内容:

return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue());

可以看到,核心线程数量为0,而上限为Integer.MAX_VALUE,而且keepAliveTime为60秒,那么这个线程池的工作模式为:只要有任务呗提交,而且当前没有空闲的线程可用,那么就会创建一个新的Worker来工作,一个线程工作完了之后会缓存(idle)60秒,如果60秒之内有新的任务提交,则会被唤醒进入工作模式,否则60秒后就会被回收。可以参考下面的描述:

  • Creates a thread pool that creates new threads as needed, but
    • will reuse previously constructed threads when they are
    • available. These pools will typically improve the performance
    • of programs that execute many short-lived asynchronous tasks.
    • Calls to {@code execute} will reuse previously constructed
    • threads if available. If no existing thread is available, a new
    • thread will be created and added to the pool. Threads that have
    • not been used for sixty seconds are terminated and removed from
    • the cache. Thus, a pool that remains idle for long enough will
    • not consume any resources. Note that pools with similar
    • properties but different details (for example, timeout parameters)
    • may be created using {@link ThreadPoolExecutor} constructors.

从描述上,我们可以想到,其实这种类型的线程池比较适合于短期高流量的场景,也就是我们所说的“秒杀”场景,在那样的场景下,需要的线程数量较多,那么使用该类型的线程池可以满足,而且该线程池还有自动收缩的功能,在不需要那么多线程的时候,会自动回收线程,释放资源。

public static ExecutorService newCachedThreadPool(ThreadFactory threadFactory)

参考newCachedThreadPool()。

public static ScheduledExecutorService newSingleThreadScheduledExecutor()

只有一个线程的调度线程池,类似于newSingleThreadExecutor,但是该方法生产的线程池具备调度功能,下面是对该方法的描述:

  • Creates a single-threaded executor that can schedule commands
    • to run after a given delay, or to execute periodically.
    • (Note however that if this single
    • thread terminates due to a failure during execution prior to
    • shutdown, a new one will take its place if needed to execute
    • subsequent tasks.) Tasks are guaranteed to execute
    • sequentially, and no more than one task will be active at any
    • given time. Unlike the otherwise equivalent
    • {@code newScheduledThreadPool(1)} the returned executor is
    • guaranteed not to be reconfigurable to use additional threads.

public static ScheduledExecutorService newSingleThreadScheduledExecutor(ThreadFactory threadFactory)

参考newSingleThreadExecutor和newSingleThreadScheduledExecutor。

public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize)

参考newFixedThreadPool,但是返回类型不一样。

public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize, ThreadFactory threadFactory)

参考newFixedThreadPool。

通过上面的分析,我们应该对java线程池的理解更为深入,再次说明,第五节是对前面四节内容的补充,你应该首先前四节之后再来阅读第五节,那样内容上更完整,但是单独阅读本文一样具备独立性,但是收获肯定没有同时阅读那样多。

六、ScheduleExecutorService

如果在一个ScheduleExecutorService中提交一个任务,这个任务的调度周期设置
的时间比任务本身执行的时间短的话会出现什么情况?也就是在线程调度时间已经到了
但是上次的任务还没有做完的情况下,ScheduleExecutorService是怎么处理的?

这个问题曾经困扰了我很久,我们都知道,ScheduleExecutorService是一个支持周期调度的线程池,我们可以设置调度的周期period,ScheduleExecutorService会按照设定好的周期调度我们的任务,如果我们设定的调度周期小于任务运行时间,那么很好理解,比如说我们设置的调度周期为1秒,而任务实际只需要10毫秒就可以执行完成一次,那么执行完成之后放到调度队列即可,下次调度时间到了再次调度执行。那么,如果我们的任务执行时间大于我们设定的调度时间会怎么样?比如我们设定的调度周期为1秒,但是我们的任务每次需要执行2秒,这个情况是不是很奇怪呢?
对于ScheduleExecutorService来说,你给我设定的调度周期是1秒,那么我当然1秒就会去运行一次你,但是运行1秒后发现你还在运行,那我是再次运行你还是等你运行完成再调度你运行?
当然,这都是我的主观臆断来猜测ScheduleExecutorService的原理,ScheduleExecutorService的真正原理需要去阅读源码来理解,下面带着这个问题,以解决这个问题为目标去看一下ScheduleExecutorService的源码吧。
首先,我们使用下面的代码作为测试:

private static Runnable blockRunner = () -> {
try {
TimeUnit.SECONDS.sleep(2);
System.out.println("one round:" + new Date());
} catch (InterruptedException e) {
e.printStackTrace();
}
};
private static ScheduledExecutorService scheduledExecutorService =
Executors.newScheduledThreadPool(2);
public static void main(String ... args) {
scheduledExecutorService
.scheduleAtFixedRate(blockRunner, 0, 100, TimeUnit.MILLISECONDS);
}
我们设定了调度周期为100毫秒,但是blockRunner实际上需要执行2秒才能返回。关于java的线程池,已经在前面写到了。
先来看一下scheduleAtFixedRate这个方法:

public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
long initialDelay,
long period,
TimeUnit unit) {
if (command == null || unit == null)
throw new NullPointerException();
if (period <= 0)
throw new IllegalArgumentException();
ScheduledFutureTask sft =
new ScheduledFutureTask(command,
null,
triggerTime(initialDelay, unit),
unit.toNanos(period));
RunnableScheduledFuture t = decorateTask(command, sft);
sft.outerTask = t;
delayedExecute(t);
return t;
}

我们的任务command被包装了两次,一次变成了一个ScheduledFutureTask类型的对象,然后又变成了RunnableScheduledFuture类型的对象。然后执行了一个方法delayedExecute,这个方法字面意思上看起来像是延时执行的意思,看一下它的代码:

private void delayedExecute(RunnableScheduledFuture<?> task) {
if (isShutdown())
reject(task);
else {
super.getQueue().add(task);
if (isShutdown() &&
!canRunInCurrentRunState(task.isPeriodic()) &&
remove(task))
task.cancel(false);
else
ensurePrestart();
}
}

它的执行逻辑是:如果线程池被关闭了,那么拒绝提交的任务,否则,将该任务添加队列中去。这个队列就是ThreadPoolExecutor中的workQueue,而这个workQueue是在ThreadPoolExecutor的构造函数中被初始化的,也就是下面这关键的一句:

public ScheduledThreadPoolExecutor(int corePoolSize,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
new DelayedWorkQueue(), threadFactory, handler);
}

也就是说,我们的任务被添加到了一个DelayedWorkQueue队列中去了,而DelayedWorkQueue我们在Java阻塞队列详解中已经分析过,它是一个可以延迟消费的阻塞队列。而延时的时间是通过接口Delayed的getDelay方法来获得的,我们最后找到ScheduledFutureTask实现了Delayed的getDelay方法。

public long getDelay(TimeUnit unit) {
return unit.convert(time - now(), NANOSECONDS);
}

time变量是什么?原来是delay,好像和period无关啊!!分析了这么久,发现这是第一次执行任务的逻辑啊,我想知道的是第二次、第三次以后和初始的delay无关之后的周期调度的情况啊,继续找吧!
然后发现了ScheduledFutureTask的run方法,很明显这就是任务调度被执行的关键所在,看下代码:

public void run() {
boolean periodic = isPeriodic();
if (!canRunInCurrentRunState(periodic))
cancel(false);
else if (!periodic)
ScheduledFutureTask.super.run();
else if (ScheduledFutureTask.super.runAndReset()) {
setNextRunTime();
reExecutePeriodic(outerTask);
}
}
}

最为关键的地方在于:

else if (ScheduledFutureTask.super.runAndReset()) {
setNextRunTime();
reExecutePeriodic(outerTask);
}

首先是:runAndReset()这个方法,然后是setNextRunTime()这个方法,然后是reExecutePeriodic(outerTask)这个方法。

第一个方法runAndReset()貌似是执行我们的提交的任务的,我们看下代码:

protected boolean runAndReset() {
if (state != NEW || !UNSAFE.compareAndSwapObject(this, runnerOffset, null, Thread.currentThread()))
return false;
boolean ran = false;
int s = state;
try {
Callable c = callable;
if (c != null && s == NEW) {
try {
c.call(); // don't set result
ran = true;
} catch (Throwable ex) {
setException(ex);
}
}
} finally {
// runner must be non-null until state is settled to
// prevent concurrent calls to run()
runner = null;
// state must be re-read after nulling runner to prevent
// leaked interrupts
s = state;
if (s >= INTERRUPTING)
handlePossibleCancellationInterrupt(s);
}
return ran && s == NEW;
}

关键的地方是c.call()这一句,这个c就是我们提交的任务。
第二个方法setNextRunTime()的意思是设置下次执行的时间,下面是他的代码细节:

private void setNextRunTime() {
long p = period;
if (p > 0)
time += p;
else
time = triggerTime(-p);
}

我们只需要看p>0这个分支就可以了,其实这是两种策略。我们的示例对应了第一个分支的策略,所以很显然,time这个变量会被加p,而p则是我们设定好的period。下面我们找一下这个time是在哪里初始化的,回忆一下scheduleAtFixedRate这个方法的内,我们说我们的任务被包装了两次,而time就是在这里被初始化的:

/**

  • Returns the trigger time of a delayed action.
    */
    private long triggerTime(long delay, TimeUnit unit) {
    return triggerTime(unit.toNanos((delay < 0) ? 0 : delay));
    }

/**

  • Returns the trigger time of a delayed action.
    */
    long triggerTime(long delay) {
    return now() +
      ((delay < (Long.MAX_VALUE >> 1)) ? delay : overflowFree(delay));
    
    }

无论如何,我们知道一个任务会被运行完一次之后再次设置时间,然后线程池会获取任务来执行,而任务队列是一个延时阻塞队列,所以也就造成了周期性运行的假象。可以看下下面获取任务的take方法:

public RunnableScheduledFuture<?> take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
for (;;) {
RunnableScheduledFuture<?> first = queue[0];
if (first == null)
available.await();
else {
long delay = first.getDelay(NANOSECONDS);
if (delay <= 0)
return finishPoll(first);
first = null; // don't retain ref while waiting
if (leader != null)
available.await();
else {
Thread thisThread = Thread.currentThread();
leader = thisThread;
try {
available.awaitNanos(delay);
} finally {
if (leader == thisThread)
leader = null;
}
}
}
}
} finally {
if (leader == null && queue[0] != null)
available.signal();
lock.unlock();
}
}

可以看到,如果delay小于等于0,那么就是说需要被立即调度,否则延时delay这样一段时间。也就是延时消费。
结论就是:一个任务会被重复添加到一个延时任务队列,所以同一时间任务队列中会有多个任务待调度,线程池会首先获取优先级高的任务执行。如果我们的任务运行时间大于设置的调度时间,那么效果就是任务运行多长时间,调度时间就会变为多久,因为添加到任务队列的任务的延时时间每次都是负数,所以会被立刻执行

上面列出了最近写的关于java线程池ScheduleExecutorService的内容,可以作为参考,本文是对ScheduleExecutorService学习和总结的一个收尾,对java线程池技术更为深入的学习和总结将在未来适宜的时候进行。
6.1 ScheduleExecutorService提交死循环任务

首先提出一个问题,如果向一个调度线程池提交一个死循环任务会发生什么?为了内容的完整性,本文会提到一些在上面列出的文章中已经涉及到的内容。
比如我们运行下面的代码:

ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(1);
private static Runnable loopRunner = () -> {
for(;;){
}
};
scheduledExecutorService .scheduleAtFixedRate(loopRunner, 0, 100, TimeUnit.MILLISECONDS);

loopRunner里面只有一个死循环什么也不做,当然这是极端情况,更为一般的情况为在for(;;)里面做一些某种驱动类型的工作,比如Netty的EventLoop一样,那样的循环更有意义,但是本文只是为了学习当向一个调度线程池提交了一个死循环任务之后的运行情况。
下面我们就分析一下scheduleAtFixedRate方法的调用链路:
1、将loopRunner包装成一个ScheduledFutureTask对象,ScheduledFutureTask这个类对于调度线程池至关重要
2、再次包装变为RunnableScheduledFuture对象
3、delayedExecute方法运行,确保任务被正确处理,如果线程池已经被关闭了,那么拒绝任务的提交,否则将任务添加到一个延时队列(workQueue)中去,这是一个具有延时功能的阻塞队列,初始容量为16,每次扩容增加50%的容量,最大容量为Integer.MAX_VALUE
4、运行方法ensurePrestart,确保线程池已经开始工作了,如果线程池里面的线程数量还没有达到设定的corePoolSize,那么就添加一个新的Worker,然后让这个Worker去延时队列去获取任务来执行
5、方法addWorker执行,添加一个Worker,然后让他执行我们提交的任务,下面摘取一段addWorker的方法内容:

/**

  • 完整代码见源码,下面只是摘取了部分,去除了一些不影响阅读的部分
    */
    private boolean addWorker(Runnable firstTask, boolean core) {

    boolean workerStarted = false;
    boolean workerAdded = false;
    Worker w = null;
    try {

      w = new Worker(firstTask);
      final Thread t = w.thread;
      if (t != null) {
          final ReentrantLock mainLock = this.mainLock;
          mainLock.lock();
          try {
              int rs = runStateOf(ctl.get());
              if (rs < SHUTDOWN || (rs == SHUTDOWN && firstTask == null)) {
                  if (t.isAlive()) // precheck that t is startable
                      throw new IllegalThreadStateException();
                  workers.add(w); //添加一个新的worker
                  int s = workers.size();
                  if (s > largestPoolSize)
                      largestPoolSize = s; 
                  workerAdded = true;
              }
          } finally {
              mainLock.unlock();
          }
          if (workerAdded) { //如果添加新的Worker成功,那么就启动它来执行我们提交的任务
              t.start();
              workerStarted = true;
          }
      }
    

    }
    return workerStarted;
    }
    6、第五步中最为重要的一句话就是t.start(),这句话的执行会发生什么?首先看这个t是什么东西:

Worker(Runnable firstTask) {
setState(-1); // inhibit interrupts until runWorker
this.firstTask = firstTask;
this.thread = getThreadFactory().newThread(this);
}

而this就是Worker自身,而Worker是实现了Runnable的,也就是说,t.start()这句话会执行worker自身的run方法

private final class Worker extends AbstractQueuedSynchronizer implements Runnable

7、我们已经知道现在会执行Worker的run方法,下面是run方法的内容:

public void run() {
runWorker(this);
}
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock(); // allow interrupts
boolean completedAbruptly = true;
try {
while (task != null || (task = getTask()) != null) {
w.lock();
// If pool is stopping, ensure thread is interrupted;
// if not, ensure thread is not interrupted. This
// requires a recheck in second case to deal with
// shutdownNow race while clearing interrupt
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
beforeExecute(wt, task);
Throwable thrown = null;
try {
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
afterExecute(task, thrown);
}
} finally {
task = null;
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
processWorkerExit(w, completedAbruptly);
}
}
首先从Worker中获取其负责的task,如果task为空,那么就去延时队列获取任务,如果没有获取到任务那么线程就可以休息了,如果获取到,那么继续执行下面的内容。主要的就是一句:task.run(),那这句话会发生什么呢?
8、想要知道task.run()之后会发生什么,就需要知道task是个什么东西,第二步的时候说过,也就是我们的任务,只是被包装成了一个RunnableScheduledFuture对象,那现在就去看RunnableScheduledFuture这个方法里面的run会发生什么,下面展示了其run方法的具体细节:

public void run() {
boolean periodic = isPeriodic();
if (!canRunInCurrentRunState(periodic))
cancel(false);
else if (!periodic)
ScheduledFutureTask.super.run();
else if (ScheduledFutureTask.super.runAndReset()) {
setNextRunTime();
reExecutePeriodic(outerTask);
}
}
如果这不是一个周期性的任务,那么就执行super的run方法,否则执行runAndReset方法,介于本文是问题导向的文章,所以在此不对super的run方法和runAndReset方法做分析,只要知道这就是执行我们实际提交的任务就好了。也就是说,走到这一步,我们的任务开始运行起来了,也就是我们的那个loopRunner开始无限循环了,下面的代码将永远得不到执行。所以,到这一步就可以解决问题了,向一个调度线程池提交一个死循环的任务,那么这个任务会霸占一个线程一直不会释放,如果很不幸线程池里面只允许有一个线程的话,那么其他提交的任务都将得不到调度执行。
9、为了走通整个流程,我们假设我们提交的不是一个死循环任务,那么提交的任务总是会被执行完的,线程总是会被释放的,那么就会执行setNextRunTime这个方法,下面是这个方法的细节:

private void setNextRunTime() {
long p = period;
if (p > 0)
time += p;
else
time = triggerTime(-p);
}
p > 0代表的是scheduleAtFixedRate,p < 0代表的是scheduleWithFixedDelay,两者的区别在于前者总是按照设定的轨迹来设定下次应该调度的时间,而后者总是在任务执行完成之后再根据周期设定下一次应该执行的时间。我们只分析前者。对于第一次提交的任务,time等于当前时间 + 首次延时执行的时间,对于delay等于0的情况下,首次提交任务的time就是当前时间,然后 + p代表的是下一次应该被调度的时间。
10、我们发现,每个任务都是在执行完一次之后再设定下次执行任务的时间的,这也特别关键。设定好下次调度的时间,那么就要开始去准备执行它吧,也就是reExecutePeriodic方法会执行,下面是reExecutePeriodic方法的内容:

void reExecutePeriodic(RunnableScheduledFuture<?> task) {
if (canRunInCurrentRunState(true)) {
super.getQueue().add(task);
if (!canRunInCurrentRunState(true) && remove(task))
task.cancel(false);
else
ensurePrestart();
}
}
这个方法只是将任务重新提交到了延时队列而已,一次完整的流程到底也就结束了,为了内容的完整性,再来分析一下一个Worker从延时队列获取任务时的情况。回到第七步,我们有一个方法没有提到,那就是getTask():

private Runnable getTask() {
boolean timedOut = false; // Did the last poll() time out?
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// Check if queue empty only if necessary.
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
decrementWorkerCount();
return null;
}
int wc = workerCountOf(c);
// Are workers subject to culling?
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
if ((wc > maximumPoolSize || (timed && timedOut))
&& (wc > 1 || workQueue.isEmpty())) {
if (compareAndDecrementWorkerCount(c))
return null;
continue;
}
try {
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
if (r != null)
return r;
timedOut = true;
} catch (InterruptedException retry) {
timedOut = false;
}
}
}
我们主要来看两个方法: poll/take,这两个方法都是从延时队列获取一个任务,下面是poll的代码,take会阻塞一直到获取到内容,而poll则不会阻塞,take的代码就不粘了:

public RunnableScheduledFuture<?> poll(long timeout, TimeUnit unit) throws InterruptedException {
long nanos = unit.toNanos(timeout);
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
for (;;) {
RunnableScheduledFuture<?> first = queue[0];
if (first == null) {
if (nanos <= 0)
return null;
else
nanos = available.awaitNanos(nanos);
} else {
long delay = first.getDelay(NANOSECONDS);
if (delay <= 0)
return finishPoll(first);
if (nanos <= 0)
return null;
first = null; // don't retain ref while waiting
if (nanos < delay || leader != null)
nanos = available.awaitNanos(nanos);
else {
Thread thisThread = Thread.currentThread();
leader = thisThread;
try {
long timeLeft = available.awaitNanos(delay);
nanos -= delay - timeLeft;
} finally {
if (leader == thisThread)
leader = null;
}
}
}
}
} finally {
if (leader == null && queue[0] != null)
available.signal();
lock.unlock();
}
}
poll的代码最为核心的内容就是,获取队列首部的任务,然后获取其延时时间,这个时间是我们在完成一次调度之后设置的下次调度时间,如果任务的运行时间大于我们设定的周期的话,这个延时时间就是负数的,那么就会被立即执行,否则会等到设定的时间,时间到了再返回给Worker执行。

相关文章
|
1天前
|
数据采集 人工智能 安全
|
11天前
|
云安全 监控 安全
|
3天前
|
自然语言处理 API
万相 Wan2.6 全新升级发布!人人都能当导演的时代来了
通义万相2.6全新升级,支持文生图、图生视频、文生视频,打造电影级创作体验。智能分镜、角色扮演、音画同步,让创意一键成片,大众也能轻松制作高质量短视频。
985 151
|
2天前
|
编解码 人工智能 机器人
通义万相2.6,模型使用指南
智能分镜 | 多镜头叙事 | 支持15秒视频生成 | 高品质声音生成 | 多人稳定对话
|
16天前
|
机器学习/深度学习 人工智能 自然语言处理
Z-Image:冲击体验上限的下一代图像生成模型
通义实验室推出全新文生图模型Z-Image,以6B参数实现“快、稳、轻、准”突破。Turbo版本仅需8步亚秒级生成,支持16GB显存设备,中英双语理解与文字渲染尤为出色,真实感和美学表现媲美国际顶尖模型,被誉为“最值得关注的开源生图模型之一”。
1686 8
|
8天前
|
人工智能 自然语言处理 API
一句话生成拓扑图!AI+Draw.io 封神开源组合,工具让你的效率爆炸
一句话生成拓扑图!next-ai-draw-io 结合 AI 与 Draw.io,通过自然语言秒出架构图,支持私有部署、免费大模型接口,彻底解放生产力,绘图效率直接爆炸。
630 152
|
10天前
|
人工智能 安全 前端开发
AgentScope Java v1.0 发布,让 Java 开发者轻松构建企业级 Agentic 应用
AgentScope 重磅发布 Java 版本,拥抱企业开发主流技术栈。
604 14
|
9天前
|
人工智能 自然语言处理 API
Next AI Draw.io:当AI遇见Draw.io图表绘制
Next AI Draw.io 是一款融合AI与图表绘制的开源工具,基于Next.js实现,支持自然语言生成架构图、流程图等专业图表。集成多款主流大模型,提供智能绘图、图像识别优化、版本管理等功能,部署简单,安全可控,助力技术文档与系统设计高效创作。
679 151