线程池之刨根问底

简介: 线程池之刨根问底

前言

说起刨根问底,首先想到的竟然是------小沈阳。。。。。

其实有关线程池的文章一搜都有一大堆,但。。。感觉都不全(手动狗头)

什么是线程?

先不说线程池的概念了,先说下什么是线程。线程究竟是什么?百度这个问题的一般会得到如下答案:

线程是指进程中的一个执行流程,一个进程中可以运行多个线程。

奥,线程是进程里的,听着这些名词感觉都很熟悉,感觉一直在和它们打交道,但,什么是进程呢?不就是一个内存中运行的应用程序嘛!而且有它自己独立的一块内存空间,一个程序至少有一个进程,一个进程至少有一个线程。

是不是感觉挺绕,其实一点都不绕,安卓中不也有多进程嘛,直接在AndroidManifest给四大组件添加android:process属性不得了,所以一个程序中并不是只能有一个进程啊,可以共同存在运行,一个进程中如果没有线程还运行什么呢,对不?

在Java中,每次程序运行至少启动2个线程:一个是main线程,一个是垃圾收集线程。因为每当使用java命令执行一个类的时候,实际上都会启动一个JVM,每一个JVM实际上就是在操作系统中启动了一个进程。

那JVM又是啥呢?JVM是虚拟机的英文简称。他是java运行环境的一部分。它是一个虚构出来的计算机,是通过在实际的计算机上仿真模拟各种计算机功能来实现的,JVM 中的内存可以划分为若干个不同的数据区域,主要分为:程序计数器、虚拟机栈、本地方法栈、堆、方法区。

好了好了,不能再说了,已经偏离主题了,现在说的是线程,从线程说到了进程,又说到了JVM。。。简单总结下吧:进程(Process) 是计算机中的程序关于某数据集合上的一次运行活动,是系统进行资源分配和调度的基本单位,是操作系统结构的基础,线程(thread) 是操作系统能够进行运算调度的最小单位。它被包含在进程之中,是进程中的实际运作单位。

Java中的线程

在平时的代码编写中会遇到很多需要开启线程来执行耗时的任务操作,因为安卓中的刷新机制,UI线程并不能执行耗时操作,所以要开启子线程来执行,一般会有以下几种方式:

直接继承自Thread类,并重写Thread中的run方法:

public class TestThread extends Thread {
    @Override
    public void run() {
        //执行线程操作
    }
}

直接new一个Thread对象并start执行:

new Thread(new Runnable() {
            @Override
            public void run() {
                //执行线程操作
            }
        }).start();

实现Runable接口,当我们查看Thread源码的时候会发现Thread类也是实现了Runable接口:

public class MyRunnable implements Runnable{
        @Override
        public void run() {
      //执行线程操作
        }
    }

初识线程池

我们都知道,Java 线程的创建以及上下文切换是比消耗性能的,所以引入了轻量级锁、偏向锁等优化,目的就是减少用户态和核心态之间的切换频率。既然创建和销毁线程非常损耗性能,那可不可以复用一些被创建好的线程呢?当然可以了,用线程池啊。

终于说到线程池了。。。进入了今天的正题。。。

先放一张表示线程池体系的图吧:

20200531210610386.png

Executor

来看看线程池最顶层的接口:

public interface Executor {
    void execute(Runnable command);
}

可以看到最顶层的Executor接口中只有一个execute方法,线程的创建、调度等细节由子类ExecutorService实现。


ExecutorService

接下来就看看ExecutorService:

public interface ExecutorService extends Executor {
  void shutdown();
  List<Runnable> shutdownNow();
  boolean isShutdown();
  boolean isTerminated();
  boolean awaitTermination(long timeout, TimeUnit unit)
        throws InterruptedException;
    <T> Future<T> submit(Callable<T> task);
    <T> Future<T> submit(Runnable task, T result);
    Future<?> submit(Runnable task);
    <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
        throws InterruptedException;
    <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
                                  long timeout, TimeUnit unit)
        throws InterruptedException;
    <T> T invokeAny(Collection<? extends Callable<T>> tasks)
        throws InterruptedException, ExecutionException;
    <T> T invokeAny(Collection<? extends Callable<T>> tasks,
                    long timeout, TimeUnit unit)
        throws InterruptedException, ExecutionException, TimeoutException;
}

