Java多线程案例之阻塞队列

简介: Java多线程案例之阻塞队列

一. 认识阻塞队列

1. 什么是阻塞队列

阻塞队列本质上还是一种队列, 和普通队列一样, 遵循先进先出, 后进后出的规则, 但阻塞队例相比于普通队列的特殊之处在于阻塞队列的阻塞功能, 主要基于多线程使用.


如果队列为空, 执行出队列操作, 就会使线程陷入阻塞, 阻塞到另一个线程往队列里添加元素(队列不空)为止.

如果队列满了,执行入队列操作, 也会使线程阻塞, 阻塞到另一个线程从队列取走元素位置(队列不满)为止.

2. 生产者消费者模型

基于阻塞队列的阻塞特性是可以实现 “生产者消费者模型” 的, 那么什么是生产者消费者模型呢?


还是用生活中的例子来解释, 这不还有半个月就要过年了, 大年当晚我们会吃年夜饭, 饺子就是年夜饭当中的一份主食, 那么要想吃到饺子, 最好就是一家人在一起把饺子包好, 简单来讲包饺子的步骤有: 擀饺子皮+包饺子.


有下面两种包饺子方式 :


每个人, 都分别进行 擀饺子皮+包饺子这样的操作, 但毕竟家里面的擀面杖不会准备那么多吧, 大家会竞争面杖, 一个人使用擀面杖的使用, 其他人就得阻塞等待, 这就影响了包饺子的效率了.

一个人专门负责擀饺子皮, 另外三个人负责包, 擀饺子的人每次擀好一个皮, 就放到 盖帘 上, 其他人每次都从盖帘上取一个皮包饺子.

其实第二种包饺子方式就是生产者消费者模型的运用, 擀饺子皮的那个人就是生产者, 其他负责包饺子的人就是消费者, 放饺子皮的盖帘就相当于阻塞队列, 如果擀饺子皮的人擀的太慢生产的饺子皮供不上使用, 不一会盖帘上没皮了, 包饺子的人就得等一会儿再包; 如果擀饺子的人擀的太快了, 包的速度跟不上擀的速度, 盖帘上放满了饺子皮, 擀饺子皮的人就得等一会儿再擀.

73d8c9be8b2a4960a39693770de0ac9a.png

生产者消费者模型能够给程序带来两个非常重要的好处, 一是可以实现实现了发送方和接收方之间的 “解耦” , 二是可以 “削峰填谷” , 保证系统的稳定性, 具体理解如下:


在服务器相互调用的场景中假设有两个服务器A(请求服务器), B(应用服务器), A把请求转发给B处理, B处理完了把结果反馈给A, 这种情况下, A和B之间的耦合是比较高的, A要调用B, A 务必要知道B的存在, 如果B挂了, 很容易引起A的bug, 再比如再加一个C服务器, 此时也需要对A修改不少代码, 就需要针对A重新修改代码, 重新测试, 重新发布, 重新部署等, 这就非常麻烦了.

73d8c9be8b2a4960a39693770de0ac9a.png

而针对上述场景, 使用生产者消费者模型就可以有效的降低耦合,

73d8c9be8b2a4960a39693770de0ac9a.png

A和B之间通过一个阻塞队列来通信, 此时A是不知道B的, A只知道队列, 也就是说A的代码中没有任何一行代码和B相关; 同样的, B也是不知道A的, B也是只知道队列, B的代码中,也没有任何一行代码和A相关.


如果B挂了, 对于A没有任何影响, 因为队列还是正常的, A仍然可以给队列插入元素, 如果队列满就先阻塞等待; 同样如果A挂了, 也对于B没啥影响, 队列是正常的B就仍然可以从队列取元素, 如果队列空了, 也就阻塞等待就好了; 也就是说, AB任何一方挂了不会对对方造成影响, 同时, 新增一个C来作为消费者, 对于A来说仍然是无感知的.


“削峰填谷” 可以联想三峡大坝的水库, 三峡大坝的水库,

