硬核干货:5W字17张高清图理解同步器框架AbstractQueuedSynchronizer(下)

简介: Doug Lea大神编写AQS是有严谨的理论基础的,他的个人博客上有一篇论文《The java.util.concurrent Synchronizer Framewor》,可以在互联网找到相应的译文《JUC同步器框架》,如果想要深入研究AQS必须要理解一下该论文的内容,然后结合论文内容详细分析一下AQS的源码实现。本文在阅读AQS源码的时候选用的JDK版本是JDK11。

独占模式与共享模式



前文提及到,同步器涉及到独占模型和共享模式。下面就针对这两种模式详细分析一下AQS的具体实现源码。


独占模式


AQS同步器如果使用独占(EXCLUSIVE)模式,那么意味着同一个时刻,只有唯一的一个节点所在线程获取(acuqire)原子状态status成功,此时该线程可以从阻塞状态解除继续运行,而同步等待队列中的其他节点持有的线程依然处于阻塞状态。独占模式同步器的功能主要由下面的四个方法提供:


  • acquire(int arg):申请获取arg个原子状态status(申请成功可以简单理解为status = status - arg)。
  • acquireInterruptibly(int arg):申请获取arg个原子状态status,响应线程中断。
  • tryAcquireNanos(int arg, long nanosTimeout):申请获取arg个原子状态status,带超时的版本。
  • release(int arg):释放arg个原子状态status(释放成功可以简单理解为status = status + arg)。


独占模式下,AQS同步器实例初始化时候传入的status值,可以简单理解为"允许申请的资源数量的上限值",下面的acquire类型的方法暂时称为"获取资源",而release方法暂时称为"释放资源"。接着我们分析前面提到的四个方法的源码,先看acquire(int arg)


public final void acquire(int arg) {
    // 获取资源成功或者新增一个独占类型节点到同步等待队列成功则直接返回,否则中断当前线程
    if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        selfInterrupt();
}
// 此方法必须又子类覆盖,用于决定是否获取资源成功
protected boolean tryAcquire(int arg) {
    throw new UnsupportedOperationException();
}
// 中断当前线程
static void selfInterrupt() {
    Thread.currentThread().interrupt();
}
// 不可中断的独占模式下,同步等待队列中的线程获取资源的方法
final boolean acquireQueued(final Node node, int arg) {
    boolean interrupted = false;
    try {
        for (;;) {
            // 获取新入队节点的前驱节点
            final Node p = node.predecessor();
            // 前驱节点为头节点并且尝试获取资源成功,也就是每一轮循环都会调用tryAcquire尝试获取资源,除非阻塞或者跳出循环
            if (p == head && tryAcquire(arg)) {
                // 设置新入队节点为头节点,原来的节点会从队列中断开
                setHead(node);
                p.next = null; // help GC
                return interrupted;   // <== 注意,这个位置是跳出死循环的唯一位置
            }
            // 判断是否需要阻塞当前获取资源失败的节点中持有的线程
            if (shouldParkAfterFailedAcquire(p, node))
                // 阻塞当前线程,如果被唤醒则返回并清空线程的中断标记
                interrupted |= parkAndCheckInterrupt();
        }
    } catch (Throwable t) {
        cancelAcquire(node);
        if (interrupted)
            selfInterrupt();
        throw t;
    }
}
/**
 * 检查并且更新获取资源失败的节点的状态,返回值决定线程是否需要被阻塞。
 * 这个方法是所有循环获取资源方法中信号控制的主要方法
 */
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
    // 这里记住ws是当前处理节点的前驱节点的等待状态
    int ws = pred.waitStatus;
    if (ws == Node.SIGNAL)
        // 前驱节点状态设置成Node.SIGNAL成功,等待被release调用释放,后继节点可以安全地进入阻塞状态
        return true;
    if (ws > 0) {
        // ws大于0只有一种情况Node.CANCELLED,说明前驱节点已经取消获取资源,
        // 这个时候会把所有这类型取消的前驱节点移除,找到一个非取消的节点重新通过next引用连接当前节点
        do {
           node.prev = pred = pred.prev;
        } while (pred.waitStatus > 0);
        pred.next = node;
    } else {
        // 其他等待状态直接修改前驱节点等待状态为Node.SIGNAL
        pred.compareAndSetWaitStatus(ws, Node.SIGNAL);
    }
    return false;
}
// 阻塞当前线程,获取并且重置线程的中断标记位
private final boolean parkAndCheckInterrupt() {
    // 这个就是阻塞线程的实现,依赖Unsafe的API
    LockSupport.park(this);
    return Thread.interrupted();
}
复制代码


上面的代码虽然看起来能基本理解,但是最好用图推敲一下"空间上的变化":


微信截图_20220513161521.png

微信截图_20220513161528.png


接着分析一下release(int arg)的实现:


