一、什么是阻塞队列
阻塞队列(Blocking Queue)是一种特殊的队列,因其为队列,因此遵循“先进先出”的原则,此外,其是一种线程安全的数据结构,具有以下两点特性:
1. 当队列满时,继续入队列就会阻塞,直到有线程将元素从队列中取出
2. 当队列空时,继续出队列就会阻塞,直到有线程向队列中插入元素
阻塞队列常用于“生产者消费者模型”
什么是“生产者消费者模型”?
“生产者消费者模式”是一种典型的开发模型,通过一个容器来解决生产者和消费者的强耦合问题
生产者:向队列中添加元素的线程
消费者:向队列中取出元素的线程
生产者和消费者之间不直接通讯,而是通过阻塞队列来通讯,生产者产出数据后,不用等待消费者处理数据,而是将数据放入阻塞队列中;消费者不用等待消费者产出数据,而是直接从阻塞队列中取出元素。
生产者和消费者之间直接通讯:
通过阻塞队列通讯:
为什么要在生产者和消费者之间使用阻塞队列?
1. 阻塞队列相当于一个缓冲区,平衡了生产者和消费者的处理能力
当生产者突然产出大量数据,若直接将这些数据交给消费者处理,消费者可能一时处理不了这么多数据,而若生产者将这些数据放入阻塞队列中,消费者则可慢慢从阻塞队列中取出数据来处理
2. 阻塞队列能够使生产者和消费者之间解耦合
若生产者直接将数据交给消费者处理,则生产者要考虑消费者处理数据的能力,一旦消费者未能及时处理数据,生产者则要等待消费者处理完数据;同样,一旦生产者未能及时产出数据,消费者就得等待生产者产出数据。
而若加上阻塞队列,生产者只负责产出数据,并将其放入阻塞队列中,消费者只负责从阻塞队列中取出数据并处理。此时就相当于“流水线”,每个线程只专注完成自己的任务。
二、阻塞队列的使用
Java标准库中内置了阻塞队列,当我们需要使用阻塞队列时,可直接使用标准库中提供的阻塞队列。
BlockingQueue是一个接口,其实现类是ArrayBlockingQueue,LinkedBlockingQueue等
Java中提供的阻塞队列有:
ArrayBlockingQueue:基于数组实现的有界阻塞队列
LinkedBlockingQueue:基于链表实现的有界阻塞队列
PriorityBlockingQueue:支持优先级的无界阻塞队列
DelayQueue:基于PriorityBlockingQueue实现的无界延迟队列
SynchronousQueue:不存储元素的阻塞队列
LinkedTransferQueue:基于链表实现的无界阻塞队列
LinkedBlockingDeque:基于链表实现的双向阻塞队列
阻塞队列类中的常用方法
void put(E e) throws InterruptedException
put 方法用于插入元素,当队列未满时,将元素插入队列尾,若队列已满,则无法继续插入元素,阻塞,直到队列中有空闲空间,才解除阻塞状态,并将元素插入到队列中
E take() throws InterruptedException
take 方法用于取出队列中队首元素, 并返回该元素,若队列中有元素,则正常取出数据,若队列为空,则阻塞,直到队列中有元素,解除阻塞状态,并取出该元素
boolean add(E e)
add 方法用于向队列中添加元素,若添加成功,则返回true,若队列已满,添加失败,add 方法会抛出异常
E remove()
remove 方法用于删除元素,并返回,当队列为空时,remove 方法会抛出异常
E element()
element 方法用于查看队首元素,并返回,但是不将其从队列中删除,若队列为空,则抛出异常
boolean offer(E e)
offer 方法用于添加元素,若添加成功,则返回true,若队列已满,添加失败时,返回false
E poll()
poll 方法用于移除并返回队首元素,若队列为空,则返回null,因此,不允许向队列中插入null,否则我们无法区分返回的null是队列中的元素还是队列为空时的提示
E peek()
peek 方法用于查看队首元素,并返回,但是不将其从队列中删除,当队列为空时,返回null
总结:
方法 | 抛出异常 | 通过返回值判断 | 阻塞 |
插入元素 | add(e) | offer(e) | put(e) |
获取并移除队首元素 | remove() | poll() | take() |
获取但不移除队首元素 | element() | peek() | 无 |
三、模拟实现阻塞队列
了解了什么是阻塞队列和阻塞队列的常用方法,那么阻塞队列是如何实现的呢?
我们通过模拟实现阻塞队列中带有阻塞功能的 put 和 take 方法来进一步了解阻塞队列
在这里我们想通过模拟实现 put 和 take 来了解阻塞队列,因此为了简单,我们将存储的元素定义为Integer,而不写作泛型的形式
如何实现 put 和 take 方法?
首先我们分析 put 和 take 方法分别要实现哪些功能
put :
1. 实现入队列操作
2. 保证线程安全
3. 当队列满时,阻塞
take :
1. 实现出队列操作
2. 保证线程安全
3. 当队列为空时,出队列阻塞
1. 实现入队列和出队列操作:
要想实现队列满后,若有元素出队列,则可继续插入元素,需要使用循环队列,这里选择使用数组来实现循环队列
实现循环队列:
1. 定义front 指向队首元素,rear 指向队尾元素,usedSize 标记队列中已有元素
2. 若队列中有空闲空间,可以进行入队列
3. 若队列中有元素,可以进行出队列
public class MyArrayBlockingQueue { private int[] elems = null; private int front = 0; private int tail = 0; private int usedSize = 0; //初始化循环队列容量 public MyArrayBlockingQueue(int capacity){ if(capacity <= 0){ throw new IllegalArgumentException("队列容量应大于0!"); } elems = new int[capacity]; } //实现入队列 public void put(int elem){ //若队列满,则不能向队列中添加元素 if(usedSize >= elems.length){ return; } //向队列中添加元素 elems[tail++] = elem; //若tail指向队列最后一个位置之后,则将其放到0位置处 if(tail >= elems.length){ tail = 0; } usedSize++; } //实现出队列 public int take(){ //若队列为空,则不能入队列 if(usedSize == 0){ //此时我们暂时先用抛出异常的方式来标记不能入队列 throw new RuntimeException(); } //取出front位置元素 int val = elems[front++]; //若front指向队列最后一个位置之后,则将其放到0位置处 if(front >= elems.length){ front = 0; } usedSize--; return val; } }
2. 保证线程安全
上面实现的循环队列操作是不安全的
例如:
由于 put 和 take 方法都有写操作,因此都需要加锁
这里我们通过锁对象来进行加锁操作(也可以通过 this 进行加锁操作)
在哪里加锁?
若我们仅对添加元素和删除元素加锁,仍不能上图中存在的问题,因此我们要对 判断和入队列(或出队列)操作 加锁
public class MyArrayBlockingQueue { private int[] elems = null; private int front = 0; private int tail = 0; private int usedSize = 0; private Object locker = new Object();//锁对象 //初始化循环队列容量 public MyArrayBlockingQueue(int capacity){ if(capacity <= 0){ throw new IllegalArgumentException("队列容量应大于0!"); } elems = new int[capacity]; } //实现入队列 public void put(int elem){ //加锁 synchronized (locker){ //若队列满,则不能向队列中添加元素 if(usedSize >= elems.length){ return; } //向队列中添加元素 elems[tail++] = elem; //若tail指向队列最后一个位置之后,则将其放到0位置处 if(tail >= elems.length){ tail = 0; } usedSize++; } } //实现出队列 public int take(){ int val = 0; //加锁 synchronized (locker){ //若队列为空,则不能入队列 if(usedSize == 0){ //此时我们暂时先用抛出异常的方式来标记不能入队列 throw new RuntimeException(); } //取出front位置元素 val = elems[front++]; //若front指向队列最后一个位置之后,则将其放到0位置处 if(front >= elems.length){ front = 0; } usedSize--; } return val; } }
3. 当队列为空时,出队列阻塞;当队列满时,入队列阻塞
在队列满的情况下,当有元素出队列时,解除入队列阻塞状态;
在队列为空的情况下,当有元素入队列,解除出队列阻塞状态;
//实现入队列 public void put(int elem) throws InterruptedException { //加锁 synchronized (locker){ //若队列满,则不能向队列中添加元素,阻塞 if(usedSize >= elems.length){ locker.wait(); } //向队列中添加元素 elems[tail++] = elem; //若tail指向队列最后一个位置之后,则将其放到0位置处 if(tail >= elems.length){ tail = 0; } usedSize++; // 入队列成功,此时唤醒一个阻塞的出队列线程 locker.notify(); } } //实现出队列 public int take() throws InterruptedException { int val = 0; //加锁 synchronized (locker){ //若队列为空,则不能入队列,阻塞 if(usedSize == 0){ locker.wait(); } //取出front位置元素 val = elems[front++]; //若front指向队列最后一个位置之后,则将其放到0位置处 if(front >= elems.length){ front = 0; } usedSize--; //出队列成功,此时唤醒一个入队列阻塞的线程 locker.notify(); } return val; }
然而,此时代码还存在一些问题
我们本想通过 put 方法中的 notify 唤醒在队列空时 出队列阻塞的线程,但在上图中,我们却通过notify 唤醒了 队列满时 入队列阻塞的线程,从而导致在队列满的情况下,仍向其中插入元素
如何解决上述问题?
我们可以在判断队列为空时,将 if 改为 while ,每唤醒一次线程,再判断一次
即在wait之前进行一次判断,唤醒之后再进行一次判断。
因而在上图情况中,虽然t3线程被唤醒,但由于又进行了一次判定,此时判断队列仍满,因而再次阻塞,而不会向下执行入队列操作
因此,模拟实现的阻塞队列代码为:
public class MyArrayBlockingQueue { private int[] elems = null; private int front = 0; private int tail = 0; private int usedSize = 0; private Object locker = new Object();//锁对象 //初始化循环队列容量 public MyArrayBlockingQueue(int capacity){ if(capacity <= 0){ throw new IllegalArgumentException("队列容量应大于0!"); } elems = new int[capacity]; } //实现入队列 public void put(int elem) throws InterruptedException { //加锁 synchronized (locker){ //若队列满,则不能向队列中添加元素,阻塞 while (usedSize >= elems.length){ locker.wait(); } //向队列中添加元素 elems[tail++] = elem; //若tail指向队列最后一个位置之后,则将其放到0位置处 if(tail >= elems.length){ tail = 0; } usedSize++; // 入队列成功,此时唤醒一个阻塞的出队列线程 locker.notify(); } } //实现出队列 public int take() throws InterruptedException { int val = 0; //加锁 synchronized (locker){ //若队列为空,则不能入队列,阻塞 while (usedSize == 0){ locker.wait(); } //取出front位置元素 val = elems[front++]; //若front指向队列最后一个位置之后,则将其放到0位置处 if(front >= elems.length){ front = 0; } usedSize--; //出队列成功,此时唤醒一个入队列阻塞的线程 locker.notify(); } return val; } }
此时,我们再通过一个简单的生产者消费者模型来验证一下我们实现的阻塞队列是否有问题
public class Test { public static void main(String[] args) { MyArrayBlockingQueue queue = new MyArrayBlockingQueue(10); //生产者 Thread producer = new Thread(() -> { int n = 1; while (true){ try { queue.put(n); System.out.println("生产元素:" + n); n++; Thread.sleep(1000); } catch (InterruptedException e) { throw new RuntimeException(e); } } }); //消费者 Thread customer = new Thread(() -> { while (true){ try { int n = queue.take(); System.out.println("消费元素:" + n); Thread.sleep(2000); } catch (InterruptedException e) { throw new RuntimeException(e); } } }); producer.start(); customer.start(); } }
运行结果: