【多线程:线程池】ThreadPoolExecutor类-提交、停止

简介: 【多线程:线程池】ThreadPoolExecutor类-提交、停止

【多线程:线程池】ThreadPoolExecutor类-提交、停止

01.提交任务

// 执行任务
    void execute(Runnable command);
    
    // 提交任务 task,用返回值 Future 获得任务执行结果
    <T> Future<T> submit(Callable<T> task);
    
    // 提交 tasks 中所有任务
    <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
     throws InterruptedException;
     
    // 提交 tasks 中所有任务,带超时时间
    <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
     long timeout, TimeUnit unit)
     throws InterruptedException;
     
    // 提交 tasks 中所有任务,哪个任务先成功执行完毕,返回此任务执行结果,其它任务取消
    <T> T invokeAny(Collection<? extends Callable<T>> tasks)
     throws InterruptedException, ExecutionException;
     
    // 提交 tasks 中所有任务,哪个任务先成功执行完毕,返回此任务执行结果,其它任务取消,带超时时间
    <T> T invokeAny(Collection<? extends Callable<T>> tasks,
     long timeout, TimeUnit unit)
     throws InterruptedException, ExecutionException, TimeoutException;

submit方法

submit方法与execute方法的区别在于,execute方法接收的参数是Runnable类型的参数 没有返回值,而submit方法接收的是Callable类型的参数有返回值 且 返回值用Future<>接收,Future<>获取线程的返回值 实现原理 就是之前所讲的保护性暂停模式。
例子

@Slf4j(topic = "c.TestPool1")
public class TestPool1 {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        ThreadPoolExecutor threadpool=new ThreadPoolExecutor(2, 10,
                20, TimeUnit.SECONDS, new ArrayBlockingQueue<>(10),
                new ThreadPoolExecutor.DiscardOldestPolicy());

        Future<String> future = threadpool.submit(new Callable<String>() {

            @Override
            public String call() throws Exception {
                log.debug("running");
                Thread.sleep(1000);
                return "ok";
            }

        });
        log.debug("{}",future.get());
    }
}

结果

17:12:30.718 c.TestPool1 [pool-1-thread-1] - running
17:12:31.731 c.TestPool1 [main] - ok

解释
可以看出我们实现了Callable接口 并返回了字符串"ok" 给Future< String>,之后通过get方法获取到返回数据。

invokeAll方法

invokeAll方法接收的是一个任务集合 且有返回值,线程池中的线程执行这个任务集合。
例子

@Slf4j(topic = "c.TestPool2")
public class TestPool2 {
    public static void main(String[] args) throws InterruptedException {
        ThreadPoolExecutor threadpool=new ThreadPoolExecutor(2, 10,
                20, TimeUnit.SECONDS, new ArrayBlockingQueue<>(10),
                new ThreadPoolExecutor.DiscardOldestPolicy());

        List<Future<String>> futures = threadpool.invokeAll(Arrays.asList(
                () -> {
                    log.debug("begin1");
                    Thread.sleep(1000);
                    return "1";
                },
                () -> {
                    log.debug("begin2");
                    Thread.sleep(500);
                    return "2";
                },
                () -> {
                    log.debug("begin3");
                    Thread.sleep(2000);
                    return "3";
                }
        ));

        futures.forEach(f -> {
            try {
                log.debug("{}",f.get());
            } catch (InterruptedException | ExecutionException e) {
                e.printStackTrace();
            }
        });
    }
}

结果

17:48:49.429 c.TestPool2 [pool-1-thread-2] - begin2
17:48:49.429 c.TestPool2 [pool-1-thread-1] - begin1
17:48:49.930 c.TestPool2 [pool-1-thread-2] - begin3
17:48:51.937 c.TestPool2 [main] - 1
17:48:51.937 c.TestPool2 [main] - 2
17:48:51.937 c.TestPool2 [main] - 3

解释
我们可以看到 任务1与任务2同时被执行,但是因为我们的线程池的核心线程数为2 所以任务3就先放入了任务队列,之后等待任务2执行完后 线程2执行任务3,中间差了0.5s 是因为 任务2执行了0.5s,最终我们遍历返回结果 并打印。

invokeAny方法

invokeAny方法的作用是得到一个最先返回的结果,之后的线程就不再运行。
例子

@Slf4j(topic = "c.TestPool3")
public class TestPool3 {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        ThreadPoolExecutor threadpool=new ThreadPoolExecutor(3, 10,
                20, TimeUnit.SECONDS, new ArrayBlockingQueue<>(10),
                new ThreadPoolExecutor.DiscardOldestPolicy());

