正文
1、AQS是什么
AQS(AbstractQueuedSynchronizer)是一个用来构建锁和同步器的框架,使用AQS能简单且高效地构造出应用广泛的大量的同步器,比如我们提到的ReentrantLock,Semaphore,其他的诸如ReentrantReadWriteLock,SynchronousQueue,FutureTask等等皆是基于AQS的。
AQS核心思想是,如果被请求的共享资源空闲,则将当前请求资源的线程设置为有效的工作线程,并且将共享资源设置为锁定状态。如果被请求的共享资源被占用,那么就需要一套线程阻塞等待以及被唤醒时锁分配的机制,这个机制AQS是用CLH队列锁实现的,即将暂时获取不到锁的线程加入到队列中。
CLH(Craig,Landin,and Hagersten)队列是一个虚拟的双向队列(虚拟的双向队列即不存在队列实例,仅存在结点之间的关联关系)。AQS是将每条请求共享资源的线程封装成一个CLH锁队列的一个结点(Node)来实现锁的分配。
2、LockSupport的使用
LockSupport是JUC提供的一个线程阻塞与唤醒的工具类,该工具可以让线程在任意位置阻塞和唤醒。主要方法如下
public static void park(Object blocker);//无限期阻塞当前线程,带有blocker对象,用于确定线程阻塞的原因 public static void park();//无限期阻塞线程 public static void parkNanos(long nanos);//阻塞当前线程有阻塞时间限制 public static void parkNanos(Object blocker, long nanos);//阻塞当前线程有阻塞对象,有时间限制 public static void parkUntil(Object blocker, long deadline);//阻塞当前线程,直到某个时间 public static void unpark(Thread thread);//唤醒某个被阻塞的线程,只有线程阻塞了才会唤醒,不阻塞则不执行任何操作。
LockSuport简单使用
public static void main(String[] args) { Thread t1 = new Thread(() -> { System.out.println("线程被阻塞了。。。。。。。1"); LockSupport.park(); System.out.println("线程被唤醒了。。。。。。。2"); }, "t1"); t1.start(); try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } //准备唤醒线程 System.out.println("准备唤醒线程了。。。。。。。。。3"); LockSupport.unpark(t1); }
上面执行的结果为1,3,2,当线程被阻塞时只有调用unpark唤醒线程,线程才会继续执行。
LockSupport.park()与Thread.sleep()区别
Thread.sleep()没法从外部唤醒,只能等待自己醒来或者发生异常。而LockSupport.park()可以通过LockSupport.unpark()方法唤醒。
Thread.sleep()声明了InterruptedException中断异常,而LockSupport.park()方法不需要捕获异常
与Thread.sleep()相比,LockSupport.park()方法更精确、更灵活的阻塞和唤醒指定线程。
Thread.sleep()本身是一个native方法,LockSupport.park()是一个静态方法,但是底层调用了Unsafe类的native()方法。
Locksupport.park()允许设置一个Blocker对象,主要用来提供监视工具或者诊断工具确定线程受阻原因。
Thread.sleep()和Locksupport.park()都不会释放所持有的锁。
LockSupport.park()与Object.wait()区别
Object.wait()需要和synchronized关键字配合使用,而LockSuport.park()可以在任何地方执行。
Object.wait()同样需要抛出中断异常。LockSupport.park()不需要。
如果线程在没有wait()条件下执行notify()会抛出java.lang.IllegalMonitorStateException异常,而LockSupport.unpark()不会抛出任何异常。
Object.wait()会释放锁,而LockSupport.park()不会释放锁。
3、结合ReentrantLock分析AQS源码
本分析基于JDK17,与jdk8代码有一些不同,但思想是一样的。
ReentrantLock有以下几个内部类
abstract static class Sync extends AbstractQueuedSynchronizer{}//继承了AQS
static final class NonfairSync extends Sync {}//实现非公平锁的关键
static final class FairSync extends Sync {}//实现公平锁的关键
非公平加锁操作
NonfairSync类图结构如下
执行加锁操作会执行如下方法,一步一步解密加锁的过程。
initialTryLock()方法解读
1处的代码通过CAS操作将状态值state由0改为1如果修改成功,将当前线程设置为独占状态,然后返回true。因为lock()方法是(!initialTryLock())所以就直接表示获取锁成功。
2处的代码表示将当前线程设置为独占状态。
3处的代码判断当前的线程是不是已经持有了锁,如果是同一个就表示重入锁,然后将转状态值state加1,并返回true表示获取锁成功。
acquire(1)方法解读
tryAcquire(1)方法解读
这个方法说明如果没有获取到锁,并不会直接将线程阻塞,而是通过CAS再次尝试一下是否可以获取到锁,如果获取到锁就直接返回,表示获取锁成功。不然则需要将线程加入队列等待。这里需要注意用到了模板方法的设计模式。
acquire()方法解读
final int acquire(Node node, int arg, boolean shared, boolean interruptible, boolean timed, long time) { Thread current = Thread.currentThread(); byte spins = 0, postSpins = 0; // retries upon unpark of first thread boolean interrupted = false, first = false; Node pred = null; // predecessor of node when enqueued /* * Repeatedly: * Check if node now first * if so, ensure head stable, else ensure valid predecessor * if node is first or not yet enqueued, try acquiring * else if node not yet created, create it * else if not yet enqueued, try once to enqueue * else if woken from park, retry (up to postSpins times) * else if WAITING status not set, set and retry * else park and clear WAITING status, and check cancellation */ for (;;) { if (!first && (pred = (node == null) ? null : node.prev) != null && !(first = (head == pred))) { if (pred.status < 0) { cleanQueue(); // predecessor cancelled continue; } else if (pred.prev == null) { Thread.onSpinWait(); // ensure serialization continue; } } if (first || pred == null) { boolean acquired; try { if (shared) acquired = (tryAcquireShared(arg) >= 0); else acquired = tryAcquire(arg); } catch (Throwable ex) { cancelAcquire(node, interrupted, false); throw ex; } if (acquired) { if (first) { node.prev = null; head = node; pred.next = null; node.waiter = null; if (shared) signalNextIfShared(node); if (interrupted) current.interrupt(); } return 1; } } if (node == null) { // allocate; retry before enqueue if (shared) node = new SharedNode(); else node = new ExclusiveNode(); } else if (pred == null) { // try to enqueue node.waiter = current; Node t = tail; node.setPrevRelaxed(t); // avoid unnecessary fence if (t == null) tryInitializeHead(); else if (!casTail(t, node)) node.setPrevRelaxed(null); // back out else t.next = node; } else if (first && spins != 0) { --spins; // reduce unfairness on rewaits Thread.onSpinWait(); } else if (node.status == 0) { node.status = WAITING; // enable signal and recheck } else { long nanos; spins = postSpins = (byte)((postSpins << 1) | 1); if (!timed) LockSupport.park(this); else if ((nanos = time - System.nanoTime()) > 0L) LockSupport.parkNanos(this, nanos); else break; node.clearStatus(); if ((interrupted |= Thread.interrupted()) && interruptible) break; } } return cancelAcquire(node, interrupted, interruptible); }
主要分析以下两块代码
4处代码和上面tryAcquire(1)方法解读方法一样,不再赘述。
5处代码 通过CAS获取到了锁,证明前面一个线程释放了锁,那么就将前面的节点设置为null方便GC回收,把当前的节点设置为头结点,将当前节点上等待的线程设置为null。
6处代码 新建一个节点节点的状态值status=0。
7处的代码设置node节点上的等待线程为当前线程,并把新节点设置为尾结点。如果节点上status状态为0就改为1。
8处的代码 将当前线程设置为阻塞状态。
大致过程下图
释放锁操作
tryRelease(1)方法解读
9处代码 如上面加锁过程每加一次锁state值加1,相应的每释放一次锁相应的state的值减1
10处代码,验证释放锁的是不是独占的线程,如果不是就抛异常,大白话就是谁加的锁还需要谁来释放。
11处代码,如果state值减为0,那么就将独占的线程置空,让其他线程来在重新CAS竞争。
signalNext(Node h)方法解读
此操作相对简单,就是唤醒当前节点的下一个节点上的线程。但是抛出一个问题,这样唤醒下一个节点上的线程,那听着像是公平锁啊,其实不然。此时如果有一个新的线程不是队列中的线程进来,那么这个新的线程和即将唤醒的线程就会发生竞争。下面看一下公平锁的抢锁代码。
hasQueuedThreads()方法解读
这个方法有点意思,从链表尾部开始遍历,如果状态值大于等于0返回true,可是只要在链表排队的节点status要么等于0要么等于1。这就验证了争抢资源的线程是不是即将被唤醒的线程,来实现公平锁的原理。
参考:
《JAVA高并发核心编程(卷2):多线程、锁、JMM、JUC、高并发设计》-尼恩编著