带你手搓阻塞队列——自定义实现

简介: 带你手搓阻塞队列——自定义实现



 

一、阻塞队列的作用

一个分布式系统中,会经常出现这样的情况:有的机器能承担的压力更大,有的能承担的压力更小:

如果按照生产者消费者模型,那就另当别论了。

假设此时通过队列来让A和B进行交互:

 

二、阻塞队列实现

2.1 普通队列实现

在实现阻塞队列之前,我们先把普通的队列(基于数组的循环队列)进行一个简单的实现,然后通过进一步的改进,把普通的队列改造成一个阻塞队列。

class MyBlockingQueue{
    private int[] items;
    private int head = 0;//队列头指针
    private int tail = 0;//队列尾指针
    private int size = 0;//队列当前元素个数
    public MyBlockingQueue(){}
    //入队列
    public void put(int elem){}
    //出队列
    public int take(){}
}

2.1.1 构造方法

public MyBlockingQueue(){
        this.items = new int[100];
    }

2.1.2 入队列

//入队列
    public void put(int elem){
        if (size >= items.length){
            return;
        }
        items[tail] = elem;
        if (tail >= items.length){//判断尾指针是否到达末尾
            tail = 0;
        }
        tail++;
        size++;
    }

2.1.3 出队列

//出队列
    public int take(){
        if(size == 0){
            return -1;
        }
        int elem = items[head];
        if (head >= items.length){
            head = 0;
        }
        head++;
        size--;
        return elem;
    }

2.2 阻塞队列实现

现在,我们就把上面的队列改造成阻塞队列。

2.2.1 保证线程安全

在当前的代码下,如果是多线程的情况,调用put或者take,这两个方法中都涉及到了对变量的修改,这样就会出现线程安全问题。这就需要我们进行加锁。

//入队列
    public void put(int elem){
        synchronized (this){
            if (size >= items.length){
                return;
            }
            items[tail] = elem;
            if (tail >= items.length){
                tail = 0;
            }
            tail++;
            size++;
        }
    }
//出队列
    public int take(){
        synchronized (this){
            if(size == 0){
                return -1;
            }
            int elem = items[head];
            if (head >= items.length){
                head = 0;
            }
            head++;
            size--;
            return elem;
        }
    }

2.2.2 保证内存可见性

光加锁就够吗?我们可以看到,多线程的情况下,不光是对变量进行修改,还有读操作等等,那就有可能出现一个线程在读,另外一个线程在修改,这个读的线程没有读到。所以,此处除了加锁之外,还需要考虑内存可见性问题。也就是说,当其他线程进行修改的时候,我们要保证当前线程可以读到这个修改,所以我们把变量加上volatile关键字。

volatile private int head = 0;
    volatile private int tail = 0;
    volatile private int size = 0;

2.2.3 阻塞功能的实现

解决了上述问题后,我们就需要考虑一下如何实现阻塞功能了。

实现阻塞有两方面:

  • 当队列满的时候,再进行put(入队),就会产生阻塞。阻塞到队列中元素出队后,就去唤醒当前因队列满而被阻塞的状态。
  • 当队列空的时候,再进行take(出队),就会产生阻塞。阻塞到队列中有元素入队时,去唤醒当前因队列空而被阻塞的状态。
//入队列
    public void put(int elem) throws InterruptedException {
        synchronized (this){
            while (size >= items.length){
                //队列满了
                //return;
                this.wait();
            }
            items[tail] = elem;
            if (tail >= items.length){
                tail = 0;
            }
            tail++;
            size++;
            //成功入队
            this.notify();//唤醒因队列空而被阻塞的状态
        }
    }
    //出队列
    public int take() throws InterruptedException {
        synchronized (this){
            while (size == 0){
                //队列空
                //return -1;
                this.wait();
            }
            int elem = items[head];
            if (head >= items.length){
                head = 0;
            }
            head++;
            size--;
            this.notify();//使用这个notify唤醒队列满的阻塞状态
            return elem;
        }
    }
}

好了,经过上面的改进,我们就已经实现了一个简单的阻塞队列,下面是改进后的完整代码:

class MyBlockingQueue{
    private int[] items;
    volatile private int head = 0;
    volatile private int tail = 0;
    volatile private int size = 0;
    public MyBlockingQueue(){
        this.items = new int[100];
    }
    //入队列
    public void put(int elem) throws InterruptedException {
        synchronized (this){
            while (size >= items.length){
                //队列满了
                //return;
                this.wait();
            }
            items[tail] = elem;
            if (tail >= items.length){
                tail = 0;
            }
            tail++;
            size++;
            //成功入队
            this.notify();//唤醒因队列空而被阻塞的状态
        }
    }
    //出队列
    public int take() throws InterruptedException {
        synchronized (this){
            while (size == 0){
                //队列空
                //return -1;
                this.wait();
            }
            int elem = items[head];
            if (head >= items.length){
                head = 0;
            }
            head++;
            size--;
            this.notify();//使用这个notify唤醒队列满的阻塞状态
            return elem;
        }
    }
}

三、基于自定义阻塞队列,模拟生产者消费者模型

实现阻塞队列之后,我们利用阻塞队列简单模拟一下生产者消费者模型:

public static void main(String[] args) {
        MyBlockingQueue queue = new MyBlockingQueue();
        //生产者线程
        Thread product = new Thread(()->{
            int count = 0;
            while (true){
                try {
                    queue.put(count);
                    System.out.println("生产元素:>"+count);
                    count++;
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        });
        //消费者线程
        Thread consummer = new Thread(()->{
            while (true){
                try {
                    int elem = queue.take();
                    System.out.println("消费元素:>"+elem);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        });
        product.start();
        consummer.start();
    }

运行结果:


🌈🌈🌈好啦,今天的分享就到这里!

🛩️🛩️🛩️希望各位看官读完文章后,能够有所提升。

🎉🎉🎉创作不易,还希望各位大佬支持一下!

✈️✈️✈️点赞,你的认可是我创作的动力!

⭐⭐⭐收藏,你的青睐是我努力的方向!

✏️✏️✏️评论:你的意见是我进步的财富!

 

目录
相关文章
|
6月前
|
Java 程序员
惊呆了!LinkedList的这些队列功能,99%的程序员都没用过!
【6月更文挑战第18天】`LinkedList`不仅是Java集合中的列表实现,还可作队列(`peek()`,`add()`,`remove()`)和双端队列(`Deque`,`addFirst()`,`addLast()`,`peekFirst()`,`peekLast()`),甚至栈(`push()`,`pop()`,`peek()`)。常被低估,其实它具备从两端操作数据的强大能力,适合多种数据结构需求。
41 6
|
6月前
|
Java Apache Spring
面试官:如何自定义一个工厂类给线程池命名,我:现场手撕吗?
【6月更文挑战第3天】面试官:如何自定义一个工厂类给线程池命名,我:现场手撕吗?
40 0
|
7月前
|
存储 缓存 Java
【linux线程(四)】初识线程池&手撕线程池
【linux线程(四)】初识线程池&手撕线程池
|
前端开发 JavaScript 容器
【一个让你停不下来的动效】——难道不来瞅瞅?(含源码+思路)
【一个让你停不下来的动效】——难道不来瞅瞅?(含源码+思路)
167 0
【一个让你停不下来的动效】——难道不来瞅瞅?(含源码+思路)
|
设计模式 架构师 Java
为什么有些蛮厉害的人,后来都不咋样了
写这篇文章目的是之前在一篇文章中谈到,我实习那会有个老哥很牛皮,业务能力嘎嘎厉害,但是后面发展一般般,这引起我的思考,最近有个同事发了篇腾讯pcg的同学关于review 相关的文章,里面也谈到架构师的层次,也再次引起我关于架构师的相关思考,接下来我们展开聊聊吧~
160 0
|
监控 Java
手撕Java线程池
原理和源码解析
3832 0
手撕Java线程池
10分钟手撸Java线程池,yyds!!
10分钟手撸Java线程池,yyds!!
277 0
10分钟手撸Java线程池,yyds!!
|
缓存 负载均衡 前端开发
手撸一款简单高效的线程池(一)
线程池大家应该都用过,不过如何从 0 到 1 的设计一款简单好用且性能较好的线程池?我们在接下来的几篇文章中,为您一一介绍。
498 0
手撸一款简单高效的线程池(一)
|
缓存 负载均衡 Java
手撸一款简单高效的线程池(二)
大家好,我是不会写代码的纯序员——Chunel Feng[1]。这篇文章是线程池优化系列连载的第二篇。主要跟大家介绍几种线程池优化思路,包括:local-thread 机制、lock-free 机制、work-stealing 机制。
532 0
手撸一款简单高效的线程池(二)
|
人工智能 负载均衡 算法
手撸一款简单高效的线程池(四)
在前几章的内容中,我们给大家介绍了一些 C++线程池中的优化思路和实现方案。这一章中,我们来聊一聊在编程实现过程中,一些工程层面的优化。让我们的代码执行的速度,跟得上自己的思路。
244 0
手撸一款简单高效的线程池(四)