上面就是ExecutorService的源码,我把里面的注释去掉了,大家想看的话直接点进去查看即可,可以看到ExecutorService 继承并拓展了 Executor,在 ExecutorService 内部提供了更全面的任务提交机制以及线程池关闭方法。

ScheduledExecutorService

再来看下ScheduledExecutorService的源码吧:

public interface ScheduledExecutorService extends ExecutorService {
    public ScheduledFuture<?> schedule(Runnable command,
                                       long delay, TimeUnit unit);
    public <V> ScheduledFuture<V> schedule(Callable<V> callable,
                                           long delay, TimeUnit unit);
    public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
                                                  long initialDelay,
                                                  long period,
                                                  TimeUnit unit);
    public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,
                                                     long initialDelay,
                                                     long delay,
                                                     TimeUnit unit);

可以看到ScheduledExecutorService继承自 ExecutorService,增加了四个定时任务相关方法。

ScheduledThreadPoolExecutor
public class ScheduledThreadPoolExecutor
        extends ThreadPoolExecutor
        implements ScheduledExecutorService {}

ScheduledThreadPoolExecutor 继承自 ThreadPoolExecutor,并实现了 ScheduledExecutorService 接口。

ForkJoinPool

public class ForkJoinPool extends AbstractExecutorService {}

ForkJoinPool 是一种支持任务分解的线程池,一般要配合可分解任务接口 ForkJoinTask 来使用。

ThreadPoolExecutor

ThreadPoolExecutor 是 ExecutorService 的默认实现,所谓的线程池机制也大多封装在此类当中。

了解线程池

通过上面的介绍大家应该对线程池已经有了大概了解,那么。。。我们该怎样使用呢?上面也提到了ThreadPoolExecutor 是 ExecutorService 的默认实现,那么直接看看 ThreadPoolExecutor 的构造方法不就得了嘛!来,看看:

20200531210631617.png

废话不多说,先来看第一个构造方法:

public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue) {
        this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
             Executors.defaultThreadFactory(), defaultHandler);
    }

代码很简单,外部传入参数后直接调用其他构造方法,来看一下需要传入的参数都是啥意思吧:

  • corePoolSize:即使处于空闲状态依然保留在池中的线程数(核心),除非设置了allowCoreThreadTimeOut,当 allowCoreThreadTimeOut 设置为 true 时,核心线程超时后也会被销毁。
  • maximumPoolSize:池中允许的最大线程数;
  • keepAliveTime:线程池空闲时线程的存活时长;
  • unit:keepAliveTime的时间单位;
  • workQueue:存放任务的队列,使用的是阻塞队列;

上面的几个参数都挺好理解,但是workQueue阻塞队列不太好理解,下面来说一下吧:

  • ArrayBlockingQueue:有界队列,一个用数组实现的有界阻塞队列,按FIFO排序量。
  • LinkedBlockingQueue:可设置容量的队列,基于链表结构的阻塞队列,按FIFO排序任务,容量可以选择进行设置,不设置的话,将是一个无边界的阻塞队列,最大长度为Integer.MAX_VALUE,吞吐量通常要高于ArrayBlockingQuene;newFixedThreadPool线程池使用了这个队列(这里有坑)。
  • DelayQueue:延迟队列,是一个任务定时周期的延迟执行的队列。根据指定的执行时间从小到大排序,否则根据插入到队列的先后排序。newScheduledThreadPool线程池使用了这个队列。
  • PriorityBlockingQueue:优先级队列,具有优先级的无界阻塞队列。
  • SynchronousQueue:同步队列,一个不存储元素的阻塞队列,每个插入操作必须等到另一个线程调用移除操作,否则插入操作一直处于阻塞状态,吞吐量通常要高于LinkedBlockingQuene,newCachedThreadPool线程池使用了这个队列。

