源码分析
工作线程内部类Worker源码分析
线程池中的每一个具体的工作线程被包装为内部类Worker
实例,Worker
继承于AbstractQueuedSynchronizer(AQS)
,实现了Runnable
接口:
private final class Worker extends AbstractQueuedSynchronizer implements Runnable{ /** * This class will never be serialized, but we provide a * serialVersionUID to suppress a javac warning. */ private static final long serialVersionUID = 6138294804551838833L; // 保存ThreadFactory创建的线程实例,如果ThreadFactory创建线程失败则为null final Thread thread; // 保存传入的Runnable任务实例 Runnable firstTask; // 记录每个线程完成的任务总数 volatile long completedTasks; // 唯一的构造函数,传入任务实例firstTask,注意可以为null Worker(Runnable firstTask) { // 禁止线程中断,直到runWorker()方法执行 setState(-1); // inhibit interrupts until runWorker this.firstTask = firstTask; // 通过ThreadFactory创建线程实例,注意一下Worker实例自身作为Runnable用于创建新的线程实例 this.thread = getThreadFactory().newThread(this); } // 委托到外部的runWorker()方法,注意runWorker()方法是线程池的方法,而不是Worker的方法 public void run() { runWorker(this); } // Lock methods // // The value 0 represents the unlocked state. // The value 1 represents the locked state. // 是否持有独占锁,state值为1的时候表示持有锁,state值为0的时候表示已经释放锁 protected boolean isHeldExclusively() { return getState() != 0; } // 独占模式下尝试获取资源,这里没有判断传入的变量,直接CAS判断0更新为1是否成功,成功则设置独占线程为当前线程 protected boolean tryAcquire(int unused) { if (compareAndSetState(0, 1)) { setExclusiveOwnerThread(Thread.currentThread()); return true; } return false; } // 独占模式下尝试是否资源,这里没有判断传入的变量,直接把state设置为0 protected boolean tryRelease(int unused) { setExclusiveOwnerThread(null); setState(0); return true; } // 加锁 public void lock() { acquire(1); } // 尝试加锁 public boolean tryLock() { return tryAcquire(1); } // 解锁 public void unlock() { release(1); } // 是否锁定 public boolean isLocked() { return isHeldExclusively(); } // 启动后进行线程中断,注意这里会判断线程实例的中断标志位是否为false,只有中断标志位为false才会中断 void interruptIfStarted() { Thread t; if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) { try { t.interrupt(); } catch (SecurityException ignore) { } } } } 复制代码
Worker
的构造函数里面的逻辑十分重要,通过ThreadFactory
创建的Thread
实例同时传入Worker
实例,因为Worker
本身实现了Runnable
,所以可以作为任务提交到线程中执行。只要Worker
持有的线程实例w
调用Thread#start()
方法就能在合适时机执行Worker#run()
。简化一下逻辑如下:
// addWorker()方法中构造 Worker worker = createWorker(); // 通过线程池构造时候传入 ThreadFactory threadFactory = getThreadFactory(); // Worker构造函数中 Thread thread = threadFactory.newThread(worker); // addWorker()方法中启动 thread.start(); 复制代码
Worker
继承自AQS
,这里使用了AQS
的独占模式,这里有个技巧是构造Worker
的时候,把AQS
的资源(状态)通过setState(-1)
设置为-1,这是因为Worker
实例刚创建时AQS
中state
的默认值为0,此时线程尚未启动,不能在这个时候进行线程中断,见Worker#interruptIfStarted()
方法。Worker
中两个覆盖AQS
的方法tryAcquire()
和tryRelease()
都没有判断外部传入的变量,前者直接CAS(0,1)
,后者直接setState(0)
。接着看核心方法ThreadPoolExecutor#runWorker()
:
final void runWorker(Worker w) { // 获取当前线程,实际上和Worker持有的线程实例是相同的 Thread wt = Thread.currentThread(); // 获取Worker中持有的初始化时传入的任务对象,这里注意存放在临时变量task中 Runnable task = w.firstTask; // 设置Worker中持有的初始化时传入的任务对象为null w.firstTask = null; // 由于Worker初始化时AQS中state设置为-1,这里要先做一次解锁把state更新为0,允许线程中断 w.unlock(); // allow interrupts // 记录线程是否因为用户异常终结,默认是true boolean completedAbruptly = true; try { // 初始化任务对象不为null,或者从任务队列获取任务不为空(从任务队列获取到的任务会更新到临时变量task中) // getTask()由于使用了阻塞队列,这个while循环如果命中后半段会处于阻塞或者超时阻塞状态,getTask()返回为null会导致线程跳出死循环使线程终结 while (task != null || (task = getTask()) != null) { // Worker加锁,本质是AQS获取资源并且尝试CAS更新state由0更变为1 w.lock(); // If pool is stopping, ensure thread is interrupted; // if not, ensure thread is not interrupted. This // requires a recheck in second case to deal with // shutdownNow race while clearing interrupt // 如果线程池正在停止(也就是由RUNNING或者SHUTDOWN状态向STOP状态变更),那么要确保当前工作线程是中断状态 // 否则,要保证当前线程不是中断状态 if ((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && !wt.isInterrupted()) wt.interrupt(); try { // 钩子方法,任务执行前 beforeExecute(wt, task); try { task.run(); // 钩子方法,任务执行后 - 正常情况 afterExecute(task, null); } catch (Throwable ex) { // 钩子方法,任务执行后 - 异常情况 afterExecute(task, ex); throw ex; } } finally { // 清空task临时变量,这个很重要,否则while会死循环执行同一个task task = null; // 累加Worker完成的任务数 w.completedTasks++; // Worker解锁,本质是AQS释放资源,设置state为0 w.unlock(); } } // 走到这里说明某一次getTask()返回为null,线程正常退出 completedAbruptly = false; } finally { // 处理线程退出,completedAbruptly为true说明由于用户异常导致线程非正常退出 processWorkerExit(w, completedAbruptly); } } 复制代码
这里重点拆解分析一下判断当前工作线程中断状态的代码:
if ((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && !wt.isInterrupted()) wt.interrupt(); // 先简化一下判断逻辑,如下 // 判断线程池状态是否至少为STOP,rs >= STOP(1) boolean atLeastStop = runStateAtLeast(ctl.get(), STOP); // 判断线程池状态是否至少为STOP,同时判断当前线程的中断状态并且清空当前线程的中断状态 boolean interruptedAndAtLeastStop = Thread.interrupted() && runStateAtLeast(ctl.get(), STOP); if (atLeastStop || interruptedAndAtLeastStop && !wt.isInterrupted()){ wt.interrupt(); } 复制代码
Thread.interrupted()
方法获取线程的中断状态同时会清空该中断状态,这里之所以会调用这个方法是因为在执行上面这个if
逻辑同时外部有可能调用shutdownNow()
方法,shutdownNow()
方法中也存在中断所有Worker
线程的逻辑,但是由于shutdownNow()
方法中会遍历所有Worker
做线程中断,有可能无法及时在任务提交到Worker
执行之前进行中断,所以这个中断逻辑会在Worker
内部执行,就是if
代码块的逻辑。这里还要注意的是:STOP
状态下会拒绝所有新提交的任务,不会再执行任务队列中的任务,同时会中断所有Worker
线程。也就是,即使任务Runnable已经runWorker()
中前半段逻辑取出,只要还没走到调用其Runnable#run(),都有可能被中断。假设刚好发生了进入if
代码块的逻辑同时外部调用了shutdownNow()
方法,那么if
逻辑内会判断线程中断状态并且重置,那么shutdownNow()
方法中调用的interruptWorkers()
就不会因为中断状态判断出现问题导致二次中断线程(会导致异常)。
小结一下上面runWorker()
方法的核心流程:
Worker
先执行一次解锁操作,用于解除不可中断状态。- 通过
while
循环调用getTask()
方法从任务队列中获取任务(当然,首轮循环也有可能是外部传入的firstTask任务实例)。 - 如果线程池更变为
STOP
状态,则需要确保工作线程是中断状态并且进行中断处理,否则要保证工作线程必须不是中断状态。 - 执行任务实例
Runnale#run()
方法,任务实例执行之前和之后(包括正常执行完毕和异常执行情况)分别会调用钩子方法beforeExecute()
和afterExecute()
。 while
循环跳出意味着runWorker()
方法结束和工作线程生命周期结束(Worker#run()
生命周期完结),会调用processWorkerExit()
处理工作线程退出的后续工作。
接下来分析一下从任务队列中获取任务的getTask()
方法和处理线程退出的后续工作的方法processWorkerExit()
。
getTask方法源码分析
getTask()
方法是工作线程在while
死循环中获取任务队列中的任务对象的方法:
private Runnable getTask() { // 记录上一次从队列中拉取的时候是否超时 boolean timedOut = false; // Did the last poll() time out? // 注意这是死循环 for (;;) { int c = ctl.get(); // Check if queue empty only if necessary. // 第一个if:如果线程池状态至少为SHUTDOWN,也就是rs >= SHUTDOWN(0),则需要判断两种情况(或逻辑): // 1. 线程池状态至少为STOP(1),也就是线程池正在停止,一般是调用了shutdownNow()方法 // 2. 任务队列为空 // 如果在线程池至少为SHUTDOWN状态并且满足上面两个条件之一,则工作线程数wc减去1,然后直接返回null if (runStateAtLeast(c, SHUTDOWN) && (runStateAtLeast(c, STOP) || workQueue.isEmpty())) { decrementWorkerCount(); return null; } // 跑到这里说明线程池还处于RUNNING状态,重新获取一次工作线程数 int wc = workerCountOf(c); // Are workers subject to culling? // timed临时变量勇于线程超时控制,决定是否需要通过poll()此带超时的非阻塞方法进行任务队列的任务拉取 // 1.allowCoreThreadTimeOut默认值为false,如果设置为true,则允许核心线程也能通过poll()方法从任务队列中拉取任务 // 2.工作线程数大于核心线程数的时候,说明线程池中创建了额外的非核心线程,这些非核心线程一定是通过poll()方法从任务队列中拉取任务 boolean timed = allowCoreThreadTimeOut || wc > corePoolSize; // 第二个if: // 1.wc > maximumPoolSize说明当前的工作线程总数大于maximumPoolSize,说明了通过setMaximumPoolSize()方法减少了线程池容量 // 或者 2.timed && timedOut说明了线程命中了超时控制并且上一轮循环通过poll()方法从任务队列中拉取任务为null // 并且 3. 工作线程总数大于1或者任务队列为空,则通过CAS把线程数减去1,同时返回null, // CAS把线程数减去1失败会进入下一轮循环做重试 if ((wc > maximumPoolSize || (timed && timedOut)) && (wc > 1 || workQueue.isEmpty())) { if (compareAndDecrementWorkerCount(c)) return null; continue; } try { // 如果timed为true,通过poll()方法做超时拉取,keepAliveTime时间内没有等待到有效的任务,则返回null // 如果timed为false,通过take()做阻塞拉取,会阻塞到有下一个有效的任务时候再返回(一般不会是null) Runnable r = timed ? workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : workQueue.take(); // 这里很重要,只有非null时候才返回,null的情况下会进入下一轮循环 if (r != null) return r; // 跑到这里说明上一次从任务队列中获取到的任务为null,一般是workQueue.poll()方法超时返回null timedOut = true; } catch (InterruptedException retry) { timedOut = false; } } } 复制代码
这个方法中,有两处十分庞大的if
逻辑,对于第一处if
可能导致工作线程数减去1直接返回null
的场景有:
- 线程池状态为
SHUTDOWN
,一般是调用了shutdown()
方法,并且任务队列为空。 - 线程池状态为
STOP
。
对于第二处if
,逻辑有点复杂,先拆解一下:
// 工作线程总数大于maximumPoolSize,说明了通过setMaximumPoolSize()方法减少了线程池容量 boolean b1 = wc > maximumPoolSize; // 允许线程超时同时上一轮通过poll()方法从任务队列中拉取任务为null boolean b2 = timed && timedOut; // 工作线程总数大于1 boolean b3 = wc > 1; // 任务队列为空 boolean b4 = workQueue.isEmpty(); boolean r = (b1 || b2) && (b3 || b4); if (r) { if (compareAndDecrementWorkerCount(c)){ return null; }else{ continue; } } 复制代码
这段逻辑大多数情况下是针对非核心线程。在execute()
方法中,当线程池总数已经超过了corePoolSize
并且还小于maximumPoolSize
时,当任务队列已经满了的时候,会通过addWorker(task,false)
添加非核心线程。而这里的逻辑恰好类似于addWorker(task,false)
的反向操作,用于减少非核心线程,使得工作线程总数趋向于corePoolSize
。如果对于非核心线程,上一轮循环获取任务对象为null
,这一轮循环很容易满足timed && timedOut
为true,这个时候getTask()
返回null会导致Worker#runWorker()
方法跳出死循环,之后执行processWorkerExit()
方法处理后续工作,而该非核心线程对应的Worker
则变成“游离对象”,等待被JVM回收。当allowCoreThreadTimeOut
设置为true的时候,这里分析的非核心线程的生命周期终结逻辑同时会适用于核心线程。那么可以总结出keepAliveTime
的意义:
- 当允许核心线程超时,也就是
allowCoreThreadTimeOut
设置为true的时候,此时keepAliveTime
表示空闲的工作线程的存活周期。 - 默认情况下不允许核心线程超时,此时
keepAliveTime
表示空闲的非核心线程的存活周期。
在一些特定的场景下,配置合理的keepAliveTime
能够更好地利用线程池的工作线程资源。
processWorkerExit方法源码分析
processWorkerExit()
方法是为将要终结的Worker
做一次清理和数据记录工作(因为processWorkerExit()
方法也包裹在runWorker()
方法finally
代码块中,其实工作线程在执行完processWorkerExit()
方法才算真正的终结)。
private void processWorkerExit(Worker w, boolean completedAbruptly) { // 因为抛出用户异常导致线程终结,直接使工作线程数减1即可 // 如果没有任何异常抛出的情况下是通过getTask()返回null引导线程正常跳出runWorker()方法的while死循环从而正常终结,这种情况下,在getTask()中已经把线程数减1 if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted decrementWorkerCount(); final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { // 全局的已完成任务记录数加上此将要终结的Worker中的已完成任务数 completedTaskCount += w.completedTasks; // 工作线程集合中移除此将要终结的Worker workers.remove(w); } finally { mainLock.unlock(); } // 见下一小节分析,用于根据当前线程池的状态判断是否需要进行线程池terminate处理 tryTerminate(); int c = ctl.get(); // 如果线程池的状态小于STOP,也就是处于RUNNING或者SHUTDOWN状态的前提下: // 1.如果线程不是由于抛出用户异常终结,如果允许核心线程超时,则保持线程池中至少存在一个工作线程 // 2.如果线程由于抛出用户异常终结,或者当前工作线程数,那么直接添加一个新的非核心线程 if (runStateLessThan(c, STOP)) { if (!completedAbruptly) { // 如果允许核心线程超时,最小值为0,否则为corePoolSize int min = allowCoreThreadTimeOut ? 0 : corePoolSize; // 如果最小值为0,同时任务队列不空,则更新最小值为1 if (min == 0 && ! workQueue.isEmpty()) min = 1; // 工作线程数大于等于最小值,直接返回不新增非核心线程 if (workerCountOf(c) >= min) return; // replacement not needed } addWorker(null, false); } } 复制代码
代码的后面部分区域,会判断线程池的状态,如果线程池是RUNNING
或者SHUTDOWN
状态的前提下,如果当前的工作线程由于抛出用户异常被终结,那么会新创建一个非核心线程。如果当前的工作线程并不是抛出用户异常被终结(正常情况下的终结),那么会这样处理:
allowCoreThreadTimeOut
为true,也就是允许核心线程超时的前提下,如果任务队列空,则会通过创建一个非核心线程保持线程池中至少有一个工作线程。allowCoreThreadTimeOut
为false,如果工作线程总数大于corePoolSize
则直接返回,否则创建一个非核心线程,也就是会趋向于保持线程池中的工作线程数量趋向于corePoolSize
。
processWorkerExit()
执行完毕之后,意味着该工作线程的生命周期已经完结。
tryTerminate方法源码分析
每个工作线程终结的时候都会调用tryTerminate()
方法:
final void tryTerminate() { for (;;) { int c = ctl.get(); // 判断线程池的状态,如果是下面三种情况下的任意一种则直接返回: // 1.线程池处于RUNNING状态 // 2.线程池至少为TIDYING状态,也就是TIDYING或者TERMINATED状态,意味着已经走到了下面的步骤,线程池即将终结 // 3.线程池至少为STOP状态并且任务队列不为空 if (isRunning(c) || runStateAtLeast(c, TIDYING) || (runStateLessThan(c, STOP) && ! workQueue.isEmpty())) return; // 工作线程数不为0,则中断工作线程集合中的第一个空闲的工作线程 if (workerCountOf(c) != 0) { // Eligible to terminate interruptIdleWorkers(ONLY_ONE); return; } final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { // CAS设置线程池状态为TIDYING,如果设置成功则执行钩子方法terminated() if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) { try { terminated(); } finally { // 最后更新线程池状态为TERMINATED ctl.set(ctlOf(TERMINATED, 0)); // 唤醒阻塞在termination条件的所有线程,这个变量的await()方法在awaitTermination()中调用 termination.signalAll(); } return; } } finally { mainLock.unlock(); } // else retry on failed CAS } } // 中断空闲的工作线程,onlyOne为true的时候,只会中断工作线程集合中的某一个线程 private void interruptIdleWorkers(boolean onlyOne) { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { for (Worker w : workers) { Thread t = w.thread; // 这里判断线程不是中断状态并且尝试获取锁成功的时候才进行线程中断 if (!t.isInterrupted() && w.tryLock()) { try { t.interrupt(); } catch (SecurityException ignore) { } finally { w.unlock(); } } // 这里跳出循环,也就是只中断集合中第一个工作线程 if (onlyOne) break; } } finally { mainLock.unlock(); } } 复制代码
这里有疑惑的地方是tryTerminate()
方法的第二个if
代码逻辑:工作线程数不为0,则中断工作线程集合中的第一个空闲的工作线程。方法API注释中有这样一段话:
If otherwise eligible to terminate but workerCount is nonzero, interrupts an idle worker to ensure that shutdown signals propagate. 当满足终结线程池的条件但是工作线程数不为0,这个时候需要中断一个空闲的工作线程去确保线程池关闭的信号得以传播。
下面将会分析的shutdown()
方法中会通过interruptIdleWorkers()
中断所有的空闲线程,这个时候有可能有非空闲的线程在执行某个任务,执行任务完毕之后,如果它刚好是核心线程,就会在下一轮循环阻塞在任务队列的take()
方法,如果不做额外的干预,它甚至会在线程池关闭之后永久阻塞在任务队列的take()
方法中。为了避免这种情况,每个工作线程退出的时候都会尝试中断工作线程集合中的某一个空闲的线程,确保所有空闲的线程都能够正常退出。
interruptIdleWorkers()
方法中会对每一个工作线程先进行tryLock()
判断,只有返回true
才有可能进行线程中断。我们知道runWorker()
方法中,工作线程在每次从任务队列中获取到非null的任务之后,会先进行加锁Worker#lock()
操作,这样就能避免线程在执行任务的过程中被中断,保证被中断的一定是空闲的工作线程。
shutdown方法源码分析
线程池关闭操作有几个相关的变体方法,先看shutdown()
:
public void shutdown() { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { // 权限校验,安全策略相关判断 checkShutdownAccess(); // 设置SHUTDOWN状态 advanceRunState(SHUTDOWN); // 中断所有的空闲的工作线程 interruptIdleWorkers(); // 钩子方法 onShutdown(); // hook for ScheduledThreadPoolExecutor } finally { mainLock.unlock(); } // 调用上面分析果敢的尝试terminate方法,使状态更变为TIDYING,执行钩子方法terminated()后,最终状态更新为TERMINATED tryTerminate(); } // 升提状态 private void advanceRunState(int targetState) { // assert targetState == SHUTDOWN || targetState == STOP; for (;;) { int c = ctl.get(); // 线程池状态至少为targetState或者CAS设置状态为targetState则跳出循环 if (runStateAtLeast(c, targetState) || ctl.compareAndSet(c, ctlOf(targetState, workerCountOf(c)))) break; } } // 中断所有的空闲的工作线程 private void interruptIdleWorkers() { interruptIdleWorkers(false); } 复制代码
接着看shutdownNow()
方法:
public List<Runnable> shutdownNow() { List<Runnable> tasks; final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { // 权限校验,安全策略相关判断 checkShutdownAccess(); // 设置STOP状态 advanceRunState(STOP); // 中断所有的工作线程 interruptWorkers(); // 清空工作队列并且取出所有的未执行的任务 tasks = drainQueue(); } finally { mainLock.unlock(); } // 调用上面分析果敢的尝试terminate方法,使状态更变为TIDYING,执行钩子方法terminated()后,最终状态更新为TERMINATED tryTerminate(); return tasks; } // 遍历所有的工作线程,如果state > 0(启动状态)则进行中断 private void interruptWorkers() { // assert mainLock.isHeldByCurrentThread(); for (Worker w : workers) w.interruptIfStarted(); } 复制代码
shutdownNow()
方法会把线程池状态先更变为STOP
,中断所有的工作线程(AbstractQueuedSynchronizer
的state
值大于0的Worker
实例,也就是包括正在执行任务的Worker
和空闲的Worker
),然后遍历任务队列,取出(移除)所有任务存放在一个列表中返回。
最后看awaitTermination()
方法:
public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException { // 转换timeout的单位为纳秒 long nanos = unit.toNanos(timeout); final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { // 循环等待直到线程池状态更变为TERMINATED,每轮循环等待nanos纳秒 while (runStateLessThan(ctl.get(), TERMINATED)) { if (nanos <= 0L) return false; nanos = termination.awaitNanos(nanos); } return true; } finally { mainLock.unlock(); } } 复制代码
awaitTermination()
虽然不是shutdown()
方法体系,但是它的处理逻辑就是确保调用此方法的线程会阻塞到tryTerminate()
方法成功把线程池状态更新为TERMINATED
后再返回,可以使用在某些需要感知线程池终结时刻的场景。
有一点值得关注的是:shutdown()
方法只会中断空闲的工作线程,如果工作线程正在执行任务对象Runnable#run()
,这种情况下的工作线程不会中断,而是等待下一轮执行getTask()
方法的时候通过线程池状态判断正常终结该工作线程。
理解可重入锁mainLock成员变量
private final ReentrantLock mainLock = new ReentrantLock(); private final Condition termination = mainLock.newCondition();
先看了ThreadPoolExecutor
内部成员属性mainLock
的引用情况:
归结一下mainLock
的使用场景:
方法 | 主要作用 |
tryTerminate |
保证状态TIDYING -> TERMINATED ,钩子方法terminated() 回调和条件变量唤醒 |
interruptIdleWorkers |
保护工作线程中断的串行化,避免"中断风暴" |
addWorker |
保护工作线程集合避免并发增加工作线程、保护度量统计数据变更 |
processWorkerExit |
保护度量统计数据变更 |
shutdown 、shutdownNow 和awaitTermination |
见下文分析 |
getPoolSize 、getActiveCount 、getLargestPoolSize 、getTaskCount 和getCompletedTaskCount |
保护度量统计数据读取,这些统计数据来一般源于Worker 集合的属性统计 |
这里分析一下线程池如何通过可重入锁和条件变量实现相对优雅地关闭。先看shutdown()
方法:
public void shutdown() { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { checkShutdownAccess(); advanceRunState(SHUTDOWN); interruptIdleWorkers(); onShutdown(); // hook for ScheduledThreadPoolExecutor } finally { mainLock.unlock(); } tryTerminate(); } 复制代码
这里shutdown()
中除了tryTerminate()
,其他它方法都是包裹在锁里面执行,确保工作线程集合稳定性以及关闭权限、确保状态变更串行化,中断所有工作线程并且避免工作线程"中断风暴"(多次并发调用shutdown()
如果不加锁,会反复中断工作线程)。
public List<Runnable> shutdownNow() { List<Runnable> tasks; final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { checkShutdownAccess(); advanceRunState(STOP); interruptWorkers(); tasks = drainQueue(); # <--- 多了这一步 } finally { mainLock.unlock(); } tryTerminate(); return tasks; } 复制代码
shutdownNow()
方法其实加锁的目的和shutdown()
差不多,不过多了一步:导出任务队列中的剩余的任务实例列表。awaitTermination()
方法中使用到前面提到过的条件变量termination
:
// 条件变量必须在锁代码块中执行,和synchronized关键字用法差不多 public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException { long nanos = unit.toNanos(timeout); final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { // 死循环确保等待执行和状态变更为TERMINATED while (runStateLessThan(ctl.get(), TERMINATED)) { if (nanos <= 0L) return false; nanos = termination.awaitNanos(nanos); # <-- 确保当前调用线程阻塞等待对应的时间或者线程池状态变更为TERMINATED,再退出等待 } return true; } finally { mainLock.unlock(); } } 复制代码
awaitTermination()
方法的核心功能是:确保当前调用awaitTermination()
方法的线程阻塞等待对应的时间或者线程池状态变更为TERMINATED
,再退出等待返回结果,这样能够让使用者输入一个可以接受的等待时间进行阻塞等待,或者线程池在其他线程中被调用了shutdown()
方法状态变更为TERMINATED
就能正常解除阻塞。awaitTermination()
方法的返回值为布尔值,true
代表线程池状态变更为TERMINATED
或者等待了输入时间范围内的时间周期被唤醒,意味则线程池正常退出,结果为false
代表等待了超过输入时间范围内的时间周期,线程池的状态依然没有更变为TERMINATED
。
线程池中的工作线程如何优雅地退出,不导致当前任务执行丢失、任务状态异常或者任务持有的数据异常,是一个很值得探讨的专题,以后有机会一定会分析一下这个专题。
reject方法源码分析
reject(Runnable command)
方法很简单:
final void reject(Runnable command) { handler.rejectedExecution(command, this); } 复制代码
调用线程池持有的成员RejectedExecutionHandler
实例回调任务实例和当前线程池实例。
钩子方法分析
到JDK11
为止,ThreadPoolExecutor
提供的钩子方法没有增加,有以下几个:
beforeExecute(Thread t, Runnable r)
:任务对象Runnable#run()
执行之前触发回调。afterExecute(Runnable r, Throwable t)
:任务对象Runnable#run()
执行之后(包括异常完成情况和正常完成情况)触发回调。terminated()
:线程池关闭的时候,状态更变为TIDYING
成功之后会回调此方法,执行此方法完毕后,线程池状态会更新为TERMINATED
。onShutdown()
:shutdown()
方法执行时候会回调此方法,API注释中提到此方法主要提供给ScheduledThreadPoolExecutor
使用。
其中onShutdown()
的方法修饰符为default
,其他三个方法的修饰符为protected
,必要时候可以自行扩展这些方法,可以实现监控、基于特定时机触发具体操作等等。
其他方法
线程池本身提供了大量数据统计相关的方法、扩容方法、预创建方法等等,这些方法的源码并不复杂,这里不做展开分析。
核心线程相关:
getCorePoolSize()
:获取核心线程数。setCorePoolSize()
:重新设置线程池的核心线程数。prestartCoreThread()
:预启动一个核心线程,当且仅当工作线程数量小于核心线程数量。prestartAllCoreThreads()
:预启动所有核心线程。
线程池容量相关:
getMaximumPoolSize()
:获取线程池容量。setMaximumPoolSize()
:重新设置线程池的最大容量。
线程存活周期相关:
setKeepAliveTime()
:设置空闲工作线程的存活周期。getKeepAliveTime()
:获取空闲工作线程的存活周期。
其他监控统计相关方法:
getTaskCount()
:获取所有已经被执行的任务总数的近似值。getCompletedTaskCount()
:获取所有已经执行完成的任务总数的近似值。getLargestPoolSize()
:获取线程池的峰值线程数(最大池容量)。getActiveCount()
:获取所有活跃线程总数(正在执行任务的工作线程)的近似值。getPoolSize()
:获取工作线程集合的容量(当前线程池中的总工作线程数)。
任务队列操作相关方法:
purge()
:移除任务队列中所有是Future
类型并且已经处于Cancelled
状态的任务。remove()
:从任务队列中移除指定的任务。BlockingQueue<Runnable> getQueue()
:获取任务队列的引用。
有部分属性值的设置有可能影响到线程池中的状态或者工作线程的增减等,例如核心线程数改变,有可能会直接增减Worker
,这里就以ThreadPoolExecutor#setCorePoolSize()
为例:
// 设置核心线程数量 public void setCorePoolSize(int corePoolSize) { // 输入值不能小于0或者大于线程池的容量 if (corePoolSize < 0 || maximumPoolSize < corePoolSize) throw new IllegalArgumentException(); // delta = 传入核心线程数和现存的核心线程数的差值 int delta = corePoolSize - this.corePoolSize; this.corePoolSize = corePoolSize; // 如果当前线程池工作线程的总量大于传入核心线程数,则中断所有的工作线程 if (workerCountOf(ctl.get()) > corePoolSize) interruptIdleWorkers(); else if (delta > 0) { // 传入核心线程数和现存的核心线程数的差值大于0,也就是核心线程扩容 // 计算传入核心线程数和现存的核心线程数的差值和任务队列中任务个数的最小值,并且添加这个最小值个数的工作线程池 // 任务队列为空的情况下,k === 0,此时第一个条件 k--> 0就不满足,不会进入循环,那么这delta个需要创建的工作线程应该是在提交新任务的时候懒创建 int k = Math.min(delta, workQueue.size()); while (k-- > 0 && addWorker(null, true)) { // 如果任务队列为空,则跳出循环 if (workQueue.isEmpty()) break; } } } 复制代码
这里else if (delta > 0)后面的代码块中有一段描述,翻译一下:我们并不知道真正情况下"需要"多少新的工作线程。作为一种启发式处理方式,预先启动足够多的新的工作线程(直到数量为核心线程池大小)来处理队列中当前的任务,但如果在这样做时队列变为空,则停止创建新的工作线程。
小结
本文花大量功夫基于每一行代码分析JUC
线程池ThreadPoolExecutor
的核心方法execute()
的实现,这个方法是整个线程池相关体系的基石,有了它才能扩展出带回调的异步执行和基于时间进行任务调度的功能,后面将会编写两篇文章分别详细分析线程池扩展服务ExecutorService
的功能源码实现以及调度线程池ScheduledThreadPoolExecutor
的源码实现,预计要耗时2-3周。