AbstractQueuedSynchronizer理解(CountDownLatch)

简介: AbstractQueuedSynchronizer理解(CountDownLatch)

本文分析一下CountDownLatch是如何运用AQS的

CountDownLatch是什么

CountDownLatch顾名思义它是一个Latch(门闩),它是用一个计数器实现的,初始状态计数器的数值等于线程数,每当有线程完成任务后,计数器就会减一。当state为0时,锁就会被释放,凡是之前因抢占锁而等待的线程这时候就会被唤醒继续抢占锁。

CountDownLatch小栗子

public static void main(String[] args) throws InterruptedException{
    int threadSize = 3;
    CountDownLatch doneSignal = new CountDownLatch(threadSize);

    for (int i = 1; i <= threadSize; i++) {
        final int threadNum = i;
        new Thread(() -> {
            System.out.println("thread" + threadNum + ":start");

            try {
                Thread.sleep(1000 * threadNum);
            } catch (InterruptedException e) {
                System.out.println("thread" + threadNum + ":exception");
            }

            doneSignal.countDown();
            System.out.println("thread" + threadNum + ":complete");
        }).start();
    }

    System.out.println("main thread:await");
    doneSignal.await();
    System.out.println("main thread:go on");
}

例子中主线程启动了三条子线程,睡眠一段时间,此时主线程在等待所有子线程结束后才会继续执行下去;
看一下输出结果:

main thread:await
thread1:start
thread2:start
thread3:start
thread1:complete
thread2:complete
thread3:complete
main thread:go on

Process finished with exit code 0

CountDownLatch原理分析

既然CountDownLatch也是AQS的一种使用方式,我们看一下它的内部类Syc是怎么实现AQS的:

private static final class Sync extends AbstractQueuedSynchronizer {
    private static final long serialVersionUID = 4982264981922014374L;
    
    //构造函数,初始化同步状态state的值,即线程个数
    Sync(int count) {
        setState(count);
    }

    int getCount() {
        return getState();
    }

    //这里重写了方法,在共享模式下,告诉调用者是否可以抢占state锁了,正数代表可以,负数代表否定;当state为0时返回正数
    protected int tryAcquireShared(int acquires) {
        return (getState() == 0) ? 1 : -1;
    }

    //共享模式下释放锁
    protected boolean tryReleaseShared(int releases) {
        // Decrement count; signal when transition to zero
        for (;;) {
            int c = getState();
            //state为0时说明没有什么可释放
            if (c == 0)
                return false;
            int nextc = c-1;
            if (compareAndSetState(c, nextc))
                //CAS对state操作成功后返回state值是否为0,为0则释放成功
                return nextc == 0;
        }
    }
}

看完了重写的AQS同步器后,我们了解了CountDownLatch对state锁的描述。接下来先看主线程调用的await方法,在await方法里调用了AQS的acquireSharedInterruptibly:

//在共享模式下尝试抢占锁
public final void acquireSharedInterruptibly(int arg)
        throws InterruptedException {
    //线程中断抛出异常
    if (Thread.interrupted())
        throw new InterruptedException();
    //尝试抢占前先查询一下是否可以抢占,如果返回值大于0程序往下执行,小于0则等待
    if (tryAcquireShared(arg) < 0)
        doAcquireSharedInterruptibly(arg);
}


