目录
1、前言
我们继续前面的《【JUC基础】12.线程池(一)》。
2、Java实现线程池
2.1、Executors框架
Executors类是Java并发工具包(java.util.concurrent)中提供的一个工具类,用于创建和管理线程池。它提供了一些静态方法,用于创建不同类型的线程池,简化了线程池的创建和配置过程。
Executor框架提供了各种类型的线程池,主要方法有:
/** * 固定线程大小的线程池 */ public static ExecutorService newFixedThreadPool(int nThreads) /** * 单线程的线程池 */ public static ExecutorService newSingleThreadExecutor() /** * 可根据实际情况调整线程数量的线程池 */ public static ExecutorService newCachedThreadPool() /** * 单线程的线程池,扩展了延时和周期性执行的功能 */ public static ExecutorService newSingleThreadScheduledExecutor() /** * 可执行线程数量的线程池,扩展了延时和周期性执行的功能 */ public static ExecutorService newScheduledThreadPool(int corePoolSize)
2.2、newFixedThreadPool
newFixedThreadPool()方法。返回一个固定线程数量的线程池。线程池中的线程数量始终不变。当有一个新的任务提交时,线程池中若有空闲数量,则立即执行。如果没有,则新的任务会被暂存在一个队列中,等到有空闲的线程时,再从任务队列中取出任务执行。
示例代码:
public class FixedThreadPoolTest { public static void main(String[] args) { // 固定线程数量为3 ExecutorService executorService = Executors.newFixedThreadPool(3); for (int i = 0; i < 6; i++) { executorService.submit(() -> { System.out.println("Thread Id:" + Thread.currentThread().getId()); ThreadUtil.sleep(1000); }); } } }
执行结果如下:我们创建了固定3个线程的线程池,然后我们依次提交6个任务,线程池就会安排这6个任务,然后执行。执行期间我们发现前3个任务和后3个任务的执行时间相差1s,且前3和后3个任务的线程ID是一致的,这就说明线程被分成了2批执行。
2.3、newCachedThreadPool
newCachedThreadPool()方法。返回一个可根据实际情况调整线程数量的线程池。线程池的数量不确定,但如果有空闲线程可以复用,则优先使用可复用线程。如果所有线程都在运行,又有新的任务提交,则会创建新的线程处理任务,处理结束后,线程池回收多余线程。
我们拿2.2示例代码来改造:
public static void main(String[] args) { // 可调整大小线程池 ExecutorService executorService = Executors.newCachedThreadPool(); for (int i = 0; i < 10; i++) { executorService.submit(() -> { System.out.println("Thread Id:" + Thread.currentThread().getId() + " is running..."); ThreadUtil.sleep(1000); System.out.println("Thread Id:" + Thread.currentThread().getId() + " done!"); }); // 这里多睡2秒,验证是否复用了空闲线程 if(i == 1){ ThreadUtil.sleep(2000); } } }
运行结果:
当我们i==1的时候睡了2秒。2秒过后,ID为9和10的线程已经执行结束。所以当第二批开始执行的时候,我们看到线程9和10被复用执行了,而与fixedThreadPool不同的是,他自动调整了线程池的线程数量大小,而非固定。因此我们看到了11、12、13......后的线程被创建。
2.4、newSingleThreadExecutor
newSingleThreadExecutor()。返回一个只有一个线程的线程池。若多于1个任务提交到线程池,任务会被存在任务等待队列中,直到当前线程空闲后,再取出执行。
示例代码:
public static void main(String[] args) { ExecutorService executorService = Executors.newSingleThreadExecutor(); for (int i = 0; i < 10; i++) { executorService.submit(() -> { System.out.println("Thread Id:" + Thread.currentThread().getId() + " is running..."); ThreadUtil.sleep(1000); System.out.println("Thread Id:" + Thread.currentThread().getId() + " done!"); }); } }
执行结果:
2.5、newScheduledThreadPool
newScheduledThreadPool()。可以根据时间需要对线程进行调度的线程池。主要有两个方法:
/** * 创建并执行在给定的初始延迟之后,随后以给定的时间段首先启用的周期性动作; 那就是执行将在initialDelay之后开始,然后是initialDelay+period * ,然后是initialDelay + 2 * period ,等等。 */ public ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit); /** * 创建并执行在给定的初始延迟之后首先启用的定期动作,随后在一个执行的终止和下一个执行的开始之间给定的延迟。 */ public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit);
与其他线程池不同,该线程池不一定会立即安排任务执行。更多是起到了定时计划的作用。
2.5.1、scheduleAtFixedRate
使用scheduleAtFixedRate()来调度一个任务。这个任务执行1秒,调度周期是2秒。那么这个任务就会每2秒执行一次。
public static void main(String[] args) { ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(3); scheduledExecutorService.scheduleAtFixedRate(() -> { System.out.println(DateUtil.now() + ":Thread Id:" + Thread.currentThread().getId() + " is running..."); ThreadUtil.sleep(1000); // 参数值0表示:立即执行,不延迟 // 参数值2表示:计划周期为2秒 }, 0, 2, TimeUnit.SECONDS); }
执行结果:
当然,scheduleAtFixedRate是不会允许任务堆叠的情况。当一个任务执行时间大于周期时间时,那么周期计划就会等待任务结束。
举个例子:
如周期为2秒,一个任务执行了1秒。那么该计划周期为2秒;
如周期为2秒,一个任务执行了5秒。那么该计划周期会等待任务5秒执行结束,周期就变为5秒;
public static void main(String[] args) { ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(3); scheduledExecutorService.scheduleAtFixedRate(() -> { System.out.println(DateUtil.now() + ":Thread Id:" + Thread.currentThread().getId() + " is running..."); // 这里任务执行改为5秒 ThreadUtil.sleep(5000); }, 0, 2, TimeUnit.SECONDS); }
执行结果:
2.5.2、scheduleWithFixedDelay
使用scheduleWithFixedDelay()来调度一个任务。这个任务执行1秒,调度周期是2秒。那么这个任务就会每(2+1)秒执行一次。
public static void main(String[] args) { ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(3); scheduledExecutorService.scheduleWithFixedDelay(() -> { System.out.println(DateUtil.now() + ":Thread Id:" + Thread.currentThread().getId() + " is running..."); ThreadUtil.sleep(1000); }, 0, 2, TimeUnit.SECONDS); }
执行结果:
同样,scheduleWithFixedDelay是不会允许任务堆叠的情况。当一个任务执行时间大于周期时间时,那么周期计划就会等待任务结束。
举个例子:
如周期为2秒,一个任务执行了1秒。那么该计划周期为(2+1)秒;
如周期为2秒,一个任务执行了5秒。那么该计划周期会等待任务5秒执行结束,周期就变为(5+2)秒;
public static void main(String[] args) { ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(3); scheduledExecutorService.scheduleWithFixedDelay(() -> { System.out.println(DateUtil.now() + ":Thread Id:" + Thread.currentThread().getId() + " is running..."); ThreadUtil.sleep(5000); }, 0, 2, TimeUnit.SECONDS); }
执行结果:
2.5.3、异常中断
需要注意的是,如果任务本身抛出异常,那么后续的所有任务都会被中断。
public static void main(String[] args) { ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(3); AtomicInteger i = new AtomicInteger(0); scheduledExecutorService.scheduleWithFixedDelay(() -> { System.out.println(DateUtil.now() + ":Thread Id:" + Thread.currentThread().getId() + " is running..."); if(i.get() == 3){ // 这里抛个异常 try { int number = 10 / 0; } catch (Exception e) { e.printStackTrace(); throw e; } } i.getAndIncrement(); ThreadUtil.sleep(1000); }, 0, 2, TimeUnit.SECONDS); }
可以看到后续都不会继续执行:
3、execute()和submit()
ThreadPoolExecutor提供了两种提交任务的方法:submit和execute。
execute:将任务提交给线程池进行执行,但无法获取任务的执行结果。适用于不关心任务执行结果的场景。例如,执行一些简单的异步操作或无需返回结果的任务。
ExecutorService executor = Executors.newFixedThreadPool(5); executor.execute(() -> { // 执行任务的代码 });
submit:将任务提交给线程池进行执行,并返回一个Future对象,通过该对象可以获取任务的执行状态和结果。适用于需要获取任务执行结果或对任务进行异常处理的场景。
ExecutorService executor = Executors.newFixedThreadPool(5); Future<String> future = executor.submit(() -> { // 执行任务的代码 return "Task Result"; }); try { String result = future.get(); // 获取任务执行结果 System.out.println("Task Result: " + result); } catch (InterruptedException | ExecutionException e) { // 处理异常 }
4、线程池关闭
前面我们讲了如何创建线程池,线程池类型,以及如何提交任务到线程池中执行。那么当线程池执行完任务,线程处于空闲状态,依旧会占用系统资源。此时我们就需要讲线程池进行关闭,以待垃圾回收器回收。
关闭线程池通常有两种方式:
- shutdown()方法:调用此方法后,线程池会停止接收新的任务,并尝试将已提交的任务执行完成。已提交但未执行的任务会继续执行,而不会被丢弃。
- shutdownNow()方法:调用此方法后,线程池会尝试停止所有正在执行的任务,并丢弃所有未执行的任务。该方法会通过中断(interrupt)线程来终止任务的执行。
较为优雅的方式:
此外还可以使用awaitTermination(timeout, unit)方法等待线程池中的任务执行完成。该方法会阻塞当前线程,直到线程池中的任务全部完成或超过指定的超时时间。如果等待超时,调用shutdownNow()方法中断执行中的任务,并尝试终止线程池。最后,调用isTerminated()方法判断线程池是否已经终止,确认所有任务都已完成。
ExecutorService executor = Executors.newFixedThreadPool(5); // 关闭线程池 executor.shutdown(); try { // 等待线程池中的任务执行完成,最多等待5秒 if (!executor.awaitTermination(5, TimeUnit.SECONDS)) { // 等待超时,调用shutdownNow()方法终止执行中的任务 executor.shutdownNow(); // 再次等待线程池中的任务执行完成,最多等待5秒 if (!executor.awaitTermination(5, TimeUnit.SECONDS)) { // 等待超时后仍有任务未完成,可能需要其他处理方式 } } } catch (InterruptedException e) { // 捕获中断异常,可能需要其他处理方式 } finally { // 判断线程池是否已终止 if (executor.isTerminated()) { // 线程池已终止,进行相关资源的释放 } }
通过以上步骤,可以保证线程池能够优雅地终止,并确保所有任务都得到执行或被中断。这样可以避免应用程序中出现线程池资源泄漏或未处理的任务。
5、小结
到此为止,线程池相关的基本知识都介绍完了。当然这些只是线程池的一些基本用法以及常规使用。面对基础入门也是足够了。至于类似线程池的源码,自定义扩展线程池等,放到后面看看进阶篇再写吧~