       String future = threadpool.invokeAny(Arrays.asList(
                () -> {
                    log.debug("begin1");
                    Thread.sleep(1000);
                    log.debug("end1");
                    return "1";
                },
                () -> {
                    log.debug("begin2");
                    Thread.sleep(500);
                    log.debug("end2");
                    return "2";
                },
                () -> {
                    log.debug("begin3");
                    Thread.sleep(2000);
                    log.debug("end3");
                    return "3";
                }
        ));

        log.debug("{}",future);
    }
}

结果

18:03:18.892 c.TestPool3 [pool-1-thread-3] - begin3
18:03:18.892 c.TestPool3 [pool-1-thread-2] - begin2
18:03:18.892 c.TestPool3 [pool-1-thread-1] - begin1
18:03:19.412 c.TestPool3 [pool-1-thread-2] - end2
18:03:19.412 c.TestPool3 [main] - 2

解释
注意我们此时把核心线程数 设置为3,此时三个任务同时运行,且任务2先运行完 可以看到最后的结果只有任务2的返回值,但是注意一个场景,假如我们现在的核心线程数只有一个 那么很明显我们的线程会先运行任务1 剩下两个任务加入任务队列,等待任务1运行完才会 有空闲线程运行剩下的任务,也就是说任务1是先运行完的 所以返回的结果只有任务1的返回值。

02.关闭线程池

/*
线程池状态变为 SHUTDOWN
    - 不会接收新任务
    - 但已提交任务会执行完
    - 此方法不会阻塞调用线程的执行
*/
void shutdown();
// 源码
public void shutdown() {
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        checkShutdownAccess();
        // 修改线程池状态
        advanceRunState(SHUTDOWN);
        // 仅会打断空闲线程
        interruptIdleWorkers();
        onShutdown(); // 扩展点 ScheduledThreadPoolExecutor
    } finally {
        mainLock.unlock();
    }
    // 尝试终结(没有运行的线程可以立刻终结,如果还有运行的线程也不会等)
    tryTerminate();
}

/*
线程池状态变为 STOP
    - 不会接收新任务
    - 会将队列中的任务返回
    - 并用 interrupt 的方式中断正在执行的任务
*/
List<Runnable> shutdownNow();
// 源码
public List<Runnable> shutdownNow() {
    List<Runnable> tasks;
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        checkShutdownAccess();
        // 修改线程池状态
        advanceRunState(STOP);
        // 打断所有线程
        interruptWorkers();
        // 获取队列中剩余任务
        tasks = drainQueue();
    } finally {
        mainLock.unlock();
    }
    // 尝试终结
    tryTerminate();
    return tasks;
}


// 其他方法
// 不在 RUNNING 状态的线程池,此方法就返回 true
boolean isShutdown();

// 线程池状态是否是 TERMINATED
boolean isTerminated();

// 调用 shutdown 后,由于调用线程并不会等待所有任务运行结束,因此如果它想在线程池 TERMINATED 后做些事情,可以利用此方法等待
boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException;

shutdown与shutdownNow例子

@Slf4j(topic = "c.TestShutDown")
public class TestShutDown {

    public static void main(String[] args) throws ExecutionException, InterruptedException {
        ThreadPoolExecutor threadpool=new ThreadPoolExecutor(2, 10,
                20, TimeUnit.SECONDS, new ArrayBlockingQueue<>(10),
                new ThreadPoolExecutor.DiscardOldestPolicy());

        Future<Integer> result1 = threadpool.submit(() -> {
            log.debug("task 1 running...");
            Thread.sleep(1000);
            log.debug("task 1 finish...");
            return 1;
        });

        Future<Integer> result2 = threadpool.submit(() -> {
            log.debug("task 2 running...");
            Thread.sleep(1000);
            log.debug("task 2 finish...");
            return 2;
        });

        Future<Integer> result3 = threadpool.submit(() -> {
            log.debug("task 3 running...");
            Thread.sleep(1000);
            log.debug("task 3 finish...");
            return 3;
        });

        // shutdown() 部分的代码
        log.debug("shutdown");
        threadpool.shutdown();
        threadpool.submit(()->{
            log.debug("task 4 running...");
            Thread.sleep(1000);
            log.debug("task 4 finish");
            return "4";
        });
        threadpool.awaitTermination(3, TimeUnit.SECONDS);
        log.debug("other...");

        // shutdownNow部分代码
        // log.debug("shutdownNow");
//        List<Runnable> runnables = threadpool.shutdownNow();
//        log.debug("other.... {}" , runnables);
    }
}

shutdown代码结果

18:35:02.128 c.TestShutDown [main] - shutdown
18:35:02.128 c.TestShutDown [pool-1-thread-1] - task 1 running...
18:35:02.128 c.TestShutDown [pool-1-thread-2] - task 2 running...
18:35:03.134 c.TestShutDown [pool-1-thread-1] - task 1 finish...
18:35:03.134 c.TestShutDown [pool-1-thread-2] - task 2 finish...
18:35:03.134 c.TestShutDown [pool-1-thread-1] - task 3 running...
18:35:04.139 c.TestShutDown [pool-1-thread-1] - task 3 finish...
18:35:04.139 c.TestShutDown [main] - other...

