Java线程池ThreadPoolExcutor源码解读详解07-阻塞队列之LinkedTransferQueue

本文涉及的产品
可观测监控 Prometheus 版,每月50GB免费额度
性能测试 PTS,5000VUM额度
Serverless 应用引擎免费试用套餐包,4320000 CU,有效期3个月
简介: `LinkedTransferQueue`是一个基于链表结构的无界并发队列,实现了`TransferQueue`接口,它使用预占模式来协调生产者和消费者的交互。队列中的元素分为数据节点(isData为true)和请求节点(isData为false)。在不同情况下,队列提供四种操作模式:NOW(立即返回,不阻塞),ASYNC(异步,不阻塞,但后续线程可能阻塞),SYNC(同步,阻塞直到匹配),TIMED(超时等待,可能返回)。`xfer`方法是队列的核心,它处理元素的转移过程。方法内部通过循环和CAS(Compare And Swap)操作来确保线程安全,同时避免锁的使用以提高性能。当找到匹



一、继承实现关系图

image.gif 1711819515575.png

二、底层数据存储结构

2.1 重要属性说明

public class LinkedTransferQueue<E> extends AbstractQueue<E>
    implements TransferQueue<E>, java.io.Serializable {
    static final class Node implements ForkJoinPool.ManagedBlocker {
        final boolean isData;   // false if this is a request node
        volatile Object item;   // initially non-null if isData; CASed to match
        volatile Node next;
        volatile Thread waiter; 
        ...
    }
    // 头节点
    transient volatile Node head;
    // 尾节点
    private transient volatile Node tail;
    // 放取元素的几种方式:
    // 立即返回,用于非超时的poll()和tryTransfer()方法中
    private static final int NOW   = 0; // for untimed poll, tryTransfer
    // 异步,不会阻塞,用于放元素时,因为内部使用无界单链表存储元素,不会阻塞放元素的过程
    private static final int ASYNC = 1; // for offer, put, add
    // 同步,调用的时候如果没有匹配到会阻塞直到匹配到为止
    private static final int SYNC  = 2; // for transfer, take
    // 超时,用于有超时的poll()和tryTransfer()方法中
    private static final int TIMED = 3; // for timed poll, tryTransfer
}

image.gif

2.2 重要方法解析xfer

