ThreadPoolExecutor解析

简介: 本文深入解析Java线程池核心类ThreadPoolExecutor的实现原理,涵盖工作队列、线程工厂、拒绝策略等关键组件,剖析任务提交与执行流程,揭示Worker工作机制及线程复用原理,帮助理解线程池如何高效管理并发任务。

上文中描述了Java中线程池相关的架构,了解了这些内容其实我们就可以使用java的线程池为我们工作了,使用其提供的线程池我们可以很方便的写出高质量的多线程代码,本节将分析ThreadPoolExecutor的实现,来探索线程池的运行原理。
下面是几个比较关键的类成员:

// 任务队列,我们的任务会添加到该队列里面,线程将从该队列获取任务来执行
private final BlockingQueue workQueue;

//任务的执行值集合,来消费workQueue里面的任务
private final HashSet workers = new HashSet();

//线程工厂
private volatile ThreadFactory threadFactory;

//拒绝策略,默认会抛出异异常,还要其他几种拒绝策略如下:
private volatile RejectedExecutionHandler handler;

1、CallerRunsPolicy:在调用者线程里面运行该任务
2、DiscardPolicy:丢弃任务
3、DiscardOldestPolicy:丢弃workQueue的头部任务

//最下保活work数量
private volatile int corePoolSize;

//work上限
private volatile int maximumPoolSize;

我们尝试执行submit方法,下面是执行的关键路径,总结起来就是:如果Worker数量还没达到上限则继续创建,否则提交任务到workQueue,然后让worker来调度运行任务。
step 1:
Future<?> submit(Runnable task);

step 2:<AbstractExecutorService>
    public Future<?> submit(Runnable task) {
    if (task == null) throw new NullPointerException();
    RunnableFuture<Void> ftask = newTaskFor(task, null);
    execute(ftask);
    return ftask;
}

step 3:<Executor>
void execute(Runnable command);

step 4:<ThreadPoolExecutor>
 public void execute(Runnable command) {
    if (command == null)
        throw new NullPointerException();
    /*
     * Proceed in 3 steps:
     *
     * 1. If fewer than corePoolSize threads are running, try to
     * start a new thread with the given command as its first
     * task.  The call to addWorker atomically checks runState and
     * workerCount, and so prevents false alarms that would add
     * threads when it shouldn't, by returning false.
     *
     * 2. If a task can be successfully queued, then we still need
     * to double-check whether we should have added a thread
     * (because existing ones died since last checking) or that
     * the pool shut down since entry into this method. So we
     * recheck state and if necessary roll back the enqueuing if
     * stopped, or start a new thread if there are none.
     *
     * 3. If we cannot queue task, then we try to add a new
     * thread.  If it fails, we know we are shut down or saturated
     * and so reject the task.
     */
    int c = ctl.get();
    if (workerCountOf(c) < corePoolSize) {
        if (addWorker(command, true))
            return;
        c = ctl.get();
    }
    if (isRunning(c) && workQueue.offer(command)) { //提交我们的额任务到workQueue
        int recheck = ctl.get();
        if (! isRunning(recheck) && remove(command))
            reject(command);
        else if (workerCountOf(recheck) == 0)
            addWorker(null, false);
    }
    else if (!addWorker(command, false)) //使用maximumPoolSize作为边界
        reject(command); //还不行?拒绝提交的任务
}

step 5:<ThreadPoolExecutor>
private boolean addWorker(Runnable firstTask, boolean core) 


step 6:<ThreadPoolExecutor>
w = new Worker(firstTask); //包装任务
final Thread t = w.thread; //获取线程(包含任务)
workers.add(w);   // 任务被放到works中
t.start(); //执行任务

上面的流程是高度概括的,实际情况远比这复杂得多,但是我们关心的是怎么打通整个流程,所以这样分析问题是没有太大的问题的。观察上面的流程,我们发现其实关键的地方在于Worker,如果弄明白它是如何工作的,那么我们也就大概明白了线程池是怎么工作的了。
thread是Worker的工作线程,上面的分析我们也发现了在addWorker中会获取worker里面的thread然后start,也就是这个线程的执行,而Worker实现了Runnable接口,所以在构造thread的时候Worker将自己传递给了构造函数,thread.start执行的其实就是Worker的run方法。下面是run方法的内容:
我们来分析一下runWorker这个方法,这就是整个线程池的核心。首先获取到了我们刚提交的任务firstTask,然后会循环从workQueue里面获取任务来执行,获取任务的方法如下:
编译前 编译后 while (1); mov eax,1 test eax,eax je foo+23h jmp foo+18h
编译前 编译后 for (;;); jmp foo+23h   

