Java源码分析之CountDownLatch

简介:         一、CountDownLatch介绍       CountDownLatch是一种同步手段,允许一个或者更多的线程等待,直到在其他线程正在执行的一组操作完成。给定count数目后CountDownLatch被初始化。

        一、CountDownLatch介绍

       CountDownLatch是一种同步手段,允许一个或者更多的线程等待,直到在其他线程正在执行的一组操作完成。给定count数目后CountDownLatch被初始化。await()方法阻塞,直到由于调用countDown()方法,当前count值达到0,之后所有等待线程被释放,而任何后续await()方法的调用会立即返回。这个是只有一次的现场,即count值无法被重设。如果你需要一个能够重设count值的版本,不妨考虑使用CyclicBarrier。

        二、CountDownLatch应用

        CountDownLatch是一个通用的同步工具,可用于许多目的。一个用count值为1来初始化的CountDownLatch可用作一个开关或者门闩:所有的线程调用await()方法等待一个线程调用countDown()方法后把门打开。一个用count值为N来初始化的CountDownLatch可用作使得一个线程等待,直到N个线程完成各自的事情,或者一些action被完成N次等。CountDownLatch一个有用的特性是:它不需要线程调用countDown()方法等待计数达到零在继续之前,它只是阻止任何线程继续过去一个等待,直到所有线程可以通过。

        1、示例应用一

        这里有一对类,一组工作线程使用两个countdown latches的示例:

        第一个是启动信号,阻止worker线程工作直到driver准备好;

        第二个是完成信号,允许driver等到所有的workers工作完成。

       代码如下:

package com.pengli.jdk;

import java.util.concurrent.CountDownLatch;

public class TestCountDownLatch {

	class Driver {
		void main() throws InterruptedException {
			CountDownLatch startSignal = new CountDownLatch(1);
			CountDownLatch doneSignal = new CountDownLatch(20);

			for (int i = 0; i < 20; ++i) // create and start threads
				new Thread(new Worker(startSignal, doneSignal)).start();

			doSomethingElse(); // don't let run yet
			startSignal.countDown(); // let all threads proceed
			doSomethingElse();
			doneSignal.await(); // wait for all to finish
		}

		void doSomethingElse() {
			// ...
		}
	}

	class Worker implements Runnable {
		private final CountDownLatch startSignal;
		private final CountDownLatch doneSignal;

		Worker(CountDownLatch startSignal, CountDownLatch doneSignal) {
			this.startSignal = startSignal;
			this.doneSignal = doneSignal;
		}

		public void run() {
			try {
				startSignal.await();
				doWork();
				doneSignal.countDown();
			} catch (InterruptedException ex) {
			} // return;
		}

		void doWork() {
			// ...
		}
	}
}

        2、示例应用二

        另一个典型用法是将一个问题分成n份,在一个线程中定义并执行一份,并在latch中count down,然后将所有的线程放入一个队列。当所有的部分完成,协调线程将会通过await()方法,继续处理。当线程必须以这种方式反复count down时,使用CyclicBarrier。

        代码如下:

package com.pengli.jdk;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;

public class TestCountDownLatch2 {

	class Driver2 { // ...
		void main() throws InterruptedException {
			CountDownLatch doneSignal = new CountDownLatch(20);
			Executor e = Executors.newFixedThreadPool(20);

			for (int i = 0; i < 20; ++i) // create and start threads
				e.execute(new WorkerRunnable(doneSignal, i));

			doneSignal.await(); // wait for all to finish
		}
	}

	class WorkerRunnable implements Runnable {
		private final CountDownLatch doneSignal;
		private final int i;

		WorkerRunnable(CountDownLatch doneSignal, int i) {
			this.doneSignal = doneSignal;
			this.i = i;
		}

		public void run() {
			doWork(i);
			doneSignal.countDown();
		}

