Executors工厂类详解

简介: 本文详解Java中Executors类提供的12种线程池创建方法,涵盖newFixedThreadPool、newCachedThreadPool、newSingleThreadExecutor及newWorkStealingPool等,分析其核心参数、工作原理与适用场景,帮助开发者根据并发需求合理选择线程池类型,提升性能与资源利用率。

首先列出了Executors这个类提供的一些方法。

Executors方法

本文需要对以上12个类做一些区分,从其特点出发,然后分析其应用场景。

public static ExecutorService newFixedThreadPool(int nThreads)

使用这个方法会产生这样一个线程池:线程池最多会保持nThreads个线程处于活动状态,如果当前所有任务都处于活动状态,那么新提交的任务会被添加到任务阻塞队列中去。总结一下就是:使用固定大小的线程池,并发数是固定的。

  • Creates a thread pool that reuses a fixed number of threads
    • operating off a shared unbounded queue. At any point, at most
    • {@code nThreads} threads will be active processing tasks.
    • If additional tasks are submitted when all threads are active,
    • they will wait in the queue until a thread is available.
    • If any thread terminates due to a failure during execution
    • prior to shutdown, a new one will take its place if needed to
    • execute subsequent tasks. The threads in the pool will exist
    • until it is explicitly {@link ExecutorService#shutdown shutdown}.

public static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory)

相比于newFixedThreadPool(int nThreads), 你可以使用这个方法来传递你自己的线程工厂,线程工厂是用来干嘛的?就是用来生成线程的,你可以使用线程工厂做一些个性化的线程特性定制。

public static ExecutorService newWorkStealingPool(int parallelism)

在了解或者使用这个方法之前,你你该对java的Fork/Join并行框架有一些了解,如果你想要快速了解一下该部分的内容,可以参考这篇文章:Java Fork/Join并行框架。

从名字上我们就知道这个方法生产出来的线程池具有某种“小偷”的行为,在Fork/Join里面,线程的工作模式为“盗窃算法”,也就是在自己的任务队列消费完了之后不是进入等到状态,而是会主动去偷窃别的线程的任务来做,其实是没有一种奖励机制来鼓励这些线程去帮助别的线程去消费任务的,所以可以认为这些线程都是好人,都为了快速完成任务协调作战。这种工作方式的重点在于,每个线程都将有一个任务队列,线程之间通过“偷窃”的方式互相帮助来完成任务的消费。
可以看下这个方法的实现:

return new ForkJoinPool(parallelism, ForkJoinPool.defaultForkJoinWorkerThreadFactory, null, true);

可以发现,这个方法不是使用我们在第一篇文章中分析了ThreadPoolExecutor来生成线程池。而是使用了ForkJoinPool,也就是Fork/Join里面的线程池,关于ForkJoinPool更为深入的分析不再本文的涉及范围内,你只要知道Fork/Join框架的一般运行原理就可以了,下面的描述可以帮助你决策你是否需要该方法提供的线程池来工作:

  • Creates a thread pool that maintains enough threads to support
    • the given parallelism level, and may use multiple queues to
    • reduce contention. The parallelism level corresponds to the
    • maximum number of threads actively engaged in, or available to
    • engage in, task processing. The actual number of threads may
    • grow and shrink dynamically. A work-stealing pool makes no
    • guarantees about the order in which submitted tasks are
    • executed.

public static ExecutorService newWorkStealingPool()

参考newWorkStealingPool(int parallelism)。

public static ExecutorService newSingleThreadExecutor()

下面是对该方法的描述:

  • Creates an Executor that uses a single worker thread operating
    • off an unbounded queue. (Note however that if this single
    • thread terminates due to a failure during execution prior to
    • shutdown, a new one will take its place if needed to execute
    • subsequent tasks.) Tasks are guaranteed to execute
    • sequentially, and no more than one task will be active at any
    • given time. Unlike the otherwise equivalent
    • {@code newFixedThreadPool(1)} the returned executor is
    • guaranteed not to be reconfigurable to use additional threads.

可以从方法的名字上知道,该方法产生的线程池仅仅有一个Worker,任何时刻都将只有一个Worker在工作,添加的任务有很大概率被放在阻塞任务队列中等待执行。这些任务会被顺序执行,这个方法的返回值其实是对ThreadPoolExecutor的一层包装,下面的代码展示了最终执行任务的类:

static class DelegatedExecutorService extends AbstractExecutorService {
private final ExecutorService e;
DelegatedExecutorService(ExecutorService executor) { e = executor; }
public void execute(Runnable command) { e.execute(command); }
public void shutdown() { e.shutdown(); }
public List shutdownNow() { return e.shutdownNow(); }
public boolean isShutdown() { return e.isShutdown(); }
public boolean isTerminated() { return e.isTerminated(); }
public boolean awaitTermination(long timeout, TimeUnit unit)
throws InterruptedException {
return e.awaitTermination(timeout, unit);
}
public Future<?> submit(Runnable task) {
return e.submit(task);
}
public Future submit(Callable task) {
return e.submit(task);
}
public Future submit(Runnable task, T result) {
return e.submit(task, result);
}
public List> invokeAll(Collection<? extends Callable> tasks)
throws InterruptedException {
return e.invokeAll(tasks);
}
public List> invokeAll(Collection<? extends Callable> tasks,
long timeout, TimeUnit unit)
throws InterruptedException {
return e.invokeAll(tasks, timeout, unit);
}
public T invokeAny(Collection<? extends Callable> tasks)
throws InterruptedException, ExecutionException {
return e.invokeAny(tasks);
}
public T invokeAny(Collection<? extends Callable> tasks,
long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException {
return e.invokeAny(tasks, timeout, unit);
}
}

从上面的代码可以看出,这个类其实就是使用了构造时传递的参数e来完成,更像是代理。而e是什么?看下面的代码:

ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue())

其实就是一个只有一个线程的ThreadPoolExecutor。

public static ExecutorService newSingleThreadExecutor(ThreadFactory threadFactory)

参考newSingleThreadExecutor(),多了一个线程工厂参数

public static ExecutorService newCachedThreadPool()

首先看它的方法体内容:

return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue());

可以看到,核心线程数量为0,而上限为Integer.MAX_VALUE,而且keepAliveTime为60秒,那么这个线程池的工作模式为:只要有任务呗提交,而且当前没有空闲的线程可用,那么就会创建一个新的Worker来工作,一个线程工作完了之后会缓存(idle)60秒,如果60秒之内有新的任务提交,则会被唤醒进入工作模式,否则60秒后就会被回收。可以参考下面的描述:

  • Creates a thread pool that creates new threads as needed, but
    • will reuse previously constructed threads when they are
    • available. These pools will typically improve the performance
    • of programs that execute many short-lived asynchronous tasks.
    • Calls to {@code execute} will reuse previously constructed
    • threads if available. If no existing thread is available, a new
    • thread will be created and added to the pool. Threads that have
    • not been used for sixty seconds are terminated and removed from
    • the cache. Thus, a pool that remains idle for long enough will
    • not consume any resources. Note that pools with similar
    • properties but different details (for example, timeout parameters)
    • may be created using {@link ThreadPoolExecutor} constructors.

从描述上,我们可以想到,其实这种类型的线程池比较适合于短期高流量的场景,也就是我们所说的“秒杀”场景,在那样的场景下,需要的线程数量较多,那么使用该类型的线程池可以满足,而且该线程池还有自动收缩的功能,在不需要那么多线程的时候,会自动回收线程,释放资源。

public static ExecutorService newCachedThreadPool(ThreadFactory threadFactory)

参考newCachedThreadPool()。

public static ScheduledExecutorService newSingleThreadScheduledExecutor()

只有一个线程的调度线程池,类似于newSingleThreadExecutor,但是该方法生产的线程池具备调度功能,下面是对该方法的描述:

  • Creates a single-threaded executor that can schedule commands
    • to run after a given delay, or to execute periodically.
    • (Note however that if this single
    • thread terminates due to a failure during execution prior to
    • shutdown, a new one will take its place if needed to execute
    • subsequent tasks.) Tasks are guaranteed to execute
    • sequentially, and no more than one task will be active at any
    • given time. Unlike the otherwise equivalent
    • {@code newScheduledThreadPool(1)} the returned executor is
    • guaranteed not to be reconfigurable to use additional threads.

public static ScheduledExecutorService newSingleThreadScheduledExecutor(ThreadFactory threadFactory)

参考newSingleThreadExecutor和newSingleThreadScheduledExecutor。

public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize)

参考newFixedThreadPool,但是返回类型不一样。

public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize, ThreadFactory threadFactory)

参考newFixedThreadPool。

通过上面的分析,我们应该对java线程池的理解更为深入,再次说明,第五节是对前面四节内容的补充,你应该首先前四节之后再来阅读第五节,那样内容上更完整,但是单独阅读本文一样具备独立性,但是收获肯定没有同时阅读那样多。

相关文章
|
18小时前
|
Java 调度
ScheduledThreadPoolExecutor解析
ScheduledThreadPoolExecutor继承自ThreadPoolExecutor,支持延时及周期性任务调度。其核心在于ScheduledFutureTask和DelayedWorkQueue:前者通过重设执行时间实现周期性,后者基于延迟队列实现任务的定时触发,结合addWorker机制确保线程池持续运行,从而完成精准调度。
|
18小时前
3.3.3 集合的删除
集合删除语法为 `db.collection.drop()` 或 `db.集合.drop()`,成功返回 true,失败返回 false。例如:`db.mycollection.drop()` 可删除 mycollection 集合。
|
17小时前
|
Java 调度
ScheduleExecutorService
当ScheduledExecutorService的调度周期小于任务执行时间时,任务不会并发执行,而是等待前一次完成后再立即执行。因任务被放入延时队列,下次触发时间基于上一次设定周期计算,若未完成则延迟执行,实际调度周期等于任务执行时间,避免了任务堆积和并发冲突。
|
17小时前
|
Java 调度
ScheduleExecutorService提交死循环任务
本文探讨向调度线程池提交死循环任务的影响。通过分析`scheduleAtFixedRate`的执行流程,揭示任务如何被包装、调度及执行。死循环将导致线程被永久占用,无法释放,若线程池容量有限,后续任务将被阻塞,影响整体调度。结合延时队列与任务重提交机制,完整呈现周期性任务的运行原理。
|
17小时前
|
NoSQL MongoDB
3.4.1 文档的插入
MongoDB中使用insert()或insertMany()向集合插入文档,支持单条或批量添加。若集合不存在则自动创建,未指定_id时自动生成,整型需用NumberInt(),日期用new Date()。键值对有序,区分类型和大小写,不可重复,键命名需遵循UTF-8规范,避免特殊字符。批量插入失败不回滚已成功数据,建议用try-catch捕获异常。
|
17小时前
|
存储 缓存 Java
ThreadLocal
ThreadLocal是线程本地变量,为每个线程提供独立的变量副本,避免线程竞争。每个线程可独享自己的数据,互不干扰。通过get、set、remove方法操作线程私有数据,底层由Thread中的ThreadLocalMap存储,线程结束时自动清理,也可手动调用remove防止内存泄漏。
|
17小时前
InheritableThreadLocal
ThreadLocal子线程无法继承父线程变量,而InheritableThreadLocal可实现父子线程间数据传递。其原理是在Thread初始化时,复制父线程的inheritableThreadLocals到子线程,通过createInheritedMap创建新的ThreadLocalMap,实现值的继承,适用于需传递上下文的场景。
|
17小时前
|
JSON NoSQL MongoDB
3.4.2 文档的基本查询
MongoDB使用`find()`查询数据,支持条件筛选与投影。`find({})`查所有文档,`_id`字段默认存在;可指定条件如`{userid:&#39;1003&#39;}`查询匹配记录,用`findOne()`返回第一条。投影参数控制字段显示,如`{userid:1,nickname:1,_id:0}`仅显指定字段,省略 `_id`。
|
18小时前
|
存储 NoSQL 关系型数据库
MongoDB相关概念
MongoDB是一款高性能、无模式的文档型数据库,适用于高并发、海量数据、高扩展性场景。广泛应用于社交、游戏、物联网、物流、视频直播等领域,适合数据量大、读写频繁、事务要求不高的应用。支持灵活的JSON-like文档存储,具备高可用、水平扩展、地理查询等优势,是Web2.0和大数据时代的理想选择。
|
18小时前
|
NoSQL MongoDB
3.4.3 文档的更新
MongoDB update语法用于修改集合中的文档,支持覆盖更新与局部更新。使用`$set`可避免字段丢失,`multi: true`实现批量修改,`$inc`则用于数值增减。注意指定查询条件与选项,确保数据准确更新。(238字)