【多线程:线程池】自定义线程池

简介: 【多线程:线程池】自定义线程池

【多线程:线程池】自定义线程池

01.介绍

建议:学习本文章前最好对生产者消费者模式熟悉,可以看我之前的文章https://blog.csdn.net/m0_71229547/article/details/125435005

这个图就是自定义线程池的实现结构,它是用生产者消费者模式 实现的,它有三个部分 Thread Pool、Blocking Queue、main 组成。
Thread Pool是线程池 它的主要作用是 进行线程的创建与任务的执行 以及 把超过线程池部分的任务交给主线程进行拒绝策略处理,我们可以把线程池中的线程当做消费者。
main是任务的生产者,主要作用是 任务的生产、线程池的创建、规定拒绝策略、执行拒绝策略
Blocking Queue是阻塞队列,包括两个功能 生产者生产任务 放入任务队列、消费者获取任务 从任务队列中取出。当任务数大于线程数时 把任务放入到任务队列 假如任务队列也放满了 则wait生产者生产 然后如果之后消费者进行了消费 则唤醒生产者 然后执行生产。当任务数为空时 消费者会持续获取任务队列里的任务 具体表现是 wait消费者线程 当有任务时唤醒 然后执行消费。

02.具体实现与解释

实现

@Slf4j(topic = "c.TestPool")
public class TestPool {
    public static void main(String[] args) {
        ThreadPool threadPool = new ThreadPool(2,
                1000, TimeUnit.MILLISECONDS, 2, (queue, task)->{
            // 1. 死等
            queue.put(task);
            // 2) 带超时等待
//            queue.offer(task, 1500, TimeUnit.MILLISECONDS);
            // 3) 让调用者放弃任务执行
//            log.debug("放弃{}", task);
            // 4) 让调用者抛出异常
//            throw new RuntimeException("任务执行失败 " + task);
            // 5) 让调用者自己执行任务
//            task.run();
        });
        for (int i = 0; i < 4; i++) {
            int j = i;
            System.out.println(i);
            threadPool.execute(() -> {
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                log.debug("{}", j);
            });
        }
    }
}

@FunctionalInterface // 拒绝策略,函数式接口
interface RejectPolicy<T> {
    void reject(BlockingQueue<T> queue, T task);
}

@Slf4j(topic = "c.ThreadPool")
class ThreadPool {
    // 任务队列
    private BlockingQueue<Runnable> taskQueue;

    // 线程集合
    private HashSet<Worker> workers = new HashSet<>();

    // 核心线程数
    private int coreSize;

    // 获取任务时的超时时间
    private long timeout;

    // 统一时间格式
    private TimeUnit timeUnit;

    // 拒接策略
    private RejectPolicy<Runnable> rejectPolicy;

    // 执行任务
    public void execute(Runnable task) {
        // 当任务数没有超过 coreSize 时,直接交给 worker 对象执行
        // 如果任务数超过 coreSize 时,加入任务队列暂存
        synchronized (workers) {
            if(workers.size() < coreSize) {
                Worker worker = new Worker(task);
                log.debug("新增 worker{}, {}", worker, task);
                workers.add(worker);
                worker.start();
            } else {
                // 1) 死等
                // 2) 带超时等待
                // 3) 让调用者放弃任务执行
                // 4) 让调用者抛出异常
                // 5) 让调用者自己执行任务
                taskQueue.tryPut(rejectPolicy, task);
            }
        }
    }

    public ThreadPool(int coreSize, long timeout, TimeUnit timeUnit, int queueCapcity, RejectPolicy<Runnable> rejectPolicy) {
        this.coreSize = coreSize;
        this.timeout = timeout;
        this.timeUnit = timeUnit;
        this.taskQueue = new BlockingQueue<>(queueCapcity);
        this.rejectPolicy = rejectPolicy;
    }

    class Worker extends Thread{
        private Runnable task;

        public Worker(Runnable task) {
            this.task = task;
        }