// 是否需要入队及阻塞有四种情况:
// NOW,立即返回,没有匹配到立即返回,不做入队操作
private static final int NOW   = 0; // for untimed poll, tryTransfer
// ASYNC,异步,元素入队但当前线程不会阻塞(相当于无界LinkedBlockingQueue的元素入队)
private static final int ASYNC = 1; // for offer, put, add
// SYNC,同步,元素入队后当前线程阻塞,等待被匹配到
private static final int SYNC  = 2; // for transfer, take
// TIMED,有超时,元素入队后等待一段时间被匹配,时间到了还没匹配到就返回元素本身
private static final int TIMED = 3; // for timed poll, tryTransfer
private E xfer(E e, boolean haveData, int how, long nanos) {
    // 不允许放入空元素
    if (haveData && (e == null)) throw new NullPointerException();
    Node s = null;
    // the node to append, if needed
    // 外层循环,自旋,失败就重试
    retry:
    for (;;) { // restart on append race
        // 下面这个for循环用于控制匹配的过程
        // 同一时刻队列中只会存储一种类型的节点
        // 从头节点开始尝试匹配,如果头节点被其它线程先一步匹配了
        // 就再尝试其下一个,直到匹配到为止,或者到队列中没有元素为止
    for (Node h = head, p = h; p != null;) { // find & match first node
        // p节点的模式
        boolean isData = p.isData;
        // p节点的值
        Object item = p.item;
        // p没有被匹配到
        if (item != p && (item != null) == isData) {
            // unmatched
            // 如果两者模式一样,则不能匹配,跳出循环后尝试入队
            if (isData == haveData) // can't match
                break;
            // 如果两者模式不一样,则尝试匹配
            // 把p的值设置为e(如果是取元素则e是null,如果是放元素则e是元素值)
            if (p.casItem(item, e)) { // match
                // 匹配成功
                // for里面的逻辑比较复杂,用于控制多线程同时放取元素时出现竞争的情况的
                for (Node q = p; q != h;) {
                    // 进入到这里可能是头节点已经被匹配,然后p会变成h的下一个节点
                    Node n = q.next;  // update by 2 unless singleton
                    // 如果head还没变,就把它更新成新的节点
                    // 并把它删除(forgetNext()会把它的next设为自己,也就是从单链表中删除了)
                    // 这时为什么要把head设为n呢?因为到这里了,肯定head本身已经被匹配掉了
                    // 而上面的p.casItem()又成功了,说明p也被当前这个元素给匹配掉了
                    // 所以需要把它们俩都出队列,让其它线程可以从真正的头开始,不用重复检查了
                    if (head == h && casHead(h, n == null ? q : n)) {
                        h.forgetNext();
                        break;
                    }
                    // advance and retry
                    // 如果新的头节点为空,或者其next为空,或者其next未匹配,就重试
                    if ((h = head)   == null || (q = h.next) == null || !q.isMatched())
                        break; // unless slack < 2
                }
                // 唤醒p中等待的线程
                LockSupport.unpark(p.waiter);
                // 并返回匹配到的元素
                return LinkedTransferQueue.<E>cast(item);
            }
        }
        // p已经被匹配了或者尝试匹配的时候失败了
        // 也就是其它线程先一步匹配了p
        // 这时候又分两种情况,p的next还没来得及修改,p的next指向了自己
        // 如果p的next已经指向了自己,就重新取head重试,否则就取其next重试
        Node n = p.next;
        p = (p != n) ? n : (h = head); // Use head if p offlist
    }
        // 到这里肯定是队列中存储的节点类型和自己一样
        // 或者队列中没有元素了
        // 就入队(不管放元素还是取元素都得入队)
        // 入队又分成四种情况:
        // NOW,立即返回,没有匹配到立即返回,不做入队操作
        // ASYNC,异步,元素入队但当前线程不会阻塞(相当于无界LinkedBlockingQueue的元素入队)
        // SYNC,同步,元素入队后当前线程阻塞,等待被匹配到
        // TIMED,有超时,元素入队后等待一段时间被匹配,时间到了还没匹配到就返回元素本身
        // 如果不是立即返回
        if (how != NOW) { // No matches available
            // 新建s节点
            if (s == null)
                s = new Node(e, haveData);
            // 尝试入队
            Node pred = tryAppend(s, haveData);
            // 入队失败,重试
            if (pred == null)
                continue retry; // lost race vs opposite mode
            // 如果不是异步(同步或者有超时)
            // 就等待被匹配
            if (how != ASYNC)
                return awaitMatch(s, pred, e, (how == TIMED), nanos);
        }
        return e; // not waiting
    }
}

image.gif

大致的逻辑:

  • 消费者取数据,
  • 如果队列不为空则直接取走数据,并唤醒存放该数据的生产者线程
  • 如果队列为空,消费者线程会生成一个占位虚拟节点,节点元素信息为null,并在这个节点上等待
  • 生产者生产数据
  • 请求添加数据,从单向链表的head节点开始遍历,若发现节点为取数据请求类型(isData==false, item == null),生产者线程直接将元素赋予这个节点,并唤醒该节点等待的消费者线程,消费者取走元素; 若未发现取数据请求节点,则创建一个节点并添加到队列的末尾,然后阻塞等待,直到有消费者来取元素。

三、特点及优缺点

  • 可以看作LinkedBlockingQueue、SynchronousQueue(公平模式)、ConcurrentLinkedQueue三者的集合体
  • 无边界以及可并发访问的队列,不支持带初始容量的队列
  • 同时支持数据的优先级排序
  • 线程安全的、适用于高并发场景
  • 不管是生产还是消费都有可能入队,
  • 从head开始比较,如果类型一样就入队,类型不一样就出队
  • 是否入队和阻塞的四种模式:NOW、ASYNC、SYNC 和 TIMED
  • 队列中元素的添加和移除必须由生产者和消费者线程共同完成,一方阻塞等待另一方的操作
  • 不使用比较重的锁,是通过 自旋+CAS来实现
  • 入队后先自旋再调用LockSupport.park()或LockSupport.parkNanos阻塞

