一、继承实现关系图
二、底层数据存储结构
2.1 重要属性说明
public class LinkedTransferQueue<E> extends AbstractQueue<E> implements TransferQueue<E>, java.io.Serializable { static final class Node implements ForkJoinPool.ManagedBlocker { final boolean isData; // false if this is a request node volatile Object item; // initially non-null if isData; CASed to match volatile Node next; volatile Thread waiter; ... } // 头节点 transient volatile Node head; // 尾节点 private transient volatile Node tail; // 放取元素的几种方式: // 立即返回,用于非超时的poll()和tryTransfer()方法中 private static final int NOW = 0; // for untimed poll, tryTransfer // 异步,不会阻塞,用于放元素时,因为内部使用无界单链表存储元素,不会阻塞放元素的过程 private static final int ASYNC = 1; // for offer, put, add // 同步,调用的时候如果没有匹配到会阻塞直到匹配到为止 private static final int SYNC = 2; // for transfer, take // 超时,用于有超时的poll()和tryTransfer()方法中 private static final int TIMED = 3; // for timed poll, tryTransfer }
2.2 重要方法解析xfer
// 是否需要入队及阻塞有四种情况: // NOW,立即返回,没有匹配到立即返回,不做入队操作 private static final int NOW = 0; // for untimed poll, tryTransfer // ASYNC,异步,元素入队但当前线程不会阻塞(相当于无界LinkedBlockingQueue的元素入队) private static final int ASYNC = 1; // for offer, put, add // SYNC,同步,元素入队后当前线程阻塞,等待被匹配到 private static final int SYNC = 2; // for transfer, take // TIMED,有超时,元素入队后等待一段时间被匹配,时间到了还没匹配到就返回元素本身 private static final int TIMED = 3; // for timed poll, tryTransfer private E xfer(E e, boolean haveData, int how, long nanos) { // 不允许放入空元素 if (haveData && (e == null)) throw new NullPointerException(); Node s = null; // the node to append, if needed // 外层循环,自旋,失败就重试 retry: for (;;) { // restart on append race // 下面这个for循环用于控制匹配的过程 // 同一时刻队列中只会存储一种类型的节点 // 从头节点开始尝试匹配,如果头节点被其它线程先一步匹配了 // 就再尝试其下一个,直到匹配到为止,或者到队列中没有元素为止 for (Node h = head, p = h; p != null;) { // find & match first node // p节点的模式 boolean isData = p.isData; // p节点的值 Object item = p.item; // p没有被匹配到 if (item != p && (item != null) == isData) { // unmatched // 如果两者模式一样,则不能匹配,跳出循环后尝试入队 if (isData == haveData) // can't match break; // 如果两者模式不一样,则尝试匹配 // 把p的值设置为e(如果是取元素则e是null,如果是放元素则e是元素值) if (p.casItem(item, e)) { // match // 匹配成功 // for里面的逻辑比较复杂,用于控制多线程同时放取元素时出现竞争的情况的 for (Node q = p; q != h;) { // 进入到这里可能是头节点已经被匹配,然后p会变成h的下一个节点 Node n = q.next; // update by 2 unless singleton // 如果head还没变,就把它更新成新的节点 // 并把它删除(forgetNext()会把它的next设为自己,也就是从单链表中删除了) // 这时为什么要把head设为n呢?因为到这里了,肯定head本身已经被匹配掉了 // 而上面的p.casItem()又成功了,说明p也被当前这个元素给匹配掉了 // 所以需要把它们俩都出队列,让其它线程可以从真正的头开始,不用重复检查了 if (head == h && casHead(h, n == null ? q : n)) { h.forgetNext(); break; } // advance and retry // 如果新的头节点为空,或者其next为空,或者其next未匹配,就重试 if ((h = head) == null || (q = h.next) == null || !q.isMatched()) break; // unless slack < 2 } // 唤醒p中等待的线程 LockSupport.unpark(p.waiter); // 并返回匹配到的元素 return LinkedTransferQueue.<E>cast(item); } } // p已经被匹配了或者尝试匹配的时候失败了 // 也就是其它线程先一步匹配了p // 这时候又分两种情况,p的next还没来得及修改,p的next指向了自己 // 如果p的next已经指向了自己,就重新取head重试,否则就取其next重试 Node n = p.next; p = (p != n) ? n : (h = head); // Use head if p offlist } // 到这里肯定是队列中存储的节点类型和自己一样 // 或者队列中没有元素了 // 就入队(不管放元素还是取元素都得入队) // 入队又分成四种情况: // NOW,立即返回,没有匹配到立即返回,不做入队操作 // ASYNC,异步,元素入队但当前线程不会阻塞(相当于无界LinkedBlockingQueue的元素入队) // SYNC,同步,元素入队后当前线程阻塞,等待被匹配到 // TIMED,有超时,元素入队后等待一段时间被匹配,时间到了还没匹配到就返回元素本身 // 如果不是立即返回 if (how != NOW) { // No matches available // 新建s节点 if (s == null) s = new Node(e, haveData); // 尝试入队 Node pred = tryAppend(s, haveData); // 入队失败,重试 if (pred == null) continue retry; // lost race vs opposite mode // 如果不是异步(同步或者有超时) // 就等待被匹配 if (how != ASYNC) return awaitMatch(s, pred, e, (how == TIMED), nanos); } return e; // not waiting } }
大致的逻辑:
- 消费者取数据,
- 如果队列不为空则直接取走数据,并唤醒存放该数据的生产者线程
- 如果队列为空,消费者线程会生成一个占位虚拟节点,节点元素信息为null,并在这个节点上等待
- 生产者生产数据
- 请求添加数据,从单向链表的head节点开始遍历,若发现节点为取数据请求类型(isData==false, item == null),生产者线程直接将元素赋予这个节点,并唤醒该节点等待的消费者线程,消费者取走元素; 若未发现取数据请求节点,则创建一个节点并添加到队列的末尾,然后阻塞等待,直到有消费者来取元素。
三、特点及优缺点
- 可以看作LinkedBlockingQueue、SynchronousQueue(公平模式)、ConcurrentLinkedQueue三者的集合体
- 无边界以及可并发访问的队列,不支持带初始容量的队列
- 同时支持数据的优先级排序
- 线程安全的、适用于高并发场景
- 不管是生产还是消费都有可能入队,
- 从head开始比较,如果类型一样就入队,类型不一样就出队
- 是否入队和阻塞的四种模式:NOW、ASYNC、SYNC 和 TIMED
- 队列中元素的添加和移除必须由生产者和消费者线程共同完成,一方阻塞等待另一方的操作
- 不使用比较重的锁,是通过 自旋+CAS来实现
- 入队后先自旋再调用LockSupport.park()或LockSupport.parkNanos阻塞
四、作用及应用场景
- 线程池:可以使线程池更稳定,更高效
- 生产者消费者模式
- 多线程调度:某个线程需要等待其他线程结束后才能执行,那么就可以使用LinkedTransferQueue来进行线程间通信