AbstractQueuedSynchronizer 笔记

简介: AbstractQueuedSynchronizer 笔记

AbstractQueuedSynchronizer

AQS是一个内部维护了先进先出队列+标识(数字)的模型,标识使用CAS模式修改,作为多线程工具的基础组件

AQS队列结构.png

属性

属性名 说明
volatile Node head 为头节点会清理Node中关联的pre,thread避免GC不回收
volatile Node tail 尾节点
volatile int state 0为空闲其它组件按需使用,使用cas来赋值,
Thread exclusiveOwnerThread 持有线程

state为volatile的int,不同的业务场景按需实现,

独占模式:

ReentrantLock.Sync中state为0表示未锁定>0表示被几个线程持有

ThreadPoolExecutor.Worker中state为0表示未执行

共享模式:

CountDownLatch.Sync中state为初始化时指定,表示有多少个线程可持有,

Semaphore.Sync中state与CountDownLatch相同

混合模式:

ReentrantReadWriteLock.Sync中state为共享锁+独占锁组合 通过位运算16位来分割,最大的读锁写锁个数为65535

关键方法

独占模式

持有资源:acquire

acquire: 独占模式获取,独占模式即只有一个线程可更新state.忽略中断标识,在获取之后响应中断。

acquireInterruptibly:独占模式获取,线程标识为中断则抛出异常

public final void acquire(int arg) {
    if (!tryAcquire(arg) &&
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        selfInterrupt();
}

AQS-acquire.png

tryAcquire:子类按需实现,使用独占模式更新state,增加state,成功返回true失败返回false

中断后不会正确响应park,所以需要重置线程中断标识,并在unpark之后进行中断补偿

释放资源:release

release:以独占模式释放资源

public final boolean release(int arg) {
    if (tryRelease(arg)) {
        Node h = head;
        if (h != null && h.waitStatus != 0)
            unparkSuccessor(h);
        return true;
    }
    return false;
}

AQS-release.png

tryRelease:子类按需实现,使用独占模式更新state,减少state,并处理对应独占线程

共享模式

持有资源:acquireShared

  • acquireShared 共享模式获取,忽略中断线程,在获取之后相应中断。
  • acquireSharedInterruptibly 共享模式获取,响应中断,线程中断则抛出异常。
public final void acquireShared(int arg) {
    if (tryAcquireShared(arg) < 0)
        doAcquireShared(arg);
}

AQS-acquireShared.png

tryAcquireShared:子类按需实现,对返回值有如下要求:

负值:失败。 零:共享模式下的获取成功,但是没有后续共享模式下的获取可以成功。 正值: 如果共享模式下的获取成功并且后续共享模式下的获取也可能成功,则为正值,在这种情况下,后续的等待线程必须检查可用性。 (对三个不同返回值的支持使该方法可以在仅有时进行获取的情况下使用。)成功后,就已经获取了此对象。

释放资源:releaseShared

  • releaseShared:以共享模式释放资源
public final boolean releaseShared(int arg) {
    if (tryReleaseShared(arg)) {
        doReleaseShared();
        return true;
    }
    return false;
}

AQS-releaseShared.png

tryReleaseShared:子类按需实现,使用共享模式更新state,减少state

其它关键方法

检查并更新节点状态:shouldParkAfterFailedAcquire

在park线程之前的判断,当前置节点为取消时更新前置节点

private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
        int ws = pred.waitStatus;
        if (ws == Node.SIGNAL)
            /*
             * This node has already set status asking a release
             * to signal it, so it can safely park.
             */
            return true;
        if (ws > 0) {
            /*
             * Predecessor was cancelled. Skip over predecessors and
             * indicate retry.
             */
            do {
                node.prev = pred = pred.prev;
            } while (pred.waitStatus > 0);
            pred.next = node;
        } else {
            /*
             * waitStatus must be 0 or PROPAGATE.  Indicate that we
             * need a signal, but don't park yet.  Caller will need to
             * retry to make sure it cannot acquire before parking.
             */
            compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
        }
        return false;
    }

唤醒后续节点:unparkSuccessor

唤醒后续节点,并清理取消的节点,

private void unparkSuccessor(Node node) {
    /*
     * If status is negative (i.e., possibly needing signal) try
     * to clear in anticipation of signalling.  It is OK if this
     * fails or if status is changed by waiting thread.
     */
    int ws = node.waitStatus;
    if (ws < 0)
        compareAndSetWaitStatus(node, ws, 0);
    /*
     * Thread to unpark is held in successor, which is normally
     * just the next node.  But if cancelled or apparently null,
     * traverse backwards from tail to find the actual
     * non-cancelled successor.
     */
    Node s = node.next;
    if (s == null || s.waitStatus > 0) {
        s = null;
        for (Node t = tail; t != null && t != node; t = t.prev)
            if (t.waitStatus <= 0)
                s = t;
    }
    if (s != null)
        LockSupport.unpark(s.thread);
}

共享模式设置队列头:setHeadAndPropagate

共享模式下多个线程同时持有资源,头节点会频繁变化需要及时释放资源

private void setHeadAndPropagate(Node node, int propagate) {
    Node h = head; // Record old head for check below
    setHead(node);
    /*
     * Try to signal next queued node if:
     *   Propagation was indicated by caller,
     *     or was recorded (as h.waitStatus either before
     *     or after setHead) by a previous operation
     *     (note: this uses sign-check of waitStatus because
     *      PROPAGATE status may transition to SIGNAL.)
     * and
     *   The next node is waiting in shared mode,
     *     or we don't know, because it appears null
     *
     * The conservatism in both of these checks may cause
     * unnecessary wake-ups, but only when there are multiple
     * racing acquires/releases, so most need signals now or soon
     * anyway.
     */
    if (propagate > 0 || h == null || h.waitStatus < 0 ||
        (h = head) == null || h.waitStatus < 0) {
        Node s = node.next;
        if (s == null || s.isShared())
            doReleaseShared();
    }
}

Node

属性名 说明
int waitStatus 1:CANCELLED,表示当前的线程被取消;<br/>-1:SIGNAL,后续节点需要unpark;<br/>-2:CONDITION,表示当前节点在等待condition,也就是在condition队列中;<br/>-3:PROPAGATE,A releaseShared应该传播到其他节点。 在doReleaseShared中对此进行了设置(仅适用于头节点),以确保传播继续进行,即使此后进行了其他操作也是如此。 <br/> 0:表示当前节点在sync队列中,等待着获取锁。
Node prev 前驱节点,比如当前节点被取消,那就需要前驱节点和后继节点来完成连接。
Node next 后继节点。
Thread thread 入队列时的当前线程。
Node nextWaiter 为NULL表示为独占模式

PROPAGATE:共享模式中会通过状态是否小于0来判断是否需要唤醒后续节点,共享模式下多个线程可同时持有state变更,waitStatus会频繁从0切换为SIGNAL,区分SIGNAL增加的中间状态所以称为传播值

目录
相关文章
|
4月前
|
消息中间件 存储 监控
Java并发知识之ReentrantLock
本文深入剖析了Java中并发编程的核心概念,特别聚焦于锁的设计思想,通过分析AbstractQueuedSynchronizer(AQS)、ReentrantLock和ReentrantReadWriteLock的实现,揭示了锁的工作原理和高效并发控制策略。
Java并发知识之ReentrantLock
|
6月前
|
安全 Java
深入探索Java并发库(JUC)中的ReentrantReadWriteLock
深入探索Java并发库(JUC)中的ReentrantReadWriteLock
|
7月前
|
安全 Java 程序员
Java多线程基础-17:简单介绍一下JUC中的 ReentrantLock
ReentrantLock是Java并发包中的可重入互斥锁,与`synchronized`类似但更灵活。
61 0
|
7月前
|
Java
AQS (AbstractQueuedSynchronizer) 概述
AQS (AbstractQueuedSynchronizer) 概述
|
缓存 数据处理
JUC第十四讲:JUC锁: ReentrantReadWriteLock详解
JUC第十四讲:JUC锁: ReentrantReadWriteLock详解
|
Java Android开发
深入浅出,从 ReentrantLock 到 AQS | Java(下)
对于非Java后端同学来说,没听过倒也不是什么太过分的事,但是如果你深入学习过 Java 并发相关,那么肯定会去了解各种锁,而作为一个 有志青年 的你必然会在心里来一句,为什么加了锁就可以同步 ? 此时必然也会看到 AQS 的影子。
147 0
|
安全 Java Android开发
深入浅出,从 ReentrantLock 到 AQS | Java(上)
对于非Java后端同学来说,没听过倒也不是什么太过分的事,但是如果你深入学习过 Java 并发相关,那么肯定会去了解各种锁,而作为一个 有志青年 的你必然会在心里来一句,为什么加了锁就可以同步 ? 此时必然也会看到 AQS 的影子。
136 0
|
Java uml
JUC系列学习(六):ReentrantReadWriteLock的使用及源码解析
`ReentrantReadWriteLock`是一种读写锁,跟`ReentrantLock`一样也是实现了`Lock`,区别在于`ReentrantLock`是独占锁,同一时刻只能有一个线程持有锁,`ReentrantLock`在某些场景下可能会有并发性能的问题。而**ReentrantReadWriteLock是独占锁(写锁)、共享锁(读锁)可以同时存在的一种读写锁,在读操作远大于写操作的场景中,能实现更好的并发性**。当读锁存在时,其他线程仍然可以获取读锁并进行读操作,但是不能获得写锁进行写操作;当写锁存在时,其他线程的读锁、写锁都是不允许的。
Java并发之AbstractQueuedSynchronizer(AQS)详解
Java并发之AbstractQueuedSynchronizer(AQS)详解
Java并发之AbstractQueuedSynchronizer(AQS)详解