java并发编程笔记--ThreadPoolExecutor实现

简介:     ThreadPoolExecutor是jdk自带的线程池实现。看到了"池"一定会想到对象池模式,它是单例模式的一个变种,主要思想是通过共享复用已有的空闲对象,达到限制开销和提高性能的目的。这里的对象可以理解为某种"资源",比如:数据库连接、线程、socket连接...创建这种资源的消耗比较大,如果每次使用都新建的话,会造成额外的开销。

    ThreadPoolExecutor是jdk自带的线程池实现。看到了"池"一定会想到对象池模式,它是单例模式的一个变种,主要思想是通过共享复用已有的空闲对象,达到限制开销提高性能的目的。这里的对象可以理解为某种"资源",比如:数据库连接、线程、socket连接...创建这种资源的消耗比较大,如果每次使用都新建的话,会造成额外的开销。同时因为对资源的创建没有限制,可能会重复创建大量的对象,导致资源枯竭。如果能够提前创建好一批对象放到一个"池"中,每次使用时都从池中选取空闲的对象,用完后再还入池中,这样便提高了资源的使用效率,又因为限制了创建对象的数量,也可以避免大量创建对象导致资源枯竭。我们常用的线程池、DB连接池、socket连接池等都是用的这个思路。

适合使用线程池的场景是建立在如下基础上的:
0)串行执行的响应性和吞吐量无法满足需求:串行程序会在一个线程中逐处理所有任务,因此可能受任务执行时间的长度影响,具有较差的响应性。同时,因为单线程程序往往不能充分利用CPU的资源,无法达到最优的吞吐量。
1)线程的生命周期管理(时间)开销很高:因为创建、销毁线程开销很大,所以频繁的创建线程会造成较大的时间开销,从而减慢任务的处理速率,增加延迟。
2)物理资源的消耗:活跃的线程会消耗操作系统资源,包括:内存、文件句柄等。如果应用程序中存在大量活跃的线程,将会占用大量的资源,有可能导致系统资源枯竭。
3)稳定性:随意的创建线程而不加管控,不利于整个应用程序的稳定。
综上所述,使用多线程提升并发应该是使用合适的线程数达到最好的性能即可,并非线程越多越好。一些场景下(如任务规模很小或者资源需要串行访问的场景),单线程处理可能比多线程更简单且性能更好。

线程池的组成

抛开ThreadPoolExecutor的实现,我们先自己设想一下线程池的概念模型:

image

    首先我们需要一个用于全局管控线程池运行的类,负责线程池的创建、任务的运行以及最后线程池关闭...完整流程的管理,这个类构成了线程池的主体,即线程管理类。线程池中包含多个用于执行任务的工作线程,工作线程由线程管理类创建和维护,能够执行外部提交的任务,并响应取消任务的操作。

    由于不同的业务场景创建的线程具有不同的特征,比如:以相同的前缀命名,设置为守护线程,归属于相同的ThreadGroup等...这些参数的配置对于线程池而言是重复性工作,因此单独抽取一个线程工厂类用作工作线程的创建,由于不同的场景设置的线程参数不同,因而需要客户端自己根据需求定制,故抽象为接口。

    线程池中使用阻塞队列作为任务提交和任务执行的"缓冲",当工作线程忙不过来时,线程池可以先将任务存放到一个任务队列中,待后续有工作线程空闲时,再从队列中获取新的任务执行。由于有了任务队列这个"缓冲",便增加了执行任务的灵活度,任务队列可以根据不同的策略对其中的任务进行编排,比如按优先级执行、延迟执行...存在这灵活性,就意味着要为客户端留有扩展的余地,因而任务队列也抽象为接口。

    线程池作为生产者-消费者模式的实现,也要考虑生产速率和消费速率的平衡。当生产-消费失衡时,即任务的提交速率大于消费速率时,为了保障程序的稳定性,需要对任务做驳回处理,以便降低线程池的负载。同时,为了减少驳回任务造成的损失,需要对被驳回的任务提供必要的驳回处理策略。最简单快速且合理的驳回策略便是抛出异常,即我们在服务降级中常用的快速失败策略,为了避免处理驳回任务带来额外的负载压力,直接放弃任务执行,通过抛出异常告知客户端任务失败。当然,为了满足各式各样的业务场景,也需要留给客户端定制策略的余地,因此驳回策略也被抽象为接口。

    线程池管理类、工作线程、任务队列、线程初始化类、任务驳回策略构成了一个线程池的基本组成。有了基本思路,下面对照着看看ThreadPoolExecutor的组成。