73d8c9be8b2a4960a39693770de0ac9a.png

如果上游水多了, 三峡大坝就会关闸蓄水, 此时就相当于由三峡大坝承担了上游的冲击, 对下游起到了很好的保护左右, 这就是 “削峰” 作用; 如果上游水少了, 三峡大坝开闸放水, 有效保证下游的用水情况, 避免出现干旱灾害, 这就是 “填谷” 作用.


而在服务器开发中, 上游就是用户发送的请求, 下游就是一些执行具体业务的服务器, 用户发多少请求这是不可控的, 有的时候请求多, 有的时候请求少, 而如果没有使用生产者消费者模型, A服务器用户请求暴涨, 此时如果没有充分的准备, B服务器来不及响应一下没抗住, 就可能会挂掉.


但是, 如果使用生产者消费者模型, 那么即使A服务器请求暴涨, 也不会影响到B, 这是因为A请求暴涨后, 用户的请求都被打包到阻塞队列中(如果阻塞队列有界, 则会引起队列阻塞, 不会影响到B), B可以从阻塞队列中取出元素以合适的速度来处理这些请求, 这就是 “削峰填谷” 的作用了.

73d8c9be8b2a4960a39693770de0ac9a.png

3. 标准库中阻塞队列类

Java标准库也提供了阻塞队列的标准类, 常用的有下面几个:


ArrayBlockingQueue : 基于数组实现界阻塞队列

LinkedBlockingQueue : 基于链表实现的有界阻塞队列

PriorityBlockingQueue : 带有优先级(堆)的无界阻塞队列

BlockingQueue接口 : 上面的类实现了该接口

阻塞队列类的核心方法:

方法 解释
void put(E e) throws InterruptedException 带有阻塞特性的入队操作方法
E take() throws InterruptedException 带有阻塞特性的出队操作方法
boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException 带有阻塞特性的入队操作方法, 并且可以设置最长等待时间
E poll(long timeout, TimeUnit unit) throws InterruptedException 带有阻塞特性的出队操作方法, 并且可以设置最长等待时间

代码示例:


下面的代码是基于标准库的阻塞队列简单实现的生产者消费者模型.

public class TestDemo19 {
    public static void main(String[] args) {
        BlockingQueue<Integer> blockingQueue = new LinkedBlockingQueue<>();
        //消费者线程
        Thread customer = new Thread(() -> {
           while (true) {
               try {
                   Integer result = blockingQueue.take();
                   System.out.println("消费元素: " + result);
               } catch (InterruptedException e) {
                   throw new RuntimeException(e);
               }
           }
        });
        customer.start();
        //生产者线程
        Thread producer = new Thread(() -> {
            int count = 0;
            while (true) {
                try {
                    blockingQueue.put(count);
                    System.out.println("生产元素: " + count);
                    count++;
                    Thread.sleep(500);
                } catch (InterruptedException e) {
                    throw new RuntimeException(e);
                }
            }
        });
        producer.start();
    }
}

执行结果:

73d8c9be8b2a4960a39693770de0ac9a.png

二. 基于循环队列实现的简单阻塞队列

1. 循环队列的简单实现

要实现一个阻塞队列, 需要先实现一个普通的循环队列, 循环队列是基于数组实现的, 这里最重要的是如何将队列为空状态与满状态区分开来, 对于这里的实现不懂得可以看看看我前面博客中关于循环队列的内容, 这里就是不做过多的赘述了 : 队列与集合Queue,Deque的理解和使用 , 用栈实现队列,用队列实现栈,设计循环队列 , 阻塞队列最核心的就是出队和入队操作, 所以我们这里重点实现这两个方法, 代码如下:

//普通的循环队列
class MyBlockingQueue {
    //存放元素的数数组
    private int[] items = new int[1000];
    //队头指针
    private int head = 0;
    //队尾指针
    private int tail = 0;
    //记录队列元素的个数
    private int size = 0;
    //入队操作
    public void put (int val) {
        if (size == items.length) {
            //队列满了
            return;
        }
        items[tail++] = val;
        //等价于 tail %= items.length
        if (tail >= items.length) {
            tail = 0;
        }
        size++;
    }
    //出队操作
    public Integer take() {
        int resulet = 0;
        if (size == 0) {
            //队列空了
            return null;
        }
        resulet = items[head++];
        //等价于 head %= elem.length
        if (head >= items.length) {
            head = 0;
        }
        size--;
        return resulet;
    }
}

2. 阻塞队列的简单实现

首先要实现的阻塞队列在大多数情况下是在多线程情况下使用的, 所以要考虑线程安全问题, 上面循环队列的代码take与put方法都有写操作, 直接加锁即可.

//线程安全的循环队列
class MyBlockingQueue {
    //存放元素的数数组
    private int[] items = new int[1000];
    //队头指针
    private int head = 0;
    //队尾指针
    private int tail = 0;
    //记录队列元素的个数
    private int size = 0;
    //入队操作
    public void put (int val) {
        synchronized (this) {
            if (size == items.length) {
                //队列满了
                return;
            }
            items[tail++] = val;
            //等价于 tail %= items.length
            if (tail >= items.length) {
                tail = 0;
            }
            size++;
        }
    }
    //出队操作
    public Integer take() {
        int resulet = 0;
        synchronized (this) {
            if (size == 0) {
                //队列空了
                return null;
            }
            resulet = items[head++];
            //等价于 head %= elem.length
            if (head >= items.length) {
                head = 0;
            }
            size--;
            return resulet;
        }
    }
}

然后就是实现阻塞效果了, 主要是使用wait和notify实现线程的阻塞等待.


入队时, 队列满了需要使用wait方法使线程阻塞, 直到有元素出队队列不满了再使用notify通知线程执行.


出队时, 队列为空也需要使用wait方法使线程阻塞, 直到有新元素入队再使用notify通知线程执行.


代码如下:

class MyBlockingQueue {
    //存放元素的数数组
    private int[] items = new int[1000];
    //队头指针
    private int head = 0;
    //队尾指针
    private int tail = 0;
    //记录队列元素的个数
    private int size = 0;
    //入队操作
    public void put (int val) throws InterruptedException {
        synchronized (this) {
            if (size == items.length) {
                //队列满了,阻塞等待
                this.wait();
            }
            items[tail++] = val;
            //等价于 tail %= items.length
            if (tail >= items.length) {
                tail = 0;
            }
            size++;
            //唤醒因队列空造成的阻塞wait
            this.notify();
        }
    }
    //出队操作
    public Integer take() throws InterruptedException {
        int resulet = 0;
        synchronized (this) {
            if (size == 0) {
                //队列空了,阻塞等待
                this.wait();
            }
            resulet = items[head++];
            //等价于 head %= elem.length
            if (head >= items.length) {
                head = 0;
            }
            size--;
            //唤醒因队列满造成的阻塞wait
            this.notify();
            return resulet;
        }
    }
}

上述代码已经基本实现了阻塞队列的功能, 但不够完善, 这里的代码再改良一下把判断队列满或者空的wait部分的代码, 把if改成while是更好的, 为什么这样写呢?


我们思考当代码中当wait被唤醒的时候, 此时if的条件一定就不成立了吗?


具体来说, 思考put方法中的wait被唤醒, 往下执行是要要求,队列不满但是wait被唤醒了之后, 队列一定是不满的嘛? 其实当前代码中是不会出现这样的问题的, 但是稳妥起见, 最好的办法就是wait唤醒之后再判定一下条件是否满足, 而且Java标准库当中就是建议这么写的.

73d8c9be8b2a4960a39693770de0ac9a.png

调整部分的代码如下:

//出队部分
while (size == items.length) {
    //队列满了,阻塞等待
    this.wait();
}
//入队部分
while (size == 0) {
    //队列空了,阻塞等待
    this.wait();
}

