JUC系列学习(一):线程池Executor框架及其实现ThreadPoolExecutor

简介: `Executor` 框架将任务的提交与任务的执行**解耦**了。

Executor框架

Executor 框架将任务的提交与任务的执行解耦了。

image.png

  • Executor 顶层接口,Executor中只有一个execute方法,用于执行任务。线程的创建、调度等细节均由其子类实现
  • ExecutorService 继承并扩展了Executor,在ExecutorService内部提供了更全面的提交机制以及线程池关闭等方法。
  • ThreadPoolExecutor:实现了ExecutorService接口,线程池机制都封装在这个类中。
  • ScheduledExecutorService:实现了ExecutorService接口,增加了定时任务的相关方法。
  • ScheduledThreadPoolExecutor:继承自ThreadPoolExecutor并实现了ScheduledExecutorService接口
  • ForkJoinPool:是一种支持任务分解的线程池,一般配合接口ForkJoinTask使用。

ThreadPoolExecutor

配置ThreadPoolExecutor线程池时,要避免线程池大小出现过大或过小的情况。如果线程池过大,那么大量的线程将会在cpu及内存资源上发生竞争,从而导致更高的内存使用量,严重情况下会导致资源耗尽;如果线程池过小,会导致空闲的cpu处理器无法工作,从而降低了吞吐率。要设置合适的线程池大小,需要考虑下面的几个因素:

  • CPU个数( 可以通过Runtime.getRuntime().availableProcessors()获取)
  • 内存大小
  • 任务类型:计算密集型I/O密集型还是两者皆可。如:任务为计算密集型,当CPU处理器个数为N时,线程池大小为N+1比较合适。
public class ThreadPoolExecutor extends AbstractExecutorService {

    public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue,
                          ThreadFactory threadFactory,
                          RejectedExecutionHandler handler) {
    if (corePoolSize < 0 ||
        maximumPoolSize <= 0 ||
        maximumPoolSize < corePoolSize ||
        keepAliveTime < 0)
        throw new IllegalArgumentException();
    if (workQueue == null || threadFactory == null || handler == null)
        throw new NullPointerException();
    this.acc = System.getSecurityManager() == null ?
            null :
            AccessController.getContext();
    this.corePoolSize = corePoolSize;
    this.maximumPoolSize = maximumPoolSize;
    this.workQueue = workQueue;
    this.keepAliveTime = unit.toNanos(keepAliveTime);
    this.threadFactory = threadFactory;
    this.handler = handler;
}
}

ThreadPoolExecutor继承自AbstractExecutorService类(实现了ExecutorService接口),ThreadPoolExecutor中有四个构造器,但最后都会调用到上面代码中的这个,来看构造器中的这些参数代表的意义:

  • corePoolSize(int类型): 核心线程数,默认一直存活(即使线程处于空闲状态)。
  • maximumPoolSize(int类型): 线程池允许的最大线程数,其值大小>=corePoolSize
  • keepAliveTime(long类型): 线程的存活时间,默认是超过corePoolSize大小之后启动的非核心线程的存活时间,当线程池设置allowCoreThreadTimeOut=true时,对核心线程也会起作用。
  • unit(TimeUnit类型): 时间单位
NANOSECONDS   //纳秒
MICROSECONDS  //微秒
MILLISECONDS  //毫秒
SECONDS       //秒
MINUTES       //分
HOURS         //小时
DAYS          //天
  • workQueue(BlockingQueue<Runnable>) : 阻塞队列(实现了BlockingQueue接口),当线程数超过核心线程数corePoolSize大小时,会将任务放入阻塞队列中,ThreadPoolExecutor中使用了下面几种队列:

    ArrayBlockingQueue :数组实现的有界阻塞队列,队列满时,后续提交的任务通过handler中的拒绝策略去处理。

    LinkedBlockingQueue:链表实现的阻塞队列,默认大小是Integer.MAX_VALUE(无界队列),也可以通过传入指定队列大小capacity

    SynchronousQueue:内部并没有缓存数据,缓存的是线程。当生产者线程进行添加操作(put)时必须等待消费者线程的移除操作(take)才会返回。SynchronousQueue可以用于实现生产者与消费者的同步。

    PriorityBlockingQueue:二叉堆实现的优先级阻塞队列,传入队列的元素不能为null并且必须实现Comparable接口。

