多线程与高并发学习:ThreadPoolExecutor源码解析

本文涉及的产品
云解析 DNS,旗舰版 1个月
全局流量管理 GTM,标准版 1个月
公共DNS(含HTTPDNS解析),每月1000万次HTTP解析
简介: 多线程与高并发学习:ThreadPoolExecutor源码解析

目录

前言

正文

源码解析———基本属性

源码解析———execute

源码解析———addWorker

源码解析———runWorker

源码解析———getTask

源码解析———processWorkerExit

源码解析———tryTerminate

源码解析———shutdown

源码解析———shutdownNow

总结

前言

线程池在工作中的应用非常广泛,学习其源码可以更好掌握并发相关的思想。

正文

源码解析———基本属性

// ctl=11100000  00000000  00000000  00000000
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
//COUNT_BITS=29
private static final int COUNT_BITS = Integer.SIZE - 3;
//(1 << COUNT_BITS)     00100000  00000000  00000000  00000000
//(1 << COUNT_BITS)-1   00011111 11111111 11111111 11111111 
private static final int CAPACITY   = (1 << COUNT_BITS)  - 1;
//大小顺序为:RUNNING<SHUTDOWN<STOP<TIDYING<TERMINATED
// runState is stored in the high-order bits
// RUNNING= 11100000  00000000  00000000  00000000
private static final int RUNNING    = -1 << COUNT_BITS;
// SHUTDOWN = 00000000  00000000 00000000 00000000  
private static final int SHUTDOWN   =  0 << COUNT_BITS;
// STOP= 00100000  00000000  00000000  00000000
private static final int STOP       =  1 << COUNT_BITS;
// TIDYING= 01000000  00000000  00000000  00000000
private static final int TIDYING    =  2 << COUNT_BITS;
// TERMINATED= 01100000  00000000  00000000  00000000
private static final int TERMINATED =  3 << COUNT_BITS;
// Packing and unpacking ctl
//最终值大小取决于高三位
private static int runStateOf(int c)     { return c & ~CAPACITY; }
//最终值大小取决于低29位
private static int workerCountOf(int c)  { return c & CAPACITY; }
private static int ctlOf(int rs, int wc) { return rs | wc; }

源码解析———execute

public void execute(Runnable command) {
        if (command == null)
            throw new NullPointerException();
        int c = ctl.get();
      //判断工作线程数量是否小于核心线程数
        if (workerCountOf(c) < corePoolSize) {
            //添加为核心线程,true参数代表是添加一个核心线程
            if (addWorker(command, true))
                return;
            c = ctl.get();
        }
      //到这里证明当前工作线程超过了核心线程数
      //判断如果线程池是处于运行状态,则将任务添加到工作队列中
        if (isRunning(c) && workQueue.offer(command)) {
            //重新检查
            int recheck = ctl.get();
            //如果线程池不是运行状态,则将任务从工作队列中移除掉,并执行拒绝策略
            if (! isRunning(recheck) && remove(command))
                reject(command);
            //判断当前的工作线程是否为0,如果为0创建一个新的工作线程用来消费处理队列中的任务
            else if (workerCountOf(recheck) == 0)
                addWorker(null, false);
        }
      //走到这里并且能添加成功,证明此时的队列已经满了
      //添加失败则执行拒绝策略
        else if (!addWorker(command, false))
            reject(command);
    }

流程总结:


1、如果当前空闲的工作线程小于核心线程数时,创建核心线程,并直接运行任务


2、如果当前空闲的工作线程大于核心线程数时,线程池处于运行状态,将任务丢到队列中,等待消费


3、重复检查,判断线程池状态是否处于运行状态,如果不是则将上面添加到队列的任务移除掉


4、如果当前空闲的工作线程为0,则创建一个非核心工作线程,创建完之后该工作线程会去消费队列中的任务


5、如果线程中的任务队列已经满了,则尝试添加非核心工作线程直接进行处理

源码解析———addWorker

