线程池

简介: 线程池是将多个线程统一管理的“池化”技术,避免频繁创建销毁线程带来的开销。Java中通过`ExecutorService`和`ThreadPoolExecutor`等类实现,核心原理是复用线程、任务队列调度及合理的拒绝策略。`ScheduledThreadPoolExecutor`支持延时与周期性任务,基于`DelayedWorkQueue`实现延迟调度。`Executors`工厂类提供多种线程汛建造方法,如固定大小、缓存型、单线程等,适用于不同并发场景,提升系统性能与资源利用率。

一、线程池初探
所谓线程池,就是将多个线程放在一个池子里面(所谓池化技术),然后需要线程的时候不是创建一个线程,而是从线程池里面获取一个可用的线程,然后执行我们的任务。线程池的关键在于它为我们管理了多个线程,我们不需要关心如何创建线程,我们只需要关系我们的核心业务,然后需要线程来执行任务的时候从线程池中获取线程。任务执行完之后线程不会被销毁,而是会被重新放到池子里面,等待机会去执行任务。
我们为什么需要线程池呢?首先一点是线程池为我们提高了一种简易的多线程编程方案,我们不需要投入太多的精力去管理多个线程,线程池会自动帮我们管理好,它知道什么时候该做什么事情,我们只要在需要的时候去获取就可以了。其次,我们使用线程池很大程度上归咎于创建和销毁线程的代价是非常昂贵的,甚至我们创建和销毁线程的资源要比我们实际执行的任务所花费的时间还要长,这显然是不科学也是不合理的,而且如果没有一个合理的管理者,可能会出现创建了过多的线程的情况,也就是在JVM中存活的线程过多,而存活着的线程也是需要销毁资源的,另外一点,过多的线程可能会造成线程过度切换的尴尬境地。
对线程池有了一个初步的认识之后,我们来看看如何使用线程池。
创建一个线程池
提交任务
创建一个调度线程池
提交一个周期性执行的任务
shutdown
可以发现使用线程池非常简单,只需要极少的代码就可以创建出我们需要的线程池,然后将我们的任务提交到线程池中去。我们只需要在结束之时记得关闭线程池就可以了。本文的重点并非在于如何使用线程池,而是试图剖析线程池的实现,比如一个调度线程池是怎么实现的?是靠什么实现的?为什么能这样实现等等问题。
二、Java线程池实现架构
Java中与线程池相关的类有下面一些:
Executor
ExecutorService
ScheduledExecutorService
ThreadPoolExecutor
ScheduledThreadPoolExecutor
Executors
通过上面一节中的使用示例,可以发现Executors类是一个创建线程池的有用的类,事实上,Executors类的角色也就是创建线程池,它是一个工厂类,可以产生不同类型的线程池,而Executor是线程池的鼻祖类,它有两个子类是ExecutorService和ScheduledExecutorService,而ThreadPoolExecutor和ScheduledThreadPoolExecutor则是真正的线程池,我们的任务将被这两个类交由其所管理者的线程池运行,可以发现,ScheduledThreadPoolExecutor是一个集大成者类,下面我们可以看看它的类关系图:
图片加载失败
ScheduledThreadPoolExecutor的类关系图
ScheduledThreadPoolExecutor继承了ThreadPoolExecutor,ThreadPoolExecutor实现了一般的线程池,没有调度功能,而ScheduledThreadPoolExecutor继承了ThreadPoolExecutor的实现,然后增加了调度功能。
ScheduledThreadPoolExecutor相较于ThreadPoolExecutor增加了调度功能
最为原始的Executor只有一个方法execute,它接受一个Runnable类型的参数,意思是使用线程池来执行这个Runnable,可以发现Executor不提供有返回值的任务。ExecutorService继承了Executor,并且极大的增强了Executor的功能,不仅支持有返回值的任务执行,而且还有很多十分有用的方法来为你提供服务。
Executor不提供有返回值的任务,ExecutorService继承自Executor,支持有返回值的任务执行
下面展示了ExecutorService提供的方法:
图片加载失败
ExecutorService提供的方法
ScheduledExecutorService继承了ExecutorService,并且增加了特有的调度(schedule)功能。关于Executor、ExecutorService和ScheduledExecutorService的关系,可以见下图:
图片加载失败
Executor、ExecutorService和ScheduledExecutorService的关系
总结一下,经过我们的调研,可以发现其实对于我们编写多线程代码来说,最为核心的是Executors类,根据我们是需要ExecutorService类型的线程池还是ScheduledExecutorService类型的线程池调用相应的工厂方法就可以了,而ExecutorService的实现表现在ThreadPoolExecutor上,ScheduledExecutorService的实现则表现在ScheduledThreadPoolExecutor上,下文将分别剖析这两者,尝试弄清楚线程池的原理。
三、ThreadPoolExecutor解析
上文中描述了Java中线程池相关的架构,了解了这些内容其实我们就可以使用java的线程池为我们工作了,使用其提供的线程池我们可以很方便的写出高质量的多线程代码,本节将分析ThreadPoolExecutor的实现,来探索线程池的运行原理。下面的图片展示了ThreadPoolExecutor的类图:
图片加载失败
ThreadPoolExecutor的类图
下面是几个比较关键的类成员:
我们尝试执行submit方法,下面是执行的关键路径,总结起来就是:如果Worker数量还没达到上限则继续创建,否则提交任务到workQueue,然后让worker来调度运行任务。
上面的流程是高度概括的,实际情况远比这复杂得多,但是我们关心的是怎么打通整个流程,所以这样分析问题是没有太大的问题的。观察上面的流程,我们发现其实关键的地方在于Worker,如果弄明白它是如何工作的,那么我们也就大概明白了线程池是怎么工作的了。下面分析一下Worker类。
worker类图
上面的图片展示了Worker的类关系图,关键在于他实现了Runnable接口,所以问题的关键就在于run方法上。在这之前,我们来看一下Worker类里面的关键成员:
thread是Worker的工作线程,上面的分析我们也发现了在addWorker中会获取worker里面的thread然后start,也就是这个线程的执行,而Worker实现了Runnable接口,所以在构造thread的时候Worker将自己传递给了构造函数,thread.start执行的其实就是Worker的run方法。下面是run方法的内容:
我们来分析一下runWorker这个方法,这就是整个线程池的核心。首先获取到了我们刚提交的任务firstTask,然后会循环从workQueue里面获取任务来执行,获取任务的方法如下:
编译前 编译后 while (1); mov eax,1 test eax,eax je foo+23h jmp foo+18h
编译前 编译后 for (;;); jmp foo+23h   