最后就是测试一下我们所写的这个阻塞队列了, 我们创建两个线程分别是消费者线程customer和生产者线程producer, 生产者生产数字, 消费者消费数字, 为了让执行结果中的阻塞效果明显一些, 我们可以使用sleep方法来控制一下生产者/消费者的生产/消费的频率, 这里我们让开始时生产的速度快一些, 消费的速度慢一些, 全部代码如下:

class MyBlockingQueue {
    //存放元素的数数组
    private int[] items = new int[1000];
    //队头指针
    private int head = 0;
    //队尾指针
    private int tail = 0;
    //记录队列元素的个数
    private int size = 0;
    //入队操作
    public void put (int val) throws InterruptedException {
        synchronized (this) {
            while (size == items.length) {
                //队列满了,阻塞等待
                this.wait();
            }
            items[tail++] = val;
            //等价于 tail %= items.length
            if (tail >= items.length) {
                tail = 0;
            }
            size++;
            //唤醒因队列空造成的阻塞wait
            this.notify();
        }
    }
    //出队操作
    public Integer take() throws InterruptedException {
        int resulet = 0;
        synchronized (this) {
            while (size == 0) {
                //队列空了,阻塞等待
                this.wait();
            }
            resulet = items[head++];
            //等价于 head %= elem.length
            while (head >= items.length) {
                head = 0;
            }
            size--;
            //唤醒因队列满造成的阻塞wait
            this.notify();
            return resulet;
        }
    }
}
public class Test {
    public static void main(String[] args) {
        //消费线程
        MyBlockingQueue queue = new MyBlockingQueue();
        Thread customer = new Thread(() -> {
            while(true) {
                try {
                    int result = queue.take();
                    System.out.println("消费元素: " + result);
                    Thread.sleep(500);
                } catch (InterruptedException e) {
                    throw new RuntimeException(e);
                }
            }
        });
        customer.start();
        //生产线程
        Thread producer = new Thread(() -> {
            int count = 0;
            while (true) {
                try {
                    queue.put(count);
                    System.out.println("生产元素: " + count);
                    count++;
                } catch (InterruptedException e) {
                    throw new RuntimeException(e);
                }
            }
        });
        producer.start();
    }
}

执行结果:

73d8c9be8b2a4960a39693770de0ac9a.png

可以看到执行结果中因为生产者生产快, 消费者消费慢, 所以一开始生产者生产的速度是极快的, 当生产到阻塞队列满了之后生产者需要等待消费者消费后才能生产, 此时生产者的速度就跟消费者同步了.


