如何解决JDK线程池中不超过最大线程数下快速消费任务

简介: 如何解决JDK线程池中不超过最大线程数下快速消费任务

在这里插入图片描述

前言

文章需要对线程池执行任务流程有一定的了解

记得之前我写通过模版设计来解决 线程池参数自定义痛点, 然后宽哥在下面灵魂发问, 也就是咱们这篇文章讲到的重点

日常灵魂发问

来来来, 我给大家复制粘贴出来

如何解决 JDK 线程池中不超过最大线程数下即时快速消费任务, 而不是在队列中堆积

因为最近业务落地改造中需要线程池, 又去看了一遍源码, 防止线上埋雷, 也再次回顾了这个问题

然后发现网上也有这种问题提问, 虽然是不同的提问, 但是核心思想是一致的, 点击跳转

业务是多变的, 而 JDK 中的线程池消费流程却是固定的, 所以 基于阻塞队列、线程池扩展改变了原有流程

01、线程池参数

我们这里讲解以 ThreadPoolExecutor#execute(Runnable runnable) 举例, 这里先说下线程池的一些参数

本篇只是说明上述问题, 不会对线程池做详细讲解
public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue,
                          ThreadFactory threadFactory,
                          RejectedExecutionHandler handler) {...}

corePoolSize

线程池中的核心线程数量, 如果没有全局设置池内线程的过期时间, 池内会维持此数量线程

maximumPoolSize

线程池中的最大线程数量, 当核心线程都在运行任务, 并且阻塞队列中任务数量已满, 此时会创建非核心线程

keepAliveTime & unit

线程池中线程过期时间以及时间单位

workQueue

存放线程池内任务的阻塞队列, 如 ArrayBlockingQueue、LinkedBlockingQueue...

threadFactory

创建线程池中线程的线程工厂, 可以在创建线程时初始化优先级、名称、守护状态...

handler

当线程池中全部线程都在运行, 阻塞队列也满的时候, 会将添加的任务执行拒绝策略, JDK 线程池中实现了四种拒绝策略, 默认 AbortPolicy, 抛出异常

02、线程池任务添加流程

相信大家在网上看到过许多类似的线程池执行流程图哈, 这里还是简要赘述下, 源码如下:

public void execute(Runnable command) {
    ...
    int c = ctl.get();
    if (workerCountOf(c) < corePoolSize) {
        if (addWorker(command, true))
            return;
        c = ctl.get();
    }
    if (isRunning(c) && workQueue.offer(command)) {
        int recheck = ctl.get();
        if (!isRunning(recheck) && remove(command))
            reject(command);
        else if (workerCountOf(recheck) == 0)
            addWorker(null, false);
    } else if (!addWorker(command, false))
        reject(command);
}

1、线程池提交任务首先判断当前线程数是否大于核心线程数, 否则创建核心线程执行任务

2、如果当前线程超过了核心线程数, 判断阻塞队列是否已满, 否则将任务添加到队列中

3、如果阻塞队列已满, 判断当前线程是否大于最大线程数, 否则创建非核心线程执行任务

4、如果当前线程大于或等于最大线程数, 执行拒绝策略

线程池任务提交流程

这道问题的意图就是要将第二步就行改写

如果当前线程大于核心线程数, 不将任务放入阻塞队列, 而是创建非核心线程执行任务

举例说明一下:

public static void main(String[] args) {
    ThreadPoolExecutor threadPoolExecutor =
            new ThreadPoolExecutor(1, 3, 60,
                    TimeUnit.SECONDS,
                    new ArrayBlockingQueue(10));

    for (int i = 0; i < 7; i++) {
        threadPoolExecutor.execute(() -> {
            System.out.println(Thread.currentThread().getName() + "-执行任务");
            LockSupport.park();
        });
    }
    threadPoolExecutor.shutdown();
    /**
     * pool-1-thread-1执行任务
     */
}

看到这段代码, 正常情况下只会有一个任务会被执行, 其余任务会被放置阻塞队列中

而我们需要做的就是, 发现池内线程大于核心线程数, 不放入阻塞队列, 而是创建非核心线程进行消费任务

本地代码实现参考 Dubbo 源码中 EagerThreadPoolExecutor, 确实能实现对应效果, 这里就不演示了, 一起看一下 Dubbo 如何做的

03、Dubbo 中实现的快速消费

Dubbo 中涉及到的类有两个, EagerThreadPoolExecutorTaskQueue

这里贴一下重点代码

3.1 TaskQueue

public class TaskQueue<R extends Runnable> extends LinkedBlockingQueue<Runnable> {
        ...
    // 队列中持有线程池的引用
    private EagerThreadPoolExecutor executor;

    public TaskQueue(int capacity) {
        super(capacity);
    }

    public void setExecutor(EagerThreadPoolExecutor exec) {
        executor = exec;
    }

    @Override
    public boolean offer(Runnable runnable) {
                ...
        // 获取线程池中线程数
        int currentPoolThreadSize = executor.getPoolSize();
        // 如果有核心线程正在空闲, 将任务加入阻塞队列, 由核心线程进行处理任务
        if (executor.getSubmittedTaskCount() < currentPoolThreadSize) {
            return super.offer(runnable);
        }

          /**
           *【重点】当前线程池线程数量小于最大线程数
           * 返回false, 根据线程池源码, 会创建非核心线程
           */
        if (currentPoolThreadSize < executor.getMaximumPoolSize()) {
            return false;
        }

        // 如果当前线程池数量大于最大线程数, 任务加入阻塞队列
        return super.offer(runnable);
    }
}  

存在一个疑点, getSubmittedTaskCount() 是如何获取提交任务数量的?

这里就需要看一下 EagerThreadPoolExecutor 实现了, 也比较简单, 只是 重写了线程池的两个方法: afterExecute()、execute()