private boolean addWorker(Runnable firstTask, boolean core) {
    //外层循环标记
    retry:
    for (;;) {
        int c = ctl.get();
        //计算此时线程池的运行状态
        int rs = runStateOf(c);
        // Check if queue empty only if necessary.
        //这里换种写法好理解写
        // if(rs>=SHUTDOWN){
        //     这里控制除了SHUTDOWN之外的其它状态,都不能添加新的工作线程
        //     if(rs != SHUTDOWN){
        //         return false;
        //     }
        //     能到这里证明此时线程池状态为SHUTDOWN
        //     如果任务为空,而队列不为空这种情况代表是创建新的非核心工作线程来消费处理队列,如:addWorker(null, false);
        //     if(!(firstTask == null&&!workQueue.isEmpty()){
        //         return false;
        //     }
        //  }
        if (rs >= SHUTDOWN &&
            ! (rs == SHUTDOWN &&
               firstTask == null &&
               ! workQueue.isEmpty()))
            return false;
        for (;;) {
            //计算当前工作线程数量
            int wc = workerCountOf(c);
            //判断工作线程总数量(包括核心线程)不能大于最大线程池数量阈值,或者核心工作线程数量不能大于核心工作线程的最大阈值
            if (wc >= CAPACITY ||
                wc >= (core ? corePoolSize : maximumPoolSize))
                return false;
            //使用CAS将工作线程数量+1
            if (compareAndIncrementWorkerCount(c))
                //停止掉最外层的循环
                break retry;
            c = ctl.get();  // Re-read ctl
            //线程池状态未改变前,一直尝试添加工作线程
            if (runStateOf(c) != rs)
                continue retry;
            // else CAS failed due to workerCount change; retry inner loop
        }
    }
    boolean workerStarted = false;
    boolean workerAdded = false;
    Worker w = null;
    try {
        //创建工作线程,将任务放到工作线程中
        w = new Worker(firstTask);
        //thread中的Runnable就是Worker
        final Thread t = w.thread;
        if (t != null) {
            //加锁,这样可以防止多线程情况下,其它线程调用了shutdown()方法,shutdownNow()方法
            final ReentrantLock mainLock = this.mainLock;
            mainLock.lock();
            try {
                // Recheck while holding lock.
                // Back out on ThreadFactory failure or if
                // shut down before lock acquired.
                //重新获取状态
                int rs = runStateOf(ctl.get());
        //如果线程池处于运行状态,或者线程池处于SHUTDOWN状态并且任务参数为null这种情况(创建非核心工作线程来消费队列)
                if (rs < SHUTDOWN ||
                    (rs == SHUTDOWN && firstTask == null)) {
                    if (t.isAlive()) // precheck that t is startable
                        throw new IllegalThreadStateException();
                    //将工作线程添加到集合中
                    workers.add(w);
                    //记录线程池中曾经存在的最大工作线程数量
                    int s = workers.size();
                    if (s > largestPoolSize)
                        largestPoolSize = s;
                    //工作线程添加完毕
                    workerAdded = true;
                }
            } finally {
                mainLock.unlock();
            }
            if (workerAdded) {
                //开启调用work的run方法,而run方法调用了runWorker(this);
                t.start();
                workerStarted = true;
            }
        }
    } finally {
        //工作线程启动失败,则将该工作线程从集合中移除,并将当前工作线程数量-1
        if (! workerStarted)
            addWorkerFailed(w);
    }
    return workerStarted;
}

'流程总结:


1、校验当前线程池状态是否为运行状态,如果不是运行状态下,判断是否为SHUTDOWN,且任务参数为空,队列不为空(针对addWorker(null, false)场景),这种情况允许创建工作线程,因为可能任务添加完队列后的瞬间,线程池状态被改为了SHUTDOWN,那么这种情况如果队列还有需要消费的任务,是可以多开启一个工作线程帮忙消费的;


2、尝试通过CAS的方式将工作线程总数加1,如果加1成功则证明能成功添加线程(应对多线程场景),否则一直自旋尝试添加工作线程


3、创建工作线程,将任务设置进去;由于Worker本身实现了Runnable接口,所以其thread属性为将本身包装成了Thread


4、开启工作线程的run方法,由于run方法中调用了runWorker(this),这个方法才是真正处理器任务的核心方法

源码解析———runWorker

