堵塞队列BlockingQueue 使用与理解

简介: 堵塞队列本质就是队列,底层数据结构 通常是由数组,或者链表构成。实现FIFO思想当阻塞队列是空时,从队列中获取元素的操作将会被阻塞。当阻塞队列是满时,往队列里添加元素的操作将会被阻塞。

堵塞队列BlockingQueue

什么是堵塞队列

堵塞队列本质就是队列,底层数据结构 通常是由数组,或者链表构成。实现FIFO思想

当阻塞队列是空时,从队列中获取元素的操作将会被阻塞。

当阻塞队列是满时,往队列里添加元素的操作将会被阻塞。

注意:bolckingQueue是在多线程环境下提供的线程安全的队列

与ArrayList区别
1、ArrayList线程不安全,blockingQueue线程安全
2、ArrayList可以扩容,blockingQueue队列不能
在这里插入图片描述

为什么要使用堵塞队列

1、我们不需要关心什么时候需要阻塞线程,什么时候需要唤醒线程,BlockingQueue都给你一手包办了

2、如果有很多任务要处理,我们当前处理不了,总不能不处理。我们可以延迟处理,总比不处理要好

阻塞队列使用场景

1、生产者消费者模式

    传统版(synchronized, wait, notify)
    阻塞队列版(lock, await, signal)

2、线程池
3、消息中间件

怎么使用堵塞队列

blockingQueue实现类
在这里插入图片描述
ArrayBlockingQueue:由数组结构组成的有界阻塞队列。

LinkedBlockingQueue:由链表结构组成的有界(但大小默认值为Integer.MAX_VALUE)阻塞队列。

PriorityBlockingQueue:支持优先级排序的无界阻塞队列。

DelayQueue:使用优先级队列实现延迟无界阻塞队列。

SynchronousQueue:不存储元素的阻塞队列(生产一个消费一个)。

LinkedTransferQueue:由链表结构绒成的无界阻塞队列。

LinkedBlockingDeque:由链表结构组成的双向阻塞队列。

BlockingQueue核心方法组
在这里插入图片描述
在这里插入图片描述
offer和poll组

 public static void main(String[] args) {

        ArrayBlockingQueue<String> blockingQueue = new ArrayBlockingQueue<>(3);
        
        new Thread(()->{
            System.out.println("元素成功入队否 "+blockingQueue.offer("1"));
            System.out.println("元素成功入队否 "+blockingQueue.offer("2"));
        },"producer1").start();
        
        new Thread(()->{
            System.out.println("元素成功入队否 "+blockingQueue.offer("3"));
            System.out.println("元素成功入队否 "+blockingQueue.offer("4"));
            System.out.println("阻塞队列中当前拥有数据个数: "+blockingQueue.size());
        },"producer2").start();

        new Thread(()->{
            System.out.println("成功消费数据 "+blockingQueue.poll());
            System.out.println("阻塞队列中当前拥有数据个数: "+blockingQueue.size());
        },"consumer").start();
    }

结果:
在这里插入图片描述
阻塞队列中只能存放指定个数的数据,如果使用offer(),将数据放入队列,当前队列已满,消费线程没有来得及消费,那么offer放入数据会失败

超时的 offer和poll组 与上面代码类似,只不过加了时间限制