        @Override
        public void run() {
            // 执行任务
            // 1) 当 task 不为空,执行任务
            // 2) 当 task 执行完毕,再接着从任务队列获取任务并执行

            // 这种处理是如果没有任务我们就死等,也可以理解为 这个线程始终在线程池内,等待任务的出现
//          while(task != null || (task = taskQueue.take()) != null) {

            // 这种处理方式是如果在规定时间内没有任务,我们就任务现在没有任务了 然后 移除线程,
            // 也可以理解为在规定时间内这个线程在线程池中 超过一定时间我们就这个线程从线程池中移除
            while(task != null || (task = taskQueue.pull(timeout, timeUnit)) != null) {

                try {
                    log.debug("正在执行...{}", task);
                    task.run();
                } catch (Exception e) {
                    e.printStackTrace();
                } finally {
                    task = null;
                }
            }
            synchronized (workers) {
                log.debug("worker 被移除{}", this);
                workers.remove(this);
            }
        }
    }
}


@Slf4j(topic = "c.BlockingQueue")
class BlockingQueue<T> {
    // 1. 任务队列
    private Deque<T> queue = new ArrayDeque<>();

    // 2. 锁
    private ReentrantLock lock = new ReentrantLock();

    // 3. 生产者条件变量
    private Condition fullWaitSet = lock.newCondition();

    // 4. 消费者条件变量
    private Condition emptyWaitSet = lock.newCondition();

    // 5. 容量
    private int capcity;

    public BlockingQueue(int capcity) {
        this.capcity = capcity;
    }