对比之下,for (;;)指令少,不占用寄存器,而且没有判断跳转,比while (1)好。
也就是说两者在在宏观上完全一样的逻辑,但是底层完全不一样,for相对于来说更加简洁明了
其实核心也就一句:
我们再回头看一下execute,其实我们上面只走了一条逻辑,在execute的时候,我们的worker的数量还没有到达我们设定的corePoolSize的时候,会走上面我们分析的逻辑,而如果达到了我们设定的阈值之后,execute中会尝试去提交任务,如果提交成功了就结束,否则会拒绝任务的提交。我们上面还提到一个成员:maximumPoolSize,其实线程池的最大的Worker数量应该是maximumPoolSize,但是我们上面的分析是corePoolSize,这是因为我们的private boolean addWorker(Runnable firstTask, boolean core)的参数core的值来控制的,core为true则使用corePoolSize来设定边界,否则使用maximumPoolSize来设定边界。
直观的解释一下,当线程池里面的Worker数量还没有到corePoolSize,那么新添加的任务会伴随着产生一个新的worker,如果Worker的数量达到了corePoolSize,那么就将任务存放在阻塞队列中等待Worker来获取执行,如果没有办法再向阻塞队列放任务了,那么这个时候maximumPoolSize就变得有用了,新的任务将会伴随着产生一个新的Worker,如果线程池里面的Worker已经达到了maximumPoolSize,那么接下来提交的任务只能被拒绝策略拒绝了。可以参考下面的描述来理解:
|---corePoolSize[创建]---||---workQueue[等待keepAliveTime]---||---maximumPoolSize[创建]---||---拒绝策略---|
在此需要说明一点,有一个重要的成员:keepAliveTime,当线程池里面的线程数量超过corePoolSize了,那么超出的线程将会在空闲keepAliveTime之后被terminated。可以参考下面的文档:
四、ScheduledThreadPoolExecutor解析
ScheduledThreadPoolExecutor适用于延时执行或者周期性执行的任务调度,ScheduledThreadPoolExecutor在实现上继承了ThreadPoolExecutor,所以你依然可以将ScheduledThreadPoolExecutor当成ThreadPoolExecutor来使用,但是ScheduledThreadPoolExecutor的功能要强大得多,因为ScheduledThreadPoolExecutor可以根据设定的参数来周期性调度运行,下面的图片展示了四个和周期性相关的方法:
四个Scheduled方法
如果你想延时一段时间之后运行一个Runnable,那么使用第一个方法
如果你想延时一段时间然后运行一个Callable,那么使用的第二个方法
如果你想要延时一段时间,然后根据设定的参数周期执行Runnable,那么可以选择第三个和第四个方法,第三个方法和第四个方法的区别在于:第三个方法严格按照规划的时间路径来执行,比如周期为2,延时为0,那么执行的序列为0,2,4,6,8....,而第四个方法将基于上次执行时间来规划下次的执行,也就是在上次执行完成之后再次执行。比如上面的执行序列0,2,4,6,8...,如果第2秒没有被调度执行,而在第三秒的时候才被调度,那么下次执行的时间不是4,而是5,以此类推。
下面来看一下这四个方法的一些细节:
通过上面的代码我们可以发现,前两个方法是类似的,后两个方法也是类似的。前两个方法属于一次性调度,所以period都为0,区别在于参数不同,一个是Runnable,而一个是Callable,可笑的是,最后都变为了Callable了,见下面的构造函数:
对于后两个方法,区别仅仅在于period的,scheduleWithFixedDelay对参数进行了操作,将原来的时间变为负数了,而后面在计算下次被调度的时间的时候会根据这个参数的正负值来分别处理,正数代表scheduleAtFixedRate,而负数代表了scheduleWithFixedDelay。
一个需要被我们注意的细节是,以上四个方法最后都会调用一个方法: delayedExecute(t),下面看一下这个方法:
大概的意思就是先判断线程池是否被关闭了,如果被关闭了,则拒绝任务的提交,否则将任务加入到任务队列中去等待被调度执行。最后的ensurePrestart的意思是需要确保线程池已经被启动起来了。下面是这个方法:
主要是增加了一个没有任务的worker,有什么用呢?我们还记得Worker的逻辑吗?addWorker方法的执行,会触发Worker的run方法的执行,然后runWorker方法就会被执行,而runWorker方法是循环从workQueue中取任务执行的,所以确保线程池被启动起来是重要的,而只需要简单的执行addWorker便会触发线程池的启动流程。 对于调度线程池来说,只要执行了addWorker方法,那么线程池就会一直在后台周期性的调度执行任务。到此,似乎我们还是没有闹明白ScheduledThreadPoolExecutor是如何实现周期性的,上面讲到四个scheduled方法时,我们没有提一个重要的类:ScheduledFutureTask,对,所有神奇的事情将会发生在这个类中,下面来分析一下这个类。
ScheduledFutureTask类图
看上面的类图,貌似这个类非常复杂,还好,我们发现他实现了Runnable接口,那么必然会有一个run方法,而这个run方法必然是整个类的核心,下面来看一下这个run方法的内容:
首先,判断是否是周期性的任务,如果不是,则直接执行(一次性),否则执行,然后设置下次执行的时间,然后重新调度,等待下次执行。这里有一个方法需要注意,也就是setNextRunTime,上面我们提到scheduleAtFixedRate和scheduleWithFixedDelay在传递参数时不一样,后者将delay值变为了负数,所以下面的处理正好印证了前文所述。
下面来看一下reExecutePeriodic方法是如何做的,他的目标是将任务再次被调度执行,下面的代码展示了这个功能的实现:
可以看到,这个方法就是将我们的任务再次放到了workQueue里面,那这个参数是什么?在上面的run方法中我们调用了reExecutePeriodic方法,参数为outerTask,而这个变量是什么?看下面的代码:
这个变量指向了自己,而this的类型是什么?是ScheduledFutureTask,也就是可以被调度的task,这样就实现了循环执行任务了。
上面的分析已经到了循环执行,但是ScheduledThreadPoolExecutor的功能是周期性执行,所以我们接着分析ScheduledThreadPoolExecutor是如何根据我们的参数走走停停的。这个时候,是应该看一下ScheduledThreadPoolExecutor的构造函数了,我们来看一个最简单的构造函数:
我们知道ScheduledThreadPoolExecutor的父类是ThreadPoolExecutor,所以这里的super其实是ThreadPoolExecutor的构造函数,我们发现其中有一个参数DelayedWorkQueue,看名字貌似是一个延迟队列的样子,进一步跟踪代码,发现了下面的一行代码(构造函数中):
所以在ScheduledThreadPoolExecutor中,workQueue是一个DelayedWorkQueue类型的队列,我们暂且认为DelayedWorkQueue是一种具备延迟功能的队列吧,那么,到此我们便可以想明白了,上面的分析我们明白了ScheduledThreadPoolExecutor是如何循环执行任务的,而这里我们明白了ScheduledThreadPoolExecutor使用DelayedWorkQueue来达到延迟的目标,所以组合起来,就可以实现ScheduledThreadPoolExecutor周期性执行的目标。下面我们来看一下DelayedWorkQueue是如何做到延迟的吧,上文中提到一个方法:getTask,这个方法的作用是从workQueue中取出任务来执行,而在ScheduledThreadPoolExecutor里面,getTask方法是从DelayedWorkQueue中取任务的,而取任务无非两个方法:poll或者take,下面我们对DelayedWorkQueue的take方法来分析一下:
在for循环里面,首先从queue中获取第一个任务,然后从任务中取出延迟时间,而后使用available变量来实现延迟效果。这里面需要几个点需要探索一下:
这个queue是什么东西?
延迟时间的来龙去脉?
available变量的来龙去脉?
对于第一个问题,看下面的代码:
它是一个RunnableScheduledFuture类型的数组,下面是RunnableScheduledFuture类的类关系图:
图片加载失败
RunnableScheduledFuture类关系
数组里面保存了我们的RunnableScheduledFuture,对queue的操作,主要来看一下增加元素和消费元素的操作。首先,假设使用add方法来增加RunnableScheduledFuture到queue,调用的链路如下:
解释一下,add方法直接转到了offer方法,该方法中,首先判断数组的容量是否足够,如果不够则grow,增长的策略如下:
每次增长50%,如此下去。增长完成后,如果这是第一个元素,则放在坐标为0的位置,否则,使用siftUp操作,下面是该方法的内容:
这个数组实现了堆这种数据结构,使用对象比较将最需要被调度执行的RunnableScheduledFuture放到数组的前面,而这得力于compareTo方法,下面是RunnableScheduledFuture类的compareTo方法的实现,主要是通过延迟时间来做比较。
上面是生产元素,下面来看一下消费数据。在上面我们提到的take方法中,使用了一个方法如下:
这个方法中调用了一个方法siftDown,这个方法如下:
对其的解释就是:
总结一下,当我们向queue插入任务的时候,会发生siftUp方法的执行,这个时候会把任务尽量往根部移动,而当我们完成任务调度之后,会发生siftDown方法的执行,与siftUp相反,siftDown方法会将任务尽量移动到queue的末尾。总之,大概的意思就是queue通过compareTo实现了类似于优先级队列的功能。
下面我们来看一下第二个问题:延迟时间的来龙去脉。在上面的take方法里面,首先获取了delay,然后再使用available来做延迟效果,那这个delay从哪里来的呢?通过上面的类图RunnableScheduledFuture的类图我们知道,RunnableScheduledFuture类实现了Delayed接口,而Delayed接口里面的唯一方法是getDelay,我们到RunnableScheduledFuture里面看一下这个方法的具体实现:
time是我们设定的下次执行的时间,所以延迟就是(time - now()),没毛病!
第三个问题:available变量的来龙去脉,至于这个问题,我们看下面的代码:
这是一个条件变量,take方法里面使用这个变量来做延迟效果。Condition可以在多个线程间做同步协调工作,更为具体细致的关于Condition的内容,可以参考更多的资料来学习,本文对此知识点点到为止。
到此为止,我们梳理了ScheduledThreadPoolExecutor是如何实现周期性调度的,首先分析了它的循环性,然后分析了它的延迟效果,对于线程池的学习现在才刚刚起步,需要更多更专业的知识类帮我理解更为底层的内容,当然,为了更进一步理解线程池的实现细节,首先需要对线程间通信有足够的把握,其次是要对各种数据结构有清晰的认识,比如队列、优先级队列、堆等高级的数据结构,以及java语言对于这些数据结构的实现,更为重要的是要结合实际情况分析问题,在工作和平时的学习中不断总结,不断迭代对于线程、线程池的认知
五、Executors工厂类详解
首先列出了Executors这个类提供的一些方法。
图片加载失败
Executors方法
本文需要对以上12个类做一些区分,从其特点出发,然后分析其应用场景。
public static ExecutorService newFixedThreadPool(int nThreads)
使用这个方法会产生这样一个线程池:线程池最多会保持nThreads个线程处于活动状态,如果当前所有任务都处于活动状态,那么新提交的任务会被添加到任务阻塞队列中去。总结一下就是:使用固定大小的线程池,并发数是固定的。
public static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory)
相比于newFixedThreadPool(int nThreads), 你可以使用这个方法来传递你自己的线程工厂,线程工厂是用来干嘛的?就是用来生成线程的,你可以使用线程工厂做一些个性化的线程特性定制。
public static ExecutorService newWorkStealingPool(int parallelism)
在了解或者使用这个方法之前,你你该对java的Fork/Join并行框架有一些了解,如果你想要快速了解一下该部分的内容,可以参考这篇文章:Java Fork/Join并行框架。
从名字上我们就知道这个方法生产出来的线程池具有某种“小偷”的行为,在Fork/Join里面,线程的工作模式为“盗窃算法”,也就是在自己的任务队列消费完了之后不是进入等到状态,而是会主动去偷窃别的线程的任务来做,其实是没有一种奖励机制来鼓励这些线程去帮助别的线程去消费任务的,所以可以认为这些线程都是好人,都为了快速完成任务协调作战。这种工作方式的重点在于,每个线程都将有一个任务队列,线程之间通过“偷窃”的方式互相帮助来完成任务的消费。
可以看下这个方法的实现:
可以发现,这个方法不是使用我们在第一篇文章中分析了ThreadPoolExecutor来生成线程池。而是使用了ForkJoinPool,也就是Fork/Join里面的线程池,关于ForkJoinPool更为深入的分析不再本文的涉及范围内,你只要知道Fork/Join框架的一般运行原理就可以了,下面的描述可以帮助你决策你是否需要该方法提供的线程池来工作:
public static ExecutorService newWorkStealingPool()
参考newWorkStealingPool(int parallelism)。
public static ExecutorService newSingleThreadExecutor()
下面是对该方法的描述:
可以从方法的名字上知道,该方法产生的线程池仅仅有一个Worker,任何时刻都将只有一个Worker在工作,添加的任务有很大概率被放在阻塞任务队列中等待执行。这些任务会被顺序执行,这个方法的返回值其实是对ThreadPoolExecutor的一层包装,下面的代码展示了最终执行任务的类:
Java
运行代码
复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
static class DelegatedExecutorService extends AbstractExecutorService {
private final ExecutorService e;
DelegatedExecutorService(ExecutorService executor) { e = executor; }
public void execute(Runnable command) { e.execute(command); }
public void shutdown() { e.shutdown(); }
public List shutdownNow() { return e.shutdownNow(); }
public boolean isShutdown() { return e.isShutdown(); }
public boolean isTerminated() { return e.isTerminated(); }
public boolean awaitTermination(long timeout, TimeUnit unit)
throws InterruptedException {
return e.awaitTermination(timeout, unit);
}
public Future<?> submit(Runnable task) {
return e.submit(task);
}
public Future submit(Callable task) {
return e.submit(task);
}
public Future submit(Runnable task, T result) {
return e.submit(task, result);
}
public List> invokeAll(Collection<? extends Callable> tasks)
throws InterruptedException {
return e.invokeAll(tasks);
}
public List> invokeAll(Collection<? extends Callable> tasks,
long timeout, TimeUnit unit)
throws InterruptedException {
return e.invokeAll(tasks, timeout, unit);
}
public T invokeAny(Collection<? extends Callable> tasks)
throws InterruptedException, ExecutionException {
return e.invokeAny(tasks);
}
public T invokeAny(Collection<? extends Callable> tasks,
long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException {
return e.invokeAny(tasks, timeout, unit);
}
}
从上面的代码可以看出,这个类其实就是使用了构造时传递的参数e来完成,更像是代理。而e是什么?看下面的代码:
Java
运行代码
复制代码
1
ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue())
其实就是一个只有一个线程的ThreadPoolExecutor。
public static ExecutorService newSingleThreadExecutor(ThreadFactory threadFactory)
参考newSingleThreadExecutor(),多了一个线程工厂参数
public static ExecutorService newCachedThreadPool()
首先看它的方法体内容:
Java
运行代码
复制代码
1
return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.S

