JUC并发编程——AQS源码解读

简介: JUC并发编程——AQS源码解读

正文


1、AQS是什么


       AQS(AbstractQueuedSynchronizer)是一个用来构建锁和同步器的框架,使用AQS能简单且高效地构造出应用广泛的大量的同步器,比如我们提到的ReentrantLock,Semaphore,其他的诸如ReentrantReadWriteLock,SynchronousQueue,FutureTask等等皆是基于AQS的。


       AQS核心思想是,如果被请求的共享资源空闲,则将当前请求资源的线程设置为有效的工作线程,并且将共享资源设置为锁定状态。如果被请求的共享资源被占用,那么就需要一套线程阻塞等待以及被唤醒时锁分配的机制,这个机制AQS是用CLH队列锁实现的,即将暂时获取不到锁的线程加入到队列中。


       CLH(Craig,Landin,and Hagersten)队列是一个虚拟的双向队列(虚拟的双向队列即不存在队列实例,仅存在结点之间的关联关系)。AQS是将每条请求共享资源的线程封装成一个CLH锁队列的一个结点(Node)来实现锁的分配。


2、LockSupport的使用


LockSupport是JUC提供的一个线程阻塞与唤醒的工具类,该工具可以让线程在任意位置阻塞和唤醒。主要方法如下


public static void park(Object blocker);//无限期阻塞当前线程,带有blocker对象,用于确定线程阻塞的原因
public static void park();//无限期阻塞线程
public static void parkNanos(long nanos);//阻塞当前线程有阻塞时间限制
public static void parkNanos(Object blocker, long nanos);//阻塞当前线程有阻塞对象,有时间限制
public static void parkUntil(Object blocker, long deadline);//阻塞当前线程,直到某个时间
public static void unpark(Thread thread);//唤醒某个被阻塞的线程,只有线程阻塞了才会唤醒,不阻塞则不执行任何操作。


LockSuport简单使用


 public static void main(String[] args) {
        Thread t1 = new Thread(() -> {
            System.out.println("线程被阻塞了。。。。。。。1");
            LockSupport.park();
            System.out.println("线程被唤醒了。。。。。。。2");
        }, "t1");
        t1.start();
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        //准备唤醒线程
        System.out.println("准备唤醒线程了。。。。。。。。。3");
        LockSupport.unpark(t1);
    }


上面执行的结果为1,3,2,当线程被阻塞时只有调用unpark唤醒线程,线程才会继续执行。


LockSupport.park()与Thread.sleep()区别


Thread.sleep()没法从外部唤醒,只能等待自己醒来或者发生异常。而LockSupport.park()可以通过LockSupport.unpark()方法唤醒。

Thread.sleep()声明了InterruptedException中断异常,而LockSupport.park()方法不需要捕获异常

与Thread.sleep()相比,LockSupport.park()方法更精确、更灵活的阻塞和唤醒指定线程。

Thread.sleep()本身是一个native方法,LockSupport.park()是一个静态方法,但是底层调用了Unsafe类的native()方法。

Locksupport.park()允许设置一个Blocker对象,主要用来提供监视工具或者诊断工具确定线程受阻原因。

Thread.sleep()和Locksupport.park()都不会释放所持有的锁。


LockSupport.park()与Object.wait()区别


Object.wait()需要和synchronized关键字配合使用,而LockSuport.park()可以在任何地方执行。

Object.wait()同样需要抛出中断异常。LockSupport.park()不需要。

如果线程在没有wait()条件下执行notify()会抛出java.lang.IllegalMonitorStateException异常,而LockSupport.unpark()不会抛出任何异常。

Object.wait()会释放锁,而LockSupport.park()不会释放锁。


3、结合ReentrantLock分析AQS源码


本分析基于JDK17,与jdk8代码有一些不同,但思想是一样的。


ReentrantLock有以下几个内部类


abstract static class Sync extends AbstractQueuedSynchronizer{}//继承了AQS


static final class NonfairSync extends Sync {}//实现非公平锁的关键


static final class FairSync extends Sync {}//实现公平锁的关键


非公平加锁操作


NonfairSync类图结构如下


222.png


执行加锁操作会执行如下方法,一步一步解密加锁的过程。


111.png


initialTryLock()方法解读


333.png


1处的代码通过CAS操作将状态值state由0改为1如果修改成功,将当前线程设置为独占状态,然后返回true。因为lock()方法是(!initialTryLock())所以就直接表示获取锁成功。

2处的代码表示将当前线程设置为独占状态。

3处的代码判断当前的线程是不是已经持有了锁,如果是同一个就表示重入锁,然后将转状态值state加1,并返回true表示获取锁成功。


acquire(1)方法解读


tryAcquire(1)方法解读


111.png


这个方法说明如果没有获取到锁,并不会直接将线程阻塞,而是通过CAS再次尝试一下是否可以获取到锁,如果获取到锁就直接返回,表示获取锁成功。不然则需要将线程加入队列等待。这里需要注意用到了模板方法的设计模式。


acquire()方法解读