对比之下,for (;;)指令少,不占用寄存器,而且没有判断跳转,比while (1)好。
也就是说两者在在宏观上完全一样的逻辑,但是底层完全不一样,for相对于来说更加简洁明了
其实核心也就一句:
我们再回头看一下execute,其实我们上面只走了一条逻辑,在execute的时候,我们的worker的数量还没有到达我们设定的corePoolSize的时候,会走上面我们分析的逻辑,而如果达到了我们设定的阈值之后,execute中会尝试去提交任务,如果提交成功了就结束,否则会拒绝任务的提交。我们上面还提到一个成员:maximumPoolSize,其实线程池的最大的Worker数量应该是maximumPoolSize,但是我们上面的分析是corePoolSize,这是因为我们的private boolean addWorker(Runnable firstTask, boolean core)的参数core的值来控制的,core为true则使用corePoolSize来设定边界,否则使用maximumPoolSize来设定边界。
直观的解释一下,当线程池里面的Worker数量还没有到corePoolSize,那么新添加的任务会伴随着产生一个新的worker,如果Worker的数量达到了corePoolSize,那么就将任务存放在阻塞队列中等待Worker来获取执行,如果没有办法再向阻塞队列放任务了,那么这个时候maximumPoolSize就变得有用了,新的任务将会伴随着产生一个新的Worker,如果线程池里面的Worker已经达到了maximumPoolSize,那么接下来提交的任务只能被拒绝策略拒绝了。可以参考下面的描述来理解:
|---corePoolSize[创建]---||---workQueue[等待keepAliveTime]---||---maximumPoolSize[创建]---||---拒绝策略---|
Java
运行代码
复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20

  • When a new task is submitted in method {@link #execute(Runnable)},
    • and fewer than corePoolSize threads are running, a new thread is
    • created to handle the request, even if other worker threads are
    • idle. If there are more than corePoolSize but less than
    • maximumPoolSize threads running, a new thread will be created only
    • if the queue is full. By setting corePoolSize and maximumPoolSize
    • the same, you create a fixed-size thread pool. By setting
    • maximumPoolSize to an essentially unbounded value such as {@code
    • Integer.MAX_VALUE}, you allow the pool to accommodate an arbitrary
    • number of concurrent tasks. Most typically, core and maximum pool
    • sizes are set only upon construction, but they may also be changed
    • dynamically using {@link #setCorePoolSize} and {@link
    • setMaximumPoolSize}.

在方法{@link #execute(Runnable)}中提交新任务时,
如果运行的线程小于corePoolSize,则创建新线程处理请求,即使其他工作线程闲置。
如果运行的线程大于corePoolSize,但是小于maximumPoolSize,当线程运行时,如果队列已满则会创建一个新线程
同样通过设置corePoolSize和maximumPoolSize,创建一个固定大小的线程池。通过设置maximumPoolSize到一个
本质上无界的值,比如{@code Integer.MAX_VALUE},您允许池容纳任意的并发任务的数量。
最典型的是核心池和最大池尺寸只在构造时设置,但也可以更改动态使用{@link #setCorePoolSize}和{@link #setMaximumPoolSize}。
在此需要说明一点,有一个重要的成员:keepAliveTime,当线程池里面的线程数量超过corePoolSize了,那么超出的线程将会在空闲keepAliveTime之后被terminated。可以参考下面的文档:
Plain Text
复制代码
1
2
3

  • If the pool currently has more than corePoolSize threads,
    • excess threads will be terminated if they have been idle for more
    • than the keepAliveTime (see {@link #getKeepAliveTime(TimeUnit)}).
相关文章
|
13天前
|
数据采集 人工智能 安全
|
8天前
|
编解码 人工智能 自然语言处理
⚽阿里云百炼通义万相 2.6 视频生成玩法手册
通义万相Wan 2.6是全球首个支持角色扮演的AI视频生成模型,可基于参考视频形象与音色生成多角色合拍、多镜头叙事的15秒长视频,实现声画同步、智能分镜,适用于影视创作、营销展示等场景。
657 4
|
8天前
|
机器学习/深度学习 人工智能 前端开发
构建AI智能体:七十、小树成林,聚沙成塔:随机森林与大模型的协同进化
随机森林是一种基于决策树的集成学习算法,通过构建多棵决策树并结合它们的预测结果来提高准确性和稳定性。其核心思想包括两个随机性:Bootstrap采样(每棵树使用不同的训练子集)和特征随机选择(每棵树分裂时只考虑部分特征)。这种方法能有效处理大规模高维数据,避免过拟合,并评估特征重要性。随机森林的超参数如树的数量、最大深度等可通过网格搜索优化。该算法兼具强大预测能力和工程化优势,是机器学习中的常用基础模型。
350 164
|
7天前
|
机器学习/深度学习 自然语言处理 机器人
阿里云百炼大模型赋能|打造企业级电话智能体与智能呼叫中心完整方案
畅信达基于阿里云百炼大模型推出MVB2000V5智能呼叫中心方案,融合LLM与MRCP+WebSocket技术,实现语音识别率超95%、低延迟交互。通过电话智能体与座席助手协同,自动化处理80%咨询,降本增效显著,适配金融、电商、医疗等多行业场景。
359 155

热门文章

最新文章