// 释放资源
public final boolean release(int arg) {
    // 尝试释放资源
    if (tryRelease(arg)) {
        Node h = head;
        if (h != null && h.waitStatus != 0)
            unparkSuccessor(h);
        return true;
    }
    return false;
}
// 尝试释放资源,独占模式下,尝试通过重新设置status的值从而实现释放资源的功能
// 这个方法必须由子类实现
protected boolean tryRelease(int arg) {
    throw new UnsupportedOperationException();
}
// 解除传入节点(一般是头节点)的第一个后继节点的阻塞状态,当前处理节点的等待状态会被CAS更新为0
private void unparkSuccessor(Node node) {
    int ws = node.waitStatus;
    // 当前处理的节点(一般是头节点)状态小于0则直接CAS更新为0
    if (ws < 0)
        node.compareAndSetWaitStatus(ws, 0);
    Node s = node.next;
    if (s == null || s.waitStatus > 0) {
        s = null;
        // 如果节点的第一个后继节点为null或者等待状态大于0(取消),则从等待队列的尾节点向前遍历,
        // 找到最后一个不为null,并且等待状态小于等于0的节点
        for (Node p = tail; p != node && p != null; p = p.prev)
            if (p.waitStatus <= 0)
                s = p;
    }
    // 解除上面的搜索到的节点的阻塞状态
    if (s != null)
        LockSupport.unpark(s.thread);
}
复制代码


接着用上面的图:


微信截图_20220513161537.png


上面图中thread-2晋升为头节点的第一个后继节点,等待下一个release()释放资源唤醒之就能晋升为头节点,一旦晋升为头节点也就是意味着可以解除阻塞继续运行。接着我们可以看acquire()的响应中断版本和带超时的版本。先看acquireInterruptibly(int arg)


public final void acquireInterruptibly(int arg)
            throws InterruptedException {
    // 获取并且清空线程中断标记位,如果是中断状态则直接抛InterruptedException异常
    if (Thread.interrupted())
        throw new InterruptedException();
    // 如果获取资源失败
    if (!tryAcquire(arg))
        doAcquireInterruptibly(arg);
}
// 独占模式下响应中断的获取资源方法
private void doAcquireInterruptibly(int arg) throws InterruptedException {
    // 基于当前线程新增一个独占的Node节点进入同步等待队列中
    final Node node = addWaiter(Node.EXCLUSIVE);
    try {
        for (;;) {
            final Node p = node.predecessor();
            if (p == head && tryAcquire(arg)) {
                setHead(node);
                p.next = null; // help GC
                return;
            }
            // 获取资源失败进入阻塞状态
            if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt())
                    // 解除阻塞后直接抛出InterruptedException异常
                    throw new InterruptedException();
            }
         } catch (Throwable t) {
            cancelAcquire(node);
            throw t;
    }
}
复制代码


doAcquireInterruptibly(int arg)方法和acquire(int arg)类似,最大的不同点在于阻塞线程解除阻塞后并不是正常继续运行,而是直接抛出InterruptedException异常。最后看tryAcquireNanos(int arg, long nanosTimeout)的实现:


// 独占模式下尝试在指定超时时间内获取资源,响应线程中断
public final boolean tryAcquireNanos(int arg, long nanosTimeout) throws InterruptedException {
    if (Thread.interrupted())
        throw new InterruptedException();
    return tryAcquire(arg) || doAcquireNanos(arg, nanosTimeout);
}
// 独占模式下带超时时间限制的获取资源方法
private boolean doAcquireNanos(int arg, long nanosTimeout) throws InterruptedException {
    // 超时期限小于0纳秒,快速失败
    if (nanosTimeout <= 0L)
        return false;
    // 超时的最终期限是当前系统时钟纳秒+外部指定的nanosTimeout增量
    final long deadline = System.nanoTime() + nanosTimeout;
    final Node node = addWaiter(Node.EXCLUSIVE);
    try {
        for (;;) {
            final Node p = node.predecessor();
            if (p == head && tryAcquire(arg)) {
                setHead(node);
                p.next = null; // help GC
                return true;
            }
            // 计算出剩余的超时时间
            nanosTimeout = deadline - System.nanoTime();
            // 剩余超时时间小于0说明已经超时则取消获取
            if (nanosTimeout <= 0L) {
                cancelAcquire(node);
                return false;
            }
            // 这里会判断剩余超时时间大于1000纳秒的时候才会进行带超时期限的线程阻塞,否则会进入下一轮获取尝试
            if (shouldParkAfterFailedAcquire(p, node) && nanosTimeout > SPIN_FOR_TIMEOUT_THRESHOLD)
                    LockSupport.parkNanos(this, nanosTimeout);
            if (Thread.interrupted())
                throw new InterruptedException();
            }
    } catch (Throwable t) {
        cancelAcquire(node);
        throw t;
    }
}
复制代码


tryAcquireNanos(int arg, long nanosTimeout)其实和doAcquireInterruptibly(int arg)类似,它们都响应线程中断,不过tryAcquireNanos()在获取资源的每一轮循环尝试都会计算剩余可用的超时时间,只有同时满足获取失败需要阻塞并且剩余超时时间大于SPIN_FOR_TIMEOUT_THRESHOLD(1000纳秒)的情况下才会进行阻塞。


独占模式的同步器的一个显著特点就是:头节点的第一个有效(非取消)的后继节点,总是尝试获取资源,一旦获取资源成功就会解除阻塞并且晋升为头节点,原来所在节点会移除出同步等待队列,原来的队列长度就会减少1,然后头结点的第一个有效的后继节点继续开始竞争资源。