image

ThreadPoolExecutor,线程池管理类,继承AbstractExecutorService类,作为ExecutorService的一种实现,通过线程池调度任务;

Worker,工作线程类,属于ThreadPoolExecutor的私有内部类,实现Runnable接口,用作运行线程任务;继承AbstractQueuedSynchronizer接口,用作控制工作线程的空闲/运行状态的同转换;

ThreadFactory,线程工厂接口,实现该接口,定制线程创建参数;

BlockingQueue,阻塞队列接口,用于缓存客户端提交的任务,传入不同的实现类,实现不同的任务编排策略;

RejectedExecutionHandler,驳回处理策略接口,定制驳回任务的处理策略,ThreadPoolExecutor默认提供4种实现:

  • CallerRunsPolicy:使用提交任务的线程处理被驳回任务;
  • AbortPolicy::中止驳回任务,抛出RejectedExecutionException异常;
  • DiscardPolicy:忽略驳回任务,不做任何处理;
  • DiscardOldestPolicy:忽略阻塞队列header元素,然后将任务提交ThreadPoolExecutor重新执行,如果失败,则继续忽略header元素;

线程池的生命周期

线程池的生命周期,即线程池从创建、使用到最终销毁所经历的过程。ThreadPoolExecutor将线程池的生命周期划分为5个状态,分别是:running、shutdown、stop、tidying、terminated。状态图如下:

image

  • running:线程池创建后的状态,能够接收新任务,同时处理任务队列中的任务;
  • shutdown:执行shutdown()后,不接收新任务,但是会处理任务队列中的任务;
  • stop:执行shutdownNow()后,不接收新任务,也不处理任务队列中的任务,同时会中断正在执行的任务;
  • tidying:当所有任务都已被终止,工作线程数为0后,状态变为tidying状态,然后运行terminated()钩子方法;
  • terminated:当terminated()钩子方法执行完毕后,变为此状态。此时ThreadPoolExecutor已被完全终止;

    ThreadPoolExecutor将线程池状态runState和工作线程数workCount放入同一个字段。其中,1~29位存放workCount;30~32位存放runState,包括1位符号位。

// 使用一个原子整型存放runState和workCount两个字段;
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));

// workCount的最大位数,此处为32位
private static final int COUNT_BITS = Integer.SIZE - 3;
// 线程池的最大线程数:2^29-1
private static final int CAPACITY   = (1 << COUNT_BITS) - 1;

// 存放runState的5个状态常量 
private static final int RUNNING    = -1 << COUNT_BITS;
private static final int SHUTDOWN   =  0 << COUNT_BITS;
private static final int STOP       =  1 << COUNT_BITS;
private static final int TIDYING    =  2 << COUNT_BITS;
private static final int TERMINATED =  3 << COUNT_BITS;

// 获取runState值
private static int runStateOf(int c)     { return c & ~CAPACITY; }
// 获取workCount值
private static int workerCountOf(int c)  { return c & CAPACITY; }
// 封装指定的runState和workCount到ctl字段中
private static int ctlOf(int runState, int workCount) { return runState | workCount; }

/**
 * 控制runState字段
 */
private static boolean runStateLessThan(int c, int s) {
    return c < s;
}
private static boolean runStateAtLeast(int c, int s) {
    return c >= s;
}
private static boolean isRunning(int c) {
    return c < SHUTDOWN;
}

/**
 * 通过CAS方式增加/减少workerCount
 */
