一行一行读Java源码——LinkedBlockingQueue

简介: 借助源码分析了LinkedBlockingQueue。

1、LinkedBlockingQueue概述

LinkedBlockingQueue,顾名思义,一个链式的(linked)、阻塞的(Blocking)队列(Queue)。
Queue,首先想到的是FIFO特性。
Linked,Queue其结构本质上也是线性表,可以由链表和顺序表实现,LinkedBlockingQueue就是链表实现,ArrayBlockingQueue是顺序表实现。因Queue 只在首尾操作,所以操作链表和顺序表的时间复杂度是一样的,但顺序表的实现会占用更少的空间,因为不需要“指针”域(next),但空间必须是连续的;链式实现不需要连续空间,但需要使用next 来指向下一个节点位置,以下LinkedBlockingQueue的节点结构。

static class Node<E> {
    E item;
    Node<E> next;
    Node(E x) { item = x; }
}

Blocking,阻塞,LinkedBlockingQueue是线程安全的,当队列满了以后,所有的入队操作将会被阻塞;当队列空了,所有的出队操作将会被阻塞。队列初始化的时候,我们可以指定队列长度capacity,如果没有指定,LinkedBlockingQueue的默认capacity是Integer.MAX_VALUE。显然,capacity还是一个不可更改的值。

/** The capacity bound, or Integer.MAX_VALUE if none */
private final int capacity;

2、LinkedBlockingQueue实现代码详解

如果要看懂LinkedBlockingQueue的实现,需要熟悉wait/notify以及AbstractQueuedSynchronizer(AQS)。题外话,个人认为并发编程中有三个非常重要的东西:等待通知机制、CAS以及AQS。

2.1 head和tail

  1. head和tail分别指示队列的首尾,可快速地定位take和put操作位置。注意头结点head和首节点first的区分。
transient Node<E> head;
private transient Node<E> last;

public LinkedBlockingQueue(int capacity) {
    if (capacity <= 0) throw new IllegalArgumentException();
    this.capacity = capacity;
    last = head = new Node<E>(null);
}

Screen_Shot_2018_01_09_at_11_01_13_PM

2.2 count

  1. count表示当前队列中元素的个数,其使用并发包下的AtomicInteger类来实现原子操作,该类的核心还是cas操作。AtomicInteger类型的count对于队列的线程安全有着至关重要的作用,因为接下来会看到take和put操作是两个独立的锁。
  2. 有兴趣的话,可以看看ArrayBlockingQueue,其只使用了一个锁来保证线程安全,所以它的count没有使用AtomicInteger,而是一个int类型。
/** Current number of elements */
private final AtomicInteger count = new AtomicInteger();

2.3 锁与条件

  1. takeLock以及putLock分别定义了take操作以及put操作锁。
  2. take操作的条件是notEmpty,所以在执行take操作时会先判断当前队列是否还有元素可以take,如果没有那么就要执行notEmpty.await() 让take线程阻塞。
  3. put操作的条件是notFull,所以在执行put操作时会先判断当前队列是否还有空间可以put元素,如果没有剩余空间那么就要执行notFull.await()
  4. 成功 take以后,会判断一下take之前队列是不是满的,如果是,说明可能会有put线程被阻塞了,所以会调用signalNotFull() 方法去唤醒那些put线程。
  5. 成功put以后,会判断一下在put之前队列是不是空的,如果是,说明可能会有take线程是阻塞的,所以会调用signalNotEmpty() 去唤醒那些take线程。
  6. 4、5两步设计的相当好,先判断是不是Empty或者Full,然后再去调用唤醒方法,避免无谓的唤醒操作。但是这一步在理解的时候有点费解。
/** Lock held by take, poll, etc */
private final ReentrantLock takeLock = new ReentrantLock();

/** Wait queue for waiting takes */
private final Condition notEmpty = takeLock.newCondition();

/** Lock held by put, offer, etc */
private final ReentrantLock putLock = new ReentrantLock();

/** Wait queue for waiting puts */
private final Condition notFull = putLock.newCondition();