微信截图_20220513161546.png


使用独占模式同步器的主要类库有:

  • 可重入锁ReentrantLock
  • 读写锁ReentrantReadWriteLock中的写锁WriteLock


共享模式


共享(SHARED)模式中的"共享"的含义是:同一个时刻,如果有一个节点所在线程获取(acuqire)原子状态status成功,那么它会解除阻塞被唤醒,并且会把唤醒状态传播到所有有效的后继节点(换言之就是唤醒整个同步等待队列中的所有有效的节点)。共享模式同步器的功能主要由下面的四个方法提供:


  • acquireShared(int arg):申请获取arg个原子状态status(申请成功可以简单理解为status = status - arg)。
  • acquireSharedInterruptibly(int arg):申请获取arg个原子状态status,响应线程中断。
  • tryAcquireSharedNanos(int arg, long nanosTimeout):申请获取arg个原子状态status,带超时的版本。
  • releaseShared(int arg):释放arg个原子状态status(释放成功可以简单理解为status = status + arg)。


先看acquireShared(int arg)的源码:


// 共享模式下获取资源
public final void acquireShared(int arg) {
    // 注意tryAcquireShared方法值为整型,只有小于0的时候才会加入同步等待队列
    if (tryAcquireShared(arg) < 0)
        doAcquireShared(arg);
}
// 共享模式下尝试获取资源,此方法需要由子类覆盖
protected int tryAcquireShared(int arg) {
    throw new UnsupportedOperationException();
}
// 共享模式下获取资源和处理同步等待队列的方法
private void doAcquireShared(int arg) {
    // 基于当前线程新建一个标记为共享的新节点
    final Node node = addWaiter(Node.SHARED);
    boolean interrupted = false;
    try {
        for (;;) {
            final Node p = node.predecessor();
            // 如果当前节点的前驱节点是头节点
            if (p == head) {
                // 每一轮循环都会调用tryAcquireShared尝试获取资源,除非阻塞或者跳出循环
                int r = tryAcquireShared(arg);
                if (r >= 0) {  // <= tryAcquireShared方法>=0说明直资源获取成功
                    // 设置头结点,并且传播获取资源成功的状态,这个方法的作用是确保唤醒状态传播到所有的后继节点
                    // 然后任意一个节点晋升为头节点都会唤醒其第一个有效的后继节点,起到一个链式释放和解除阻塞的动作
                    setHeadAndPropagate(node, r);
                    p.next = null; // help GC
                    return;
                }
            }
            // 判断获取资源失败是否需要阻塞,这里会把前驱节点的等待状态CAS更新为Node.SIGNAL
            if (shouldParkAfterFailedAcquire(p, node))
                interrupted |= parkAndCheckInterrupt();
        }
    } catch (Throwable t) {
        cancelAcquire(node);
        throw t;
    } finally {
        if (interrupted)
            selfInterrupt();
    }
}
// 设置同步等待队列的头节点,判断当前处理的节点的后继节点是否共享模式的节点,如果共享模式的节点,
// propagate大于0或者节点的waitStatus为PROPAGATE则进行共享模式下的释放资源
private void setHeadAndPropagate(Node node, int propagate) {
    // h为头节点的中间变量
    Node h = head;
    // 设置当前处理节点为头节点
    setHead(node);
    // 这个判断条件比较复杂:入参propagate大于0 || 头节点为null || 头节点的状态为非取消 || 再次获取头节点为null || 再次获取头节点不为取消
    if (propagate > 0 || h == null || h.waitStatus < 0 || (h = head) == null || h.waitStatus < 0) {
        Node s = node.next;
        // 当前节点(其实已经成为头节点)的第一个后继节点为null或者是共享模式的节点
        if (s == null || s.isShared())
            doReleaseShared();
    }
}
// Release action for shared mode:共享模式下的释放资源动作
private void doReleaseShared() {
    for (;;) {
        Node h = head;
        // 头节点不为null并且不为尾节点
        if (h != null && h != tail) {
            int ws = h.waitStatus;
            // 如果头节点等待状态为SIGNAL(-1)则CAS更新它为0,更新成功后唤醒和解除其后继节点的阻塞
            if (ws == Node.SIGNAL) {
                if (!h.compareAndSetWaitStatus(Node.SIGNAL, 0))
                    continue;
                // 唤醒头节点的后继节点
                unparkSuccessor(h);
            }
            // 如果头节点的等待状态为0,则CAS更新它为PROPAGATE(-3)
            else if (ws == 0 && !h.compareAndSetWaitStatus(0, Node.PROPAGATE))
                continue;
            }
        // 头节点没有变更,则跳出循环
        if (h == head)
            break;
    }
}
复制代码


其实代码的实现和独占模式有很多类似的地方,一个很大的不同点是:共享模式同步器当节点获取资源成功晋升为头节点之后,它会把自身的等待状态通过CAS更新为Node.PROPAGATE,下一个加入等待队列的新节点会把头节点的等待状态值更新回Node.SIGNAL,标记后继节点处于可以被唤醒的状态,如果遇上资源释放,那么这个阻塞的节点就能被唤醒从而解除阻塞。我们还是画图理解一下,先假设tryAcquireShared(int arg)总是返回小于0的值,入队两个阻塞的线程thread-1thread-2,然后进行资源释放确保tryAcquireShared(int arg)总是返回大于0的值:


微信截图_20220513161556.png


看起来和独占模式下的同步等待队列差不多,实际上真正不同的地方在于有节点解除阻塞和晋升为头节点的过程。因此我们可以先看releaseShared(int arg)的源码:


// 共享模式下释放资源
public final boolean releaseShared(int arg) {
    // 尝试释放资源成功则调用前面分析过的doReleaseShared以传播唤醒状态和unpark头节点的后继节点
    if (tryReleaseShared(arg)) {
        doReleaseShared();
        return true;
    }
    return false;
}
// 共享模式下尝试释放资源,必须由子类覆盖
protected boolean tryReleaseShared(int arg) {
    throw new UnsupportedOperationException();
}
复制代码


releaseShared(int arg)就是在tryReleaseShared(int arg)调用返回true的情况下主动调用一次doReleaseShared()从而基于头节点传播唤醒状态和unpark头节点的后继节点。接着之前的图:


微信截图_20220513161605.png


微信截图_20220513161614.png


接着看acquireSharedInterruptibly(int arg)的源码实现:


// 共享模式下获取资源的方法,响应线程中断
public final void acquireSharedInterruptibly(int arg) throws InterruptedException {
    if (Thread.interrupted())
        throw new InterruptedException();
    if (tryAcquireShared(arg) < 0)
        doAcquireSharedInterruptibly(arg);
}
private void doAcquireSharedInterruptibly(int arg) throws InterruptedException {
    final Node node = addWaiter(Node.SHARED);
    try {
        for (;;) {
            final Node p = node.predecessor();
            if (p == head) {
                int r = tryAcquireShared(arg);
                if (r >= 0) {
                    setHeadAndPropagate(node, r);
                    p.next = null; // help GC
                    return;
                }
            }
            // 和非响应线程中断的acquireShared方法类似,不过这里解除阻塞之后直接抛出异常InterruptedException
            if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt())
                throw new InterruptedException();
        }
    } catch (Throwable t) {
        cancelAcquire(node);
        throw t;
    }
}
复制代码


最后看tryAcquireSharedNanos(int arg, long nanosTimeout)的源码实现:


// 共享模式下获取资源的方法,带超时时间版本
public final boolean tryAcquireSharedNanos(int arg, long nanosTimeout) throws InterruptedException {
        if (Thread.interrupted())
            throw new InterruptedException();
        // 注意这里只要tryAcquireShared >= 0或者doAcquireSharedNanos返回true都认为获取资源成功
        return tryAcquireShared(arg) >= 0 || doAcquireSharedNanos(arg, nanosTimeout);
}
private boolean doAcquireSharedNanos(int arg, long nanosTimeout) throws InterruptedException {
    if (nanosTimeout <= 0L)
        return false;
    // 计算超时的最终期限    
    final long deadline = System.nanoTime() + nanosTimeout;
    final Node node = addWaiter(Node.SHARED);
    try {
        for (;;) {
            final Node p = node.predecessor();
            if (p == head) {
                int r = tryAcquireShared(arg);
                if (r >= 0) {
                    setHeadAndPropagate(node, r);
                    p.next = null; // help GC
                    return true;
                }
            }
            //重新计算剩余的超时时间 
            nanosTimeout = deadline - System.nanoTime();
            // 超时的情况下直接取消获取
            if (nanosTimeout <= 0L) {
                cancelAcquire(node);
                return false;
            }
            // 满足阻塞状态并且剩余的超时时间大于阀值1000纳秒则通过LockSupport.parkNanos()阻塞线程
            if (shouldParkAfterFailedAcquire(p, node) && nanosTimeout > SPIN_FOR_TIMEOUT_THRESHOLD)
                LockSupport.parkNanos(this, nanosTimeout);
            // 解除阻塞后判断线程的中断标记并且清空标记位,如果是处于中断状态则抛出InterruptedException 
            if (Thread.interrupted())
                throw new InterruptedException();
        }
    } catch (Throwable t) {
        cancelAcquire(node);
        throw t;
    }
}
复制代码


共享模式的同步器的一个显著特点就是:头节点的第一个有效(非取消)的后继节点,总是尝试获取资源,一旦获取资源成功就会解除阻塞并且晋升为头节点,原来所在节点会移除出同步等待队列,原来的队列长度就会减少1,重新设置头节点的过程会传播唤醒的状态,简单来说就是唤醒一个有效的后继节点,只要一个节点可以晋升为头节点,它的后继节点就能被唤醒,以此类推。节点的唤醒顺序遵循类似于FIFO的原则,通俗说就是先阻塞或者阻塞时间最长则先被唤醒


微信截图_20220513161623.png


使用共享模式同步器的主要类库有:

  • 信号量Semaphore
  • 倒数栅栏CountDownLatch


Condition的实现



Condition实例的建立是在Lock接口的newCondition()方法,它是锁条件等待的实现,基于作用或者语义可以见Condition接口的相关API注释:


Condition是对象监视器锁方法Object#wait()、Object#notify()和Object#notifyAll()的替代实现,对象监视器锁实现锁的时候作用的效果是每个锁对象必须使用多个wait-set(JVM内置的等待队列),通过Object提供的方法和监视器锁结合使用就能达到Lock的实现效果。如果替换synchronized方法和语句并且结合使用Lock和Condition,就能替换并且达到对象监视器锁的效果。


Condition必须固有地绑定在一个Lock的实现类上,也就是要通过Lock的实例建立Condition实例,而且Condition的方法调用使用必须在Lock的"锁定代码块"中,这一点和synchronized关键字以及Object的相关JNI方法使用的情况十分相似。

前文介绍过Condition接口提供的方法以及Condition队列,也就是条件等待队列,通过画图简单介绍了它的队列节点组成。实际上,条件等待队列需要结合同步等待队列使用,这也刚好对应于前面提到的Condition的方法调用使用必须在Lock的锁定代码块中。听起来很懵逼,我们慢慢分析一下ConditionObject的方法源码就能知道具体的原因。


先看ConditionObject#await()方法:


// 退出等待后主动进行中断当前线程
private static final int REINTERRUPT = 1;
// 退出等待后抛出InterruptedException异常
private static final int THROW_IE   = -1;
/** 
 * 可中断的条件等待实现
 * 1、当前线程处于中断状态则抛出InterruptedException
 * 2、保存getState返回的锁状态,并且使用此锁状态调用release释放所有的阻塞线程
 * 3、线程加入等待队列进行阻塞,直到signall或者中断
 * 4、通过保存getState返回的锁状态调用acquire方法
 * 5、第4步中阻塞过程中中断则抛出InterruptedException
 */
public final void await() throws InterruptedException {
    // 如果线程是中断状态则清空中断标记位并且抛出InterruptedException
    if (Thread.interrupted())
        throw new InterruptedException();
    // 当前线程所在的新节点加入条件等待队列
    Node node = addConditionWaiter();
    // 释放当前AQS中的所有资源返回资源的status保存值,也就是基于status的值调用release(status) - 其实这一步是解锁操作
    int savedState = fullyRelease(node);
    // 初始化中断模式
    int interruptMode = 0;
    // 如果节点新建的节点不位于同步队列中(理论上应该是一定不存在),则对节点所在线程进行阻塞,第二轮循环理论上节点一定在同步等待队列中
    while (!isOnSyncQueue(node)) {
        LockSupport.park(this);
        // 处理节点所在线程中断的转换操作
        if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
            break;
    }
    // 节点所在线程被唤醒后,如果节点所在线程没有处于中断状态,则以独占模式进行头节点竞争
    // 注意这里使用的status是前面释放资源时候返回的保存下来的status
    if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
        interruptMode = REINTERRUPT;
    // 下一个等待节点不空,则从等待队列中移除所有取消的等待节点
    if (node.nextWaiter != null) // clean up if cancelled
        unlinkCancelledWaiters();
    // interruptMode不为0则按照中断模式进行不同的处理
    if (interruptMode != 0)
        reportInterruptAfterWait(interruptMode);
}
// 释放当前AQS中的所有资源,其实也就是基于status的值调用release(status)
// 这一步对于锁实现来说,就是一个解锁操作
final int fullyRelease(Node node) {
    try {
        int savedState = getState();
        if (release(savedState))
            return savedState;
        throw new IllegalMonitorStateException();
    } catch (Throwable t) {
        // 释放失败则标记等待状态为取消
        node.waitStatus = Node.CANCELLED;
        throw t;
    }
}
// 传入的节点是否在同步队列中
final boolean isOnSyncQueue(Node node) {
    // 节点等待您状态为CONDITION或者前驱节点为null则返回false
    if (node.waitStatus == Node.CONDITION || node.prev == null)
        return false;
    // 因为等待队列是通过nextWaiter连接,next引用存在说明节点位于同步队列
    if (node.next != null)
        return true;
    // 从同步队列的尾部向前遍历是否存在传入的节点实例
    return findNodeFromTail(node);
}
// 从同步队列的尾部向前遍历是否存在传入的节点实例
private boolean findNodeFromTail(Node node) {
    for (Node p = tail;;) {
        if (p == node)
            return true;
        if (p == null)
            return false;
        p = p.prev;
    }
}
// 这是一个很复杂的判断,用了两个三目表达式,作用是如果新建的等待节点所在线程中断,
// 则把节点的状态由CONDITION更新为0,并且加入到同步等待队列中,返回THROW_IE中断状态,如果加入同步队列失败,返回REINTERRUPT
// 如果新建的等待节点所在线程没有中断,返回0,也就是初始状态的interruptMode
private int checkInterruptWhileWaiting(Node node) {
    return Thread.interrupted() ? (transferAfterCancelledWait(node) ? THROW_IE : REINTERRUPT) : 0;
}
// 节点线程中断取消等待后的转换操作
final boolean transferAfterCancelledWait(Node node) {
    // CAS更新节点的状态由CONDITION更改为0
    if (node.compareAndSetWaitStatus(Node.CONDITION, 0)) {
        // 节点加入同步等待队列
        enq(node);
        return true;
    }
    // 这里尝试自旋,直到节点加入同步等待队列成功
    while (!isOnSyncQueue(node))
        Thread.yield();
    return false;
}
// 等待完毕后报告中断处理,前边的逻辑得到的interruptMode如果为THROW_IE则抛出InterruptedException,如果为REINTERRUPT则中断当前线程
private void reportInterruptAfterWait(int interruptMode) throws InterruptedException {
    if (interruptMode == THROW_IE)
        throw new InterruptedException();
    else if (interruptMode == REINTERRUPT)
        selfInterrupt();
}
复制代码