private void doAcquireSharedInterruptibly(int arg)
    throws InterruptedException {
    //在Reentrant解析中我们看过,往队列中新增node(共享模式)
    final Node node = addWaiter(Node.SHARED);
    boolean failed = true;
    try {
        for (;;) {
            final Node p = node.predecessor();
            if (p == head) {
                //如果当前node的前继时head,马上尝试抢占锁
                int r = tryAcquireShared(arg);
                if (r >= 0) {
                    //如果state==0即允许往下执行,重新设置head并往下传播信号
                    setHeadAndPropagate(node, r);
                    p.next = null; // help GC
                    failed = false;
                    //得到往下执行的允许
                    return;
                }
            }
            //以下都跟Reentrant一样
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                throw new InterruptedException();
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}

private void setHeadAndPropagate(Node node, int propagate) {
    Node h = head; // Record old head for check below
    //将当前node设置为head,清空node的thread、prev
    setHead(node);
    /*
     * Try to signal next queued node if:
     *   Propagation was indicated by caller,
     *     or was recorded (as h.waitStatus either before
     *     or after setHead) by a previous operation
     *     (note: this uses sign-check of waitStatus because
     *      PROPAGATE status may transition to SIGNAL.)
     * and
     *   The next node is waiting in shared mode,
     *     or we don't know, because it appears null
     *
     * The conservatism in both of these checks may cause
     * unnecessary wake-ups, but only when there are multiple
     * racing acquires/releases, so most need signals now or soon
     * anyway.
     */
    //如果propagate大于0,或者原来head的等待状态小于0或者现在head的等待状态小于0
    if (propagate > 0 || h == null || h.waitStatus < 0 ||
        (h = head) == null || h.waitStatus < 0) {
        Node s = node.next;
        //准备唤醒下一个节点
        if (s == null || s.isShared())
            doReleaseShared();
    }
}

private void doReleaseShared() {
    /*
     * Ensure that a release propagates, even if there are other
     * in-progress acquires/releases.  This proceeds in the usual
     * way of trying to unparkSuccessor of head if it needs
     * signal. But if it does not, status is set to PROPAGATE to
     * ensure that upon release, propagation continues.
     * Additionally, we must loop in case a new node is added
     * while we are doing this. Also, unlike other uses of
     * unparkSuccessor, we need to know if CAS to reset status
     * fails, if so rechecking.
     */
    for (;;) {
        Node h = head;
        if (h != null && h != tail) {
            int ws = h.waitStatus;
            if (ws == Node.SIGNAL) {
                //如果head的状态为SIGNAL,更改状态为0
                if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
                    continue;            // loop to recheck cases
                //唤醒后继节点
                unparkSuccessor(h);
            }
            //如果head状态为0,更改状态为PROPAGATE
            else if (ws == 0 &&
                     !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
                continue;                // loop on failed CAS
        }
        //如果head没有改变,结束当前loop,如果遇到head被别的线程改变,继续loop
        if (h == head)                   // loop if head changed
            break;
    }
}

释放锁的信号一直向后传播,直到所有node被唤醒并继续执行,那第一个信号时何时发起的呢?我们来看一下CountDownLatch的countDown方法,该方法调用了sync的releaseShared方法:

public final boolean releaseShared(int arg) {
    if (tryReleaseShared(arg)) {
        //如果同步状态state为0时,调用doReleaseShared,在这里就发出了第一个唤醒所有等待node的信号,然后信号自动往后传播
        doReleaseShared();
        return true;
    }
    return false;
}

总结

CountDownLatch在调用await的时候判断state释放为0,如果大于0则阻塞当前线程,将当前线程的node添加到队列中等待;在调用countDown时当遇到state减到0时,发出释放共享锁的信号,从头节点的后记节点开始往后传递信号,将队列等待的线程逐个唤醒并继续往下执行;
在这里state跟Reentrant的state独占锁含义不同,state的含义是由AQS的子类去描述的。

相关文章
|
负载均衡 Java API
Spring Cloud Gateway 详解:构建高效的API网关解决方案
Spring Cloud Gateway 详解:构建高效的API网关解决方案
793 0
|
XML JSON Java
Spring Boot 返回 XML 数据,一分钟搞定!
Spring核心技术 67 篇文章13 订阅 订阅专栏 Spring Boot 返回 XML 数据,前提必须已经搭建了 Spring Boot 项目,所以这一块代码就不贴了,可以点击查看之前分享的 Spring Boot 返回 JSON 数据,一分钟搞定!。
Spring Boot 返回 XML 数据,一分钟搞定!
|
索引 Python
【Pandas】Pandas Dataframe 常用用法
Pandas DataFrame的常用操作示例,包括筛选数据、索引操作、合并DataFrame、设置和排序索引、文本处理、列重命名、处理缺失值、排序以及删除满足特定条件的行等技巧。
594 0
|
弹性计算 网络协议 安全
阿里云ECS云服务器安全组配置开放端口方法教程
阿里云ECS云服务器安全组配置开放端口方法教程,阿里云服务器端口怎么打开?云服务器ECS端口在安全组中开启,轻量应用服务器端口在防火墙中打开,阿里云服务器网以80端口为例,来详细说下阿里云服务器端口开放图文教程,其他的端口如8080、3306、443、1433也是同样的方法进行开启端口:
1674 0
|
前端开发
怎么使用async-validator快速校验表单
怎么使用async-validator快速校验表单
675 0
|
监控 Java Shell
监控堆外第三方监控工具Zabbix
监控堆外第三方监控工具Zabbix
406 5
|
SQL 分布式计算 Oracle
使用Sqoop从Oracle数据库导入数据
使用Sqoop从Oracle数据库导入数据
使用Sqoop从Oracle数据库导入数据
|
关系型数据库 MySQL 数据库
MySQL - 查看 / 修改配置参数(Global Variables)
MySQL - 查看 / 修改配置参数(Global Variables)
1362 0
|
存储 消息中间件 Apache
Apache Pulsar简介
Apache Pulsar What is Pulsar "Pulsar is a distributed pub-sub messaging platform with a very flexible messaging model and an intuitive client API." Pulsar是pub-sub模式的分布式消息平台,拥有灵活的消息模型和直观的客户端API。
3682 0

热门文章

最新文章

下一篇
开通oss服务