ThreadPoolExecutor解析

简介: 本文深入分析Java线程池ThreadPoolExecutor的实现原理,通过类图与源码解析,揭示其核心组件与工作流程。重点探讨任务提交、线程创建、任务调度及拒绝策略等机制,剖析Worker如何通过runWorker循环消费任务队列,从而掌握线程池高效运行的内在逻辑。(238字)

上文中描述了Java中线程池相关的架构,了解了这些内容其实我们就可以使用java的线程池为我们工作了,使用其提供的线程池我们可以很方便的写出高质量的多线程代码,本节将分析ThreadPoolExecutor的实现,来探索线程池的运行原理。下面的图片展示了ThreadPoolExecutor的类图:

ThreadPoolExecutor的类图
下面是几个比较关键的类成员:

// 任务队列,我们的任务会添加到该队列里面,线程将从该队列获取任务来执行
private final BlockingQueue workQueue;

//任务的执行值集合,来消费workQueue里面的任务
private final HashSet workers = new HashSet();

//线程工厂
private volatile ThreadFactory threadFactory;

//拒绝策略,默认会抛出异异常,还要其他几种拒绝策略如下:
private volatile RejectedExecutionHandler handler;

1、CallerRunsPolicy:在调用者线程里面运行该任务
2、DiscardPolicy:丢弃任务
3、DiscardOldestPolicy:丢弃workQueue的头部任务

//最下保活work数量
private volatile int corePoolSize;

//work上限
private volatile int maximumPoolSize;

我们尝试执行submit方法,下面是执行的关键路径,总结起来就是:如果Worker数量还没达到上限则继续创建,否则提交任务到workQueue,然后让worker来调度运行任务。

step 1:
Future<?> submit(Runnable task);

step 2:<AbstractExecutorService>
    public Future<?> submit(Runnable task) {
    if (task == null) throw new NullPointerException();
    RunnableFuture<Void> ftask = newTaskFor(task, null);
    execute(ftask);
    return ftask;
}

step 3:<Executor>
void execute(Runnable command);

step 4:<ThreadPoolExecutor>
 public void execute(Runnable command) {
    if (command == null)
        throw new NullPointerException();
    /*
     * Proceed in 3 steps:
     *
     * 1. If fewer than corePoolSize threads are running, try to
     * start a new thread with the given command as its first
     * task.  The call to addWorker atomically checks runState and
     * workerCount, and so prevents false alarms that would add
     * threads when it shouldn't, by returning false.
     *
     * 2. If a task can be successfully queued, then we still need
     * to double-check whether we should have added a thread
     * (because existing ones died since last checking) or that
     * the pool shut down since entry into this method. So we
     * recheck state and if necessary roll back the enqueuing if
     * stopped, or start a new thread if there are none.
     *
     * 3. If we cannot queue task, then we try to add a new
     * thread.  If it fails, we know we are shut down or saturated
     * and so reject the task.
     */
    int c = ctl.get();
    if (workerCountOf(c) < corePoolSize) {
        if (addWorker(command, true))
            return;
        c = ctl.get();
    }
    if (isRunning(c) && workQueue.offer(command)) { //提交我们的额任务到workQueue
        int recheck = ctl.get();
        if (! isRunning(recheck) && remove(command))
            reject(command);
        else if (workerCountOf(recheck) == 0)
            addWorker(null, false);
    }
    else if (!addWorker(command, false)) //使用maximumPoolSize作为边界
        reject(command); //还不行?拒绝提交的任务
}

step 5:<ThreadPoolExecutor>
private boolean addWorker(Runnable firstTask, boolean core) 


step 6:<ThreadPoolExecutor>
w = new Worker(firstTask); //包装任务
final Thread t = w.thread; //获取线程(包含任务)
workers.add(w);   // 任务被放到works中
t.start(); //执行任务

上面的流程是高度概括的,实际情况远比这复杂得多,但是我们关心的是怎么打通整个流程,所以这样分析问题是没有太大的问题的。观察上面的流程,我们发现其实关键的地方在于Worker,如果弄明白它是如何工作的,那么我们也就大概明白了线程池是怎么工作的了。下面分析一下Worker类。