其实上面的await()逻辑并不复杂,前提是理解了对象监视器锁那套等待和唤醒的机制(由JVM实现,C语言学得好的可以去看下源码),这里只是通过算法和数据结构重新进行了一次实现。await()主要使用了两个队列:同步等待队列和条件等待队列。我们先假设有两个线程thread-1thread-2调用了下面的代码中的process()方法:


ReentrantLock lock = new ReentrantLock();
Condition condition = lock.newCondition();
public void process(){
    try{
        lock.lock();
        condition.await();
        // 省略其他逻辑...
    }finally{
        lock.unlock();
    }
}
复制代码


ReentrantLock使用的是AQS独占模式的实现,因此在调用lock()方法的时候,同步等待队列的一个瞬时快照(假设线程thread-1先加入同步等待队列)可能如下:


微信截图_20220513161632.png


接着,线程thread-1所在节点是头节点的后继节点,获取锁成功,它解除阻塞后可以调用await()方法,这个时候会释放同步等待队列中的所有等待节点,也就是线程thread-2所在的节点也被释放,因此线程thread-2也会调用await()方法:


微信截图_20220513161638.png


只要有线程能够到达await()方法,那么原来的同步器中的同步等待队列就会释放所有阻塞节点,表现为释放锁,然后这些释放掉的节点会加入到条件等待队列中,条件等待队列中的节点也是阻塞的,这个时候只有通过signal()或者signalAll()进行队列元素转移才有机会唤醒阻塞的线程。因此接着看signal()signalAll()的源码实现:


// 从等待队列中移动一个等待时间最长的线程(如果过存在的话)到锁同步等待队列中
public final void signal() {
    // 判断当前线程是否和独占线程一致,其实就是此操作需要在锁代码块中执行
    if (!isHeldExclusively())
        throw new IllegalMonitorStateException();
    Node first = firstWaiter;
    if (first != null)
        doSignal(first);
}
// 基于第一个等待节点进行Signal操作
private void doSignal(Node first) {
    do {
        // 首节点的下一个等待节点为空,说明只剩下一个等待节点
        if ( (firstWaiter = first.nextWaiter) == null)
            lastWaiter = null;
        // 当前处理节点从链表从移除    
        first.nextWaiter = null;
    } while (!transferForSignal(first) && (first = firstWaiter) != null);
}
// 唤醒的转换操作
final boolean transferForSignal(Node node) {
    // CAS更新节点状态由CONDITION到0,更新失败则返回false不唤醒
    if (!node.compareAndSetWaitStatus(Node.CONDITION, 0))
        return false;
    // 节点作为新节点重新加入到同步等待队列
    Node p = enq(node);
    int ws = p.waitStatus;
    // 取消或者更新节点等待状态为SIGNAL的节点需要解除阻塞进行重新同步,这里的操作只针对取消和状态异常的节点
    if (ws > 0 || !p.compareAndSetWaitStatus(ws, Node.SIGNAL))
        LockSupport.unpark(node.thread);
    return true;
}
// 从等待队列中移动所有等待时间最长的线程(如果过存在的话)到锁同步等待队列中
public final void signalAll() {
    if (!isHeldExclusively())
        throw new IllegalMonitorStateException();
    Node first = firstWaiter;
        if (first != null)
        doSignalAll(first);
}
// 基于第一个等待节点进行SignalAll操作
private void doSignalAll(Node first) {
    // 置空lastWaiter和firstWaiter
    lastWaiter = firstWaiter = null;
    do {
        // 获取下一个等待节点
        Node next = first.nextWaiter;
        // 当前处理节点从链表从移除
        first.nextWaiter = null;
        // 处理当前节点
        transferForSignal(first);
        // 更新中间引用
        first = next;
    } while (first != null);
}
复制代码


其实signal()或者signalAll()会对取消的节点或者短暂中间状态的节点进行解除阻塞,但是正常情况下,它们的操作结果是把阻塞等待时间最长的一个或者所有节点重新加入到AQS的同步等待队列中。例如,上面的例子调用signal()方法后如下:


微信截图_20220513161647.png


这样子,相当于线程thread-1重新加入到AQS同步等待队列中(从条件等待队列中移动到同步等待队列中),并且开始竞争头节点,一旦竞争成功,就能够解除阻塞。这个时候从逻辑上看,signal()方法最终解除了对线程thread-1的阻塞。await()的其他变体方法的原理是类似的,这里因为篇幅原因不再展开。这里小结一下Condition的显著特点:


  • 1、同时依赖两个同步等待队列,一个是AQS提供,另一个是ConditionObject提供的。
  • 2、await()方法会释放AQS同步等待队列中的阻塞节点,这些节点会加入到条件等待队列中进行阻塞。
  • 3、signal()或者signalAll()会把条件等待队列中的节点重新加入AQS同步等待队列中,并不解除正常节点的阻塞状态。
  • 4、接第3步,这些进入到AQS同步等待队列的节点会重新竞争成为头节点,接下来的步骤其实也就是前面分析过的独占模式下的AQS的运作原理。