再来看看第二个:

public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory) {
        this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
             threadFactory, defaultHandler);
    }

可以看到第二个构造方法和第一个方法相比只多了一个参数,那就来说下多的这个参数的意思吧:

  • threadFactory:执行程序创建新线程时要使用的工厂;

第三个构造方法:

public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              RejectedExecutionHandler handler) {
        this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
             Executors.defaultThreadFactory(), handler);
    }

OK,又多了一个参数,来看看吧:

  • 在队列(workQueue)和线程池达到最大线程数(maximumPoolSize)均满时仍有任务的情况下的处理方式;

这里需要详细说一下了,拒绝策略是线程池的一种保护机制,目的就是当这种无节制的线程资源申请发生时,拒绝新的任务保护线程池。默认拒绝策略会直接报异常,但是 JDK 中一共提供了 4 种保护策略,如下图所示:

20200531210648477.png



最后一个构造方法:

public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler) {
        if (corePoolSize < 0 ||
            maximumPoolSize <= 0 ||
            maximumPoolSize < corePoolSize ||
            keepAliveTime < 0)
            throw new IllegalArgumentException();
        if (workQueue == null || threadFactory == null || handler == null)
            throw new NullPointerException();
        this.corePoolSize = corePoolSize;
        this.maximumPoolSize = maximumPoolSize;
        this.workQueue = workQueue;
        this.keepAliveTime = unit.toNanos(keepAliveTime);
        this.threadFactory = threadFactory;
        this.handler = handler;
    }

这个方法的参数和第三个一样,只是进行了一些空值判断、异常的抛出以及参数的赋值操作。

使用

构造方法有了,来看看怎么使用吧:

ThreadPoolExecutor mExecutor = new ThreadPoolExecutor(corePoolSize,// 核心线程数
                        maximumPoolSize, // 最大线程数
                        keepAliveTime, // 闲置线程存活时间
                        TimeUnit.MILLISECONDS,// 时间单位
                        new LinkedBlockingDeque<Runnable>(),// 线程队列
                        Executors.defaultThreadFactory(),// 线程工厂
                        new ThreadPoolExecutor.AbortPolicy()// 队列已满,而且当前线程数已经超过最大线程数时的异常处理策略

先来创建一个线程池,然后直接使用就可以了:

mExecutor.execute(runnable);

是不是很简单啊?不不不,还有更简单的,为了方便开发者可以更方便的使用线程池,JDK 中给我们提供了一个线程池的工厂类—Executors。在 Executors 中定义了多个静态方法,用来创建不同配置的线程池。常见有以下几种。

newSingleThreadExecutor

顾名思义,这是一个单线程化的线程池,只会用唯一的工作线程来执行任务,保证所有任务按先进先出的顺序执行。

private void one(){
        ExecutorService executorService = Executors.newSingleThreadExecutor();
        executorService.execute(new Runnable() {
            @Override
            public void run() {
                //执行线程操作
            }
        });
    }

那么。。。它是怎么实现只有一个工作线程呢?来看看源码不得了:

public static ExecutorService newSingleThreadExecutor() {
        return new FinalizableDelegatedExecutorService
            (new ThreadPoolExecutor(1, 1,
                                    0L, TimeUnit.MILLISECONDS,
                                    new LinkedBlockingQueue<Runnable>()));
    }

很简单吧,只是把核心线程数和最大线程数都设置为了1,缓存队列为LinkedBlockingQueue(可设置容量队列)

newCachedThreadPool

这个是创建一个可缓存线程池,如果线程池长度超过处理需要,可灵活回收空闲线程,若无可回收,则新建线程。

private void two(){
        ExecutorService executorService = Executors.newCachedThreadPool();
        executorService.execute(new Runnable() {
            @Override
            public void run() {
        //执行线程操作
            }
        });
    }

再来看看可缓存的线程池是怎么实现的:

public static ExecutorService newCachedThreadPool() {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>());
    }