2.4 put

  1. 在对尾插入一个指定的元素e,如果没有空间,线程将会等待。
  2. e不允许为空,该队列不存储null元素,否则抛NullPointerException
  3. 局部变量c初始值为-1,其存储当前队列的元素个数,准确地说是put操作之前的元素个数,因为c = count.getAndIncrement(),而getAndIncrement()返回的是previous值(c的值很重要,不然无法理解唤醒操作)。
  4. putLock.lockInterruptibly() ,获取put lock。
  5. 如果 count.get() == capacity ,即队列已经没有剩余空间了,那么条件为not Full 的操作,即put操作线程要执行语句notFull.await()进入等待;否则正常入队。
  6. 正常入队后,count加1,c获取的是入队前的值(这点需要注意)。
  7. c + 1 < capacity 表示如果当前队列的元素个数小于capacity,那么就可以唤醒一下那些条件为not Empty 的put操作线程(当然,此时不一定会有等待线程)。
  8. 如果(c == 0),即put之前队列是空的,那么就有可能有take操作线程在等待,所以执行signalNotEmpty(),该方法会先获取take锁,然后唤醒等待的take线程来take。
public void put(E e) throws InterruptedException {
    if (e == null) throw new NullPointerException();
    int c = -1;
    Node<E> node = new Node<E>(e);
    final ReentrantLock putLock = this.putLock;
    final AtomicInteger count = this.count;
    putLock.lockInterruptibly();
    try {
        while (count.get() == capacity) {
            notFull.await();
        }
        enqueue(node);
        c = count.getAndIncrement();
        if (c + 1 < capacity)
            notFull.signal();
    } finally {
        putLock.unlock();
    }
    if (c == 0)
        signalNotEmpty();
}
private void signalNotEmpty() {
    final ReentrantLock takeLock = this.takeLock;
    takeLock.lock();
    try {
        notEmpty.signal();
    } finally {
        takeLock.unlock();
    }
}

2.5 take

  1. take和put原理上是相同的,take是从first节点开始出队,注意区分head;如果队列中没有节点,那么take线程就需要等待。
  2. 局部变量c初始值为-1,其存储当前队列的元素个数,准确地说是take操作之前的元素个数。
  3. takeLock.lockInterruptibly() 获取take lock。
  4. count.get() == 0,如果当前队列中没有元素,那么条件为not Empty 的take操作线程将要等待;否则正常出队。
  5. c > 1 表示take以前队列中至少是有2个元素,那么可以唤醒其它在等待的take线程,操作为notEmpty.signal()
  6. c == capacity 表示take操作前队列是满的,那么就有可能有put线程在等待着,因此执行signalNotFull(),该方法首先获取put锁,然后唤醒那些可能在等待的put线程。
public E take() throws InterruptedException {
    E x;
    int c = -1;
    final AtomicInteger count = this.count;
    final ReentrantLock takeLock = this.takeLock;
    takeLock.lockInterruptibly();
    try {
        while (count.get() == 0) {
            notEmpty.await();
        }
        x = dequeue();
        c = count.getAndDecrement();
        if (c > 1)
            notEmpty.signal();
    } finally {
        takeLock.unlock();
    }
    if (c == capacity)
        signalNotFull();
    return x;
}
private void signalNotFull() {
    final ReentrantLock putLock = this.putLock;
    putLock.lock();
    try {
        notFull.signal();
    } finally {
        putLock.unlock();
    }
}

2.6 offer

  1. 重载的两个offer方法本质上也是put操作,但在操作上略有不同。
  2. 一个offer方法提供了线程等待时间,其先进入条件的等待队列等待。
  3. 另一个offer方法是能入队就入队,不能就返回false,不等了。
  4. 这两种offer方法可根据实际需要来适当选择。
