一天一个 JUC 工具类 -- AQS

简介: AbstractQueuedSynchronizer(AQS)是Java中用于构建锁和同步器的抽象基类。它是Java并发工具包(java.util.concurrent)中实现高级线程同步控制的关键组件之一。AQS提供了一种基于等待队列的同步器框架,允许开发者构建自定义的同步器。在这篇文章中我们将从源码分析和底层原理的角度来介绍AQS。

AQS

AbstractQueuedSynchronizer(AQS)是Java中用于构建锁和同步器的抽象基类。它是Java并发工具包(java.util.concurrent)中实现高级线程同步控制的关键组件之一。AQS提供了一种基于等待队列的同步器框架,允许开发者构建自定义的同步器。在这篇文章中我们将从源码分析和底层原理的角度来介绍AQS。

源码分析

       private transient volatile Node head;

   private transient volatile Node tail;
   static final class Node {
    

       static final Node SHARED = new Node();

       static final Node EXCLUSIVE = null;


       static final int CANCELLED =  1;

       static final int SIGNAL    = -1;

       static final int CONDITION = -2;

       static final int PROPAGATE = -3;


       volatile int waitStatus;

       volatile Node prev;

       volatile Node next;

       volatile Thread thread;

       Node nextWaiter;

       /**
        * Returns true if node is waiting in shared mode.
        */
       final boolean isShared() {
    
           return nextWaiter == SHARED;
       }

       final Node predecessor() throws NullPointerException {
    
           Node p = prev;
           if (p == null)
               throw new NullPointerException();
           else
               return p;
       }

       Node() {
        // Used to establish initial head or SHARED marker
       }

       Node(Thread thread, Node mode) {
         // Used by addWaiter
           this.nextWaiter = mode;
           this.thread = thread;
       }

       Node(Thread thread, int waitStatus) {
     // Used by Condition
           this.waitStatus = waitStatus;
           this.thread = thread;
       }
   }

AQS的核心数据结构是一个双向链表(变量 head 和 tail),称为等待队列 。我们来看一下AQS 内部定义的 Node 节点里面都有哪些东西

常量 SHAREDEXCLUSIVE 是区分等待队列中的两种类型:独占节点(exclusive node)和共享节点(shared node)。独占节点用于独占式同步,共享节点用于共享式同步(例如Semaphore、CountDownLatch等)。接下来是几个状态量来表明当前 node 的状态:


               static final int CANCELLED =  1;

       static final int SIGNAL    = -1;

       static final int CONDITION = -2;

       static final int PROPAGATE = -3;

               volatile int waitStatus;

这几个量都是 waitStatus 可能的值

  1. SIGNAL:当前节点的后继节点被阻塞(通过 park),因此当前节点在释放或取消时必须唤醒它的后继节点。为避免竞争,获取操作必须首先指示它们需要一个信号,然后重试原子获取操作,在失败时进行阻塞。
  2. CANCELLED:该节点由于超时或中断而被取消。节点一旦处于该状态,将永远不会再次阻塞。特别地,拥有被取消节点的线程将不再阻塞。
  3. CONDITION:该节点当前位于条件队列中。在被传输之前,它不会被用作同步队列节点。一旦被传输,该节点的状态将被设置为 0。(在这里使用该值与字段的其他用途无关,但简化了机制。)
  4. PROPAGATE:应该将 releaseShared 操作传播给其他节点。在 doReleaseShared 方法中,对于头节点,设置此值以确保传播继续,即使此后有其他操作介入。
  5. 0:上述情况均不满足。对于非负数值,表示节点无需发出信号。因此,大多数代码无需检查特定的值,只需检查 SIGNAL 即可。该字段对于普通同步节点初始化为 0,对于条件节点初始化为 CONDITION。可以使用 CAS(或在可能时,无条件的 volatile 写入)来修改该字段的值。

对于独占节点(exclusive node)和共享节点(shared node)的解释:

独占节点(exclusive node)和共享节点(shared node)。这两种节点分别用于实现独占式同步和共享式同步。在多线程环境下,AQS利用这些节点实现对锁和资源的安全管理和控制。

独占节点(exclusive node):

独占节点用于独占式同步,如ReentrantLock等可重入锁。在独占模式下,同一时刻只允许一个线程获取锁,其他线程需要等待,直到获取锁的线程释放锁。独占节点继承自AQS的内部类Node。

共享节点(shared node):

共享节点用于共享式同步,如CountDownLatch、Semaphore等。在共享模式下,允许多个线程同时获取资源,一般用于计数器等场景。共享节点同样继承自AQS的内部类Node。