取消获取资源(cancelAcquire)



新节点加入等待队列失败导致任何类型的异常或者带超时版本的API调用的时候剩余超时时间小于等于零的时候,就会调用cancelAcquire()方法,用于取消该节点对应节点获取资源的操作。


// 取消节点获取资源的操作
private void cancelAcquire(Node node) {
    // 节点为null直接返回
    if (node == null)
        return;
    // 置空节点持有的线程,因为此时节点线程已经发生中断
    node.thread = null;
    Node pred = node.prev;
    // 这个循环是为了获取当前节点的上一个不为取消状态的节点,也就是中间如果发生了取消的节点都直接断开
    while (pred.waitStatus > 0)
        node.prev = pred = pred.prev;
    // 保存当前节点的上一个不为取消状态的节点的后继节点    
    Node predNext = pred.next;
    // 当前节点等待状态更新为CANCELLED
    node.waitStatus = Node.CANCELLED;
    // 如果当前节点为尾节点,则直接更新尾节点为当前节点的上一个不为取消状态的节点
    if (node == tail && compareAndSetTail(node, pred)) {
         // 然后更新该节点的后继节点为null,因为它已经成为新的尾节点
         pred.compareAndSetNext(predNext, null);
    } else {
        int ws;
        // 当前节点的上一个不为取消状态的节点已经不是头节点的情况,需要把当前取消的节点从AQS同步等待队列中断开
        if (pred != head &&
            ((ws = pred.waitStatus) == Node.SIGNAL || (ws <= 0 && pred.compareAndSetWaitStatus(ws, Node.SIGNAL))) && pred.thread != null) {
            Node next = node.next;
            if (next != null && next.waitStatus <= 0)
                pred.compareAndSetNext(predNext, next);
        } else {
            // 当前节点的上一个不为取消状态的节点已经是头节点,相当于头节点之后的节点都是取消,需要唤醒当前节点的后继节点
            unparkSuccessor(node);
        }
        // 节点后继节点设置为自身,那么就不会影响后继节点
        node.next = node;
    }
}
复制代码


cancelAcquire()方法有多处调用,主要包括下面的情况:


  • 1、节点线程在阻塞过程中主动中断的情况下会调用。
  • 2、acquire的处理过程发生任何异常的情况下都会调用,包括tryAcquire()tryAcquireShared()等。
  • 3、新节点加入等待队列失败导致任何类型的异常或者带超时版本的API调用的时候剩余超时时间小于等于零的时候。


cancelAcquire()主要作用是把取消的节点移出同步等待队列,必须时候需要进行后继节点的唤醒。


实战篇



AQS是一个抽象的同步器基础框架,其实我们也可以直接使用它实现一些高级的并发框架。下面基于AQS实现一些非内建的功能,这两个例子来自于AQS的注释中。


metux


大学C语言课程中经常提及到的只有一个资源的metux(互斥区),也就是说,同一个时刻,只能有一个线程获取到资源,其他获取资源的线程需要阻塞等待到前一个线程释放资源。


public class Metux implements Lock, Serializable {
    private static class Sync extends AbstractQueuedSynchronizer {
        @Override
        protected boolean tryAcquire(int arg) {
            assert 1 == arg;
            if (compareAndSetState(0, 1)) {
                setExclusiveOwnerThread(Thread.currentThread());
                return true;
            }
            return false;
        }
        @Override
        protected boolean tryRelease(int arg) {
            assert 1 == arg;
            if (!isHeldExclusively()) {
                throw new IllegalMonitorStateException();
            }
            setExclusiveOwnerThread(null);
            setState(0);
            return true;
        }
        public Condition newCondition() {
            return new ConditionObject();
        }
        public boolean isLocked() {
            return getState() != 0;
        }
        @Override
        public boolean isHeldExclusively() {
            return getExclusiveOwnerThread() == Thread.currentThread();
        }
        private void readObject(ObjectInputStream s) throws IOException, ClassNotFoundException {
            s.defaultReadObject();
            setState(0);
        }
    }
    private final Sync sync = new Sync();
    @Override
    public void lock() {
        sync.acquire(1);
    }
    @Override
    public void lockInterruptibly() throws InterruptedException {
        sync.acquireInterruptibly(1);
    }
    @Override
    public boolean tryLock() {
        return sync.tryAcquire(1);
    }
    @Override
    public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
        return sync.tryAcquireNanos(1, unit.toNanos(time));
    }
    public boolean isLocked() {
        return sync.isLocked();
    }
    public boolean isHeldByCurrentThread() {
        return sync.isHeldExclusively();
    }
    @Override
    public void unlock() {
        sync.release(1);
    }
    @Override
    public Condition newCondition() {
        return sync.newCondition();
    }
    public static void main(String[] args) throws Exception {
        final Metux metux = new Metux();
        new Thread(() -> {
            metux.lock();
            System.out.println(String.format("%s-thread-1获取锁成功休眠3秒...", LocalDateTime.now()));
            try {
                Thread.sleep(3000);
            } catch (InterruptedException e) {
                //ignore
            }
            metux.unlock();
            System.out.println(String.format("%s-thread-1获解锁成功...", LocalDateTime.now()));
            return;
        }, "thread-1").start();
        new Thread(() -> {
            metux.lock();
            System.out.println(String.format("%s-thread-2获取锁成功...",LocalDateTime.now()));
            return;
        }, "thread-2").start();
        Thread.sleep(Integer.MAX_VALUE);
    }
}
复制代码