可以看到可缓存的线程池是没有核心线程的,但是最大线程数为Integer.MAX_VALUE(2147483647),缓存队列为SynchronousQueue(同步队列)

newFixedThreadPool

这个方法是创建一个固定数目的、可重用的线程池。

private void three(){
        ExecutorService executorService = Executors.newFixedThreadPool(3);
        executorService.execute(new Runnable() {
            @Override
            public void run() {
                //执行线程操作
            }
        });
    }

来看下固定数目的线程池的实现吧:

public static ExecutorService newFixedThreadPool(int nThreads) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>());
    }

很简单,只是传入一个数字,核心线程数和最大线程数都设置为输入的值的大小,缓存队列为LinkedBlockingQueue(可设置容量队列)

newScheduledThreadPool

创建一个定时线程池,支持定时及周期性任务执行。

private void four(){
        ExecutorService executorService = Executors.newScheduledThreadPool(3);
        executorService.execute(new Runnable() {
            @Override
            public void run() {
                //执行线程操作
            }
        });
    }

最后再看一下定时线程池的实现:

public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
        return new ScheduledThreadPoolExecutor(corePoolSize);
    }

上面提到过,定时线程池的实现是ScheduledThreadPoolExecutor,那咱就再来看下ScheduledThreadPoolExecutor的构造方法:

public ScheduledThreadPoolExecutor(int corePoolSize) {
        super(corePoolSize, Integer.MAX_VALUE,
              DEFAULT_KEEPALIVE_MILLIS, MILLISECONDS,
              new DelayedWorkQueue());
    }

核心线程数也是传进的值,最大线程是也是Integer.MAX_VALUE(2147483647),缓存队列为DelayedWorkQueue(延迟队列)

小总结

是不是发现Exectors中的这几个静态方法很方便啊,特别省事,直接调用即可使用,比自己来创建ThreadPoolExecutor方便的多。

但是。。。

凡事就怕但是,先来看一段代码:

private static void three(){
        ExecutorService executorService = Executors.newFixedThreadPool(3);
        for (int i = 0; i < 10000000; i++) {
            final int taskId = i;
            System.out.println(taskId);
            executorService.execute(new Runnable() {
                @Override
                public void run() {
                   try{
                       System.out.println("线程:"+Thread.currentThread().getName()+" 正在执行:"+taskId);
                       Thread.sleep(1000);
                   }catch (Exception e){
                       e.printStackTrace();
                   }
                }
            });
        }
    }

上面说过LinkedBlockingQueue如果容量不设置的话将是一个无边界的阻塞队列,最大长度为Integer.MAX_VALUE,源码中也没有进行设置,所以。。。。会OOM。。此处即为上面提到的坑。。

由于newFixedThreadPool和newSingleThreadExecutor实现基本一样,就不再测试newSingleThreadExecutor。

下面来看下newCachedThreadPool:

private void two(){
        ExecutorService executorService = Executors.newCachedThreadPool();
        for (int i = 0; i < 10000000; i++) {
            final int taskId = i;
            System.out.println(taskId);
            executorService.execute(new Runnable() {
                @Override
                public void run() {
                    try{
                        System.out.println("线程:"+Thread.currentThread().getName()+" 正在执行:"+taskId);
                        Thread.sleep(1000);
                    }catch (Exception e){
                        e.printStackTrace();
                    }
                }
            });
        }
    }

同样会报OOM,原因很简单,它的最大线程数为Integer.MAX_VALUE(2147483647),没有限制,所以。。。。

CachedThreadPool 和 ScheduledThreadPool都允许的创建线程数量为 Integer.MAX_VALUE,此处不再测试ScheduledThreadPool。