worker类图
上面的图片展示了Worker的类关系图,关键在于他实现了Runnable接口,所以问题的关键就在于run方法上。在这之前,我们来看一下Worker类里面的关键成员:

final Thread thread;

Runnable firstTask; //我们提交的任务,可能被立刻执行,也可能被放到队列里面

thread是Worker的工作线程,上面的分析我们也发现了在addWorker中会获取worker里面的thread然后start,也就是这个线程的执行,而Worker实现了Runnable接口,所以在构造thread的时候Worker将自己传递给了构造函数,thread.start执行的其实就是Worker的run方法。下面是run方法的内容:

public void run() {
runWorker(this);
}

final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock(); // allow interrupts
boolean completedAbruptly = true;
try {
while (task != null || (task = getTask()) != null) {
w.lock();
// If pool is stopping, ensure thread is interrupted;
// if not, ensure thread is not interrupted. This
// requires a recheck in second case to deal with
// shutdownNow race while clearing interrupt
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
beforeExecute(wt, task);
Throwable thrown = null;
try {
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
afterExecute(task, thrown);
}
} finally {
task = null;
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
processWorkerExit(w, completedAbruptly);
}
}

相关文章
|
18小时前
3.5 文档的分页查询
统计查询使用count()方法,可统计集合中符合条件的记录数量。语法为db.collection.count(query, options)。如统计comment集合全部记录:db.comment.count();按条件统计userid为1003的记录数:db.comment.count({userid:&quot;1003&quot;})。默认返回所有匹配文档的数量。
|
18小时前
|
XML NoSQL Java
5.3 技术选型
本节介绍MongoDB的Java连接驱动mongodb-driver及SpringDataMongoDB持久层框架,通过搭建文章微服务工程,配置Spring Boot整合MongoDB,实现项目初始化与启动。
|
18小时前
|
存储 NoSQL MongoDB
3.2 数据库操作
MongoDB中使用`use 数据库名`选择或创建数据库,若不存在则自动创建。通过`show dbs`查看所有数据库,`db`查看当前库。集合需插入文档后才真正创建。数据库名须为非空、小写、不超过64字节的UTF-8字符串,不可含特殊字符。保留库包括admin(权限管理)、local(本地数据)和config(分片信息)。
|
18小时前
|
存储 NoSQL Linux
2.4 Linux系统中的安装启动和连接
本文介绍在Linux系统部署单机MongoDB用于生产环境的完整步骤,包括下载、解压、目录配置、日志与数据路径设置、配置文件编写及服务启停方法。操作类似Windows,通过配置`mongod.conf`实现后台运行,支持命令行与图形工具连接,并提供防火墙处理与安全关闭服务方案,确保稳定运行。
|
18小时前
|
NoSQL MongoDB
2.3 Compass-图形化界面客户端
前往MongoDB官网下载MongoDB Compass,安装版按提示安装,压缩版解压后运行MongoDBCompassCommunity.exe。打开后输入主机地址、端口等信息,点击连接即可使用。
|
18小时前
|
存储 NoSQL 关系型数据库
4-MongoDB索引知识
MongoDB索引通过B树结构提升查询效率,避免全表扫描。支持单字段和复合索引,前者适用于单一字段排序与查询,后者按字段顺序构建,优化多条件查询与排序操作,显著提升大数据量下的查询性能。
|
18小时前
5.5 文章评论实体类的编写
创建实体类Comment,位于cn.itcast.article.po包中,包含评论的ID、内容、发布时间、用户信息、点赞数、回复数、状态及关联文章ID等属性,并提供getter/setter及toString方法,用于封装评论数据。
|
18小时前
3.5.3 排序查询
sort()方法用于对查询结果排序,1为升序,-1为降序,可按多字段排序。语法:db.集合名.find().sort({字段:1/-1})。与skip()、limit()联用时,执行顺序为先排序,再跳过,最后限制显示。
|
18小时前
3.5.2 分页列表查询
使用limit()限制查询返回的记录数,skip()跳过指定数量数据,实现分页。如每页2条:第一页skip(0).limit(2),第二页skip(2).limit(2),以此类推,默认limit为20,skip为0。