private boolean compareAndIncrementWorkerCount(int expect) {
    return ctl.compareAndSet(expect, expect + 1);
}
private boolean compareAndDecrementWorkerCount(int expect) {
    return ctl.compareAndSet(expect, expect - 1);
}
private void decrementWorkerCount() {
    do {} while (! compareAndDecrementWorkerCount(ctl.get()));
}

线程池的初始化

    ThreadPoolExecutor初始化至少需要指定4个参数:corePoolSize(核心工作线程数),maximumPoolSize(最大工作线程数),keepAliveTime(空闲线程存活时间),workQueue(任务队列);除此之外,可以选定2个参数:threadFactory(线程工厂,默认使用DefaultThreadFactory类),handler(驳回任务处理器,默认使用AbortPolicy类)。根据填写参数的不同,提供了4个构造函数,如下是最终的构造函数:

public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue,
                          ThreadFactory threadFactory,
                          RejectedExecutionHandler handler) {
    if (corePoolSize < 0 ||
        maximumPoolSize <= 0 ||
        maximumPoolSize < corePoolSize ||
        keepAliveTime < 0)
        throw new IllegalArgumentException();
        
    if (workQueue == null || threadFactory == null || handler == null)
        throw new NullPointerException();
        
    this.corePoolSize = corePoolSize;
    this.maximumPoolSize = maximumPoolSize;
    this.workQueue = workQueue;
    this.keepAliveTime = unit.toNanos(keepAliveTime);
    this.threadFactory = threadFactory;
    this.handler = handler;
}

提交任务

image

    ThreadPoolExecutor继承了AbstractExecutorService抽象类。AbstractExecutorService实现了ExecutorService接口中提交任务的基本方法,包括:submit和invoke*等方法,用于提交任务,但没有实现execute方法。

    submit包含3个重载方法,均是将Runnable、Callable对象包装成RunnableFuture对象,然后调用execute方法,可见执行任务的具体方式仍是依赖子类实现,有一些模板方法模式的味道。

image

    invokeAll方法用于批量执行多个任务,并同步等待任务执行完成返回结果。invokeAny方法用于执行一批任务中的一个任务,并同步等待,直到有任意一个任务执行完成,便取消其它任务,并返回。invokeAny方法依赖于ExecutorCompletionService实现,ExecutorCompletionService作为CompletionService接口的实现类,可以理解为一个Executor对象+一个BlockingQueue,Executor用于执行提交的任务,可由调用方依赖注入,此处传入AbstractExecutorService本身,即AbstractExecutorService的实现类本身。BlockingQueue用于存放执行完成的任务。invokeAny方法调用doInvokeAny方法,doInvokeAny方法调用ExecutorCompletionService的submit方法,提交所有待执行的任务,然后调用ExecutorCompletionService的take方法,阻塞等待ExecutorCompletionService的BlockingQueue中有完成任务的Future放入。待收到第一个完成的任务结果后,调用Future的cancel方法取消其它还在执行的任务,随即返回。

    还有一个newTaskFor方法,标志为protected,用于将Runnable、Callable接口封装成RunnableFuture对象,AbstractExecutorService的子类可根据包装Future对象的不同重写。

执行任务

所有Executor接口的实现类,均以execute方法为执行入口。ThreadPoolExecutor的execute方法执行如下:

public void execute(Runnable command) {
    if (command == null)
        throw new NullPointerException();
        
    // step1. 如果poolSize < corePoolSize,则新建线程;
    int c = ctl.get();
    if (workerCountOf(c) < corePoolSize) {
        if (addWorker(command, true))
            return;
        c = ctl.get();
    }
    // step2. 如果线程池状态为running,且poolSize >= corePoolSize,则放入任务队列;
    // 如果线程池状态不是running,则驳回任务,执行驳回策略;
    if (isRunning(c) && workQueue.offer(command)) {
        int recheck = ctl.get();
        if (! isRunning(recheck) && remove(command))
            reject(command);
        else if (workerCountOf(recheck) == 0)
            addWorker(null, false);
    }
    // step3. 如果放入任务队列失败,则创建一个新线程执行任务;
    // 如果创建线程失败,则驳回任务,执行驳回策略;
    else if (!addWorker(command, false))
        reject(command);
}