final int acquire(Node node, int arg, boolean shared,
                      boolean interruptible, boolean timed, long time) {
        Thread current = Thread.currentThread();
        byte spins = 0, postSpins = 0;   // retries upon unpark of first thread
        boolean interrupted = false, first = false;
        Node pred = null;                // predecessor of node when enqueued
        /*
         * Repeatedly:
         *  Check if node now first
         *    if so, ensure head stable, else ensure valid predecessor
         *  if node is first or not yet enqueued, try acquiring
         *  else if node not yet created, create it
         *  else if not yet enqueued, try once to enqueue
         *  else if woken from park, retry (up to postSpins times)
         *  else if WAITING status not set, set and retry
         *  else park and clear WAITING status, and check cancellation
         */
        for (;;) {
            if (!first && (pred = (node == null) ? null : node.prev) != null &&
                !(first = (head == pred))) {
                if (pred.status < 0) {
                    cleanQueue();           // predecessor cancelled
                    continue;
                } else if (pred.prev == null) {
                    Thread.onSpinWait();    // ensure serialization
                    continue;
                }
            }
            if (first || pred == null) {
                boolean acquired;
                try {
                    if (shared)
                        acquired = (tryAcquireShared(arg) >= 0);
                    else
                        acquired = tryAcquire(arg);
                } catch (Throwable ex) {
                    cancelAcquire(node, interrupted, false);
                    throw ex;
                }
                if (acquired) {
                    if (first) {
                        node.prev = null;
                        head = node;
                        pred.next = null;
                        node.waiter = null;
                        if (shared)
                            signalNextIfShared(node);
                        if (interrupted)
                            current.interrupt();
                    }
                    return 1;
                }
            }
            if (node == null) {                 // allocate; retry before enqueue
                if (shared)
                    node = new SharedNode();
                else
                    node = new ExclusiveNode();
            } else if (pred == null) {          // try to enqueue
                node.waiter = current;
                Node t = tail;
                node.setPrevRelaxed(t);         // avoid unnecessary fence
                if (t == null)
                    tryInitializeHead();
                else if (!casTail(t, node))
                    node.setPrevRelaxed(null);  // back out
                else
                    t.next = node;
            } else if (first && spins != 0) {
                --spins;                        // reduce unfairness on rewaits
                Thread.onSpinWait();
            } else if (node.status == 0) {
                node.status = WAITING;          // enable signal and recheck
            } else {
                long nanos;
                spins = postSpins = (byte)((postSpins << 1) | 1);
                if (!timed)
                    LockSupport.park(this);
                else if ((nanos = time - System.nanoTime()) > 0L)
                    LockSupport.parkNanos(this, nanos);
                else
                    break;
                node.clearStatus();
                if ((interrupted |= Thread.interrupted()) && interruptible)
                    break;
            }
        }
        return cancelAcquire(node, interrupted, interruptible);
    }


主要分析以下两块代码


333.png

444.png


4处代码和上面tryAcquire(1)方法解读方法一样,不再赘述。


5处代码 通过CAS获取到了锁,证明前面一个线程释放了锁,那么就将前面的节点设置为null方便GC回收,把当前的节点设置为头结点,将当前节点上等待的线程设置为null。


6处代码 新建一个节点节点的状态值status=0。


7处的代码设置node节点上的等待线程为当前线程,并把新节点设置为尾结点。如果节点上status状态为0就改为1。


8处的代码 将当前线程设置为阻塞状态。


大致过程下图


8074245061964fd5a3b4850740109991.png


释放锁操作


770259d1a417490293a94d9fc2494375.png


tryRelease(1)方法解读


444.png


9处代码 如上面加锁过程每加一次锁state值加1,相应的每释放一次锁相应的state的值减1


10处代码,验证释放锁的是不是独占的线程,如果不是就抛异常,大白话就是谁加的锁还需要谁来释放。


11处代码,如果state值减为0,那么就将独占的线程置空,让其他线程来在重新CAS竞争。


signalNext(Node h)方法解读


333.png


此操作相对简单,就是唤醒当前节点的下一个节点上的线程。但是抛出一个问题,这样唤醒下一个节点上的线程,那听着像是公平锁啊,其实不然。此时如果有一个新的线程不是队列中的线程进来,那么这个新的线程和即将唤醒的线程就会发生竞争。下面看一下公平锁的抢锁代码。


222.png


hasQueuedThreads()方法解读


111.png


这个方法有点意思,从链表尾部开始遍历,如果状态值大于等于0返回true,可是只要在链表排队的节点status要么等于0要么等于1。这就验证了争抢资源的线程是不是即将被唤醒的线程,来实现公平锁的原理。


参考:


Java 并发 - 理论基础 | Java 全栈知识体系


《JAVA高并发核心编程(卷2):多线程、锁、JMM、JUC、高并发设计》-尼恩编著

相关文章
|
7月前
|
安全 Java 编译器
高并发编程之什么是 JUC
高并发编程之什么是 JUC
57 1
|
7月前
|
Java
深入理解Java中的AbstractQueuedSynchronizer(AQS):并发编程的核心组件
深入理解Java中的AbstractQueuedSynchronizer(AQS):并发编程的核心组件
|
6月前
|
安全 算法 Java
|
7月前
|
安全 Java 程序员
Java多线程基础-17:简单介绍一下JUC中的 ReentrantLock
ReentrantLock是Java并发包中的可重入互斥锁,与`synchronized`类似但更灵活。
57 0
|
安全 Java 调度
JUC并发编程(上)
JUC并发编程(上)
73 0
|
并行计算 Java 应用服务中间件
JUC并发编程超详细详解篇(一)
JUC并发编程超详细详解篇
1648 1
JUC并发编程超详细详解篇(一)
|
存储 缓存 监控
JUC并发编程(下)
JUC并发编程(下)
42 0
AQS源码解读之一
AQS源码解读之一
48 0
|
SpringCloudAlibaba 安全 Java
JUC并发编程(二):线程相关知识点
实现编发编程的主要手段就是多线程。线程是操作系统里的一个概念。接下来先说说两者的定义、联系与区别。
80 0
|
安全 Java 编译器
JUC 并发编程
JUC 并发编程