acquire()release() 是 AQS 中的两个核心方法,用于实现同步器的获取和释放操作。它们是用来实现独占式同步的关键方法。

1. acquire() 方法:

public final void acquire(int arg) {
   if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) {
       selfInterrupt();
   }
}
  • arg:表示获取同步状态的参数,具体含义由具体的同步器实现类决定。

acquire() 方法分为两个步骤:tryAcquire()acquireQueued()

  • tryAcquire():尝试以独占模式获取同步状态。该方法由具体的同步器实现类重写,用于尝试获取同步状态。如果成功获取了同步状态(即返回 true),则该方法直接返回。如果没有成功获取同步状态(即返回 false),则继续执行后续步骤。
  • acquireQueued():将当前线程加入等待队列,并以独占模式获取同步状态。该方法会在等待队列中排队,并尝试获取同步状态,直到成功获取为止。它是一个自旋操作,即不断尝试获取锁直到成功。在等待队列中排队的过程中,如果线程被中断,则会退出自旋,并清除中断状态。
  • addWaiter(Node mode):将当前线程以指定模式(独占模式或共享模式)加入等待队列。该方法会创建一个新的节点,并将其插入到等待队列的尾部。
  • selfInterrupt():如果当前线程被中断,则自我中断。在等待队列中自旋的过程中,如果发现当前线程被中断,则会调用该方法中断自己,以响应外部的中断请求。

2. release() 方法:

public final boolean release(int arg) {
   if (tryRelease(arg)) {
       Node h = head;
       if (h != null && h.waitStatus != 0) {
           unparkSuccessor(h);
       }
       return true;
   }
   return false;
}
  • arg:表示释放同步状态的参数,具体含义由具体的同步器实现类决定。

  • release() 方法分为两个步骤:tryRelease()unparkSuccessor()

  • tryRelease():尝试以独占模式释放同步状态。该方法由具体的同步器实现类重写,用于尝试释放同步状态。如果成功释放了同步状态(即返回 true),则继续执行后续步骤。如果没有成功释放同步状态(即返回 false),则不进行后续操作。

  • unparkSuccessor(Node node):唤醒后继节点。当线程释放锁时,它会调用 unparkSuccessor() 方法来唤醒等待队列中的后继节点,使其有机会继续尝试获取同步状态。该方法会将后继节点的等待状态设为 0(Node.SIGNAL)并尝试唤醒后继节点。

release() 方法通常在释放锁或资源时调用,它会首先尝试释放同步状态,如果成功释放,则唤醒等待队列中的后继节点,使其有机会获取同步状态。

这些方法的实现会依赖具体的同步器类型,因为不同的同步器有不同的获取和释放逻辑。在使用 AQS 时,开发者需要根据具体的业务需求来实现这些方法,以实现线程的同步和协作。

自己手写一个共享锁

基于 AQS 实现一个共享式的同步锁,我们创建一个新的类 SharedLock。SharedLock 继承自 AQS 并实现tryAcquireShared()、tryReleaseShared() 方法

import java.util.concurrent.locks.AbstractQueuedSynchronizer;

public class SharedLock extends AbstractQueuedSynchronizer {
    
   // 构造方法,指定共享资源数量
   public SharedLock(int resources) {
    
       setState(resources);
   }

   // 获取共享资源
   public void acquireShared() {
    
       acquireShared(1);
   }

   // 释放共享资源
   public void releaseShared() {
    
       releaseShared(1);
   }

   @Override
   protected int tryAcquireShared(int arg) {
    
       // 获取当前同步状态
       int currentState = getState();

       // 如果当前资源数量为0,或者请求资源数量大于当前资源数量,则返回负值,表示获取失败
       if (currentState == 0 || arg > currentState) {
    
           return -1;
       }

       // 尝试更新同步状态,获取资源
       int newState = currentState - arg;
       if (compareAndSetState(currentState, newState)) {
    
           return newState;
       }

       return -1; // 获取资源失败
   }

   @Override
   protected boolean tryReleaseShared(int arg) {
    
       // 释放共享资源,直接增加同步状态
       for (;;) {
    
           int currentState = getState();
           int newState = currentState + arg;
           if (compareAndSetState(currentState, newState)) {
    
               return true;
           }
       }
   }
}
  • 构造方法 SharedLock(int resources):创建一个共享锁,并指定共享资源的数量。
  • acquireShared() 方法:获取共享资源。如果当前没有可用的资源,则线程会进入等待状态。
  • releaseShared() 方法:释放共享资源。
  • tryAcquireShared(int arg) 方法:尝试获取共享资源。arg 表示请求的资源数量。如果当前可用的资源数量不足以满足请求,则返回负值表示获取失败,否则更新同步状态并返回新的资源数量。
  • tryReleaseShared(int arg) 方法:尝试释放共享资源。arg 表示释放的资源数量。直接增加同步状态,表示释放资源。