从代码可以看出,线程池执行任务主要分为3步:

  • 第一步:如果poolSize < corePoolSize,则新建线程;
  • 第二步:如果线程池状态为running,且poolSize >= corePoolSize,则放入任务队列;如果线程池状态不是running,则驳回任务,执行驳回策略;
  • 第三步:如果放入任务队列失败,则创建一个新线程执行任务; 如果创建线程失败,则驳回任务,执行驳回策略;

execute方法中调用addWorker方法用于创建一个工作线程,传入当前任务并启动;来看addWorker实现:

private boolean addWorker(Runnable firstTask, boolean core) {
    // 通过CAS操作使workerCount++
    ... ... 
    
    // 标识工作线程是否正常启动
    boolean workerStarted = false;
    // 标识工作线程是否添加成功
    boolean workerAdded = false;
    Worker w = null;
    try {
        // 创建工作线程对象,传入任务,调用ThreadFactory创建线程;
        w = new Worker(firstTask);
        final Thread t = w.thread;
        if (t != null) {
            // 加锁,保证如下操作的原子性:
            // 1)添加工作线程到线程集合;
            // 2)更新线程池运行数据;
            final ReentrantLock mainLock = this.mainLock;
            mainLock.lock();
            try {
                int rs = runStateOf(ctl.get());
                if (rs < SHUTDOWN ||
                    (rs == SHUTDOWN && firstTask == null)) {
                    if (t.isAlive()) 
                        throw new IllegalThreadStateException();
                    workers.add(w);
                    int s = workers.size();
                    if (s > largestPoolSize)
                        largestPoolSize = s;
                    workerAdded = true;
                }
            } finally {
                mainLock.unlock();
            }
            
            // 启动工作线程
            if (workerAdded) {
                t.start();
                workerStarted = true;
            }
        }
    } finally {
        // 线程启动失败,则终止线程池运行,尝试调用ThreadPoolExecutor提供的terninated()回调函数;
        if (! workerStarted)
            addWorkerFailed(w);
    }
    return workerStarted;
}

创建Worker线程后,执行t.start()方法会启动工作线程,工作线程的run方法调用ThreadPoolExecutor的runWorker方法。该方法用于执行线程任务,期间会调用ThreadPoolExecutor提供的beforeExecute和afterExecute回调函数,这两个函数由ThreadPoolExecutor的子类实现。

final void runWorker(Worker w) {
    Thread wt = Thread.currentThread();
    Runnable task = w.firstTask;
    w.firstTask = null;
    w.unlock(); // 允许中断
    // 标识任务是否未被完整执行,true标识未被完整执行;
    boolean completedAbruptly = true;
    try {
        // getTask阻塞等待任务,当线程池停止或者获取任务超时,则返回null;循环退出条件如下:
        // 1)因为调用setMaximumPoolSize导致poolSize大于setMaximumPoolSize,需要回收掉部分工作线程;
        // 2)线程池被stop;
        // 3)线程池被shutdown并且任务队列为空;
        // 4)线程池获取任务超时,即超过keepAliveTimeout时限,需要被回收时;
        while (task != null || (task = getTask()) != null) {
            // 标识当前工作线程为已使用状态
            w.lock();
            // 检查当前线程是否因为执行shutdownNow方法而被中断;
            if ((runStateAtLeast(ctl.get(), STOP) ||
                 (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && !wt.isInterrupted())
                wt.interrupt();
            try {
                beforeExecute(wt, task);
                Throwable thrown = null;
                try {
                    task.run(); // 在当前工作线程中执行任务
                } catch (RuntimeException x) {
                    thrown = x; throw x;
                } catch (Error x) {
                    thrown = x; throw x;
                } catch (Throwable x) {
                    thrown = x; throw new Error(x);
                } finally {
                    afterExecute(task, thrown);
                }
            } finally {
                task = null;
                // 更新统计数据
                w.completedTasks++;
                // 标识工作线程为未使用状态
                w.unlock();
            }
        }
        // 如果执行到此处,表明任务被完整执行
        completedAbruptly = false;
    } finally {
        // 此方法调用场景:
        // 1)终止线程池时,用于从线程集合中移除工作线程;
        // 2)当前线程因为异常而退出,重新创建线程替换之;
        // 3)线程池中因为设置allowCoreThreadTimeOut=true,导致工作线程全部被回收时,任务队列仍然有任务,则新建线程;
        processWorkerExit(w, completedAbruptly);
    }
}

关闭线程池

关闭线程池有2个入口:shutdown、shutdownNow。shutdown是相对"温和"的关闭方式,它不接受新提交任务,中断所有空闲线程,但不会干预正在运行的线程,以及已经提交的任务。

public void shutdown() {
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        // 检查权限
        checkShutdownAccess();
        // 设置线程池状态为SHUTDOWN
        advanceRunState(SHUTDOWN);
        // 中断所有等待任务的空闲线程
        interruptIdleWorkers();
        // 调用回调函数
        onShutdown(); // hook for ScheduledThreadPoolExecutor
    } finally {
        mainLock.unlock();
    }
    
    // 执行线程池终止操作
    tryTerminate();
}

shutdownNow是相对"激进"的关闭方法,它会尝试中断所有工作线程,同时干预正在运行的任务中止,清空任务队列,并将任务队列中未执行的任务返回给调用方。

public List<Runnable> shutdownNow() {
    List<Runnable> tasks;
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        // 检查权限
        checkShutdownAccess();
        // 设置线程池状态为STOP
        advanceRunState(STOP);
        // 中断所有线程
        interruptWorkers();
        // 导出未执行的任务
        tasks = drainQueue();
    } finally {
        mainLock.unlock();
    }
    
    // 执行终止操作
    tryTerminate();
    return tasks;
}

shutdown和shutdownNow并不会同步等待线程池终止,而是提前异步返回,如果需要在调用shutdown或者shutdownNow后,同步等待线程池终止,则可以再调用awaitTermination方法。该方法会等到线程池状态变为terminated状态或者超时后返回。

public boolean awaitTermination(long timeout, TimeUnit unit)
    throws InterruptedException {
    long nanos = unit.toNanos(timeout);
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        for (;;) {
            // 状态变为terminated,返回true
            if (runStateAtLeast(ctl.get(), TERMINATED))
                return true;
            // 超时,返回false
            if (nanos <= 0)
                return false;
            nanos = termination.awaitNanos(nanos);
        }
    } finally {
        mainLock.unlock();
    }
}

    以上是ThreadPoolExecutor的基本实现介绍,除了对主要流程的实现以外,ThreadPoolExecutor还提供了线程池监控、参数配置等方法,实现简单,详情可细读源码。

问题思考:为什么要把runState和workerCount放到同一个AtomicInteger中?