某个时间的某次运行结果如下:


2019-04-07T11:49:27.858791200-thread-1获取锁成功休眠3秒...
2019-04-07T11:49:30.876567-thread-2获取锁成功...
2019-04-07T11:49:30.876567-thread-1获解锁成功...
复制代码


二元栅栏


二元栅栏是CountDownLatch的简化版,只允许一个线程阻塞,由另一个线程负责唤醒。


public class BooleanLatch {
    private static class Sync extends AbstractQueuedSynchronizer {
        boolean isSignalled() {
            return getState() != 0;
        }
        @Override
        protected int tryAcquireShared(int ignore) {
            return isSignalled() ? 1 : -1;
        }
        @Override
        protected boolean tryReleaseShared(int ignore) {
            setState(1);
            return true;
        }
    }
    private final Sync sync = new Sync();
    public boolean isSignalled() {
        return sync.isSignalled();
    }
    public void signal() {
        sync.releaseShared(1);
    }
    public void await() throws InterruptedException {
        sync.acquireSharedInterruptibly(1);
    }
    public static void main(String[] args) throws Exception {
        BooleanLatch latch = new BooleanLatch();
        new Thread(() -> {
            try {
                Thread.sleep(3000);
            } catch (InterruptedException e) {
                //ignore
            }
            latch.signal();
        }).start();
        System.out.println(String.format("[%s]-主线程进入阻塞...", LocalDateTime.now()));
        latch.await();
        System.out.println(String.format("[%s]-主线程进被唤醒...", LocalDateTime.now()));
    }
}
复制代码


某个时间的某次运行结果如下:


[2019-04-07T11:55:12.647816200]-主线程进入阻塞...
[2019-04-07T11:55:15.632088]-主线程进被唤醒...
复制代码


小结



JUC的重要并发类库或者容器中,AQS起到了基础框架的作用,理解同步器的实现原理,有助于理解和分析其他并发相关类库的实现。这篇文章前后耗费了接近1个月时间编写,DEBUG过程最好使用多线程断点,否则很难模拟真实的情况。AQS里面的逻辑是相对复杂的,很敬佩并发大师Doug Lea如此精巧的类库设计,此所谓巨人的肩膀。


参考资料:


(本文完 c-30-d e-a-20190407 r-a-20200723 ProcessOn重新修订所有插图,强迫症发作修正病句和错字)


相关文章
|
设计模式 Java Linux
110.【十万字带你深入学习23种设计模式】(十五)
110.【十万字带你深入学习23种设计模式】
69 0
|
Java 测试技术 Maven
看到一个魔改线程池,面试素材加一!(中)
看到一个魔改线程池,面试素材加一!(中)
480 0
看到一个魔改线程池,面试素材加一!(中)
|
5月前
|
Java 开发者
奇迹时刻!探索 Java 多线程的奇幻之旅:Thread 类和 Runnable 接口的惊人对决
【8月更文挑战第13天】Java的多线程特性能显著提升程序性能与响应性。本文通过示例代码详细解析了两种核心实现方式:Thread类与Runnable接口。Thread类适用于简单场景,直接定义线程行为;Runnable接口则更适合复杂的项目结构,尤其在需要继承其他类时,能保持代码的清晰与模块化。理解两者差异有助于开发者在实际应用中做出合理选择,构建高效稳定的多线程程序。
70 7
|
算法 C++
ForkJoinPool大型图文现场(一阅到底 vs 直接收藏)(下)
ForkJoinPool大型图文现场(一阅到底 vs 直接收藏)(下)
ForkJoinPool大型图文现场(一阅到底 vs 直接收藏)(下)
|
存储 缓存 监控
|
编解码 负载均衡 网络协议
两万字详解!Netty经典32连问! 2
两万字详解!Netty经典32连问! 2
|
消息中间件 网络协议 JavaScript
两万字详解!Netty经典32连问!
两万字详解!Netty经典32连问!
|
存储 消息中间件 缓存
四万字爆肝总结java多线程所有知识点(史上最全总结)
全文从多线程的实现方式、线程的状态、线程的方法、线程的同步、线程的通讯、等角度对多线程的基础知识进行总结
547 1
四万字爆肝总结java多线程所有知识点(史上最全总结)
|
存储 监控 Java
7000字+24张图带你彻底弄懂线程池(2)
7000字+24张图带你彻底弄懂线程池(2)
|
消息中间件 缓存 监控
7000字+24张图带你彻底弄懂线程池(1)
7000字+24张图带你彻底弄懂线程池(1)