下面使用一下我们自定义的共享锁

我们创建了一个包含 5 个共享资源的 SharedLock 对象,并创建了 10 个线程来获取和释放共享资源。每个线程会尝试获取一个共享资源,然后执行一段休眠时间后再释放该资源。

public class Main {
    
   public static void main(String[] args) {
    
       int totalResources = 5;
       SharedLock sharedLock = new SharedLock(totalResources);

       Runnable runnable = () -> {
    
           sharedLock.acquireShared();
           System.out.println(Thread.currentThread().getName() + " acquired the shared resource.");
           try {
    
               Thread.sleep(1000);
           } catch (InterruptedException e) {
    
               e.printStackTrace();
           }
           sharedLock.releaseShared();
           System.out.println(Thread.currentThread().getName() + " released the shared resource.");
       };

       for (int i = 0; i < 10; i++) {
    
           Thread thread = new Thread(runnable);
           thread.start();
       }
   }
}

注意: 共享锁允许多个线程同时获取资源,只要可用资源数量足够。当可用资源数量不足时,线程将进入等待状态,直到有足够的资源为止

总结

AQS的底层原理:

  1. 基于CAS(Compare and Swap): AQS主要利用了CAS操作(即compareAndSet()方法)来实现对共享变量state的原子更新。CAS是一种乐观锁机制,通过比较当前值与期望值是否相等,若相等则执行更新操作,否则重新尝试。
  2. 等待队列: 等待队列是AQS实现线程同步的关键数据结构,它采用双向链表来存储等待线程节点。当线程需要获取锁时,如果锁被其他线程持有,则该线程会进入等待队列中,并挂起。当锁释放时,AQS会从等待队列中唤醒某个线程,使其重新尝试获取锁。
  3. 独占模式和共享模式: AQS支持两种同步模式:独占模式和共享模式。独占模式适用于只允许一个线程访问资源的场景,如ReentrantLock。共享模式适用于允许多个线程同时访问资源的场景,如Semaphore和CountDownLatch。
相关文章
|
8月前
|
安全 Java API
JavaEE初阶 CAS,JUC的一些简单理解,包含concurrent, ReentrantLock,Semaphore以及ConcurrentHashMap
JavaEE初阶 CAS,JUC的一些简单理解,包含concurrent, ReentrantLock,Semaphore以及ConcurrentHashMap
55 0
|
消息中间件 监控 Java
JUC第二十六讲:JUC工具类: CountDownLatch详解
JUC第二十六讲:JUC工具类: CountDownLatch详解
|
4月前
|
Java 数据库
JUC工具类: Semaphore详解
信号量Semaphore是并发编程中的一种高级同步机制,它可以在复杂的资源共享场景中发挥重要作用。理解它的工作原理及正确的使用方法对实现高效且健壮的并发控制至关重要。
49 1
|
5月前
|
Java
JUC工具类: CountDownLatch详解
`CountDownLatch`是并发编程实践中的一个重要工具,它能简化多线程协调执行的复杂性,特别是在当一个操作需要等待一个或多个事件完成才能继续执行时。使用 `CountDownLatch`可以编写简洁的并行代码,确保在执行操作之前,所有的必要步骤都已经准备就绪。
54 1
|
4月前
|
并行计算 Java 开发者
JUC工具类: CyclicBarrier详解
`CyclicBarrier`是并发编程领域一个非常实用的同步辅助类,适用于并行任务场景,它提供了一种简便的线程同步机制。正确地理解和使用这个工具,对开发者来说,可以大大简化并行处理逻辑的复杂度,增强代码的健壮性与可维护性。
38 0
|
7月前
|
存储 安全 Java
Java多线程编程--JUC
Java多线程编程
|
7月前
|
Java 数据库
深入解析Java并发包(JUC)中的Semaphore
深入解析Java并发包(JUC)中的Semaphore
|
存储 安全 算法
一天一个 JUC 工具类 -- 并发集合
使用JUC工具包中的并发集合,我们可以避免手动处理锁和同步的复杂性,从而降低出现线程安全问题的概率。这些并发集合通过内部采用高效的算法和数据结构来优化并发操作,从而提供更好的性能和扩展性。
|
8月前
|
安全 Java API
JUC的常见类
JUC的常见类
56 0
JUC第二十八讲:JUC工具类: Semaphore详解
JUC第二十八讲:JUC工具类: Semaphore详解