目录
相关文章
|
1月前
|
安全 Java 程序员
深入理解Java内存模型与并发编程####
本文旨在探讨Java内存模型(JMM)的复杂性及其对并发编程的影响,不同于传统的摘要形式,本文将以一个实际案例为引子,逐步揭示JMM的核心概念,包括原子性、可见性、有序性,以及这些特性在多线程环境下的具体表现。通过对比分析不同并发工具类的应用,如synchronized、volatile关键字、Lock接口及其实现等,本文将展示如何在实践中有效利用JMM来设计高效且安全的并发程序。最后,还将简要介绍Java 8及更高版本中引入的新特性,如StampedLock,以及它们如何进一步优化多线程编程模型。 ####
31 0
|
1月前
|
Java 程序员
Java编程中的异常处理:从基础到高级
在Java的世界中,异常处理是代码健壮性的守护神。本文将带你从异常的基本概念出发,逐步深入到高级用法,探索如何优雅地处理程序中的错误和异常情况。通过实际案例,我们将一起学习如何编写更可靠、更易于维护的Java代码。准备好了吗?让我们一起踏上这段旅程,解锁Java异常处理的秘密!
|
14天前
|
存储 缓存 Java
Java 并发编程——volatile 关键字解析
本文介绍了Java线程中的`volatile`关键字及其与`synchronized`锁的区别。`volatile`保证了变量的可见性和一定的有序性,但不能保证原子性。它通过内存屏障实现,避免指令重排序,确保线程间数据一致。相比`synchronized`,`volatile`性能更优,适用于简单状态标记和某些特定场景,如单例模式中的双重检查锁定。文中还解释了Java内存模型的基本概念,包括主内存、工作内存及并发编程中的原子性、可见性和有序性。
Java 并发编程——volatile 关键字解析
|
18天前
|
算法 Java 调度
java并发编程中Monitor里的waitSet和EntryList都是做什么的
在Java并发编程中,Monitor内部包含两个重要队列:等待集(Wait Set)和入口列表(Entry List)。Wait Set用于线程的条件等待和协作,线程调用`wait()`后进入此集合,通过`notify()`或`notifyAll()`唤醒。Entry List则管理锁的竞争,未能获取锁的线程在此排队,等待锁释放后重新竞争。理解两者区别有助于设计高效的多线程程序。 - **Wait Set**:线程调用`wait()`后进入,等待条件满足被唤醒,需重新竞争锁。 - **Entry List**:多个线程竞争锁时,未获锁的线程在此排队,等待锁释放后获取锁继续执行。
52 12
|
14天前
|
存储 安全 Java
Java多线程编程秘籍:各种方案一网打尽,不要错过!
Java 中实现多线程的方式主要有四种:继承 Thread 类、实现 Runnable 接口、实现 Callable 接口和使用线程池。每种方式各有优缺点,适用于不同的场景。继承 Thread 类最简单,实现 Runnable 接口更灵活,Callable 接口支持返回结果,线程池则便于管理和复用线程。实际应用中可根据需求选择合适的方式。此外,还介绍了多线程相关的常见面试问题及答案,涵盖线程概念、线程安全、线程池等知识点。
97 2
|
1月前
|
安全 算法 Java
Java多线程编程中的陷阱与最佳实践####
本文探讨了Java多线程编程中常见的陷阱,并介绍了如何通过最佳实践来避免这些问题。我们将从基础概念入手,逐步深入到具体的代码示例,帮助开发者更好地理解和应用多线程技术。无论是初学者还是有经验的开发者,都能从中获得有价值的见解和建议。 ####
|
1月前
|
安全 Java 编译器
Kotlin教程笔记(27) -Kotlin 与 Java 共存(二)
Kotlin教程笔记(27) -Kotlin 与 Java 共存(二)
|
1月前
|
Java 调度
Java中的多线程编程与并发控制
本文深入探讨了Java编程语言中多线程编程的基础知识和并发控制机制。文章首先介绍了多线程的基本概念,包括线程的定义、生命周期以及在Java中创建和管理线程的方法。接着,详细讲解了Java提供的同步机制,如synchronized关键字、wait()和notify()方法等,以及如何通过这些机制实现线程间的协调与通信。最后,本文还讨论了一些常见的并发问题,例如死锁、竞态条件等,并提供了相应的解决策略。
51 3
|
1月前
|
Java 开发工具 Android开发
Kotlin教程笔记(26) -Kotlin 与 Java 共存(一)
Kotlin教程笔记(26) -Kotlin 与 Java 共存(一)
|
1月前
|
开发框架 安全 Java
Java 反射机制:动态编程的强大利器
Java反射机制允许程序在运行时检查类、接口、字段和方法的信息,并能操作对象。它提供了一种动态编程的方式,使得代码更加灵活,能够适应未知的或变化的需求,是开发框架和库的重要工具。
50 3