    // 带超时阻塞获取
    public T pull(long timeout, TimeUnit unit) {
        lock.lock();
        try {
            // 将 timeout 统一转换为 纳秒
            long nanos = unit.toNanos(timeout);
            while (queue.isEmpty()) {
                try {
                    // 返回值是剩余时间
                    if (nanos <= 0) {
                        return null;
                    }
                    nanos = emptyWaitSet.awaitNanos(nanos);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            T t = queue.removeFirst();
            fullWaitSet.signal();
            return t;
        } finally {
            lock.unlock();
        }
    }

    // 阻塞获取
    public T take() {
        lock.lock();
        try {
            while (queue.isEmpty()) {
                try {
                    emptyWaitSet.await();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            T t = queue.removeFirst();
            fullWaitSet.signal();
            return t;
        } finally {
            lock.unlock();
        }
    }

    // 阻塞添加
    public void put(T task) {
        lock.lock();
        try {
            while (queue.size() == capcity) {
                try {
                    log.debug("等待加入任务队列 {} ...", task);
                    fullWaitSet.await();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            log.debug("加入任务队列 {}", task);
            queue.addLast(task);
            emptyWaitSet.signal();
        } finally {
            lock.unlock();
        }
    }

    // 带超时时间阻塞添加
    public boolean offer(T task, long timeout, TimeUnit timeUnit) {
        lock.lock();
        try {
            long nanos = timeUnit.toNanos(timeout);
            while (queue.size() == capcity) {
                try {
                    if(nanos <= 0) {
                        return false;
                    }
                    log.debug("等待加入任务队列 {} ...", task);
                    nanos = fullWaitSet.awaitNanos(nanos);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            log.debug("加入任务队列 {}", task);
            queue.addLast(task);
            emptyWaitSet.signal();
            return true;
        } finally {
            lock.unlock();
        }
    }

    public int size() {
        lock.lock();
        try {
            return queue.size();
        } finally {
            lock.unlock();
        }
    }

    public void tryPut(RejectPolicy<T> rejectPolicy, T task) {
        lock.lock();
        try {
            // 判断队列是否满
            if(queue.size() == capcity) {
                rejectPolicy.reject(this, task);
            } else {  // 有空闲
                log.debug("加入任务队列 {}", task);
                queue.addLast(task);
                emptyWaitSet.signal();
            }
        } finally {
            lock.unlock();
        }
    }
}

解释

可以看出来代码量还是很大的,我们逐个分析每个部分。

BlockingQueue

这个类的阻塞队列类,比较重要的是put方法与take方法。
put方法的主要作用是生产者线程生产的任务大于了线程池容量 然后生产者线程把多出的任务通过put方法放到了任务队列里,如果任务超出了任务队列的大小 则wait生产者线程 不再生产 直到有消费者线程执行了任务队列的任务 此时唤醒生产者线程继续生产。
take方法的主要作用是如果此时线程池中的消费者线程执行完了自己的任务 然后就会通过take方法从任务队列中获取任务 并且从任务队列中移除 如果此时获取不到任务 说明任务队列为空 然后wait当前这个消费者线程 直到任务队列重新有任务 唤醒消费者线程继续消费任务。
put方法与offer方法的对比:put方法是如果任务队列已经满了 我们就生产线程还会生产一个任务 只不过这个任务陷入了等待 直到任务队列不满时才会重新执行。offer方法是有时限的添加任务 如果此时任务队列满了 我们生产线程还会生产一个任务 只不过这个任务会进入有时限的等待 如果超过这个时间我们就放弃这个任务 转而执行下一个任务。
take方法与pull方法:take方法是如果任务队列为空 则此消费线程此时获取不到任务 然后陷入等待 直到任务队列不为空 然后获取到这个任务并把这个任务从任务队列中移除,可以理解为这个线程一直在线程池内等待新的任务的到来。pull方法是如果任务队列为空 此时消费线程此时获取不到任务 然后会进入有时限的等待 如果超过这个时限还没有获取到任务 则把这个线程从线程池中移除。

ThreadPool

这个类是线程池类,需要关注execute方法与有参构造
有参构造:参数是线程池需要的配置,有coreSize 线程核心数(线程池大小)、queueCapcity 任务队列大小 用来创建任务队列时传参、timeout take方法的超时时间、timeUnit 统一时间配置、rejectPolicy拒绝策略
execute方法是执行任务的方法,线程数没有超过核心数时:我们 创建线程 把线程加入线程集合 并启动线程执行run方法 run方法通过take/pull方法从任务队列里获取任务 如果获取到任务就执行任务 如果没有获取到任务就等待。线程数超过核心数时:说明我们的任务数量已经大于线程数了 此时又因为我们的线程池已经满了 所以转而由生产线程执行拒绝策略。

RejectPolicy

拒绝策略函数式接口,需要关注它的实现,它的作用是当任务数大于核心数时 此时消费线程不能再执行多出的任务,此时生产线程执行拒绝策略 具体讨论这部分的任务应该怎么处理。
实现:拒绝策略是在主线程创建线程池对象时写的,在代码中我们也能看到 它分为五种处理策略,==死等==:可以看出它执行的是put方法 也就是多出的任务放入任务队列 直到任务队列满 不过如果此时仍然有多出任务 就会一直等待任务队列到不满时 然后等待的任务加入任务队列。==带超时等待==:也就是执行offer方法 和put方法基本一样 只不过在任务队列满了后 进入有时限的等待 如果超过时间则放弃这个任务 执行下一个任务。==让调用者放弃任务执行==:也就是如果任务数超过核心数 且任务队列已满,超出部分的任务生产者直接放弃。==让调用者抛出异常==:如果任务数超过核心数 且任务队列已满,生产者直接抛出异常。==让调用者自己执行任务==:如果任务数超过核心数 且任务队列已满,生产者自己把这个任务执行了。

03.不同情况下线程池的执行情况

我们以下结果的拒绝策略都为死等,消费线程获取任务队列用的是pull方法

核心数+任务队列=任务的数量

我们把核心数设置为2 任务队列设置为2 任务数量设置为4

结果

解释
上来先新增两个任务0 1,然后直接被Thread0 Thread1执行了,多出的两个任务2 3我们放入了任务队列 也就是图片里的加入任务队列,1s后 线程池的两个线程执行任务完毕 从线程池中获取新的任务,所以此时2 3任务被从任务队列中取出 然后Thread0 Thread1执行 2 3任务,最后 因为我们用的是pull方法有时限获取在1s后没有获取到任务 所以把Thread0 Thread1移除线程池

核心数+任务队列<任务的数量

我们把核心数设置为2 任务队列设置为2 任务数量设置为6
结果

任务数>线程数+任务队列,又因为我们每一个任务都要执行1s,所以势必会有任务等待加入任务队列,我们看上图,可以发现0 1任务被执行 2 3任务进入任务队列 4任务等待加入任务队列,因为此时生产者线程陷入等待 所以任务5就没有被生产出来,当0 1任务执行完后,Thread0 线程执行任务2 任务4加入任务队列 此时任务队列又满了 所以任务5此时等待加入任务队列,然后Thread1执行任务3 然后任务5加入任务队列,最后任务5 任务4被执行,没有任务后 两个消费线程被移除线程池。

目录
相关文章
|
6天前
|
存储 缓存 Java
什么是线程池?从底层源码入手,深度解析线程池的工作原理
本文从底层源码入手,深度解析ThreadPoolExecutor底层源码,包括其核心字段、内部类和重要方法,另外对Executors工具类下的四种自带线程池源码进行解释。 阅读本文后,可以对线程池的工作原理、七大参数、生命周期、拒绝策略等内容拥有更深入的认识。
什么是线程池?从底层源码入手,深度解析线程池的工作原理
|
7天前
|
Java 数据库 Android开发
一个Android App最少有几个线程?实现多线程的方式有哪些?
本文介绍了Android多线程编程的重要性及其实现方法,涵盖了基本概念、常见线程类型(如主线程、工作线程)以及多种多线程实现方式(如`Thread`、`HandlerThread`、`Executors`、Kotlin协程等)。通过合理的多线程管理,可大幅提升应用性能和用户体验。
25 15
一个Android App最少有几个线程?实现多线程的方式有哪些?
|
9天前
|
Java 数据库 Android开发
一个Android App最少有几个线程?实现多线程的方式有哪些?
本文介绍了Android应用开发中的多线程编程,涵盖基本概念、常见实现方式及最佳实践。主要内容包括主线程与工作线程的作用、多线程的多种实现方法(如 `Thread`、`HandlerThread`、`Executors` 和 Kotlin 协程),以及如何避免内存泄漏和合理使用线程池。通过有效的多线程管理,可以显著提升应用性能和用户体验。
29 10
|
16天前
|
存储 Ubuntu Linux
C语言 多线程编程(1) 初识线程和条件变量
本文档详细介绍了多线程的概念、相关命令及线程的操作方法。首先解释了线程的定义及其与进程的关系,接着对比了线程与进程的区别。随后介绍了如何在 Linux 系统中使用 `pidstat`、`top` 和 `ps` 命令查看线程信息。文档还探讨了多进程和多线程模式各自的优缺点及适用场景,并详细讲解了如何使用 POSIX 线程库创建、退出、等待和取消线程。此外,还介绍了线程分离的概念和方法,并提供了多个示例代码帮助理解。最后,深入探讨了线程间的通讯机制、互斥锁和条件变量的使用,通过具体示例展示了如何实现生产者与消费者的同步模型。
|
17天前
|
监控 Java
线程池中线程异常后:销毁还是复用?技术深度剖析
在并发编程中,线程池作为一种高效利用系统资源的工具,被广泛用于处理大量并发任务。然而,当线程池中的线程在执行任务时遇到异常,如何妥善处理这些异常线程成为了一个值得深入探讨的话题。本文将围绕“线程池中线程异常后:销毁还是复用?”这一主题,分享一些实践经验和理论思考。
31 3
|
24天前
|
存储 监控 Java
Java多线程优化:提高线程池性能的技巧与实践
Java多线程优化:提高线程池性能的技巧与实践
48 1
|
24天前
|
监控 安全 Java
Java多线程调试技巧:如何定位和解决线程安全问题
Java多线程调试技巧:如何定位和解决线程安全问题
71 2
【多线程面试题 一】、 创建线程有哪几种方式?
创建线程的三种方式包括继承Thread类、实现Runnable接口和实现Callable接口,其中Runnable和Callable接口方式更受推荐,因为它们允许多重继承并更好地体现面向对象思想。
|
30天前
|
Java 调度
【多线程面试题 五】、 介绍一下线程的生命周期
线程的生命周期包括新建、就绪、运行、阻塞和死亡状态,线程状态会根据线程的执行情况在这些状态之间转换。
【多线程面试题 五】、 介绍一下线程的生命周期