目录
相关文章
|
11天前
|
Java
Java—多线程实现生产消费者
本文介绍了多线程实现生产消费者模式的三个版本。Version1包含四个类:`Producer`(生产者)、`Consumer`(消费者)、`Resource`(公共资源)和`TestMain`(测试类)。通过`synchronized`和`wait/notify`机制控制线程同步,但存在多个生产者或消费者时可能出现多次生产和消费的问题。 Version2将`if`改为`while`,解决了多次生产和消费的问题,但仍可能因`notify()`随机唤醒线程而导致死锁。因此,引入了`notifyAll()`来唤醒所有等待线程,但这会带来性能问题。
Java—多线程实现生产消费者
|
13天前
|
安全 Java Kotlin
Java多线程——synchronized、volatile 保障可见性
Java多线程中,`synchronized` 和 `volatile` 关键字用于保障可见性。`synchronized` 保证原子性、可见性和有序性,通过锁机制确保线程安全;`volatile` 仅保证可见性和有序性,不保证原子性。代码示例展示了如何使用 `synchronized` 和 `volatile` 解决主线程无法感知子线程修改共享变量的问题。总结:`volatile` 确保不同线程对共享变量操作的可见性,使一个线程修改后,其他线程能立即看到最新值。
|
13天前
|
消息中间件 缓存 安全
Java多线程是什么
Java多线程简介:本文介绍了Java中常见的线程池类型,包括`newCachedThreadPool`(适用于短期异步任务)、`newFixedThreadPool`(适用于固定数量的长期任务)、`newScheduledThreadPool`(支持定时和周期性任务)以及`newSingleThreadExecutor`(保证任务顺序执行)。同时,文章还讲解了Java中的锁机制,如`synchronized`关键字、CAS操作及其实现方式,并详细描述了可重入锁`ReentrantLock`和读写锁`ReadWriteLock`的工作原理与应用场景。
|
14天前
|
安全 Java 编译器
深入理解Java中synchronized三种使用方式:助您写出线程安全的代码
`synchronized` 是 Java 中的关键字,用于实现线程同步,确保多个线程互斥访问共享资源。它通过内置的监视器锁机制,防止多个线程同时执行被 `synchronized` 修饰的方法或代码块。`synchronized` 可以修饰非静态方法、静态方法和代码块,分别锁定实例对象、类对象或指定的对象。其底层原理基于 JVM 的指令和对象的监视器,JDK 1.6 后引入了偏向锁、轻量级锁等优化措施,提高了性能。
37 3
|
14天前
|
存储 安全 Java
Java多线程编程秘籍:各种方案一网打尽,不要错过!
Java 中实现多线程的方式主要有四种:继承 Thread 类、实现 Runnable 接口、实现 Callable 接口和使用线程池。每种方式各有优缺点,适用于不同的场景。继承 Thread 类最简单,实现 Runnable 接口更灵活,Callable 接口支持返回结果,线程池则便于管理和复用线程。实际应用中可根据需求选择合适的方式。此外,还介绍了多线程相关的常见面试问题及答案,涵盖线程概念、线程安全、线程池等知识点。
96 2
|
22天前
|
安全 Java API
java如何请求接口然后终止某个线程
通过本文的介绍,您应该能够理解如何在Java中请求接口并根据返回结果终止某个线程。合理使用标志位或 `interrupt`方法可以确保线程的安全终止,而处理好网络请求中的各种异常情况,可以提高程序的稳定性和可靠性。
46 6
|
缓存 Java 索引
JAVA并发之阻塞队列浅析
JAVA并发之阻塞队列浅析背景因为在工作中经常会用到阻塞队列,有的时候还要根据业务场景获取重写阻塞队列中的方法,所以学习一下阻塞队列的实现原理还是很有必要的。(PS:不深入了解的话,很容易使用出错,造成没有技术深度的样子) 阻塞队列是什么?要想了解阻塞队列,先了解一下队列是啥,简单的说队列就是一种先进先出的数据结构。
833 0
|
2月前
|
设计模式 Java 开发者
Java多线程编程的陷阱与解决方案####
本文深入探讨了Java多线程编程中常见的问题及其解决策略。通过分析竞态条件、死锁、活锁等典型场景,并结合代码示例和实用技巧,帮助开发者有效避免这些陷阱,提升并发程序的稳定性和性能。 ####
|
1月前
|
存储 监控 小程序
Java中的线程池优化实践####
本文深入探讨了Java中线程池的工作原理,分析了常见的线程池类型及其适用场景,并通过实际案例展示了如何根据应用需求进行线程池的优化配置。文章首先介绍了线程池的基本概念和核心参数,随后详细阐述了几种常见的线程池实现(如FixedThreadPool、CachedThreadPool、ScheduledThreadPool等)的特点及使用场景。接着,通过一个电商系统订单处理的实际案例,分析了线程池参数设置不当导致的性能问题,并提出了相应的优化策略。最终,总结了线程池优化的最佳实践,旨在帮助开发者更好地利用Java线程池提升应用性能和稳定性。 ####
|
2月前
|
缓存 Java 开发者
Java多线程编程的陷阱与最佳实践####
本文深入探讨了Java多线程编程中常见的陷阱,如竞态条件、死锁和内存一致性错误,并提供了实用的避免策略。通过分析典型错误案例,本文旨在帮助开发者更好地理解和掌握多线程环境下的编程技巧,从而提升并发程序的稳定性和性能。 ####