public boolean offer(E e, long timeout, TimeUnit unit)
        throws InterruptedException {

    if (e == null) throw new NullPointerException();
    long nanos = unit.toNanos(timeout);
    int c = -1;
    final ReentrantLock putLock = this.putLock;
    final AtomicInteger count = this.count;
    putLock.lockInterruptibly();
    try {
        while (count.get() == capacity) {
            if (nanos <= 0)
                return false;
            nanos = notFull.awaitNanos(nanos);
        }
        enqueue(new Node<E>(e));
        c = count.getAndIncrement();
        if (c + 1 < capacity)
            notFull.signal();
    } finally {
        putLock.unlock();
    }
    if (c == 0)
        signalNotEmpty();
    return true;
}
public boolean offer(E e) {
    if (e == null) throw new NullPointerException();
    final AtomicInteger count = this.count;
    if (count.get() == capacity)
        return false;
    int c = -1;
    Node<E> node = new Node<E>(e);
    final ReentrantLock putLock = this.putLock;
    putLock.lock();
    try {
        if (count.get() < capacity) {
            enqueue(node);
            c = count.getAndIncrement();
            if (c + 1 < capacity)
                notFull.signal();
        }
    } finally {
        putLock.unlock();
    }
    if (c == 0)
        signalNotEmpty();
    return c >= 0;
}

2.7 poll与peek

  1. 两个poll方法也是take的改版,一个是超时等待,一个干脆就不等了,有就取,没有就算了。两种方法可在实际应用中按需选用。
  2. peek方法和take方法不同的是没有出队,只是"看看"首元素first.item。
public E poll(long timeout, TimeUnit unit) throws InterruptedException {
    E x = null;
    int c = -1;
    long nanos = unit.toNanos(timeout);
    final AtomicInteger count = this.count;
    final ReentrantLock takeLock = this.takeLock;
    takeLock.lockInterruptibly();
    try {
        while (count.get() == 0) {
            if (nanos <= 0)
                return null;
            nanos = notEmpty.awaitNanos(nanos);
        }
        x = dequeue();
        c = count.getAndDecrement();
        if (c > 1)
            notEmpty.signal();
    } finally {
        takeLock.unlock();
    }
    if (c == capacity)
        signalNotFull();
    return x;
}
public E poll() {
    final AtomicInteger count = this.count;
    if (count.get() == 0)
        return null;
    E x = null;
    int c = -1;
    final ReentrantLock takeLock = this.takeLock;
    takeLock.lock();
    try {
        if (count.get() > 0) {
            x = dequeue();
            c = count.getAndDecrement();
            if (c > 1)
                notEmpty.signal();
        }
    } finally {
        takeLock.unlock();
    }
    if (c == capacity)
        signalNotFull();
    return x;
}
public E peek() {
    if (count.get() == 0)
        return null;
    final ReentrantLock takeLock = this.takeLock;
    takeLock.lock();
    try {
        Node<E> first = head.next;
        if (first == null)
            return null;
        else
            return first.item;
    } finally {
        takeLock.unlock();
    }
}

3、总结

  1. LinkedBlockingQueue体现了生产者/消费者模型,借助wait/notify机制,可实现take、put操作线程的等待与唤醒。
  2. AtomicInteger类型的count(队列中当前元素个数)以及双锁机制(take和put锁)共同使得LinkedBlockingQueue是线程安全的。实现方式值得学习和体会。

能力与时间有限(当然主要是能力),错漏之处还请评论指正。