put和take组

 public static void main(String[] args) {

        BlockingQueue<String> blockingQueue = new ArrayBlockingQueue<>(3);

        new Thread(()->{
            try {
                blockingQueue.put("1");
                blockingQueue.put("2");
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        },"producer1").start();

        new Thread(()->{
            try {
                blockingQueue.put("3");
                blockingQueue.put("4");
                System.out.println("阻塞队列中当前拥有数据个数: "+blockingQueue.size());
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        },"producer2").start();
    }

结果:
在这里插入图片描述
使用put将数据消息进入队列,如果队列满了,并且没有消费者线程进行消费,那么一直会堵塞线程,只要队列不为满时,将元素放入才不会堵塞线程

SynchronousQueue队列

SynchronousQueue没有容量。

与其他BlockingQueue不同,SynchronousQueue是一个不存储元素的BlockingQueue。

每一个put操作必须要等待一个take操作,否则不能继续添加元素,反之亦然。

public static void main(String[] args) {
        BlockingQueue<String> blockingQueue = new SynchronousQueue<>();

        new Thread(() -> {
            try {
                System.out.println(Thread.currentThread().getName() + "\t put A ");
                blockingQueue.put("A");

                System.out.println(Thread.currentThread().getName() + "\t put B ");
                blockingQueue.put("B");

                System.out.println(Thread.currentThread().getName() + "\t put C ");
                blockingQueue.put("C");

            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }, "t1").start();

        new Thread(() -> {
            try {

                try {
                    TimeUnit.SECONDS.sleep(5);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                blockingQueue.take();
                System.out.println(Thread.currentThread().getName() + "\t take A ");

                try {
                    TimeUnit.SECONDS.sleep(5);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                blockingQueue.take();
                System.out.println(Thread.currentThread().getName() + "\t take B ");

                try {
                    TimeUnit.SECONDS.sleep(5);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                blockingQueue.take();
                System.out.println(Thread.currentThread().getName() + "\t take C ");

            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }, "t2").start();
    }

放一个拿一个,存在一个就不能放了哦

传统模式下的生产者消费者

1、synchronized控制的

class Data{

    int number = 0;

    AtomicInteger atomicInteger = new AtomicInteger(0);

    public void increment(){
        synchronized (this){
            // 不等于0进行,等待消费者消费
            while (number != 0){
                try {
                    this.wait();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }

            // 生产
            number++;
            System.out.println(Thread.currentThread().getName()+" 生产了一个产品: " +number);

            // 通知消费者消费
            this.notify();
        }
    }


    public void decrement(){
        synchronized (this){
            // 等待生产者生产
            while (number == 0){
                try {
                    this.wait();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }

            // 消费
            number--;
            System.out.println(Thread.currentThread().getName()+" 消费了一个产品: " +number);

            // 通知生产者生产
            this.notify();
        }
    }

}

public static void main(String[] args) {

        // 任务: 生产一个消费一个
        Data data = new Data();

        new Thread(()->{
            for (int i = 0; i < 5; i++) {
                data.increment();
            }
        },"producer").start();

        new Thread(()->{
            for (int i = 0; i < 5; i++) {
                data.decrement();
            }
        },"consumer").start();


    }

2、lock(ReentrantLock)

class Data{

    int number = 0;

    private Lock lock = new ReentrantLock();

    private Condition condition = lock.newCondition();

    public void increment(){
         lock.lock();
         try {
             // 不等于0进行,等待消费者消费
             while (number != 0){
                 try {
                     condition.await();
                 } catch (InterruptedException e) {
                     e.printStackTrace();
                 }
             }

             // 生产
             number++;
             System.out.println(Thread.currentThread().getName()+" 生产了一个产品: " +number);

             // 通知消费者消费
             condition.signal();
         }finally {
             lock.unlock();
         }

    }


    public void decrement(){
        lock.lock();
        try {
            // 等待生产者生产
            while (number == 0){
                try {
                   condition.await();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }

            // 消费
            number--;
            System.out.println(Thread.currentThread().getName()+" 消费了一个产品: " +number);

            // 通知生产者生产
            condition.signal();
        }finally {
            lock.unlock();
        }
    }

}

public static void main(String[] args) {

        // 任务: 生产一个消费一个
        Data data = new Data();

        new Thread(()->{
            for (int i = 0; i < 5; i++) {
                data.increment();
            }
        },"producer").start();

        new Thread(()->{
            for (int i = 0; i < 5; i++) {
                data.decrement();
            }
        },"consumer").start();


    }

两者运行结果:
在这里插入图片描述

虚假唤醒问题

存在多个线程并发争抢一个资源。以生产者消费者为例:

我们任务要求,只能生产一个产品消费一个产品。两个生产者生产,两个消费者消费。

当生产者生产完一个产品时,要唤醒等待的线程(notify是随机唤醒)。注意此时有两个消费者线程,一个生产者线程等待。如果cpu的调度权被等待的生产者获取到了,此时生产者在 wait()方法处 会直接往下执行,实际上就生产了两个产品。同理消费者也可能同时消费两个产品

根源在于:换性的线程是直接往下执行的并没有判断是否满足对应条件
在这里插入图片描述
产生虚假唤醒的源码

class Data{

    int number = 0;

    private Lock lock = new ReentrantLock();

    private Condition condition = lock.newCondition();

    public void increment(){
         lock.lock();
         try {
             // 不等于0进行,等待消费者消费
             if (number != 0){
                 try {
                     condition.await();
                 } catch (InterruptedException e) {
                     e.printStackTrace();
                 }
             }

             // 生产
             number++;
             System.out.println(Thread.currentThread().getName()+" 生产了一个产品: " +number);

             // 通知消费者消费
             condition.signal();
         }finally {
             lock.unlock();
         }

    }


    public void decrement(){
        lock.lock();
        try {
            // 等待生产者生产
            if (number == 0){
                try {
                   condition.await();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }

            // 消费
            number--;
            System.out.println(Thread.currentThread().getName()+" 消费了一个产品: " +number);

            // 通知生产者生产
            condition.signal();
        }finally {
            lock.unlock();
        }
    }

}

public static void main(String[] args) {

        // 任务: 生产一个消费一个
        Data data = new Data();

        new Thread(()->{
            for (int i = 0; i < 5; i++) {
                data.increment();
            }
        },"producer").start();

        new Thread(()->{
            for (int i = 0; i < 5; i++) {
                data.decrement();
            }
        },"consumer").start();


        new Thread(()->{
            for (int i = 0; i < 5; i++) {
                data.increment();
            }
        },"producer1").start();

        new Thread(()->{
            for (int i = 0; i < 5; i++) {
                data.decrement();
            }
        },"consumer2").start();


    }

可能的结果
在这里插入图片描述

解决:
if该while即可,唤醒的同时,进行再次判断
在这里插入图片描述
总结具有 await/wait方法时,需要使用while

Synchronized和Lock区别

1、synchronized属于JVM层面,属于java的关键字

monitorenter(底层是通过monitor对象来完成,其实wait/notify等方法也依赖于monitor对象 只能在同步块或者方法中才能调用 wait/ notify等方法)

Lock是具体类(java.util.concurrent.locks.Lock)是api层面的锁

2、使用方法:
synchronized:不需要用户去手动释放锁,当synchronized代码执行后,系统会自动让线程释放对锁的占用。

ReentrantLock:则需要用户去手动释放锁,若没有主动释放锁,就有可能出现死锁的现象,需要lock() 和 unlock() 配置try catch语句来完成

3、等待是否中断
synchronized:不可中断,除非抛出异常或者正常运行完成。

ReentrantLock:可中断,可以设置超时方法
设置超时方法,trylock(long timeout, TimeUnit unit)
lockInterrupible() 放代码块中,调用interrupt() 方法可以中断

4、加锁是否公平
synchronized:非公平锁

ReentrantLock:默认非公平锁,构造函数可以传递boolean值,true为公平锁,false为非公平锁

5、锁绑定多个条件Condition
synchronized:没有,要么随机,要么全部唤醒
ReentrantLock:用来实现分组唤醒需要唤醒的线程,可以精确唤醒,而不是像synchronized那样,要么随机,要么全部唤醒

Condition实现精准唤醒线程

任务:
多线程之间按顺序调用,实现 A-> B -> C 三个线程启动,要求如下:
AA打印5次,BB打印10次,CC打印15次

class ShareData{

    // 1,2,3 分别标识3个不同的线程A,B,C
    int number = 1;

    Lock lock = new ReentrantLock();

    // condition在哪个线程中就表示是哪个线程的条件
    Condition c1 =lock.newCondition();
    Condition c2 =lock.newCondition();
    Condition c3 =lock.newCondition();

    // 功能聚合  任务写在共享资源中
    public void print5(){
       lock.lock();
       try {
           // 判断
           while (number != 1){
               try {
                   // 当前线程等待
                   c1.await();
               } catch (InterruptedException e) {
                   e.printStackTrace();
               }
           }

           // 执行任务
           for (int i = 0; i < 5; i++) {
               System.out.println(Thread.currentThread().getName()+" "+i);
           }
           System.out.println();

           // 唤醒 (干完活后,需要通知B线程执行)
           number = 2;
           // 通知2号去干活了
           c2.signal();
       }finally {
           lock.unlock();
       }
    }

    public void print10(){
        lock.lock();
        try {
            // 判断
            while (number != 2){
                try {
                    // 当前线程等待
                    c2.await();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }

            // 执行任务
            for (int i = 0; i < 10; i++) {
                System.out.println(Thread.currentThread().getName()+" "+i);
            }
            System.out.println();


            // 唤醒 (干完活后,需要通知C线程执行)
            number = 3;
            // 通知3号去干活了
            c3.signal();
        }finally {
            lock.unlock();
        }
    }

    public void print15(){
        lock.lock();
        try {
            // 判断
            while (number != 3){
                try {
                    // 当前线程等待
                    c3.await();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }

            // 执行任务
            for (int i = 0; i < 15; i++) {
                System.out.println(Thread.currentThread().getName()+" "+i);
            }
            System.out.println();


            // 唤醒 (干完活后,需要通知A线程执行)
            number = 1;
            // 通知1号去干活了
            c1.signal();
        }finally {
            lock.unlock();
        }
    }
}

/**
 * 1、多线程操作资源类
 * 2、判断需不需要等待
 * 3、执行任务
 * 4、通知其它线程执行
 */
public class ConditionTest {

    public static void main(String[] args) {

        ShareData shareData = new ShareData();

        // Condition在哪个线程,表示是哪个线程的条件
        new Thread(()->{
            shareData.print5();
        },"A").start();

        new Thread(()->{
            shareData.print10();
        },"B").start();

        new Thread(()->{
            shareData.print15();
        },"C").start();
    }
}

执行结果

A 0
A 1
A 2
A 3
A 4

B 0
B 1
B 2
B 3
B 4
B 5
B 6
B 7
B 8
B 9

C 0
C 1
C 2
C 3
C 4
C 5
C 6
C 7
C 8
C 9
C 10
C 11
C 12
C 13
C 14

注意: Condition在哪个线程,表示是哪个线程的条件,其它线程可以使用其线程的对应condition精准控制线程调用

BlockingQueue队列下的生产者和消费者

class MyResource {
    // 默认开启,进行生产消费
    // 这里用到了volatile是为了保持数据的可见性,也就是当TLAG修改时,要马上通知其它线程进行修改
    private volatile boolean FLAG = true;

    // 使用原子包装类,而不用number++
    private AtomicInteger atomicInteger = new AtomicInteger();

    // 这里不能为了满足条件,而实例化一个具体的SynchronousBlockingQueue
    BlockingQueue<String> blockingQueue = null;

    // 而应该采用依赖注入里面的,构造注入方法传入
    public MyResource(BlockingQueue<String> blockingQueue) {
        this.blockingQueue = blockingQueue;
        // 查询出传入的class是什么
        System.out.println(blockingQueue.getClass().getName());
    }


    public void myProducer() throws Exception{
        String data = null;
        boolean retValue;
        // 多线程环境的判断,一定要使用while进行,防止出现虚假唤醒
        // 当FLAG为true的时候,开始生产
        while(FLAG) {
            data = atomicInteger.incrementAndGet() + "";

            // 2秒存入1个data
            retValue = blockingQueue.offer(data, 2L, TimeUnit.SECONDS);
            if(retValue) {
                System.out.println(Thread.currentThread().getName() + "\t 插入队列:" + data  + "成功" );
            } else {
                System.out.println(Thread.currentThread().getName() + "\t 插入队列:" + data  + "失败" );
            }

            try {
                TimeUnit.SECONDS.sleep(1);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }

        System.out.println(Thread.currentThread().getName() + "\t 停止生产,表示FLAG=false,生产介绍");
    }


    public void myConsumer() throws Exception{
        String retValue;
        // 多线程环境的判断,一定要使用while进行,防止出现虚假唤醒
        // 当FLAG为true的时候,开始生产
        while(FLAG) {
            // 2秒存入1个data
            retValue = blockingQueue.poll(2L, TimeUnit.SECONDS);
            if(retValue != null && retValue != "") {
                System.out.println(Thread.currentThread().getName() + "\t 消费队列:" + retValue  + "成功" );
            } else {
                FLAG = false;
                System.out.println(Thread.currentThread().getName() + "\t 消费失败,队列中已为空,退出" );

                // 退出消费队列
                return;
            }
        }
    }

    /**
     * 停止生产的判断
     */
    public void stop() {
        this.FLAG = false;
    }

}

public class BlockingQueueProducerConsumer {

    public static void main(String[] args) {
        // 传入具体的实现类, ArrayBlockingQueue
        MyResource myResource = new MyResource(new ArrayBlockingQueue<String>(10));

        new Thread(() -> {
            System.out.println(Thread.currentThread().getName() + "\t 生产线程启动\n\n");

            try {
                myResource.myProducer();
                System.out.println("\n");

            } catch (Exception e) {
                e.printStackTrace();
            }
        }, "producer").start();


        new Thread(() -> {
            System.out.println(Thread.currentThread().getName() + "\t 消费线程启动");

            try {
                myResource.myConsumer();
            } catch (Exception e) {
                e.printStackTrace();
            }
        }, "consumer").start();

        // 5秒后,停止生产和消费
        try {
            TimeUnit.SECONDS.sleep(5);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }


        System.out.println("\n\n5秒中后,生产和消费线程停止,线程结束");
        myResource.stop();
    }
}
相关文章
|
Java 调度
多线程学习(三)那些队列可用于线程池
多线程学习(三)那些队列可用于线程池
97 0
|
5月前
|
存储 设计模式 安全
使用BlockingQueue实现生产者-消费者模式
使用BlockingQueue实现生产者-消费者模式
|
算法 安全 Java
【阻塞队列BlockingQueue&非阻塞队列ConcurrentLinkedQueue&同步队列SyncQueue】
【阻塞队列BlockingQueue&非阻塞队列ConcurrentLinkedQueue&同步队列SyncQueue】
|
消息中间件
并发队列ConcurrentLinkedQueue和阻塞队列LinkedBlockingQueue使用场景总结
并发队列ConcurrentLinkedQueue和阻塞队列LinkedBlockingQueue使用场景总结
58 0
|
存储 缓存 安全
BlockingQueue阻塞队列原理以及实现
BlockingQueue阻塞队列原理以及实现
128 0
|
存储 缓存
并发编程之BlockingQueue队列
BlockingQueue即阻塞队列,从阻塞这个词可以看出,在某些情况下对阻塞队列的访问可能会造成阻塞。被阻塞的情况主要有如下两种:
229 0
|
安全 算法 API
非阻塞的无界线程安全队列 —— ConcurrentLinkedQueue
JUC 下面的相关源码继续往下阅读,这就看到了非阻塞的无界线程安全队列 —— ConcurrentLinkedQueue,来一起看看吧。
142 0
|
存储 Java
女同事问狗哥什么是线程池的阻塞队列?
女同事问狗哥什么是线程池的阻塞队列?
女同事问狗哥什么是线程池的阻塞队列?
|
安全 Java
BlockingQueue 阻塞队列详解(下)
BlockingQueue 是一个 Queue , 它是一个线程安全的阻塞队列接口。 ​
124 0