DelayQueue: 延时阻塞队列,队列元素需要实现Delayed接口。

  • threadFactory(ThreadFactory): 线程工厂,用于创建线程。Executors中使用了默认的DefaultThreadFactory

     private static class DefaultThreadFactory implements ThreadFactory {
       private static final AtomicInteger poolNumber = new AtomicInteger(1);
       private final ThreadGroup group;
       private final AtomicInteger threadNumber = new AtomicInteger(1);
       private final String namePrefix;
    
       DefaultThreadFactory() {
           SecurityManager s = System.getSecurityManager();
           group = (s != null) ? s.getThreadGroup() :
                                 Thread.currentThread().getThreadGroup();
           namePrefix = "pool-" +
                         poolNumber.getAndIncrement() +
                        "-thread-";
       }
    
       public Thread newThread(Runnable r) {
           Thread t = new Thread(group, r,
                                 namePrefix + threadNumber.getAndIncrement(),
                                 0);
           if (t.isDaemon())
               t.setDaemon(false);
           if (t.getPriority() != Thread.NORM_PRIORITY)
               t.setPriority(Thread.NORM_PRIORITY);
           return t;
       } }

也可以自定义ThreadFactory,在newThread中自行配置Thread(如:配置线程名、是否是守护线程、线程优先级等)。

  • handler(RejectedExecutionHandler): 提交的任务被拒绝执行的饱和策略,通常有下面几种形式:
AbortPolicy: 默认饱和策略,丢弃任务并抛出RejectedExecutionException异常。调用者可以捕获这个异常并自行处理。

CallerRunsPolicy:线程池中不再处理该任务,由调用线程处理该任务。

DiscardPolicy:当任务无法添加到队列中等待执行时,DiscardPolicy策略会丢弃任务,并且不抛异常。

DiscardOldestPolicy:丢弃队列最前面的任务,然后重新尝试提交新的任务。

具体使用哪个饱和策略要根据具体的业务场景指定。当然也可以自定义RejectedExecutionHandler来自行决定拒绝任务的处理策略。

执行流程

线程池ThreadPoolExecutor的工作流程大致分为下面几步

  • 当工作线程数小于核心线程数(corePoolSize)时,直接创建核心线程去执行任务
  • 当线程数超过核心线程数(corePoolSize)时,将任务加入等待队列(BlockingQueue)中
  • 队列满时,继续创建非核心线程去执行任务(注意:核心线程corePoolSize+非核心线程<=maximumPoolSize)

简单总结一下执行顺序:核心线程数满->队列满->最大线程数满->任务拒绝策略

用流程图大致表示为:

threadpoolExecutor.png

线程池状态

ThreadPoolExecutor有三种状态:运行、关闭、终止

// runState is stored in the high-order bits
private static final int RUNNING    = -1 << COUNT_BITS;
private static final int SHUTDOWN   =  0 << COUNT_BITS;
private static final int STOP       =  1 << COUNT_BITS;
private static final int TIDYING    =  2 << COUNT_BITS;
private static final int TERMINATED =  3 << COUNT_BITS;
  • RUNNING: 运行状态,接收并处理队列中的任务
  • SHUTDOWN: 不再接收新任务,队列中已提交的任务执行完成。
  • STOP: 尝试取消所有正在运行中的任务,并且不再处理队列中尚未开始执行的任务。
  • TIDYING: 所有任务已终止,线程池中线程数为变为0,此时线程池状态变为TIDYING
  • TERMINATED: 线程池终止。

AsyncTask

AsyncTaskAndroid内置线程池,相当于封装了Thread+Handler,可以很方便地在后台执行耗时任务并将结果更新到UI线程AsyncTask内部的线程池是通过ThreadPoolExecutor实现的。

public abstract class AsyncTask<Params, Progress, Result> {
......
}

AsyncTask是一个抽象类,几个关键方法如下:

  • onPreExecute():运行在UI线程,可以用来做一些初始化工作
  • doInBackground(Params…):运行在子线程,用于执行一些耗时操作,执行过程中可以通过publishProgress(Progress…values)来通知UI线程任务的进度。
  • onPostExecute(Result result):运行在UI线程,doInBackground执行完毕后返回Result,并将该值传递到onPostExecute(Result result)中。
  • onProgressUpdate(Progress…):运行在UI线程,用于显示doInBackground的执行进度,在doInBackground()中执行publishProgress(Progress…values)后会回调到此方法。
  • onCancelled():运行在UI线程,当调用cancel(true)后会回调此方法。此时onPostExecute()不会再执行。

注:AsyncTask有三种泛型类型,Params,Progress、Result(可以都指定为空,如AsyncTask<Void, Void, Void>),其中:

  • Params:在execute(Params... params)传递,会传递到doInBackground(Params…)中。
  • Progress:后台任务进度值,在onProgressUpdate(Progress…)使用
  • Result:最终返回的结果类型,在onPostExecute(Result result)使用

AsyncTask中任务的串行&并行

private static final int CPU_COUNT = Runtime.getRuntime().availableProcessors();

// We want at least 2 threads and at most 4 threads in the core pool,
// preferring to have 1 less than the CPU count to avoid saturating
// the CPU with background work
private static final int CORE_POOL_SIZE = Math.max(2, Math.min(CPU_COUNT - 1, 4));
private static final int MAXIMUM_POOL_SIZE = CPU_COUNT * 2 + 1;
private static final int KEEP_ALIVE_SECONDS = 30;

private static final ThreadFactory sThreadFactory = new ThreadFactory() {
    private final AtomicInteger mCount = new AtomicInteger(1);

    public Thread newThread(Runnable r) {
        return new Thread(r, "AsyncTask #" + mCount.getAndIncrement());
    }
};

private static final BlockingQueue<Runnable> sPoolWorkQueue =
        new LinkedBlockingQueue<Runnable>(128);

//并行线程池
public static final Executor THREAD_POOL_EXECUTOR;

static {
    ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(
            CORE_POOL_SIZE, MAXIMUM_POOL_SIZE, KEEP_ALIVE_SECONDS, TimeUnit.SECONDS,
            sPoolWorkQueue, sThreadFactory);
    threadPoolExecutor.allowCoreThreadTimeOut(true);
    THREAD_POOL_EXECUTOR = threadPoolExecutor;
}

//任务串行
@MainThread
public final AsyncTask<Params, Progress, Result> execute(Params... params) {
    return executeOnExecutor(sDefaultExecutor, params);
}

//任务并行
@MainThread
public final AsyncTask<Params, Progress, Result> executeOnExecutor(Executor exec,
        Params... params) {
    if (mStatus != Status.PENDING) {
        switch (mStatus) {
            case RUNNING:
                throw new IllegalStateException("Cannot execute task:"
                        + " the task is already running.");
            case FINISHED:
                throw new IllegalStateException("Cannot execute task:"
                        + " the task has already been executed "
                        + "(a task can be executed only once)");
        }
    }

    mStatus = Status.RUNNING;
    onPreExecute();
    mWorker.mParams = params;
    exec.execute(mFuture);
    return this;
}

@MainThread
public static void execute(Runnable runnable) {
    sDefaultExecutor.execute(runnable);
}

AsyncTask执行任务可以通过execute()或者executeOnExecutor()去提交并执行任务,其中execute()是串行执行任务,而executeOnExecutor是并行执行任务。其中并行线程池就是用的THREAD_POOL_EXECUTOR,来看串行是如何实现的:

private static class SerialExecutor implements Executor {
    final ArrayDeque<Runnable> mTasks = new ArrayDeque<Runnable>();
    Runnable mActive;

    public synchronized void execute(final Runnable r) {
        mTasks.offer(new Runnable() {
            public void run() {
                try {
                    r.run();
                } finally {
                    scheduleNext();
                }
            }
        });
        if (mActive == null) {
            scheduleNext();
        }
    }

    protected synchronized void scheduleNext() {
        if ((mActive = mTasks.poll()) != null) {
            THREAD_POOL_EXECUTOR.execute(mActive);
        }
    }
}

串行并不是将任务直接交给THREAD_POOL_EXECUTOR去执行,而是内部自行维护了一个mTasks双向队列,每次有新Runnable任务来时,先添加到队列中,然后判断当前是否有正在执行的任务(mActive != null即表示有正在执行的任务),没有的话才会将任务提交给线程池执行;否则会等待前一个任务执行完成,然后从队列中取出新任务(按FIFO顺序)继续交给线程池执行。

