@[toc]
概述
CountDownLatch也是利用的AQS队列,关于AQS队列的讲述请参考前面两篇文章:
AQS类是一个模板类,我们可以根据根据具体的需求通过重写以下几个方法来自定义实现同步器
- tryAcquire (排它锁获取)
- tryRelease (排它锁释放)
- tryAcquireShared (共享锁获取)
- tryReleasedShared (共享锁释放)
我们看一下官方文档中的代码案例:
import java.util.Random;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
class Driver {
public static int N = 5;
public static void main(String[] args) throws InterruptedException {
CountDownLatch startSignal = new CountDownLatch(1);
CountDownLatch doneSignal = new CountDownLatch(N);
for (int i = 0; i < N; ++i) // create and start threads
new Thread(new Worker(startSignal, doneSignal)).start();
doSomethingElse(); // don't let run yet
startSignal.countDown(); // let all threads proceed
doSomethingElse();
doneSignal.await(); // wait for all to finish
System.out.println(System.currentTimeMillis() + "done");
}
private static void doSomethingElse() throws InterruptedException {
TimeUnit.SECONDS.sleep(2);
System.out.println(System.currentTimeMillis() + Thread.currentThread().getName() + " 执行任务");
}
}
class Worker implements Runnable {
private final CountDownLatch startSignal;
private final CountDownLatch doneSignal;
Worker(CountDownLatch startSignal, CountDownLatch doneSignal) {
this.startSignal = startSignal;
this.doneSignal = doneSignal;
}
public void run() {
try {
startSignal.await();
doWork();
doneSignal.countDown();
} catch (InterruptedException ex) {
} // return;
}
void doWork() throws InterruptedException {
TimeUnit.SECONDS.sleep(new Random().nextInt(10) + 2);
System.out.println(System.currentTimeMillis() + Thread.currentThread().getName() + " 执行任务");
}
}
输出结果:
1561552734980main 执行任务
1561552736980main 执行任务
1561552738980Thread-0 执行任务
1561552739980Thread-1 执行任务
1561552740981Thread-2 执行任务
1561552742981Thread-4 执行任务
1561552743981Thread-3 执行任务
1561552743981done
可以看到,CountDownLatch不仅可以实现一个倒计时器,计数器,还可以实现线程间的通信协调。
CountDownLatch初始化
public CountDownLatch(int count) {
if (count < 0) thrownew IllegalArgumentException("count < 0");
this.sync = new Sync(count);
}
CountDownLatch初始化时会初始化state的值,这里代表线程数。
private static final class Sync extends AbstractQueuedSynchronizer {
privatestatic final long serialVersionUID = 4982264981922014374L;
Sync(int count) {//初始化state的值
setState(count);
}
int getCount() {//countDownLatch可以获取当前剩余的state的数量
return getState();
}
protected int tryAcquireShared(int acquires) {
return (getState() == 0) ? 1 : -1;
}
protected boolean tryReleaseShared(int releases) {
// 递减state的值
for (;;) {
int c = getState();
if (c == 0)
return false;
int nextc = c - 1;
if (compareAndSetState(c, nextc))
return nextc == 0;
}
}
}
从代码中可以看到,尝试获取资源(tryAcquireShared),state等于0时,返回1,表示可以获取锁。其他同步器也是这样的,只要tryAcquireShared返回>=0的值,表示可以获取资源,CountDownLatch的使用具体后面会讲。
释放资源时(tryReleaseShared),每次都是递减1,当state等于0时,返回true。
await
接下来看看主线程await做了什么事情
//当前线程会一直等到直到计数器减到0,或者当前线程中断
public void await() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}
public final void acquireSharedInterruptibly(int arg) throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();//如果中断,抛出异常
if (tryAcquireShared(arg) < 0)//tryAcquireShared小于0 说明state的值还没有递减到0
doAcquireSharedInterruptibly(arg);
}
doAcquireSharedInterruptibly在父类AQS中:
private void doAcquireSharedInterruptibly(int arg) throws InterruptedException {
final Node node = addWaiter(Node.SHARED);//把当前等待的线程,以共享模式加入到AQS队列中
try {
for (;;) {
final Node p = node.predecessor();
if (p == head) {
int r = tryAcquireShared(arg);
if (r >= 0) {//前面已经降到state减到0时tryAcquireShared返回1,表示当前线程可以获取锁了,也就是当前被CountDownLatch阻塞的线程获取锁成功可以继续执行了。
setHeadAndPropagate(node, r);
p.next = null; // help GC
return;
}
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
throw new InterruptedException();
}
} catch (Throwable t) {
cancelAcquire(node);
throw t;
}
}
整体代码逻辑与之前文章中讲述的基本一致。这里只说明一下CountDownLatch的特殊之处:
在设置了CountDownLatch以后,AQS中的state会有初始值,表示线程数。由于主线程(或者当前线程)会等待state的值减到0,否则一直等待。由于是使用的AQS队列,所以主线程的等待过程体现在将主线程以共享模式添加到AQS队列中,挂起,等待被唤醒。唤醒以后查看state的值为0,说明获取到了共享锁,可以进入临界区执行任务了。
countDown
countDown方法会调用AQS类的releaseShared方法sync.releaseShared(1);
:
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
return false;
}
其中doReleaseShared与jdk11源码--ReentrantReadWriteLock源码分析的一样,就不在赘述。
CountDownLatch中tryReleaseShared的实现,其实上面已经讲到了,tryReleaseShared只是将state的值递减1而已。
当减到0时返回true,就会执行doReleaseShared方法了,doReleaseShared回去唤醒后继节点。如此一来,被阻塞的主线程就会被唤醒了。
end.