		void doWork(int i) {
			// ...
		}
	}
}
        三、CountDownLatch实现分析

        1、Sync

        在CountDownLatch内部,有一个Sync的同步器,它继承自java.util.concurrent包中各种同步工具共用的AbstractQueuedSynchronizer,其实现如下:

    /**
     * Synchronization control For CountDownLatch.
     * Uses AQS state to represent count.
     */
    private static final class Sync extends AbstractQueuedSynchronizer {
        private static final long serialVersionUID = 4982264981922014374L;

        Sync(int count) {
            setState(count);
        }

        int getCount() {
            return getState();
        }

        protected int tryAcquireShared(int acquires) {
            return (getState() == 0) ? 1 : -1;
        }

        protected boolean tryReleaseShared(int releases) {
            // Decrement count; signal when transition to zero
            for (;;) {
                int c = getState();
                if (c == 0)
                    return false;
                int nextc = c-1;
                if (compareAndSetState(c, nextc))
                    return nextc == 0;
            }
        }
    }
        关于AbstractQueuedSynchronizer,有其它的文章进行专门的介绍。这里只分析下Sync的实现。其有一个需要入参int count的构造函数,设置AbstractQueuedSynchronizer的state。并覆写了tryAcquireShared()和tryReleaseShared()方法,其中tryReleaseShared()方法用于CountDownLatch的countDown()方法,这个tryReleaseShared()方法的逻辑如下:

        在一个for循环内,首先通过getState()获取state值,如果为0,直接返回false,否则取state-1,并尝试CAS操作,修改state状态,并且state等于0,返回true,否则返回false。

        tryAcquireShared()方法更简单,判断state(即count值),如果等于0,返回1,否则返回-1.

        2、countDown()

        countDown()方法的实现很简单,如下:

    /**
     * Decrements the count of the latch, releasing all waiting threads if
     * the count reaches zero.
     *
     * <p>If the current count is greater than zero then it is decremented.
     * If the new count is zero then all waiting threads are re-enabled for
     * thread scheduling purposes.
     *
     * <p>If the current count equals zero then nothing happens.
     */
    public void countDown() {
        sync.releaseShared(1);
    }
        其核心处理就是减少latch中count值,如果cout值为0,释放所有的等待线程。它调用的是sync的releaseShared()方法,而这个方法是在AbstractQueuedSynchronizer中实现的,如下:

    /**
     * Releases in shared mode.  Implemented by unblocking one or more
     * threads if {@link #tryReleaseShared} returns true.
     *
     * @param arg the release argument.  This value is conveyed to
     *        {@link #tryReleaseShared} but is otherwise uninterpreted
     *        and can represent anything you like.
     * @return the value returned from {@link #tryReleaseShared}
     */
    public final boolean releaseShared(int arg) {
        if (tryReleaseShared(arg)) {
            doReleaseShared();
            return true;
        }
        return false;
    }
        先调用tryReleaseShared()方法,即上述Sync的同名方法,并且如果返回true的话,继续调用doReleaseShared()方法,返回true,否则返回false。即如果修改后state(即count)值为正,不做其他处理,否则调用doReleaseShared()方法。

        3、await()

         await()方法的核心作用是,让当前线程阻塞,直到latch的count值更改为0,或者当前线程被interrupted。如果count值为0,则await()方法直接返回。代码如下:

    /**
     * Causes the current thread to wait until the latch has counted down to
     * zero, unless the thread is {@linkplain Thread#interrupt interrupted}.
     *
     * <p>If the current count is zero then this method returns immediately.
     *
     * <p>If the current count is greater than zero then the current
     * thread becomes disabled for thread scheduling purposes and lies
     * dormant until one of two things happen:
     * <ul>
     * <li>The count reaches zero due to invocations of the
     * {@link #countDown} method; or
     * <li>Some other thread {@linkplain Thread#interrupt interrupts}
     * the current thread.
     * </ul>
     *
     * <p>If the current thread:
     * <ul>
     * <li>has its interrupted status set on entry to this method; or
     * <li>is {@linkplain Thread#interrupt interrupted} while waiting,
     * </ul>
     * then {@link InterruptedException} is thrown and the current thread's
     * interrupted status is cleared.
     *
     * @throws InterruptedException if the current thread is interrupted
     *         while waiting
     */
    public void await() throws InterruptedException {
        sync.acquireSharedInterruptibly(1);
    }
        还是借助的sync,调用的其acquireSharedInterruptibly()方法,这个方法是在Sync的父类中实现的,代码如下:

    /**
     * Acquires in shared mode, aborting if interrupted.  Implemented
     * by first checking interrupt status, then invoking at least once
     * {@link #tryAcquireShared}, returning on success.  Otherwise the
     * thread is queued, possibly repeatedly blocking and unblocking,
     * invoking {@link #tryAcquireShared} until success or the thread
     * is interrupted.
     * @param arg the acquire argument
     * This value is conveyed to {@link #tryAcquireShared} but is
     * otherwise uninterpreted and can represent anything
     * you like.
     * @throws InterruptedException if the current thread is interrupted
     */
    public final void acquireSharedInterruptibly(int arg)
            throws InterruptedException {
        if (Thread.interrupted())
            throw new InterruptedException();
        if (tryAcquireShared(arg) < 0)
            doAcquireSharedInterruptibly(arg);
    }
        先调用Sync的tryAcquireShared()方法,如果返回值为负值,则调用doAcquireSharedInterruptibly()方法。上面讲到了,如果state(即count)值为0,则返回1,方法直接返回,否则进入doAcquireSharedInterruptibly()方法,实现阻塞。

        doReleaseShared()和doAcquireSharedInterruptibly()方法的介绍参见AbstractQueuedSynchronizer的分析文章。