目录
相关文章
|
1月前
|
XML Java 编译器
Java注解的底层源码剖析与技术认识
Java注解(Annotation)是Java 5引入的一种新特性,它提供了一种在代码中添加元数据(Metadata)的方式。注解本身并不是代码的一部分,它们不会直接影响代码的执行,但可以在编译、类加载和运行时被读取和处理。注解为开发者提供了一种以非侵入性的方式为代码提供额外信息的手段,这些信息可以用于生成文档、编译时检查、运行时处理等。
71 7
|
2月前
|
数据采集 人工智能 Java
Java产科专科电子病历系统源码
产科专科电子病历系统,全结构化设计,实现产科专科电子病历与院内HIS、LIS、PACS信息系统、区域妇幼信息平台的三级互联互通,系统由门诊系统、住院系统、数据统计模块三部分组成,它管理了孕妇从怀孕开始到生产结束42天一系列医院保健服务信息。
46 4
|
2月前
|
监控 Java 应用服务中间件
高级java面试---spring.factories文件的解析源码API机制
【11月更文挑战第20天】Spring Boot是一个用于快速构建基于Spring框架的应用程序的开源框架。它通过自动配置、起步依赖和内嵌服务器等特性,极大地简化了Spring应用的开发和部署过程。本文将深入探讨Spring Boot的背景历史、业务场景、功能点以及底层原理,并通过Java代码手写模拟Spring Boot的启动过程,特别是spring.factories文件的解析源码API机制。
110 2
|
13天前
|
监控 JavaScript 数据可视化
建筑施工一体化信息管理平台源码,支持微服务架构,采用Java、Spring Cloud、Vue等技术开发。
智慧工地云平台是专为建筑施工领域打造的一体化信息管理平台,利用大数据、云计算、物联网等技术,实现施工区域各系统数据汇总与可视化管理。平台涵盖人员、设备、物料、环境等关键因素的实时监控与数据分析,提供远程指挥、决策支持等功能,提升工作效率,促进产业信息化发展。系统由PC端、APP移动端及项目、监管、数据屏三大平台组成,支持微服务架构,采用Java、Spring Cloud、Vue等技术开发。
|
1月前
|
存储 JavaScript 前端开发
基于 SpringBoot 和 Vue 开发校园点餐订餐外卖跑腿Java源码
一个非常实用的校园外卖系统,基于 SpringBoot 和 Vue 的开发。这一系统源于黑马的外卖案例项目 经过站长的进一步改进和优化,提供了更丰富的功能和更高的可用性。 这个项目的架构设计非常有趣。虽然它采用了SpringBoot和Vue的组合,但并不是一个完全分离的项目。 前端视图通过JS的方式引入了Vue和Element UI,既能利用Vue的快速开发优势,
130 13
|
2月前
|
缓存 监控 Java
Java线程池提交任务流程底层源码与源码解析
【11月更文挑战第30天】嘿,各位技术爱好者们,今天咱们来聊聊Java线程池提交任务的底层源码与源码解析。作为一个资深的Java开发者,我相信你一定对线程池并不陌生。线程池作为并发编程中的一大利器,其重要性不言而喻。今天,我将以对话的方式,带你一步步深入线程池的奥秘,从概述到功能点,再到背景和业务点,最后到底层原理和示例,让你对线程池有一个全新的认识。
65 12
|
1月前
|
JavaScript 安全 Java
java版药品不良反应智能监测系统源码,采用SpringBoot、Vue、MySQL技术开发
基于B/S架构,采用Java、SpringBoot、Vue、MySQL等技术自主研发的ADR智能监测系统,适用于三甲医院,支持二次开发。该系统能自动监测全院患者药物不良反应,通过移动端和PC端实时反馈,提升用药安全。系统涵盖规则管理、监测报告、系统管理三大模块,确保精准、高效地处理ADR事件。
|
2月前
|
人工智能 监控 数据可视化
Java智慧工地信息管理平台源码 智慧工地信息化解决方案SaaS源码 支持二次开发
智慧工地系统是依托物联网、互联网、AI、可视化建立的大数据管理平台,是一种全新的管理模式,能够实现劳务管理、安全施工、绿色施工的智能化和互联网化。围绕施工现场管理的人、机、料、法、环五大维度,以及施工过程管理的进度、质量、安全三大体系为基础应用,实现全面高效的工程管理需求,满足工地多角色、多视角的有效监管,实现工程建设管理的降本增效,为监管平台提供数据支撑。
59 3
|
2月前
|
运维 自然语言处理 供应链
Java云HIS医院管理系统源码 病案管理、医保业务、门诊、住院、电子病历编辑器
通过门诊的申请,或者直接住院登记,通过”护士工作站“分配患者,完成后,进入医生患者列表,医生对应开具”长期医嘱“和”临时医嘱“,并在电子病历中,记录病情。病人出院时,停止长期医嘱,开具出院医嘱。进入出院审核,审核医嘱与住院通过后,病人结清缴费,完成出院。
148 4
|
1月前
|
人工智能 移动开发 安全
家政上门系统用户端、阿姨端源码,java家政管理平台源码
家政上门系统基于互联网技术,整合大数据分析、AI算法和现代通信技术,提供便捷高效的家政服务。涵盖保洁、月嫂、烹饪等多元化服务,支持多终端访问,具备智能匹配、在线支付、订单管理等功能,确保服务透明、安全,适用于家庭生活的各种需求场景,推动家政市场规范化发展。