四、作用及应用场景

  • 线程池:可以使线程池更稳定,更高效
  • 生产者消费者模式
  • 多线程调度:某个线程需要等待其他线程结束后才能执行,那么就可以使用LinkedTransferQueue来进行线程间通信
相关文章
|
1月前
|
Java Apache Maven
Java百项管理之新闻管理系统 熟悉java语法——大学生作业 有源码!!!可运行!!!
文章提供了使用Apache POI库在Java中创建和读取Excel文件的详细代码示例,包括写入数据到Excel和从Excel读取数据的方法。
56 6
Java百项管理之新闻管理系统 熟悉java语法——大学生作业 有源码!!!可运行!!!
|
8天前
|
监控 安全 Java
在 Java 中使用线程池监控以及动态调整线程池时需要注意什么?
【10月更文挑战第22天】在进行线程池的监控和动态调整时,要综合考虑多方面的因素,谨慎操作,以确保线程池能够高效、稳定地运行,满足业务的需求。
79 38
|
1天前
|
运维 自然语言处理 供应链
Java云HIS医院管理系统源码 病案管理、医保业务、门诊、住院、电子病历编辑器
通过门诊的申请,或者直接住院登记,通过”护士工作站“分配患者,完成后,进入医生患者列表,医生对应开具”长期医嘱“和”临时医嘱“,并在电子病历中,记录病情。病人出院时,停止长期医嘱,开具出院医嘱。进入出院审核,审核医嘱与住院通过后,病人结清缴费,完成出院。
13 3
|
8天前
|
Prometheus 监控 Cloud Native
JAVA线程池监控以及动态调整线程池
【10月更文挑战第22天】在 Java 中,线程池的监控和动态调整是非常重要的,它可以帮助我们更好地管理系统资源,提高应用的性能和稳定性。
36 4
|
6天前
|
JavaScript Java 项目管理
Java毕设学习 基于SpringBoot + Vue 的医院管理系统 持续给大家寻找Java毕设学习项目(附源码)
基于SpringBoot + Vue的医院管理系统,涵盖医院、患者、挂号、药物、检查、病床、排班管理和数据分析等功能。开发工具为IDEA和HBuilder X,环境需配置jdk8、Node.js14、MySQL8。文末提供源码下载链接。
|
8天前
|
Prometheus 监控 Cloud Native
在 Java 中,如何使用线程池监控以及动态调整线程池?
【10月更文挑战第22天】线程池的监控和动态调整是一项重要的任务,需要我们结合具体的应用场景和需求,选择合适的方法和策略,以确保线程池始终处于最优状态,提高系统的性能和稳定性。
58 2
|
9天前
|
移动开发 前端开发 JavaScript
java家政系统成品源码的关键特点和技术应用
家政系统成品源码是已开发完成的家政服务管理软件,支持用户注册、登录、管理个人资料,家政人员信息管理,服务项目分类,订单与预约管理,支付集成,评价与反馈,地图定位等功能。适用于各种规模的家政服务公司,采用uniapp、SpringBoot、MySQL等技术栈,确保高效管理和优质用户体验。
|
11天前
|
缓存 监控 Java
java中线程池的使用
java中线程池的使用
|
1月前
|
JSON 前端开发 Java
震惊!图文并茂——Java后端如何响应不同格式的数据给前端(带源码)
文章介绍了Java后端如何使用Spring Boot框架响应不同格式的数据给前端,包括返回静态页面、数据、HTML代码片段、JSON对象、设置状态码和响应的Header。
112 1
震惊!图文并茂——Java后端如何响应不同格式的数据给前端(带源码)
|
1月前
|
存储 前端开发 Java
Java后端如何进行文件上传和下载 —— 本地版(文末配绝对能用的源码,超详细,超好用,一看就懂,博主在线解答) 文件如何预览和下载?(超简单教程)
本文详细介绍了在Java后端进行文件上传和下载的实现方法,包括文件上传保存到本地的完整流程、文件下载的代码实现,以及如何处理文件预览、下载大小限制和运行失败的问题,并提供了完整的代码示例。
236 1