💪🏻 制定明确可量化的目标,坚持默默的做事。
一、继承实现关系图
二、低层数据存储结构
public class DelayQueue<E extends Delayed> extends AbstractQueue<E> implements BlockingQueue<E> { ... private final transient ReentrantLock lock = new ReentrantLock(); private final PriorityQueue<E> q = new PriorityQueue<E>(); private Thread leader; private final Condition available = lock.newCondition(); ... }
说明:
- lock:添加和获取锁(同一把锁)
- q:PriorityQueue存储数据(实现优化级队列)
- leader: 标记正在延迟等待出队的线程
- available: 出队阻塞对象
构造器:
public DelayQueue() {} public DelayQueue(Collection<? extends E> c) { this.addAll(c); }
说明:
- 默认无参构造器
- 第二个传入一个集合初始化队列元素构造器
三、特点及优缺点
3.1 特点
- 支持优先级排序的无界阻塞延迟队列,优先级高的先出队,先先级低的后出队
- 存储数据结构为数组,以二叉堆形式存储于数组中,二叉堆叶子节点均小于父节点
- 只有一把锁(出队有阻塞)
- 不可添加为null的元素
- 出队取队头元素(只有队头元素的delay为0时才能取出元素)
- 初始容量大小为11
- 最大可扩容到Integer.MAX_VALUE
3.2 优缺点
- 根据排序规则排序(元素必须实现 Delayed接口)
- put不阻塞(若生产速度快于消费速度,会耗尽所有的可用堆内空间)
- 优先级排序无界阻塞队列(无界是说Integer.MAX_VALUE非常大,一般不会达这个数字内存就已经撑不住)
- 取数据锁为延迟阻塞出队
- 内部数组长度不够会进行扩容
- 新增元素进行最小二叉推排序
四、源码详解
读取重要源码:
- 添加任务方法
- 获取和删除任务方法
- 扩容
阅读明白这几个接口即可,其它都很简单。
4.1 put、offer
- put实际上是调offer方法
- 不允许添加null对象
- siftUp按优先级插入队列逻辑同 阻塞队列PriorityBlockingQueue
public void put(E e) { offer(e); } public boolean offer(E e) { final ReentrantLock lock = this.lock; lock.lock(); try { q.offer(e); if (q.peek() == e) { // 刚插入就在队首,说明队列是空的,目前只有一个元素 // 则唤醒队列为空时的阻塞 // 唤醒队列leader不为空时的阻塞 leader = null; available.signal(); } return true; } finally { lock.unlock(); } } // p.offer(e) public boolean offer(E e) { // 插入元素不允许为空 if (e == null) throw new NullPointerException(); modCount++; int i = size; if (i >= queue.length) // 队列为空则扩容 grow(i + 1); // 按优先级插入到队列中 siftUp(i, e); // 队列元素个数+1 size = i + 1; return true; } /** * 按优先级插入到队列中 */ private void siftUp(int k, E x) { if (comparator != null) siftUpUsingComparator(k, x, queue, comparator); else siftUpComparable(k, x, queue); }
4.2 take
- 加锁
- 读取队首元素,若为空则阻塞,
- 若不为空且过期时间为0,则取出队首元素),队列元素重排
- 最后一个元素n 并且最后一个元素赋值为null
- 队列按排序规则从子节点中取元素填充父节点
- 子节点中取元素似为空的父节点,重复上一步骤,直到取出的子节点为叶子节点leaf
- first = null 解决高并发情况下某一线程a延迟等待出队,而其它线程因leader!=null而available.await()等待,因线程a出取first而其它线程first一直不为空,导致JVM无法回收导致内存泄漏问题
- 判断leader元素是否为空,不为空的话阻塞当前线程
- leader元素为空,则设置leader为当前线程,阻塞delay时间,delay结束重复以上步骤
- 若无取出线程,且队列不为空,则唤配所有其它awai()线程
- 解锁
public E take() throws InterruptedException { final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { for (;;) { E first = q.peek(); if (first == null) available.await(); else { long delay = first.getDelay(NANOSECONDS); if (delay <= 0L) return q.poll(); // first = null 解决高并发情况下某一线程a延迟等待出队,而其它线程因leader!=null而available.await()等待,因线程a出取first而其它线程first一直不为空,导致JVM无法回收导致内存泄漏问题 first = null; if (leader != null) available.await(); else { // leader为空,则设置当前线程为取元素线程 Thread thisThread = Thread.currentThread(); leader = thisThread; try { // 延迟等待取出元素 available.awaitNanos(delay); } finally { // 延迟等待结束之后,还原现场 if (leader == thisThread) leader = null; } } } } } finally { if (leader == null && q.peek() != null) // 无正在取元素线程,且队列不为空,唤醒await的线程 available.signal(); // 释放锁 lock.unlock(); } } public E poll() { final Object[] es; final E result; if ((result = (E) ((es = queue)[0])) != null) { modCount++; final int n; final E x = (E) es[(n = --size)]; es[n] = null; if (n > 0) { final Comparator<? super E> cmp; if ((cmp = comparator) == null) siftDownComparable(0, x, es, n); else siftDownUsingComparator(0, x, es, n, cmp); } } return result; }
4.3 扩容原理
- 默认队列大小为11,扩容最大大小为Integer.MAX_VALUE
- 扩容时机:元素个数 >= 队列的大小
- 扩容规则:
- 队列大小 < 64,则新队列大小 = (旧队列大小 + 1) * 2
- 队列大小 >= 64,则新队列大小 = 旧队列大小 + 旧队列大小 / 2
- 当旧容量大于Integer.MAX_VALUE - 8时,新容量值=旧容量 + 1(下面看源码说明)
- 当超过Ineter.MAX_VALUE时,报OutOfMemoryError错误
- 创建新容量数组队列,原数组队列数据复制到新数组队列中
private void grow(int minCapacity) { int oldCapacity = queue.length; // 当旧容量大小小于64,新容量 = 旧容量 + (旧容量 + 2) // 当旧容量大小大于63,新容量 = 旧容量 + (旧容量 / 2) // 当旧容量大于Integer.MAX_VALUE - 8时,走hugeLength(int oldLength, int minGrowth)逻辑 // minCapacity = (i = size) + 1 (即oldCapacity+1) - oldCapacity = 1 int newCapacity = ArraysSupport.newLength(oldCapacity, minCapacity - oldCapacity, /* minimum growth */ oldCapacity < 64 ? oldCapacity + 2 : oldCapacity >> 1 /* preferred growth */); queue = Arrays.copyOf(queue, newCapacity); } public static int newLength(int oldLength, int minGrowth, int prefGrowth) { // preconditions not checked because of inlining // assert oldLength >= 0 // assert minGrowth > 0 int prefLength = oldLength + Math.max(minGrowth, prefGrowth); // might overflow if (0 < prefLength && prefLength <= SOFT_MAX_ARRAY_LENGTH) { return prefLength; } else { // put code cold in a separate method return hugeLength(oldLength, minGrowth); } } private static int hugeLength(int oldLength, int minGrowth) { // 当oldLength > Integer.MAX_VALUE - 8时 // minLength = oldLength + 1 // 当minLength < Integer.MAX_VALUE,此时minLength > SOFT_MAX_ARRAY_LENGTH(Integer.MAX_VALUE - 8)返回 minLength // 否则抛OutOfMemoryError错误 int minLength = oldLength + minGrowth; if (minLength < 0) { // overflow throw new OutOfMemoryError( "Required array length " + oldLength + " + " + minGrowth + " is too large"); } else if (minLength <= SOFT_MAX_ARRAY_LENGTH) { return SOFT_MAX_ARRAY_LENGTH; } else { return minLength; } }
五、作用及使用场景
- 下单:如唯品会,下单30分钟之内不支付自动取消
- 试课课程:试课结束前15分钟通知老师
- 空闲连接延迟自动关闭
- 缓存:超过时间自动清除
- 超时处理、业务办理排队叫号、插队和枪购活动等