相关文章
|
1月前
|
XML Java 编译器
Java注解的底层源码剖析与技术认识
Java注解(Annotation)是Java 5引入的一种新特性,它提供了一种在代码中添加元数据(Metadata)的方式。注解本身并不是代码的一部分,它们不会直接影响代码的执行,但可以在编译、类加载和运行时被读取和处理。注解为开发者提供了一种以非侵入性的方式为代码提供额外信息的手段,这些信息可以用于生成文档、编译时检查、运行时处理等。
71 7
|
2月前
|
数据采集 人工智能 Java
Java产科专科电子病历系统源码
产科专科电子病历系统,全结构化设计,实现产科专科电子病历与院内HIS、LIS、PACS信息系统、区域妇幼信息平台的三级互联互通,系统由门诊系统、住院系统、数据统计模块三部分组成,它管理了孕妇从怀孕开始到生产结束42天一系列医院保健服务信息。
46 4
|
2月前
|
监控 Java 应用服务中间件
高级java面试---spring.factories文件的解析源码API机制
【11月更文挑战第20天】Spring Boot是一个用于快速构建基于Spring框架的应用程序的开源框架。它通过自动配置、起步依赖和内嵌服务器等特性,极大地简化了Spring应用的开发和部署过程。本文将深入探讨Spring Boot的背景历史、业务场景、功能点以及底层原理,并通过Java代码手写模拟Spring Boot的启动过程,特别是spring.factories文件的解析源码API机制。
110 2
|
3月前
|
Java Apache Maven
Java百项管理之新闻管理系统 熟悉java语法——大学生作业 有源码!!!可运行!!!
文章提供了使用Apache POI库在Java中创建和读取Excel文件的详细代码示例,包括写入数据到Excel和从Excel读取数据的方法。
72 6
Java百项管理之新闻管理系统 熟悉java语法——大学生作业 有源码!!!可运行!!!
|
4月前
|
数据采集 运维 前端开发
【Java】全套云HIS源码包含EMR、LIS (医院信息化建设)
系统技术特点:采用前后端分离架构,前端由Angular、JavaScript开发;后端使用Java语言开发。
139 5
|
13天前
|
监控 JavaScript 数据可视化
建筑施工一体化信息管理平台源码,支持微服务架构,采用Java、Spring Cloud、Vue等技术开发。
智慧工地云平台是专为建筑施工领域打造的一体化信息管理平台,利用大数据、云计算、物联网等技术,实现施工区域各系统数据汇总与可视化管理。平台涵盖人员、设备、物料、环境等关键因素的实时监控与数据分析,提供远程指挥、决策支持等功能,提升工作效率,促进产业信息化发展。系统由PC端、APP移动端及项目、监管、数据屏三大平台组成,支持微服务架构,采用Java、Spring Cloud、Vue等技术开发。
|
1月前
|
存储 JavaScript 前端开发
基于 SpringBoot 和 Vue 开发校园点餐订餐外卖跑腿Java源码
一个非常实用的校园外卖系统,基于 SpringBoot 和 Vue 的开发。这一系统源于黑马的外卖案例项目 经过站长的进一步改进和优化,提供了更丰富的功能和更高的可用性。 这个项目的架构设计非常有趣。虽然它采用了SpringBoot和Vue的组合,但并不是一个完全分离的项目。 前端视图通过JS的方式引入了Vue和Element UI,既能利用Vue的快速开发优势,
129 13
|
2月前
|
缓存 监控 Java
Java线程池提交任务流程底层源码与源码解析
【11月更文挑战第30天】嘿,各位技术爱好者们,今天咱们来聊聊Java线程池提交任务的底层源码与源码解析。作为一个资深的Java开发者,我相信你一定对线程池并不陌生。线程池作为并发编程中的一大利器,其重要性不言而喻。今天,我将以对话的方式,带你一步步深入线程池的奥秘,从概述到功能点,再到背景和业务点,最后到底层原理和示例,让你对线程池有一个全新的认识。
65 12
|
2月前
|
Java
Java之CountDownLatch原理浅析
本文介绍了Java并发工具类`CountDownLatch`的使用方法、原理及其与`Thread.join()`的区别。`CountDownLatch`通过构造函数接收一个整数参数作为计数器,调用`countDown`方法减少计数,`await`方法会阻塞当前线程,直到计数为零。文章还详细解析了其内部机制,包括初始化、`countDown`和`await`方法的工作原理,并给出了一个游戏加载场景的示例代码。
Java之CountDownLatch原理浅析
|
1月前
|
JavaScript 安全 Java
java版药品不良反应智能监测系统源码,采用SpringBoot、Vue、MySQL技术开发
基于B/S架构,采用Java、SpringBoot、Vue、MySQL等技术自主研发的ADR智能监测系统,适用于三甲医院,支持二次开发。该系统能自动监测全院患者药物不良反应,通过移动端和PC端实时反馈,提升用药安全。系统涵盖规则管理、监测报告、系统管理三大模块,确保精准、高效地处理ADR事件。