前提
很早之前就打算看一次JUC线程池ThreadPoolExecutor
的源码实现,由于近段时间比较忙,一直没有时间整理出源码分析的文章。之前在分析扩展线程池实现可回调的Future
时候曾经提到并发大师Doug Lea
在设计线程池ThreadPoolExecutor
的提交任务的顶层接口Executor
只有一个无状态的执行方法:
public interface Executor { void execute(Runnable command); } 复制代码
而ExecutorService
提供了很多扩展方法底层基本上是基于Executor#execute()
方法进行扩展。本文着重分析ThreadPoolExecutor#execute()
的实现,笔者会从实现原理、源码实现等角度结合简化例子进行详细的分析。ThreadPoolExecutor
的源码从JDK8
到JDK11
基本没有变化,本文编写的时候使用的是JDK11
。
ThreadPoolExecutor的原理
ThreadPoolExecutor
里面使用到JUC同步器框架AbstractQueuedSynchronizer
(俗称AQS
)、大量的位操作、CAS
操作。ThreadPoolExecutor
提供了固定活跃线程(核心线程)、额外的线程(线程池容量 - 核心线程数这部分额外创建的线程,下面称为非核心线程)、任务队列以及拒绝策略这几个重要的功能。
JUC同步器框架
ThreadPoolExecutor
里面使用到JUC同步器框架,主要用于四个方面:
- 全局锁
mainLock
成员属性,是可重入锁ReentrantLock
类型,主要是用于访问工作线程Worker
集合和进行数据统计记录时候的加锁操作。 - 条件变量
termination
,Condition
类型,主要用于线程进行等待终结awaitTermination()
方法时的带期限阻塞。 - 任务队列
workQueue
,BlockingQueue<Runnable>
类型,任务队列,用于存放待执行的任务。 - 工作线程,内部类
Worker
类型,是线程池中真正的工作线程对象。
关于AQS
笔者之前写过一篇相关源码分析的文章:JUC同步器框架AbstractQueuedSynchronizer源码图文分析。
核心线程
这里先参考ThreadPoolExecutor
的实现并且进行简化,实现一个只有核心线程的线程池,要求如下:
- 暂时不考虑任务执行异常情况下的处理。
- 任务队列为无界队列。
- 线程池容量固定为核心线程数量。
- 暂时不考虑拒绝策略。
public class CoreThreadPool implements Executor { private BlockingQueue<Runnable> workQueue; private static final AtomicInteger COUNTER = new AtomicInteger(); private int coreSize; private int threadCount = 0; public CoreThreadPool(int coreSize) { this.coreSize = coreSize; this.workQueue = new LinkedBlockingQueue<>(); } @Override public void execute(Runnable command) { if (++threadCount <= coreSize) { new Worker(command).start(); } else { try { workQueue.put(command); } catch (InterruptedException e) { throw new IllegalStateException(e); } } } private class Worker extends Thread { private Runnable firstTask; public Worker(Runnable runnable) { super(String.format("Worker-%d", COUNTER.getAndIncrement())); this.firstTask = runnable; } @Override public void run() { Runnable task = this.firstTask; while (null != task || null != (task = getTask())) { try { task.run(); } finally { task = null; } } } } private Runnable getTask() { try { return workQueue.take(); } catch (InterruptedException e) { throw new IllegalStateException(e); } } public static void main(String[] args) throws Exception { CoreThreadPool pool = new CoreThreadPool(5); IntStream.range(0, 10) .forEach(i -> pool.execute(() -> System.out.println(String.format("Thread:%s,value:%d", Thread.currentThread().getName(), i)))); Thread.sleep(Integer.MAX_VALUE); } } 复制代码
某次运行结果如下:
Thread:Worker-0,value:0 Thread:Worker-3,value:3 Thread:Worker-2,value:2 Thread:Worker-1,value:1 Thread:Worker-4,value:4 Thread:Worker-1,value:5 Thread:Worker-2,value:8 Thread:Worker-4,value:7 Thread:Worker-0,value:6 Thread:Worker-3,value:9 复制代码
设计此线程池的时候,核心线程是懒创建的,如果线程空闲的时候则阻塞在任务队列的take()
方法,其实对于ThreadPoolExecutor
也是类似这样实现,只是如果使用了keepAliveTime
并且允许核心线程超时(allowCoreThreadTimeOut
设置为true
)则会使用BlockingQueue#poll(keepAliveTime)
进行轮询代替永久阻塞。
其他附加功能
构建ThreadPoolExecutor
实例的时候,需要定义maximumPoolSize
(线程池最大线程数)和corePoolSize
(核心线程数)。当任务队列是有界的阻塞队列,核心线程满负载,任务队列已经满的情况下,会尝试创建额外的maximumPoolSize - corePoolSize
个线程去执行新提交的任务。当ThreadPoolExecutor
这里实现的两个主要附加功能是:
- 一定条件下会创建非核心线程去执行任务,非核心线程的回收周期(线程生命周期终结时刻)是
keepAliveTime
,线程生命周期终结的条件是:下一次通过任务队列获取任务的时候并且存活时间超过keepAliveTime
。 - 提供拒绝策略,也就是在核心线程满负载、任务队列已满、非核心线程满负载的条件下会触发拒绝策略。
源码分析
先分析线程池的关键属性,接着分析其状态控制,最后重点分析ThreadPoolExecutor#execute()
方法。
关键属性
public class ThreadPoolExecutor extends AbstractExecutorService { // 控制变量-存放状态和线程数 private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0)); // 任务队列,必须是阻塞队列 private final BlockingQueue<Runnable> workQueue; // 工作线程集合,存放线程池中所有的(活跃的)工作线程,只有在持有全局锁mainLock的前提下才能访问此集合 private final HashSet<Worker> workers = new HashSet<>(); // 全局锁 private final ReentrantLock mainLock = new ReentrantLock(); // awaitTermination方法使用的等待条件变量 private final Condition termination = mainLock.newCondition(); // 记录峰值线程数 private int largestPoolSize; // 记录已经成功执行完毕的任务数 private long completedTaskCount; // 线程工厂,用于创建新的线程实例 private volatile ThreadFactory threadFactory; // 拒绝执行处理器,对应不同的拒绝策略 private volatile RejectedExecutionHandler handler; // 空闲线程等待任务的时间周期,单位是纳秒 private volatile long keepAliveTime; // 是否允许核心线程超时,如果为true则keepAliveTime对核心线程也生效 private volatile boolean allowCoreThreadTimeOut; // 核心线程数 private volatile int corePoolSize; // 线程池容量 private volatile int maximumPoolSize; // 省略其他代码 } 复制代码
下面看参数列表最长的构造函数:
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) { if (corePoolSize < 0 || maximumPoolSize <= 0 || maximumPoolSize < corePoolSize || keepAliveTime < 0) throw new IllegalArgumentException(); if (workQueue == null || threadFactory == null || handler == null) throw new NullPointerException(); this.corePoolSize = corePoolSize; this.maximumPoolSize = maximumPoolSize; this.workQueue = workQueue; this.keepAliveTime = unit.toNanos(keepAliveTime); this.threadFactory = threadFactory; this.handler = handler; } 复制代码
可以自定义核心线程数、线程池容量(最大线程数)、空闲线程等待任务周期、任务队列、线程工厂、拒绝策略。下面简单分析一下每个参数的含义和作用:
corePoolSize
:int类型,核心线程数量。maximumPoolSize
:int类型,最大线程数量,也就是线程池的容量。keepAliveTime
:long类型,线程空闲等待时间,也和工作线程的生命周期有关,下文会分析。unit
:TimeUnit
类型,keepAliveTime
参数的时间单位,实际上keepAliveTime
最终会转化为纳秒。workQueue
:BlockingQueue<Runnable>
类型,等待队列或者叫任务队列。threadFactory
:ThreadFactory
类型,线程工厂,用于创建工作线程(包括核心线程和非核心线程),默认使用Executors.defaultThreadFactory()
作为内建线程工厂实例,一般自定义线程工厂才能更好地跟踪工作线程。handler
:RejectedExecutionHandler
类型,线程池的拒绝执行处理器,更多时候称为拒绝策略,拒绝策略执行的时机是当阻塞队列已满、没有空闲的线程(包括核心线程和非核心线程)并且继续提交任务。提供了4种内建的拒绝策略实现:
AbortPolicy
:直接拒绝策略,也就是不会执行任务,直接抛出RejectedExecutionException
,这是默认的拒绝策略。DiscardPolicy
:抛弃策略,也就是直接忽略提交的任务(通俗来说就是空实现)。DiscardOldestPolicy
:抛弃最老任务策略,也就是通过poll()
方法取出任务队列队头的任务抛弃,然后执行当前提交的任务。CallerRunsPolicy
:调用者执行策略,也就是当前调用Executor#execute()
的线程直接调用任务Runnable#run()
,一般不希望任务丢失会选用这种策略,但从实际角度来看,原来的异步调用意图会退化为同步调用。
状态控制
状态控制主要围绕原子整型成员变量ctl
:
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0)); private static final int COUNT_BITS = Integer.SIZE - 3; private static final int COUNT_MASK = (1 << COUNT_BITS) - 1; private static final int RUNNING = -1 << COUNT_BITS; private static final int SHUTDOWN = 0 << COUNT_BITS; private static final int STOP = 1 << COUNT_BITS; private static final int TIDYING = 2 << COUNT_BITS; private static final int TERMINATED = 3 << COUNT_BITS; // 通过ctl值获取运行状态 private static int runStateOf(int c) { return c & ~COUNT_MASK; } // 通过ctl值获取工作线程数 private static int workerCountOf(int c) { return c & COUNT_MASK; } // 通过运行状态和工作线程数计算ctl的值,或运算 private static int ctlOf(int rs, int wc) { return rs | wc; } private static boolean runStateLessThan(int c, int s) { return c < s; } private static boolean runStateAtLeast(int c, int s) { return c >= s; } private static boolean isRunning(int c) { return c < SHUTDOWN; } // CAS操作线程数增加1 private boolean compareAndIncrementWorkerCount(int expect) { return ctl.compareAndSet(expect, expect + 1); } // CAS操作线程数减少1 private boolean compareAndDecrementWorkerCount(int expect) { return ctl.compareAndSet(expect, expect - 1); } // 线程数直接减少1 private void decrementWorkerCount() { ctl.addAndGet(-1); } 复制代码
接下来分析一下线程池的状态变量,工作线程上限数量位的长度是COUNT_BITS
,它的值是Integer.SIZE - 3
,也就是正整数29:
我们知道,整型包装类型Integer实例的大小是4 byte,一共32 bit,也就是一共有32个位用于存放0或者1。 在ThreadPoolExecutor实现中,使用32位的整型包装类型存放工作线程数和线程池状态。 其中,低29位用于存放工作线程数,而高3位用于存放线程池状态,所以线程池的状态最多只能有2^3种。 工作线程上限数量为2^29 - 1,超过5亿,这个数量在短时间内不用考虑会超限。
接着看工作线程上限数量掩码COUNT_MASK
,它的值是(1 < COUNT_BITS) - l
,也就是1左移29位,再减去1,如果补全32位,它的位视图如下:
然后就是线程池的状态常量,这里只详细分析其中一个,其他类同,这里看RUNNING
状态:
// -1的补码为:111-11111111111111111111111111111 // 左移29位后:111-00000000000000000000000000000 // 10进制值为:-536870912 // 高3位111的值就是表示线程池正在处于运行状态 private static final int RUNNING = -1 << COUNT_BITS; 复制代码
控制变量ctl
的组成就是通过线程池运行状态rs
和工作线程数wc
通过或运算得到的:
// rs=RUNNING值为:111-00000000000000000000000000000 // wc的值为0:000-00000000000000000000000000000 // rs | wc的结果为:111-00000000000000000000000000000 private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0)); private static int ctlOf(int rs, int wc) { return rs | wc; } 复制代码
那么我们怎么从ctl
中取出高3位?上面源码中提供的runStateOf()
方法就是提取运行状态:
// 先把COUNT_MASK取反(~COUNT_MASK),得到:111-00000000000000000000000000000 // ctl位图特点是:xxx-yyyyyyyyyyyyyyyyyyyyyyyyyyyyyy // 两者做一次与运算即可得到高3位xxx private static int runStateOf(int c){ return c & ~COUNT_MASK; } 复制代码
同理,取出低29位只需要把ctl
和COUNT_MASK
(000-11111111111111111111111111111
)做一次与运算即可。
小结一下线程池的运行状态常量:
状态名称 | 位图 | 十进制值 | 描述 |
RUNNING |
111-00000000000000000000000000000 |
-536870912 | 运行中状态,可以接收新的任务和执行任务队列中的任务 |
SHUTDOWN |
000-00000000000000000000000000000 |
0 | shutdown状态,不再接收新的任务,但是会执行任务队列中的任务 |
STOP |
001-00000000000000000000000000000 |
536870912 | 停止状态,不再接收新的任务,也不会执行任务队列中的任务,中断所有执行中的任务 |
TIDYING |
010-00000000000000000000000000000 |
1073741824 | 整理中状态,所有任务已经终结,工作线程数为0,过渡到此状态的工作线程会调用钩子方法terminated() |
TERMINATED |
011-00000000000000000000000000000 |
1610612736 | 终结状态,钩子方法terminated() 执行完毕 |
这里有一个比较特殊的技巧,由于运行状态值存放在高3位,所以可以直接通过十进制值(甚至可以忽略低29位,直接用ctl
进行比较,或者使用ctl
和线程池状态常量进行比较)来比较和判断线程池的状态:
RUNNING(-536870912) < SHUTDOWN(0) < STOP(536870912) < TIDYING(1073741824) < TERMINATED(1610612736)
下面这三个方法就是使用这种技巧:
// ctl和状态常量比较,判断是否小于 private static boolean runStateLessThan(int c, int s) { return c < s; } // ctl和状态常量比较,判断是否小于或等于 private static boolean runStateAtLeast(int c, int s) { return c >= s; } // ctl和状态常量SHUTDOWN比较,判断是否处于RUNNING状态 private static boolean isRunning(int c) { return c < SHUTDOWN; } 复制代码
最后是线程池状态的跃迁图:
PS:线程池源码中有很多中间变量用了简单的单字母表示,例如c就是表示ctl、wc就是表示worker count、rs就是表示running status。
execute方法源码分析
线程池异步执行任务的方法实现是ThreadPoolExecutor#execute()
,源码如下:
// 执行命令,其中命令(下面称任务)对象是Runnable的实例 public void execute(Runnable command) { // 判断命令(任务)对象非空 if (command == null) throw new NullPointerException(); // 获取ctl的值 int c = ctl.get(); // 判断如果当前工作线程数小于核心线程数,则创建新的核心线程并且执行传入的任务 if (workerCountOf(c) < corePoolSize) { if (addWorker(command, true)) // 如果创建新的核心线程成功则直接返回 return; // 这里说明创建核心线程失败,需要更新ctl的临时变量c c = ctl.get(); } // 走到这里说明创建新的核心线程失败,也就是当前工作线程数大于等于corePoolSize // 判断线程池是否处于运行中状态,同时尝试用非阻塞方法向任务队列放入任务(放入任务失败返回false) if (isRunning(c) && workQueue.offer(command)) { int recheck = ctl.get(); // 这里是向任务队列投放任务成功,对线程池的运行中状态做二次检查 // 如果线程池二次检查状态是非运行中状态,则从任务队列移除当前的任务调用拒绝策略处理之(也就是移除前面成功入队的任务实例) if (! isRunning(recheck) && remove(command)) // 调用拒绝策略处理任务 - 返回 reject(command); // 走到下面的else if分支,说明有以下的前提: // 0、待执行的任务已经成功加入任务队列 // 1、线程池可能是RUNNING状态 // 2、传入的任务可能从任务队列中移除失败(移除失败的唯一可能就是任务已经被执行了) // 如果当前工作线程数量为0,则创建一个非核心线程并且传入的任务对象为null - 返回 // 也就是创建的非核心线程不会马上运行,而是等待获取任务队列的任务去执行 // 如果前工作线程数量不为0,原来应该是最后的else分支,但是可以什么也不做,因为任务已经成功入队列,总会有合适的时机分配其他空闲线程去执行它 else if (workerCountOf(recheck) == 0) addWorker(null, false); } // 走到这里说明有以下的前提: // 0、线程池中的工作线程总数已经大于等于corePoolSize(简单来说就是核心线程已经全部懒创建完毕) // 1、线程池可能不是RUNNING状态 // 2、线程池可能是RUNNING状态同时任务队列已经满了 // 如果向任务队列投放任务失败,则会尝试创建非核心线程传入任务执行 // 创建非核心线程失败,此时需要拒绝执行任务 else if (!addWorker(command, false)) // 调用拒绝策略处理任务 - 返回 reject(command); } 复制代码
这里简单分析一下整个流程:
- 如果当前工作线程总数小于
corePoolSize
,则直接创建核心线程执行任务(任务实例会传入直接用于构造工作线程实例)。 - 如果当前工作线程总数大于等于
corePoolSize
,判断线程池是否处于运行中状态,同时尝试用非阻塞方法向任务队列放入任务,这里会二次检查线程池运行状态,如果当前工作线程数量为0,则创建一个非核心线程并且传入的任务对象为null。 - 如果向任务队列投放任务失败(任务队列已经满了),则会尝试创建非核心线程传入任务实例执行。
- 如果创建非核心线程失败,此时需要拒绝执行任务,调用拒绝策略处理任务。
这里是一个疑惑点:为什么需要二次检查线程池的运行状态,当前工作线程数量为0,尝试创建一个非核心线程并且传入的任务对象为null?这个可以看API注释:
如果一个任务成功加入任务队列,我们依然需要二次检查是否需要添加一个工作线程(因为所有存活的工作线程有可能在最后一次检查之后已经终结)或者执行当前方法的时候线程池是否已经shutdown了。所以我们需要二次检查线程池的状态,必须时把任务从任务队列中移除或者在没有可用的工作线程的前提下新建一个工作线程。
任务提交流程从调用者的角度来看如下:
addWorker方法源码分析
boolean addWorker(Runnable firstTask, boolean core)
方法的第一的参数可以用于直接传入任务实例,第二个参数用于标识将要创建的工作线程是否核心线程。方法源码如下:
// 添加工作线程,如果返回false说明没有新创建工作线程,如果返回true说明创建和启动工作线程成功 private boolean addWorker(Runnable firstTask, boolean core) { retry: // 注意这是一个死循环 - 最外层循环 for (int c = ctl.get();;) { // 这个是十分复杂的条件,这里先拆分多个与(&&)条件: // 1. 线程池状态至少为SHUTDOWN状态,也就是rs >= SHUTDOWN(0) // 2. 线程池状态至少为STOP状态,也就是rs >= STOP(1),或者传入的任务实例firstTask不为null,或者任务队列为空 // 其实这个判断的边界是线程池状态为shutdown状态下,不会再接受新的任务,在此前提下如果状态已经到了STOP、或者传入任务不为空、或者任务队列为空(已经没有积压任务)都不需要添加新的线程 if (runStateAtLeast(c, SHUTDOWN) && (runStateAtLeast(c, STOP) || firstTask != null || workQueue.isEmpty())) return false; // 注意这也是一个死循环 - 二层循环 for (;;) { // 这里每一轮循环都会重新获取工作线程数wc // 1. 如果传入的core为true,表示将要创建核心线程,通过wc和corePoolSize判断,如果wc >= corePoolSize,则返回false表示创建核心线程失败 // 1. 如果传入的core为false,表示将要创非建核心线程,通过wc和maximumPoolSize判断,如果wc >= maximumPoolSize,则返回false表示创建非核心线程失败 if (workerCountOf(c) >= ((core ? corePoolSize : maximumPoolSize) & COUNT_MASK)) return false; // 成功通过CAS更新工作线程数wc,则break到最外层的循环 if (compareAndIncrementWorkerCount(c)) break retry; // 走到这里说明了通过CAS更新工作线程数wc失败,这个时候需要重新判断线程池的状态是否由RUNNING已经变为SHUTDOWN c = ctl.get(); // Re-read ctl // 如果线程池状态已经由RUNNING已经变为SHUTDOWN,则重新跳出到外层循环继续执行 if (runStateAtLeast(c, SHUTDOWN)) continue retry; // 如果线程池状态依然是RUNNING,CAS更新工作线程数wc失败说明有可能是并发更新导致的失败,则在内层循环重试即可 // else CAS failed due to workerCount change; retry inner loop } } // 标记工作线程是否启动成功 boolean workerStarted = false; // 标记工作线程是否创建成功 boolean workerAdded = false; Worker w = null; try { // 传入任务实例firstTask创建Worker实例,Worker构造里面会通过线程工厂创建新的Thread对象,所以下面可以直接操作Thread t = w.thread // 这一步Worker实例已经创建,但是没有加入工作线程集合或者启动它持有的线程Thread实例 w = new Worker(firstTask); final Thread t = w.thread; if (t != null) { // 这里需要全局加锁,因为会改变一些指标值和非线程安全的集合 final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { // Recheck while holding lock. // Back out on ThreadFactory failure or if // shut down before lock acquired. int c = ctl.get(); // 这里主要在加锁的前提下判断ThreadFactory创建的线程是否存活或者判断获取锁成功之后线程池状态是否已经更变为SHUTDOWN // 1. 如果线程池状态依然为RUNNING,则只需要判断线程实例是否存活,需要添加到工作线程集合和启动新的Worker // 2. 如果线程池状态小于STOP,也就是RUNNING或者SHUTDOWN状态下,同时传入的任务实例firstTask为null,则需要添加到工作线程集合和启动新的Worker // 对于2,换言之,如果线程池处于SHUTDOWN状态下,同时传入的任务实例firstTask不为null,则不会添加到工作线程集合和启动新的Worker // 这一步其实有可能创建了新的Worker实例但是并不启动(临时对象,没有任何强引用),这种Worker有可能成功下一轮GC被收集的垃圾对象 if (isRunning(c) || (runStateLessThan(c, STOP) && firstTask == null)) { if (t.isAlive()) // precheck that t is startable throw new IllegalThreadStateException(); // 把创建的工作线程实例添加到工作线程集合 workers.add(w); int s = workers.size(); // 尝试更新历史峰值工作线程数,也就是线程池峰值容量 if (s > largestPoolSize) largestPoolSize = s; // 这里更新工作线程是否启动成功标识为true,后面才会调用Thread#start()方法启动真实的线程实例 workerAdded = true; } } finally { mainLock.unlock(); } // 如果成功添加工作线程,则调用Worker内部的线程实例t的Thread#start()方法启动真实的线程实例 if (workerAdded) { t.start(); // 标记线程启动成功 workerStarted = true; } } } finally { // 线程启动失败,需要从工作线程集合移除对应的Worker if (! workerStarted) addWorkerFailed(w); } return workerStarted; } // 添加Worker失败 private void addWorkerFailed(Worker w) { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { // 从工作线程集合移除之 if (w != null) workers.remove(w); // wc数量减1 decrementWorkerCount(); // 基于状态判断尝试终结线程池 tryTerminate(); } finally { mainLock.unlock(); } } 复制代码
笔者发现了Doug Lea
大神十分喜欢复杂的条件判断,而且单行复杂判断不喜欢加花括号,像下面这种代码在他编写的很多类库中都比较常见:
if (runStateAtLeast(c, SHUTDOWN) && (runStateAtLeast(c, STOP) || firstTask != null || workQueue.isEmpty())) return false; // .... // 代码拆分一下如下 boolean atLeastShutdown = runStateAtLeast(c, SHUTDOWN); # rs >= SHUTDOWN(0) boolean atLeastStop = runStateAtLeast(c, STOP) || firstTask != null || workQueue.isEmpty(); if (atLeastShutdown && atLeastStop){ return false; } 复制代码
上面的分析逻辑中需要注意一点,Worker
实例创建的同时,在其构造函数中会通过ThreadFactory
创建一个Java线程Thread
实例,后面会加锁后二次检查是否需要把Worker
实例添加到工作线程集合workers
中和是否需要启动Worker
中持有的Thread
实例,只有启动了Thread
实例实例,Worker
才真正开始运作,否则只是一个无用的临时对象。Worker
本身也实现了Runnable
接口,它可以看成是一个Runnable
的适配器。