开发者社区> 问答> 正文

简述线程池阻塞队列PriorityBlockingQueue

简述线程池阻塞队列PriorityBlockingQueue

展开
收起
huc_逆天 2021-01-11 10:08:33 1159 0
1 条回答
写回答
取消 提交回答
  • PriorityBlockingQueue

    这是一个无界有序的阻塞队列,排序规则和PriorityQueue一致,只是增加了阻塞操作。同样的该队列不支持插入null元素,同时不支持插入非comparable的对象。它的迭代器并不保证队列保持任何特定的顺序,如果想要顺序遍历,考虑使用Arrays.sort(pq.toArray())。该类不保证同等优先级的元素顺序,如果你想要强制顺序,就需要考虑自定义顺序或者是Comparator使用第二个比较属性。

    源码如下:

    /**第一种,创建容量的11的队列*/ 
    public PriorityBlockingQueue() {
            this(DEFAULT_INITIAL_CAPACITY, null);
        }
    /**第二种,指定初始化容量*/
    public PriorityBlockingQueue(int initialCapacity) {
            this(initialCapacity, null);
        }
    /**第三种,指定初始化容量和比较规则*/
    public PriorityBlockingQueue(int initialCapacity,
                                     Comparator<? super E> comparator) {
            if (initialCapacity < 1)
                throw new IllegalArgumentException();
            this.lock = new ReentrantLock();
            this.notEmpty = lock.newCondition();
            this.comparator = comparator;
            this.queue = new Object[initialCapacity];
        }
    /**第四种,指定初始化集合,使用锁*/
    public PriorityBlockingQueue(Collection<? extends E> c) {
            this.lock = new ReentrantLock();
            this.notEmpty = lock.newCondition();
            boolean heapify = true; // true if not known to be in heap order
            boolean screen = true;  // true if must screen for nulls
            if (c instanceof SortedSet<?>) {
                SortedSet<? extends E> ss = (SortedSet<? extends E>) c;
                this.comparator = (Comparator<? super E>) ss.comparator();
                heapify = false;
            }
            else if (c instanceof PriorityBlockingQueue<?>) {
                PriorityBlockingQueue<? extends E> pq =
                    (PriorityBlockingQueue<? extends E>) c;
                this.comparator = (Comparator<? super E>) pq.comparator();
                screen = false;
                if (pq.getClass() == PriorityBlockingQueue.class) // exact match
                    heapify = false;
            }
            Object[] a = c.toArray();
            int n = a.length;
            // If c.toArray incorrectly doesn't return Object[], copy it.
            if (a.getClass() != Object[].class)
                a = Arrays.copyOf(a, n, Object[].class);
            if (screen && (n == 1 || this.comparator != null)) {
                for (int i = 0; i < n; ++i)
                    if (a[i] == null)
                        throw new NullPointerException();
            }
            this.queue = a;
            this.size = n;
            if (heapify)
                heapify();
        }
    

    使用如下方法进行扩容:

    /***/ 
    private void tryGrow(Object[] array, int oldCap) {
            lock.unlock(); // must release and then re-acquire main lock
            Object[] newArray = null;
            if (allocationSpinLock == 0 &&
                UNSAFE.compareAndSwapInt(this, allocationSpinLockOffset,
                                         0, 1)) {
                try {
                    int newCap = oldCap + ((oldCap < 64) ?
                                           (oldCap + 2) : // grow faster if small
                                           (oldCap >> 1));
                    if (newCap - MAX_ARRAY_SIZE > 0) {    // possible overflow
                        int minCap = oldCap + 1;
                        if (minCap < 0 || minCap > MAX_ARRAY_SIZE)
                            throw new OutOfMemoryError();
                        newCap = MAX_ARRAY_SIZE;
                    }
                    if (newCap > oldCap && queue == array)
                        newArray = new Object[newCap];
                } finally {
                    allocationSpinLock = 0;
                }
            }
            if (newArray == null) // back off if another thread is allocating
                Thread.yield();
            lock.lock();
            if (newArray != null && queue == array) {
                queue = newArray;
                System.arraycopy(array, 0, newArray, 0, oldCap);
            }
        }
    

    其先放开了锁,然后通过CAS设置allocationSpinLock来判断哪个线程获得了扩容权限,如果没抢到权限就会让出CPU使用权。最后还是要锁住开始真正的扩容。扩容权限争取到了就是计算大小,分配数组。扩容的大小时原有的一倍。

    2021-02-03 18:23:08
    赞同 展开评论 打赏
问答分类:
问答地址:
问答排行榜
最热
最新

相关电子书

更多
多IO线程优化版 立即下载
低代码开发师(初级)实战教程 立即下载
阿里巴巴DevOps 最佳实践手册 立即下载