final void runWorker(Worker w) {
      //获取当前的工作线程
        Thread wt = Thread.currentThread();
      //获取任务
        Runnable task = w.firstTask;
      //将任务置为空,因为后面执行完这个任务后,工作线程还会去处理队列中的任务
        w.firstTask = null;
      //由于Worker继承了AQS,这里调用unlock(),实际是把AQS中的状态设置为0,此时调用shutdown或者shutdownNow方法时,可以获取到锁,将当前线程中断掉
        w.unlock(); // allow interrupts
        boolean completedAbruptly = true;
        try {
            //1、当前有任务处理,执行execute、submit传入的任务
            //2、从队列中获得到任务
            while (task != null || (task = getTask()) != null) {
                //获取到任务进行锁后,由于是非重入锁,此时不能中断在执行中的工作线程;所以SHUTDOWN状态下是没办法中断非空闲线程的;空闲线程由于没有lock所以可以被中断
                w.lock();
                // If pool is stopping, ensure thread is interrupted;
                // if not, ensure thread is not interrupted.  This
                // requires a recheck in second case to deal with
                // shutdownNow race while clearing interrupt
                //线程池状态为STOP或者STOP后面的状态时,不会再执行任务了
                //Thread.interrupted()会返回当前线程的中断标志,并且清除中断标志
                if ((runStateAtLeast(ctl.get(), STOP) ||
                     (Thread.interrupted() &&
                      //如果是STOP及其后面的状态则为true,!wt.isInterrupted()判断中断标志是否为false
                      runStateAtLeast(ctl.get(), STOP))) &&
                    !wt.isInterrupted())
                    //先当前线程中断
                    wt.interrupt();
                try {
                    //拓展方法,留给子类实现
                    beforeExecute(wt, task);
                    Throwable thrown = null;
                    try {
                        //执行任务
                        task.run();
                    } catch (RuntimeException x) {
                        thrown = x; throw x;
                    } catch (Error x) {
                        thrown = x; throw x;
                    } catch (Throwable x) {
                        thrown = x; throw new Error(x);
                    } finally {
                        //拓展方法,留给子类实现
                        afterExecute(task, thrown);
                    }
                } finally {
                    task = null;
                    //处理完任务的数量+1
                    w.completedTasks++;
                    w.unlock();
                }
            }
            completedAbruptly = false;
        } finally {
            processWorkerExit(w, completedAbruptly);
        }
    }

流程总结


1、刚开始将AQS中的state设置为0,允许调用shutdown或者shutdownNow方法时,可以获取到锁,将当前线程中断掉


2、如果当前有传入任务则直接执行,否则去队列中获取。获取不到时会将当前的工作线程销毁;


3、线程不是中断状态则执行任务,执行成功将当前线程已处理器任务数累加


4、最后将线程销毁

源码解析———getTask

private Runnable getTask() {
    boolean timedOut = false; // Did the last poll() time out?
    for (;;) {
        //获取线程池状态
        int c = ctl.get();
        int rs = runStateOf(c);
        // Check if queue empty only if necessary.
        //如果线程池处于SHUTDOWN并且队列为空,则没有任务需要处理
        //如果线程池状态为STOP,TYDING,TERMINATED,则不需要在处理
        //以上两种情况会将当前的工作线程销毁掉,这里先将数量-1。在外层方法中的processWorkerExit进行销毁
        if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
            decrementWorkerCount();
            return null;
        }
    //统计当前工作线程数量
        int wc = workerCountOf(c);
        // Are workers subject to culling?
        //timed为true时只有两种可能,一种是allowCoreThreadTimeOut=true允许核心线程在规定时间内获取不到任务时,进行销毁,此时的核心线程就跟非核心线程没有什么区别了
        //另一种情况是,核心线程不允许超时,也就是一直保持核心线程处于活跃状态,并且存在非核心线程(比较总工作线程数是否大于核心线程数)
        //核心工作核心和非核心工作线程没有什么区别,只是最终一直处于活跃状态就是核心线程。刚开始是核心线程后面可能就转为非核心线程,最终可能被销毁。
        boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
    //1、超过最大线程池数量,直接销毁当前的工作线程
        //2、time为true代表允许超时销毁,而timeout=true代表获取任务超时,此时会判断工作线程数量是否大于1,如果<=1时,任务队列必须为空。这样判断的目的是为了保证将目前的工作线程销毁后,还存在线程可以处理任务,而如果任务都没有了,就不需要工作线程的存在了。
        if ((wc > maximumPoolSize || (timed && timedOut))
            && (wc > 1 || workQueue.isEmpty())) {
            if (compareAndDecrementWorkerCount(c))
                return null;
            continue;
        }
        try {
            //如果允许超时,则keepAliveTime时间内获取不到任务时,跳出来
            Runnable r = timed ?
                workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                workQueue.take();
            if (r != null)
                return r;
            //设置超时标识
            timedOut = true;
        } catch (InterruptedException retry) {
            timedOut = false;
        }
    }
}

流程总结:


1、如果线程池处于SHUTDOWN并且队列为空,则没有任务需要处理了,此时可以销毁当前的工作线程了,或线程池状态为STOP,TYDING,TERMINATED,则不需要在处理


