1 如何使用
构造函数接收一个int类型的参数作为计数器,表示需等待N个点完成。
调用countDown方法时,N就会减1,CountDownLatch的await方法会阻塞当前线程,直到N变成零。
java
代码解读
复制代码
CountDownLatch latch = new CountDownLatch(3);
new Thread(() -> {
// do something
latch.countDown();
}).start();
new Thread(() -> {
// do something
latch.countDown();
}).start();
new Thread(() -> {
// do something
latch.countDown();
}).start();
// 主线程阻塞
latch.await();
// 主线程继续执行
log.debug("wait end...");
2 类比join()
使用Thread.join()
时,主线程需等待另一个线程终结后,才能继续执行。有以下不足:
- 不能在另一个线程执行中途,就让主线程结束等待开始执行;
- 主线程必须持有其他线程的引用,才能调用
join()
。在使用线程池如ThreadPoolExecutor时,在主线程中没法拿到线程池中线程引用,无法调用join。
CountDownLatch避免了以上不足,countDown()
可以在任何地方调用,且调用await()
只需等到计数为0,而不是等其他线程终结。
java
代码解读
复制代码
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
@Slf4j
public class TestCountDownLatch {
public static void main(String[] args) throws InterruptedException, ExecutionException {
CountDownLatch latch = new CountDownLatch(3);
ExecutorService service = Executors.newFixedThreadPool(2);
service.submit(() -> {
try {
TimeUnit.SECONDS.sleep(1);
latch.countDown();
log.debug("end...{}", latch.getCount());
} catch (InterruptedException e) {
}
});
service.submit(() -> {
try {
TimeUnit.SECONDS.sleep(1);
latch.countDown();
TimeUnit.SECONDS.sleep(1);
// 在一个线程任务的不同阶段,可以多次countDown()
latch.countDown();
TimeUnit.SECONDS.sleep(1);
log.debug("end...{}", latch.getCount());
} catch (InterruptedException e) {
}
});
service.submit(() -> {
try {
log.debug("waiting...");
latch.await();
log.debug("wait end...");
} catch (InterruptedException e) {
}
});
service.shutdown();
}
}
3 原理
内部类Sync,是AbstractQueuedSynchronizer的子类,是AQS的共享模式。CountDownLatch对同步状态的所有操作,都转交给Sync实例。
3.1 初始化
主线程需要等待几个操作(或线程)先完成,count就取几。
3.2 countDown方法
调用countDown()
,会将state减1。 如果调用countDown()
之前,state已经为0了,将不做任何操作(返回false)。 如果调用countDown()
之后,state变为了0,将返回true,AQS的releaseShared(int arg)
将唤醒同步队列中第一个阻塞的线程。
3.3 await()方法
调用await()
时,转交给Sync处理。 如果state==0,await()
方法将返回1(大于0),将直接退出。否则返回-1,当前线程进入同步队列等待。
总结:
- 当某个
countDown()
将state减为0时,唤醒同步队列中第一个调用await()
而阻塞的线程A。 - 线程A被unpark后,因为state==0,tryAcquireShared(arg)必然成功;
- 如果线程A的next节点是SHARED模式,将会调用
doReleaseShared()
,将唤醒同步队列中下一个阻塞线程。然后线程A退出; - 最终,同步队列中所有等待线程被依次唤醒了。
可见,AQS的共享模式下,releaseShared()会级联唤醒同步队列中的阻塞线程。
4 使用
来看一个游戏加载场景:假设每局游戏有10个玩家,等所有玩家都加载完毕时,游戏开始。
使用CountDownLatch来实现,代码如下:
java
代码解读
复制代码
import java.util.Arrays;
import java.util.Random;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicInteger;
@Slf4j
public class TestCountDownLatch {
public static void main(String[] args) throws InterruptedException, ExecutionException {
AtomicInteger num = new AtomicInteger(1);
ExecutorService service = Executors.newFixedThreadPool(10,
(r) -> new Thread(r, "player-" + num.getAndIncrement()));
CountDownLatch latch = new CountDownLatch(10);
String[] all = new String[10];
Random r = new Random();
for (int j = 0; j < 10; j++) {
final int index = j;
service.submit(() -> {
String name = Thread.currentThread().getName();
for (int i = 0; i <= 100; i++) {
try {
// 模拟出不同加载进度
Thread.sleep(r.nextInt(100));
} catch (InterruptedException e) {
}
all[index] = name + "= " + i + "%";
// 不换行覆盖输出
System.out.print("\r" + Arrays.toString(all));
}
latch.countDown();
});
}
latch.await();
System.out.println("\n" + "欢迎来到王者峡谷......");
service.shutdown();
}
}