CountDownLatch介绍
CountDownLatch 是 java1.5 之后被引入的,是 java.util.concurrent
包下的一个同步工具类,它允许一个或多个线程等待,直到在其他线程中一组操作执行完成。
CountDownLatch 主要有 countDown
方法和 await
方法。CountDownLatch 在初始化时,需要指定一个整数作为计数器。当调用 countDown 方法时,计数器会被减1;当调用await方法时,如果计数器大于0时,线程会被阻塞,直到计数器被 countDown 方法减到0时,线程才会继续执行。计数器是无法重置的,当计数器被减到0时,调用await方法都会直接返回。
调用 countDown 方法的线程可以继续执行,不需要等待计数器被减到0,只有调用await方法的线程才需要等待。
源码分析
// CountDownLatch有一个内部类叫做Sync,
// 它继承了AbstractQueuedSynchronizer类,
// 其中维护了一个整数state,并且保证了修改state的可见性和原子性。
public class CountDownLatch {
/**
* Synchronization control For CountDownLatch.
* Uses AQS state to represent count.
* 内部类
*/
private static final class Sync extends AbstractQueuedSynchronizer {
private static final long serialVersionUID = 4982264981922014374L;
Sync(int count) {
setState(count);
}
int getCount() {
return getState();
}
protected int tryAcquireShared(int acquires) {
return (getState() == 0) ? 1 : -1;
}
protected boolean tryReleaseShared(int releases) {
// Decrement count; signal when transition to zero
for (;;) {
int c = getState();
if (c == 0)
return false;
int nextc = c-1;
if (compareAndSetState(c, nextc))
return nextc == 0;
}
}
}
// 持有内部类
private final Sync sync;
// 创建CountDownLatch实例时,也会创建一个Sync的实例,同时把计数器的值传给Sync实例
public CountDownLatch(int count) {
if (count < 0) throw new IllegalArgumentException("count < 0");
this.sync = new Sync(count);
}
// await方法中调用了Sync实例的acquireSharedInterruptibly方法
public void await() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}
public boolean await(long timeout, TimeUnit unit)
throws InterruptedException {
return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
}
// 在countDown方法中,只调用了Sync实例的releaseShared方法
public void countDown() {
sync.releaseShared(1);
}
public long getCount() {
return sync.getCount();
}
}
/**
* The synchronization state.
* 这是 AbstractQueuedSynchronizer 类中的一个整数变量 state,
* 并且保证了修改state的可见性和原子性。
*/
private volatile int state;
// 其中的releaseShared方法,先对计数器进行减1操作,如果减1后的计数器为0,唤醒被await方法阻塞的所有线程
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) { //对计数器进行减一操作
doReleaseShared(); //如果计数器为0,唤醒被await方法阻塞的所有线程
return true;
}
return false;
}
protected boolean tryReleaseShared(int releases) {
for (;;) {//死循环,如果CAS操作失败就会不断继续尝试。
int c = getState();//获取当前计数器的值。
if (c == 0)// 计数器为0时,就直接返回。
return false;
int nextc = c-1;
if (compareAndSetState(c, nextc))// 使用CAS方法对计数器进行减1操作
return nextc == 0;//如果操作成功,返回计数器是否为0
}
}
// 判断计数器是否为0,如果不为0则阻塞当前线程
public final void acquireSharedInterruptibly(int arg)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
if (tryAcquireShared(arg) < 0)
doAcquireSharedInterruptibly(arg);
}
// 其中tryAcquireShared方法,是AbstractQueuedSynchronizer中的一个模板方法,其具体实现在Sync类中,其主要是判断计数器是否为零,如果为零则返回1,如果不为零则返回-1
protected int tryAcquireShared(int acquires) {
return (getState() == 0) ? 1 : -1;
}
实践
模拟一个业务场景,某公司需要导出公司所有部门,每个部门的加班时长统计数据,假设每个部门的数据量很大,这时可以每个部门开一个线程去统计,等所有线程统计完成,再把统计数据填充到excel导出。
package com.nobody.domain;
import java.util.concurrent.CountDownLatch;
/**
* 演示 CountDownLatch
*
* @author Μr.ηobοdy
*
* @date 2020-05-12
*
*/
public class TestCountDownLatch {
public static void main(String[] args) throws InterruptedException {
CountDownLatch latch = new CountDownLatch(2);
// 模拟统计A部门人员加班时长
new Thread(() -> {
System.out.println("开始统计A部门人员加班时长情况...");
try {
Thread.sleep(4000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("结束统计A部门人员加班时长情况...");
latch.countDown();
}).start();
// 模拟统计B部门人员加班时长
new Thread(() -> {
System.out.println("开始统计B部门人员加班时长情况...");
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("结束统计B部门人员加班时长情况...");
latch.countDown();
}).start();
Thread.sleep(100);
System.out.println("等待所有部门统计...");
latch.await();
System.out.println("所有部门统计结束,导出数据...");
}
}
输出结果
开始统计A部门人员加班时长情况...
开始统计B部门人员加班时长情况...
等待所有部门统计...
结束统计B部门人员加班时长情况...
结束统计A部门人员加班时长情况...
所有部门统计结束,导出数据...
如果某些线程因为数据量大,或者服务调用链时间长,统计很久还没结果?难道要一直等待? 那可以将latch.await();
换为await(long timeout, TimeUnit unit)
方法,即超过设定的时间就不再阻塞等待。
System.out.println("等待所有部门统计...");
latch.await(3000, TimeUnit.MILLISECONDS);
System.out.println("所有部门统计结束,导出数据...");
输入结果
开始统计A部门人员加班时长情况...
开始统计B部门人员加班时长情况...
等待所有部门统计...
结束统计B部门人员加班时长情况...
所有部门统计结束,导出数据...
结束统计A部门人员加班时长情况...