2、判断当前的工作线程是否允许超时;有两种情况,一是allowCoreThreadTimeOut=true允许核心线程在队列进行任务获取时设置超时时间,超时后进行销毁,此时的核心线程就跟非核心线程没有什么区别了,因为核心线程就是一直保持活跃状态,就算队列没有任务了,也会进行等待;二是核心线程不允许超时,存在非核心线程(比较总工作线程数是否大于核心线程数),则允许超时,非核心工作线程在获取队列时设置超时时间,过期获取不到时,会将其销毁;

源码解析———processWorkerExit

private void processWorkerExit(Worker w, boolean completedAbruptly) {
    //如果线程异常,则completedAbruptly=true,此时将工作线程数-1
    if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted
        decrementWorkerCount();
  //加锁
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        //将已完成任务数量汇总
        completedTaskCount += w.completedTasks;
        //移除集合中的工作线程
        workers.remove(w);
    } finally {
        mainLock.unlock();
    }
  //将符合条件的工作线程中断,并设置线程池最终状态
    tryTerminate();
    int c = ctl.get();
    //如果线程池状态为RUNNING或SHUTDOWN
    if (runStateLessThan(c, STOP)) {
        if (!completedAbruptly) {
          // min表示最小线程数,若allowCoreThreadTimeOut为true表示设置了允许核心线程数超时,则最小核心线程数为0,否则就是corePoolSize值
            int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
            //如果队列中还有任务需要处理,则将最小核心数设置为1,保证有线程可以进行处理
            if (min == 0 && ! workQueue.isEmpty())
                min = 1;
            if (workerCountOf(c) >= min)
                return; // replacement not needed
        }
        //创建工作线程
        addWorker(null, false);
    }
}

流程总结:


1、如果completedAbruptly=true,证明是发生了异常,则将总工作线程数量-1;因为如果completedAbruptly=false,则流程是正常执行的,由于线程找不到任务执行了,所以才来销毁当前线程,而线程数量-1步骤则是在getTask()步骤中已经完成了;


2、将已完成的处理任务数量汇总


3、将符合条件的工作线程中断,并设置线程池最终状态


4、如果线程池状态为RUNNING或SHUTDOWN,并且流程是正常执行没有异常,判断是否允许核心线程超时,因为如果允许核心线程超时那么核心线程就跟非核心线程没有区别,都可能因为没有任务而被销毁,此时将最小线程数设置为0,否则设置为核心线程数的阈值;


5、判断队列是否还有任务,有的话保留步骤4中计算的最小工作线程数用来执行队列任务

源码解析———tryTerminate

final void tryTerminate() {
    for (;;) {
        int c = ctl.get();
        //SHUTDOWN状态下队列没有任务了 或者  状态为STOP会往下走
        if (isRunning(c) ||
            runStateAtLeast(c, TIDYING) ||
            (runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty()))
            return;
        //如果线程池存在工作线程,将工作线程中断
        if (workerCountOf(c) != 0) { // Eligible to terminate
            interruptIdleWorkers(ONLY_ONE);
            return;
        }
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            //将线程池状态设置为TIDYING
            if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {
                try {
                    //执行中断钩子函数
                    terminated();
                } finally {
                    //将线程池状态设置为TERMINATED
                    ctl.set(ctlOf(TERMINATED, 0));
                    termination.signalAll();
                }
                return;
            }
        } finally {
            mainLock.unlock();
        }
        // else retry on failed CAS
    }
}

流程总结:


1、判断是否需要中断工作线程


线程池处于运行状态不需要中断

线程池处于SHUTDOWN状态,但是队列还有任务没处理完,不需要中断

TYDING、TEMINATED状态不需要中断,因为在进入这两者状态前的STOP状态已经进行线程中断操作了

2、如果线程池存在工作线程,将工作线程中断


3、加锁后,将线程池状态修改为将线程池状态设置为TIDYING


4、执行钩子函数后,将线程池状态设置为TERMINATED

源码解析———shutdown

public void shutdown() {
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        checkShutdownAccess();
        //设置线程池状态为SHUTDOWN
        advanceRunState(SHUTDOWN);
        //将所有工作线程中断标志设置为true
        interruptIdleWorkers();
        //尝试将工作线程打断
        onShutdown(); // hook for ScheduledThreadPoolExecutor
    } finally {
        mainLock.unlock();
    }
    //尝试中断,会中断空闲的线程因为空闲线程还没有lock
    tryTerminate();
}

源码解析———shutdownNow