3.2 EagerThreadPoolExecutor

public class EagerThreadPoolExecutor extends ThreadPoolExecutor {

    /**
     * task count
     */
    private final AtomicInteger submittedTaskCount = new AtomicInteger(0);

    /**
     * @return current tasks which are executed
     */
    public int getSubmittedTaskCount() {
        return submittedTaskCount.get();
    }
  
    @Override
    protected void afterExecute(Runnable r, Throwable t) {
        submittedTaskCount.decrementAndGet();
    }

    @Override
    public void execute(Runnable command) {
        if (command == null) {
            throw new NullPointerException();
        }
        // do not increment in method beforeExecute!
        submittedTaskCount.incrementAndGet();
        try {
            super.execute(command);
        } catch (RejectedExecutionException rx) {
            // retry to offer the task into queue.
            final TaskQueue queue = (TaskQueue) super.getQueue();
            try {
                if (!queue.retryOffer(command, 0, TimeUnit.MILLISECONDS)) {
                    submittedTaskCount.decrementAndGet();
                    throw new RejectedExecutionException("Queue capacity is full.", rx);
                }
            } catch (InterruptedException x) {
                submittedTaskCount.decrementAndGet();
                throw new RejectedExecutionException(x);
            }
        } catch (Throwable t) {
            // decrease any way
            submittedTaskCount.decrementAndGet();
            throw t;
        }
    }
}

EagerThreadPoolExecutor 继承了 ThreadPoolExecutor, 在 execute() 上做了个性化设计

并在线程池内新增了一个任务数量的字段, 是一个原子类, 添加任务时自增, 任务异常及结束时递减

这样就能保证 TaskQueue#offer(Runnable runnable) 做出逻辑处理

相关文章
|
25天前
|
存储 监控 Java
Java多线程优化:提高线程池性能的技巧与实践
Java多线程优化:提高线程池性能的技巧与实践
49 1
|
6天前
|
存储 缓存 Java
什么是线程池?从底层源码入手,深度解析线程池的工作原理
本文从底层源码入手,深度解析ThreadPoolExecutor底层源码,包括其核心字段、内部类和重要方法,另外对Executors工具类下的四种自带线程池源码进行解释。 阅读本文后,可以对线程池的工作原理、七大参数、生命周期、拒绝策略等内容拥有更深入的认识。
什么是线程池?从底层源码入手,深度解析线程池的工作原理
|
7天前
|
监控 数据可视化 Java
使用JDK自带的监控工具JConsole来监控线程池的内存使用情况
使用JDK自带的监控工具JConsole来监控线程池的内存使用情况
|
1月前
|
缓存 Java
异步&线程池 线程池的七大参数 初始化线程的4种方式 【上篇】
这篇文章详细介绍了Java中线程的四种初始化方式,包括继承Thread类、实现Runnable接口、实现Callable接口与FutureTask结合使用,以及使用线程池。同时,还深入探讨了线程池的七大参数及其作用,解释了线程池的运行流程,并列举了四种常见的线程池类型。最后,阐述了在开发中使用线程池的原因,如降低资源消耗、提高响应速度和增强线程的可管理性。
异步&线程池 线程池的七大参数 初始化线程的4种方式 【上篇】
|
17天前
|
监控 Java
线程池中线程异常后:销毁还是复用?技术深度剖析
在并发编程中,线程池作为一种高效利用系统资源的工具,被广泛用于处理大量并发任务。然而,当线程池中的线程在执行任务时遇到异常,如何妥善处理这些异常线程成为了一个值得深入探讨的话题。本文将围绕“线程池中线程异常后:销毁还是复用?”这一主题,分享一些实践经验和理论思考。
32 3
|
29天前
|
存储 监控 Java
|
27天前
|
缓存 Java 调度
【Java 并发秘籍】线程池大作战:揭秘 JDK 中的线程池家族!
【8月更文挑战第24天】Java的并发库提供多种线程池以应对不同的多线程编程需求。本文通过实例介绍了四种主要线程池:固定大小线程池、可缓存线程池、单一线程线程池及定时任务线程池。固定大小线程池通过预设线程数管理任务队列;可缓存线程池能根据需要动态调整线程数量;单一线程线程池确保任务顺序执行;定时任务线程池支持周期性或延时任务调度。了解并正确选用这些线程池有助于提高程序效率和资源利用率。
34 2
|
20天前
|
前端开发 JavaScript 大数据
React与Web Workers:开启前端多线程时代的钥匙——深入探索计算密集型任务的优化策略与最佳实践
【8月更文挑战第31天】随着Web应用复杂性的提升,单线程JavaScript已难以胜任高计算量任务。Web Workers通过多线程编程解决了这一问题,使耗时任务独立运行而不阻塞主线程。结合React的组件化与虚拟DOM优势,可将大数据处理等任务交由Web Workers完成,确保UI流畅。最佳实践包括定义清晰接口、加强错误处理及合理评估任务特性。这一结合不仅提升了用户体验,更为前端开发带来多线程时代的全新可能。
22 0
|
23天前
|
数据采集 Java Python
python 递归锁、信号量、事件、线程队列、进程池和线程池、回调函数、定时器
python 递归锁、信号量、事件、线程队列、进程池和线程池、回调函数、定时器
|
24天前
|
Java
线程池中线程抛了异常,该如何处理?
【8月更文挑战第27天】在Java多线程编程中,线程池(ThreadPool)是一种常用的并发处理工具,它能够有效地管理线程的生命周期,提高资源利用率,并简化并发编程的复杂性。然而,当线程池中的线程在执行任务时抛出异常,如果不妥善处理,这些异常可能会导致程序出现未预料的行为,甚至崩溃。因此,了解并掌握线程池异常处理机制至关重要。
104 0