三、ThreadPoolExecutor解析
上文中描述了Java中线程池相关的架构,了解了这些内容其实我们就可以使用java的线程池为我们工作了,使用其提供的线程池我们可以很方便的写出高质量的多线程代码,本节将分析ThreadPoolExecutor的实现,来探索线程池的运行原理。下面的图片展示了ThreadPoolExecutor的类图:
图片加载失败
ThreadPoolExecutor的类图
下面是几个比较关键的类成员:
我们尝试执行submit方法,下面是执行的关键路径,总结起来就是:如果Worker数量还没达到上限则继续创建,否则提交任务到workQueue,然后让worker来调度运行任务。
上面的流程是高度概括的,实际情况远比这复杂得多,但是我们关心的是怎么打通整个流程,所以这样分析问题是没有太大的问题的。观察上面的流程,我们发现其实关键的地方在于Worker,如果弄明白它是如何工作的,那么我们也就大概明白了线程池是怎么工作的了。下面分析一下Worker类。
worker类图
上面的图片展示了Worker的类关系图,关键在于他实现了Runnable接口,所以问题的关键就在于run方法上。在这之前,我们来看一下Worker类里面的关键成员:
thread是Worker的工作线程,上面的分析我们也发现了在addWorker中会获取worker里面的thread然后start,也就是这个线程的执行,而Worker实现了Runnable接口,所以在构造thread的时候Worker将自己传递给了构造函数,thread.start执行的其实就是Worker的run方法。下面是run方法的内容:
我们来分析一下runWorker这个方法,这就是整个线程池的核心。首先获取到了我们刚提交的任务firstTask,然后会循环从workQueue里面获取任务来执行,获取任务的方法如下:
编译前 编译后 while (1); mov eax,1 test eax,eax je foo+23h jmp foo+18h
编译前 编译后 for (;;); jmp foo+23h
对比之下,for (;;)指令少,不占用寄存器,而且没有判断跳转,比while (1)好。
也就是说两者在在宏观上完全一样的逻辑,但是底层完全不一样,for相对于来说更加简洁明了
其实核心也就一句:
我们再回头看一下execute,其实我们上面只走了一条逻辑,在execute的时候,我们的worker的数量还没有到达我们设定的corePoolSize的时候,会走上面我们分析的逻辑,而如果达到了我们设定的阈值之后,execute中会尝试去提交任务,如果提交成功了就结束,否则会拒绝任务的提交。我们上面还提到一个成员:maximumPoolSize,其实线程池的最大的Worker数量应该是maximumPoolSize,但是我们上面的分析是corePoolSize,这是因为我们的private boolean addWorker(Runnable firstTask, boolean core)的参数core的值来控制的,core为true则使用corePoolSize来设定边界,否则使用maximumPoolSize来设定边界。
直观的解释一下,当线程池里面的Worker数量还没有到corePoolSize,那么新添加的任务会伴随着产生一个新的worker,如果Worker的数量达到了corePoolSize,那么就将任务存放在阻塞队列中等待Worker来获取执行,如果没有办法再向阻塞队列放任务了,那么这个时候maximumPoolSize就变得有用了,新的任务将会伴随着产生一个新的Worker,如果线程池里面的Worker已经达到了maximumPoolSize,那么接下来提交的任务只能被拒绝策略拒绝了。可以参考下面的描述来理解:
|---corePoolSize[创建]---||---workQueue[等待keepAliveTime]---||---maximumPoolSize[创建]---||---拒绝策略---|
Java
运行代码
复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
- When a new task is submitted in method {@link #execute(Runnable)},
- and fewer than corePoolSize threads are running, a new thread is
- created to handle the request, even if other worker threads are
- idle. If there are more than corePoolSize but less than
- maximumPoolSize threads running, a new thread will be created only
- if the queue is full. By setting corePoolSize and maximumPoolSize
- the same, you create a fixed-size thread pool. By setting
- maximumPoolSize to an essentially unbounded value such as {@code
- Integer.MAX_VALUE}, you allow the pool to accommodate an arbitrary
- number of concurrent tasks. Most typically, core and maximum pool
- sizes are set only upon construction, but they may also be changed
- dynamically using {@link #setCorePoolSize} and {@link
setMaximumPoolSize}.
在方法{@link #execute(Runnable)}中提交新任务时,
如果运行的线程小于corePoolSize,则创建新线程处理请求,即使其他工作线程闲置。
如果运行的线程大于corePoolSize,但是小于maximumPoolSize,当线程运行时,如果队列已满则会创建一个新线程
同样通过设置corePoolSize和maximumPoolSize,创建一个固定大小的线程池。通过设置maximumPoolSize到一个
本质上无界的值,比如{@code Integer.MAX_VALUE},您允许池容纳任意的并发任务的数量。
最典型的是核心池和最大池尺寸只在构造时设置,但也可以更改动态使用{@link #setCorePoolSize}和{@link #setMaximumPoolSize}。
在此需要说明一点,有一个重要的成员:keepAliveTime,当线程池里面的线程数量超过corePoolSize了,那么超出的线程将会在空闲keepAliveTime之后被terminated。可以参考下面的文档:
Plain Text
复制代码
1
2
3
- If the pool currently has more than corePoolSize threads,
- excess threads will be terminated if they have been idle for more
- than the keepAliveTime (see {@link #getKeepAliveTime(TimeUnit)}).