public List<Runnable> shutdownNow() {
    List<Runnable> tasks;
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        checkShutdownAccess();
        //设置线程池状态为STOP
        advanceRunState(STOP);
        //中断所有线程
        interruptWorkers();
        //清空队列任务
        tasks = drainQueue();
    } finally {
        mainLock.unlock();
    }
    //尝试中断,会中断空闲的线程因为空闲线程还没有lock,而那些有lock的由于队列任务被清空,执行完任务后也会被销毁
    tryTerminate();
    return tasks;
}

总结

我们从上面的源码知道核心线程跟非核心线程本质上是一样的,只是为了当任务较少的情况下,让几个线程处于活跃状态,可以处理队列中的任务,而核心线程数量就是处于活跃状态,可以随时处理队列任务;但是有一种情况下核心线程也会被销毁,就是设置了“允许核心线程超时”标志,此时的核心线程就跟非核心线程没什么区别,在获取队列中的任务超时后,会被销毁掉,但是如果队列中有任务的话,会设置最小线程池数为1,维持线程池有一个线程可以用来处理任务;


从源码中可以知道当调用shutdown方法后,线程池状态变为SHUTDOWN状态,此时会尝试将空闲状态的线程中断掉,也就是没有进行lock的线程;而那些持有lock的线程还会继续处于队列中的任务,直到任务被消费完之后,销毁;处于SHUTDOWN状态下不允许添加新的任务,但是允许执行完已添加到队列任务;


调用shutdownNow方法后,线程池状态变为STOP,此时会中断所有空闲的线程,并将队列任务清空。而那些还处于运行状态下的则执行完后,就获取不到任务而被销毁。


当所有线程池中不存在工作线程后,并且状态为STOP,通过CAS的方式将状态改为TYDING,在执行完钩子函数后,将线程池状态改为TERMINATED


目录
相关文章
|
24天前
|
并行计算 Java 数据处理
SpringBoot高级并发实践:自定义线程池与@Async异步调用深度解析
SpringBoot高级并发实践:自定义线程池与@Async异步调用深度解析
110 0
|
3天前
|
安全 程序员 API
|
11天前
|
安全 Java
Java多线程通信新解:本文通过生产者-消费者模型案例,深入解析wait()、notify()、notifyAll()方法的实用技巧
【10月更文挑战第20天】Java多线程通信新解:本文通过生产者-消费者模型案例,深入解析wait()、notify()、notifyAll()方法的实用技巧,包括避免在循环外调用wait()、优先使用notifyAll()、确保线程安全及处理InterruptedException等,帮助读者更好地掌握这些方法的应用。
11 1
|
1月前
|
安全 Java 数据库连接
Python多线程编程:竞争问题的解析与应对策略
Python多线程编程:竞争问题的解析与应对策略
18 0
|
1月前
|
安全 Java 数据库连接
Python多线程编程:竞争问题的解析与应对策略【2】
Python多线程编程:竞争问题的解析与应对策略【2】
20 0
|
6月前
|
消息中间件 Java Linux
2024年最全BATJ真题突击:Java基础+JVM+分布式高并发+网络编程+Linux(1),2024年最新意外的惊喜
2024年最全BATJ真题突击:Java基础+JVM+分布式高并发+网络编程+Linux(1),2024年最新意外的惊喜
|
5月前
|
缓存 NoSQL Java
Java高并发实战:利用线程池和Redis实现高效数据入库
Java高并发实战:利用线程池和Redis实现高效数据入库
474 0
|
3月前
|
监控 算法 Java
企业应用面临高并发等挑战,优化Java后台系统性能至关重要
随着互联网技术的发展,企业应用面临高并发等挑战,优化Java后台系统性能至关重要。本文提供三大技巧:1)优化JVM,如选用合适版本(如OpenJDK 11)、调整参数(如使用G1垃圾收集器)及监控性能;2)优化代码与算法,减少对象创建、合理使用集合及采用高效算法(如快速排序);3)数据库优化,包括索引、查询及分页策略改进,全面提升系统效能。
45 0
|
5月前
|
存储 NoSQL Java
探索Java分布式锁:在高并发环境下的同步访问实现与优化
【6月更文挑战第30天】Java分布式锁在高并发下确保数据一致性,通过Redis的SETNX、ZooKeeper的临时节点、数据库操作等方式实现。优化策略包括锁超时重试、续期、公平性及性能提升,关键在于平衡同步与效率,适应大规模分布式系统的需求。
155 1
|
4月前
|
算法 Java 调度
高并发架构设计三大利器:缓存、限流和降级问题之使用Java代码实现令牌桶算法问题如何解决
高并发架构设计三大利器:缓存、限流和降级问题之使用Java代码实现令牌桶算法问题如何解决

推荐镜像

更多