一、什么是线程池
线程池能够存储一定数量的线程,当我们需要时,就从池中取出使用,用完再还给线程池。
为什么要使用线程池?
线程池能够减少每次启动、销毁线程的损耗,同时,在执行任务时,不需要等待线程创建就可直接执行,提高了响应速度,此外,由线程池统一调度和分配线程,有利于线程的管理。
二、线程池的使用
ThreadPoolExecutor
在标准库中,线程池由 ThreadPoolExecutor 类实现,有 4 中构造方法:
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) { this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, Executors.defaultThreadFactory(), defaultHandler); }
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory) { this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, defaultHandler); }
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, RejectedExecutionHandler handler) { this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, Executors.defaultThreadFactory(), handler); }
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) { if (corePoolSize < 0 || maximumPoolSize <= 0 || maximumPoolSize < corePoolSize || keepAliveTime < 0) throw new IllegalArgumentException(); if (workQueue == null || threadFactory == null || handler == null) throw new NullPointerException(); this.corePoolSize = corePoolSize; this.maximumPoolSize = maximumPoolSize; this.workQueue = workQueue; this.keepAliveTime = unit.toNanos(keepAliveTime); this.threadFactory = threadFactory; this.handler = handler; String name = Objects.toIdentityString(this); this.container = SharedThreadContainer.create(name); }
其中,
int corePoolSize:核心线程数。在默认情况下,核心线程会一直存活,但当将allowCoreThreadTimeout 设置为 true 时,核心线程超时后也会回收
int maximumPoolSize:最大线程数。线程池所能容纳的最大线程数。最大线程数 = 核心线程数 + 非核心线程数(即临时存在的线程,当不需要后会被回收)。
long keepAliveTime:保存存活时间。非核心线程闲置时开始计时,若超过改该时间,非核心线程被回收,若设置 allowCoreThreadTimeout 为true,则核心线程在超时后也会被回收
TimeUnit unit:keepAliveTime 的时间单位。例如:TimeUnit.MILLISECONDS(毫秒)、TimeUnit.SECONDS(秒)、TimeUnit.MINUTES(分)、TimeUnit.HOURS(小时)
BlockingQueue<Runnable> workQueue:任务队列。用于存储要执行的任务。通过 execute()方法将任务(Runnable对象)添加到队列(阻塞队列)中
ThreadFactory threadFactory:线程工厂。指定线程池创建新线程的方式RejectedExecutionHandler handler:拒绝策略。当达到最大线程数时线程池采取的策略
线程工厂
线程工厂用于指定创建线程的方式。ThreadFactory是一个接口类
public interface ThreadFactory { Thread newThread(Runnable r); }
其中只有一个newThread方法,用于接收一个Runnable对象,并将其封装到Thread对象中。
在自定义创建线程方式时,需要实现ThreadFactory接口,并实现new ThreadFactory方法。也可以使用默认的线程工厂 Executors.defaultThreadFactory
总而言之,其就是通过封装 new 操作,在方法内部设定不同的属性完成对象的初始化,构造对象的过程
拒绝策略
线程池提交一个任务时会先判断核心线程是否有空余,若没有,则将其添加到阻塞队列中。而当阻塞队列也满时,此时则会扩充线程池中线程数,直到到达最大线程数。而此时(即任务数 > 最大线程数 + 阻塞队列大小),提交的任务就会触发拒绝策略(即如何处理新提交的任务)
线程池提供了 4 种拒绝策略:
1. ThreadPoolExecutor.AbortPolicy:继续提交新任务,则抛出异常,中止任务
2. ThreadPoolExecutor.CallerRunsPolicy:提交的新任务,由添加任务的线程负责执行
3.ThreadPoolExecutor.DiscardOldestPolicy:丢弃最老的任务,执行新任务
4. ThreadPoolExecutor.DiscardPolicy:丢弃新任务
Executors
由于ThreadPoolExecutor的使用比较复杂,因此标准库中提供了 Executors 将ThreadPoolExecutors封装了一下(即在内部将ThreadPoolExecutors对象创建好并设置好不同的参数)
封装好的 4 种常见功能线程池:
1. FixedThreadPool:定长线程池
2. CachedThreadPool:可缓存线程池
3. SingleThreadExecutor:单线程化线程池
4.ScheduledThreadPool:定时线程池
如何选择使用ThreadPoolExecutor还是Executors?
根据需求进行选择。当希望高度定制化时,可选用ThreadPoolExecutor;当只是想简单使用线程池时,可选用Executors。
创建线程池时,如何设定线程池的线程数量?
不同的程序设定的线程池线程数量是不同的,因此,我们要根据具体情况具体分析
当一个进程中,所有线程都是cpu密集型(即这个线程大部分时间都要在cpu上运行),此时设定的线程数目就不应该超过cpu逻辑核心数
当一个进程中,所有线程都是IO密集型(即这个线程大部分时间都在等待IO,而不是在cpu上运行),此时的设定的线程数目就可以超过cpu逻辑核心数
然而,实际上在一个进程中大多数都是一部分线程是cpu密集型,一部分是IO密集型,它们的比例很难确定,因此,我们可以通过实验、测试的方式来找到合适的线程数量(即尝试设定不同的线程数量,分别进行性能测试,衡量不同线程数量的情况下,其时间开销、系统资源占用开销,从而找到合适的线程数量)
三、简单模拟实现线程池
在了解了什么是线程池和线程池的使用后,我们可以通过简单模拟实现线程池的方式来进一步理解线程池
在这里,我们模拟实现一个固定线程数量的线程池
思路分析
要实现一个线程池,我们首先要分析其应该具备哪些功能:
1. 创建一个阻塞队列,用于存放要执行的任务
2. 提供submit方法,用于添加新的任务
3. 提供构造方法,指定应该创建多少个线程
4. 在构造方法中,创建好这些线程
具体实现
我们首先创建出一个阻塞队列,用于存放要执行的任务
//用于存放任务的队列,且阻塞队列线程安全 private BlockingQueue<Runnable> queue = new ArrayBlockingQueue<>(500);
然后实现submit方法,用来将任务添加到线程池中
public void submit(Runnable runnable) throws InterruptedException { queue.put(runnable); }
最后我们实现其构造方法,通过传入的参数n 指定创建线程数量为n的线程池,并在方法中让这些线程不断取出任务队列中的任务来执行
public MyThreadPoolExecutor(int n){ for (int i = 0; i < n; i++) { Thread t = new Thread(()->{ while (true){ try { Runnable runnable = queue.take(); runnable.run(); }catch (InterruptedException e){ e.printStackTrace(); } } }); t.start(); } }
由于阻塞队列是线程安全的,因此能够保证其线程安全
关于阻塞队列,可参考:Java阻塞队列-CSDN博客
完整代码
import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; public class MyThreadPoolExecutor { //用于存放任务的队列,且阻塞队列线程安全 private BlockingQueue<Runnable> queue = new ArrayBlockingQueue<>(100); public MyThreadPoolExecutor(int n){ for (int i = 0; i < n; i++) { Thread t = new Thread(()->{ while (true){ try { Runnable runnable = queue.take(); runnable.run(); }catch (InterruptedException e){ e.printStackTrace(); } } }); t.start(); } } //添加任务 public void submit(Runnable runnable) throws InterruptedException { queue.put(runnable); } }
我们可以简单地测试所编写的代码是否正确:
class Demo{ public static void main(String[] args) throws InterruptedException { MyThreadPoolExecutor myThreadPoolExecutor = new MyThreadPoolExecutor(5); for (int i = 0; i < 100; i++) { int n = i; myThreadPoolExecutor.submit(new Runnable() { @Override public void run() { System.out.println("当前线程:" + Thread.currentThread().getName() + "执行任务:" + n); } }); } } }
运行结果:
为什么会出现上述结果?
这是由于多个线程之间的执行顺序是不确定的,当某个线程取到任务后,并非立即执行,在此过程中另一个线程就可能插到其前面了。