所以。。。

有了上面的但是,也就有了下面的所以,相信阿里的开发规范大家都或多或少的看过,里面有这么一条:

线程池不允许使用 Executors 去创建,而是通过 ThreadPoolExecutor 的方式,这样的处理方式让写的同学更加明确线程池的运行规则,规避资源耗尽的风险。

那么。。。我到底该怎么做?

很简单,通过ThreadPoolExecutor的方式自己创建线程池,根据业务逻辑选择阻塞队列、拒绝策略等。

当然安卓开发其实无需考虑那么多,使用Kotlin的朋友应该知道一个更香的东西------协程,直接使用协程不得了,也不用考虑这么多乱七八糟的东西。。。谷歌大法好。。协程的使用在这里我就不现眼了,扔物线的课程中讲的明明白白的。

总结

本来感觉没多少东西,但为什么写了这么久

个人编写,难免出错,文中如有错误请指出,万分感激🙏。下次再见。


目录
相关文章
|
23天前
|
存储 缓存 监控
线程池夺命十四问
线程池夺命十四问
线程池夺命十四问
|
5月前
|
存储 缓存 Java
老程序员分享:Java并发编程:线程池的使用
老程序员分享:Java并发编程:线程池的使用
|
6月前
|
存储 缓存 Java
【linux线程(四)】初识线程池&手撕线程池
【linux线程(四)】初识线程池&手撕线程池
|
6月前
|
Java 测试技术
Java多线程实战-从零手搓一个简易线程池(二)线程池实现与拒绝策略接口定义
Java多线程实战-从零手搓一个简易线程池(二)线程池实现与拒绝策略接口定义
|
6月前
|
存储 缓存 Oracle
Java线程池,白话文vs八股文,原来是这么回事!
一、线程池原理 1、白话文篇 1.1、正式员工(corePoolSize) 正式员工:这些是公司最稳定和最可靠的长期员工,他们一直在工作,不会被解雇或者辞职。他们负责处理公司的核心业务,比如生产、销售、财务等。在Java线程池中,正式员工对应于核心线程(corePoolSize),这些线程会一直存在于线程池中。他们负责执行线程池中的任务,如果没有任务,他们会等待新的任务到来。 1.2、所有员工(maximumPoolSize) 所有员工:这些是公司所有的员工,包括正式员工和外包员工。他们共同组成了公司的团队,协作完成公司的各种业务。在Java线程池中,所有员工对应于所有线程(maxim
|
6月前
|
存储 分布式计算 Java
不是吧?线程池这样搞?
学习线程池能够帮助我们更好地处理多线程编程,并提高程序的性能和稳定性。线程池指定线程数这块,首先要考量自己的业务是什么样的?是cpu密集型的还是io密集型的,假设运行应用的机器CPU核心数是N。 cpu密集型的可以先给到N+1,io密集型的可以给到2N 。
54 1
【多线程】线程池如何复用,怎么才能让面试官听懂我说的?
今天来说一下面试中常问到问题,我们知道线程池是帮助我们对线程资源的管理,只有我们合理的使用使用线程池,他才能做到事倍功半,但是你知道线程池是如何复用的吗?
|
缓存 监控 Java
Java多线程-死磕ThreadPoolExecutor线程池
Java线程池ThreadPoolExecutor基本实现原理
249 0
Java多线程-死磕ThreadPoolExecutor线程池
|
存储 缓存 监控
老生常谈的线程池希望你已经都会了
老生常谈的线程池希望你已经都会了
434 0
老生常谈的线程池希望你已经都会了
|
缓存 Java 程序员
产品经理问我:手动创建线程不香吗,为什么非要用线程池呢?
每次写线程池的文章时,总会想起自己大三第一次面试就是挂在这上面,当时年少轻狂,连SpringBoot是什么都不知道就敢面阿里,真是初生牛犊不怕虎。