解释
可以看出我们在shutdown后依然可以把 shutdown之前的 任务运行完毕,但是shutdown之后的任务就没有再运行了。另外我们关注一下awaitTermination方法 它的作用是等待shutdown部分的任务运行完后 主线程再运行awaitTermination方法之后的代码。

shutdownNow代码结果

18:47:52.344 c.TestShutDown [main] - shutdownNow
18:47:52.344 c.TestShutDown [pool-1-thread-2] - task 2 running...
18:47:52.344 c.TestShutDown [pool-1-thread-1] - task 1 running...
18:47:52.354 c.TestShutDown [main] - other.... [java.util.concurrent.FutureTask@e580929]

解释
可以看出我们在shutdownNow之后 只有一个任务运行成功了,也就是别的任务都已经被打断了。

目录
相关文章
|
6天前
|
存储 缓存 Java
什么是线程池?从底层源码入手,深度解析线程池的工作原理
本文从底层源码入手,深度解析ThreadPoolExecutor底层源码,包括其核心字段、内部类和重要方法,另外对Executors工具类下的四种自带线程池源码进行解释。 阅读本文后,可以对线程池的工作原理、七大参数、生命周期、拒绝策略等内容拥有更深入的认识。
什么是线程池?从底层源码入手,深度解析线程池的工作原理
|
7天前
|
Java 数据库 Android开发
一个Android App最少有几个线程?实现多线程的方式有哪些?
本文介绍了Android多线程编程的重要性及其实现方法,涵盖了基本概念、常见线程类型(如主线程、工作线程)以及多种多线程实现方式(如`Thread`、`HandlerThread`、`Executors`、Kotlin协程等)。通过合理的多线程管理,可大幅提升应用性能和用户体验。
25 15
一个Android App最少有几个线程?实现多线程的方式有哪些?
|
9天前
|
Java 数据库 Android开发
一个Android App最少有几个线程?实现多线程的方式有哪些?
本文介绍了Android应用开发中的多线程编程,涵盖基本概念、常见实现方式及最佳实践。主要内容包括主线程与工作线程的作用、多线程的多种实现方法(如 `Thread`、`HandlerThread`、`Executors` 和 Kotlin 协程),以及如何避免内存泄漏和合理使用线程池。通过有效的多线程管理,可以显著提升应用性能和用户体验。
29 10
|
16天前
|
存储 Ubuntu Linux
C语言 多线程编程(1) 初识线程和条件变量
本文档详细介绍了多线程的概念、相关命令及线程的操作方法。首先解释了线程的定义及其与进程的关系,接着对比了线程与进程的区别。随后介绍了如何在 Linux 系统中使用 `pidstat`、`top` 和 `ps` 命令查看线程信息。文档还探讨了多进程和多线程模式各自的优缺点及适用场景,并详细讲解了如何使用 POSIX 线程库创建、退出、等待和取消线程。此外,还介绍了线程分离的概念和方法,并提供了多个示例代码帮助理解。最后,深入探讨了线程间的通讯机制、互斥锁和条件变量的使用,通过具体示例展示了如何实现生产者与消费者的同步模型。
|
17天前
|
监控 Java
线程池中线程异常后:销毁还是复用?技术深度剖析
在并发编程中,线程池作为一种高效利用系统资源的工具,被广泛用于处理大量并发任务。然而,当线程池中的线程在执行任务时遇到异常,如何妥善处理这些异常线程成为了一个值得深入探讨的话题。本文将围绕“线程池中线程异常后:销毁还是复用?”这一主题,分享一些实践经验和理论思考。
31 3
|
24天前
|
存储 监控 Java
Java多线程优化:提高线程池性能的技巧与实践
Java多线程优化:提高线程池性能的技巧与实践
48 1
|
24天前
|
监控 安全 Java
Java多线程调试技巧:如何定位和解决线程安全问题
Java多线程调试技巧:如何定位和解决线程安全问题
71 2
【多线程面试题 一】、 创建线程有哪几种方式?
创建线程的三种方式包括继承Thread类、实现Runnable接口和实现Callable接口,其中Runnable和Callable接口方式更受推荐,因为它们允许多重继承并更好地体现面向对象思想。
|
30天前
|
Java 调度
【多线程面试题 五】、 介绍一下线程的生命周期
线程的生命周期包括新建、就绪、运行、阻塞和死亡状态,线程状态会根据线程的执行情况在这些状态之间转换。
【多线程面试题 五】、 介绍一下线程的生命周期