相关文章
|
12天前
|
数据采集 人工智能 安全
|
7天前
|
机器学习/深度学习 人工智能 前端开发
构建AI智能体:七十、小树成林,聚沙成塔:随机森林与大模型的协同进化
随机森林是一种基于决策树的集成学习算法,通过构建多棵决策树并结合它们的预测结果来提高准确性和稳定性。其核心思想包括两个随机性:Bootstrap采样(每棵树使用不同的训练子集)和特征随机选择(每棵树分裂时只考虑部分特征)。这种方法能有效处理大规模高维数据,避免过拟合,并评估特征重要性。随机森林的超参数如树的数量、最大深度等可通过网格搜索优化。该算法兼具强大预测能力和工程化优势,是机器学习中的常用基础模型。
344 164
|
6天前
|
机器学习/深度学习 自然语言处理 机器人
阿里云百炼大模型赋能|打造企业级电话智能体与智能呼叫中心完整方案
畅信达基于阿里云百炼大模型推出MVB2000V5智能呼叫中心方案,融合LLM与MRCP+WebSocket技术,实现语音识别率超95%、低延迟交互。通过电话智能体与座席助手协同,自动化处理80%咨询,降本增效显著,适配金融、电商、医疗等多行业场景。
345 155
|
7天前
|
编解码 人工智能 自然语言处理
⚽阿里云百炼通义万相 2.6 视频生成玩法手册
通义万相Wan 2.6是全球首个支持角色扮演的AI视频生成模型,可基于参考视频形象与音色生成多角色合拍、多镜头叙事的15秒长视频,实现声画同步、智能分镜,适用于影视创作、营销展示等场景。
579 4
|
15天前
|
SQL 自然语言处理 调度
Agent Skills 的一次工程实践
**本文采用 Agent Skills 实现整体智能体**,开发框架采用 AgentScope,模型使用 **qwen3-max**。Agent Skills 是 Anthropic 新推出的一种有别于mcp server的一种开发方式,用于为 AI **引入可共享的专业技能**。经验封装到**可发现、可复用的能力单元**中,每个技能以文件夹形式存在,包含特定任务的指导性说明(SKILL.md 文件)、脚本代码和资源等 。大模型可以根据需要动态加载这些技能,从而扩展自身的功能。目前不少国内外的一些框架也开始支持此种的开发方式,详细介绍如下。
1018 7