参考

【1】Java多线程复习与巩固(六)--线程池ThreadPoolExecutor详解

【2】彻底弄懂 Java 线程池原理

【3】Executors框架:https://blog.csdn.net/tongdanping/article/details/79604637

【4】AsyncTask:https://juejin.im/post/5b65c71af265da0f9402ca4a

【5】AsyncTask:https://blog.csdn.net/SEU_Calvin/article/details/52172248

相关文章
|
4月前
|
存储 缓存 安全
【Java面试题汇总】多线程、JUC、锁篇(2023版)
线程和进程的区别、CAS的ABA问题、AQS、哪些地方使用了CAS、怎么保证线程安全、线程同步方式、synchronized的用法及原理、Lock、volatile、线程的六个状态、ThreadLocal、线程通信方式、创建方式、两种创建线程池的方法、线程池设置合适的线程数、线程安全的集合?ConcurrentHashMap、JUC
|
3月前
|
Java C++
【多线程】JUC的常见类,Callable接口,ReentranLock,Semaphore,CountDownLatch
【多线程】JUC的常见类,Callable接口,ReentranLock,Semaphore,CountDownLatch
40 0
|
4月前
|
监控 Java 调度
【Java学习】多线程&JUC万字超详解
本文详细介绍了多线程的概念和三种实现方式,还有一些常见的成员方法,CPU的调动方式,多线程的生命周期,还有线程安全问题,锁和死锁的概念,以及等待唤醒机制,阻塞队列,多线程的六种状态,线程池等
231 6
|
5月前
|
算法 Java
JUC(1)线程和进程、并发和并行、线程的状态、lock锁、生产者和消费者问题
该博客文章综合介绍了Java并发编程的基础知识,包括线程与进程的区别、并发与并行的概念、线程的生命周期状态、`sleep`与`wait`方法的差异、`Lock`接口及其实现类与`synchronized`关键字的对比,以及生产者和消费者问题的解决方案和使用`Condition`对象替代`synchronized`关键字的方法。
JUC(1)线程和进程、并发和并行、线程的状态、lock锁、生产者和消费者问题
|
5月前
|
设计模式 Java 调度
JUC线程池: ScheduledThreadPoolExecutor详解
`ScheduledThreadPoolExecutor`是Java标准库提供的一个强大的定时任务调度工具,它让并发编程中的任务调度变得简单而可靠。这个类的设计兼顾了灵活性与功能性,使其成为实现复杂定时任务逻辑的理想选择。不过,使用时仍需留意任务的执行时间以及系统的实际响应能力,以避免潜在的调度问题影响应用程序的行为。
96 1
|
5月前
|
监控 Java
ThreadPoolExecutor 线程执行超时,释放线程
ThreadPoolExecutor 线程执行超时,释放线程
183 1
|
5月前
|
Java API 调度
JUC线程池: FutureTask详解
总而言之,FutureTask是Java并发编程中一个非常实用的类,它在异步任务执行及结果处理方面提供了优雅的解决方案。在实现细节方面可以搭配线程池的使用,以及与Callable接口的配合使用,来完成高效的并发任务执行和结果处理。
48 0
|
5月前
|
Java 程序员 容器
【多线程面试题二十四】、 说说你对JUC的了解
这篇文章介绍了Java并发包java.util.concurrent(简称JUC),它是JSR 166规范的实现,提供了并发编程所需的基础组件,包括原子更新类、锁与条件变量、线程池、阻塞队列、并发容器和同步器等多种工具。
|
6月前
|
并行计算 安全 Java
线程操纵术并行策略问题之任务执行器(Executor)问题如何解决
线程操纵术并行策略问题之任务执行器(Executor)问题如何解决
|
6月前
|
设计模式 存储 缓存
Java面试题:结合建造者模式与内存优化,设计一个可扩展的高性能对象创建框架?利用多线程工具类与并发框架,实现一个高并发的分布式任务调度系统?设计一个高性能的实时事件通知系统
Java面试题:结合建造者模式与内存优化,设计一个可扩展的高性能对象创建框架?利用多线程工具类与并发框架,实现一个高并发的分布式任务调度系统?设计一个高性能的实时事件通知系统
69 0