Java并发编程笔记之CyclicBarrier源码分析

简介: JUC 中 回环屏障 CyclicBarrier 的使用与分析,它也可以实现像 CountDownLatch 一样让一组线程全部到达一个状态后再全部同时执行,但是 CyclicBarrier 可以被复用。

JUC 中 回环屏障 CyclicBarrier 的使用与分析,它也可以实现像 CountDownLatch 一样让一组线程全部到达一个状态后再全部同时执行,但是 CyclicBarrier 可以被复用。那么 CyclicBarrier 内部的实现与 CountDownLatch 有何不同那?

  CounDownLatch在解决多个线程同步方面相对于调用线程的 join 已经提供了不少改进,但是CountDownLatch的计数器是一次性的,也就是等到计数器变为0后,再调用CountDownLatch的await ()和countDown()方法都会立刻返回,这就起不到线程同步的效果了。CyclicBarrier类的功能不限于CountDownLatch所提供的功能,从字面意思理解CyclicBarrier是回环屏障的意思,它可以实现让一组线程全部达到一个状态后再全部同时执行。这里之所以叫做回环是因为当所有等待线程执行完毕之后,重置CyclicBarrier的状态后可以被重用。下图演示了这一过程。

一.CyclicBarrier的实现原理

  为了能一览CyclicBarrier的架构设计,下面先看下CyclicBarrier的类图,如下图:

如上面类图,可以知道CyclicBarrier 内部并不是直接使用AQS实现,而是使用了独占锁ReentrantLock来实现的同步;parties用来记录线程个数,用来表示需要多少线程先调用await后,所有线程才会冲破屏障继续往下运行;而 count 一开始等一parties,每当线程调用await方法后就递减 1 ,当为 0 的时候就表示所有线程都到了屏障点,另外你可能会疑惑为何维护parties 和 count 这两个变量,只有count 不就行了吗?别忘了CyclicBarries是可以被复用的,使用两个变量原因是用parties始终来记录总的线程个数,当count计数器变为 0 后,会使用parties 赋值给count,已达到复用的作用。这两个变量是在构造CyclicBarries对象的时候传递的,源码如下:

这里还有一个变量barrierConmmand也通过构造函数传递而来,这是一个任务,这个任务的执行时机是当所有线程都达到屏障点后。另外CyclicBarrier内部使用独占锁Lock来保证同时只有一个线程调用await方法时候才可以返回,使用lock首先保证了更新计数器count 的原子性,另外使用lock的条件变量 trip 支持了 线程间使用 notify,await 操作进行同步。

最后变量generation内部就一个变量broken用来记录当前屏障是否被打破,另外注意这里broken并没有被声明为volatile ,这是因为锁内使用变量不需要。源码如下:


 private static class Generation {
        boolean broken = false;
 }


 

接下来重点看一下CyclicBarrier的几个重要的函数,如下:

  1.int await() 当前线程调用 CyclicBarrier 的该方法时候,当前线程会被阻塞,知道满足下面条件之一才会返回:(1)parties 个线程都调用了 await()方法,也就是线程都到了屏障点。(2)其他线程调用了当前线程的interrupt()方法中断了当前线程,则当前线程会抛出InterruptedException 异常返回。(3)当前屏障点关联的Generation对象的broken标志被设置为true的时候,会抛出 BrokenBarrierException 异常。源码如下:


public int await() throws InterruptedException, BrokenBarrierException {
   try {
       return dowait(false, 0L);
   } catch (TimeoutException toe) {
       throw new Error(toe); // cannot happen
   }
}


正如上面代码可以知道内部调用了dowait 方法,第一个参数false说明不设置超时时间,这时候第二个参数没有意义。

 

  2.boolean await(long timeout, TimeUnit unit) 当前线程调用 CyclicBarrier 的该方法时候当前线程会被阻塞,直到满足下面条件之一才会返回: (1) parties 个线程都调用了 await() 函数,也就是线程都到了屏障点,这时候返回 true。 (2) 当设置的超时时间到了后返回 false (3) 其它线程调用了当前线程的 interrupt()方法中断了当前线程,则当前线程会抛出 InterruptedException 异常返回。 (4) 当前屏障点关联的 Generation 对象的 broken 标志被设置为 true 时候,会抛出 BrokenBarrierException 异常。源码如下:


public int await(long timeout, TimeUnit unit)
        throws InterruptedException,
               BrokenBarrierException,
               TimeoutException {
        return dowait(true, unit.toNanos(timeout));
}


正如上面代码可以知道内部调用了dowait 方法,第一个参数true说明设置超时时间,这时候第二个参数是超时时间。

 

  3.int dowait(boolean timed, long nanos) 该方法是实现 CyclicBarrier 的核心功能,源码如下:


private int dowait(boolean timed, long nanos) throws InterruptedException, BrokenBarrierException,
               TimeoutException {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
             ...

            //(1)如果index==0说明所有线程都到到了屏障点,则执行初始化时候传递的任务
            int index = --count;
            if (index == 0) {  // tripped
                boolean ranAction = false;
                try {
                    //(2)执行任务
                    if (command != null)
                        command.run();
                    ranAction = true;
                    //(3)激活其它调用await而被阻塞的线程,并重置CyclicBarrier
                    nextGeneration();
                    //返回
                    return 0;
                } finally {
                    if (!ranAction)
                        breakBarrier();
                }
            }

            // (4)如果index!=0
            for (;;) {
                try {
                     //(5)没有设置超时时间,
                     if (!timed)
                        trip.await();
                    //(6)设置了超时时间
                    else if (nanos > 0L)
                        nanos = trip.awaitNanos(nanos);
                } catch (InterruptedException ie) {
                    ...
                }
                    ...
            }
        } finally {
            lock.unlock();
        }
  }

   private void nextGeneration() {
       //(7)唤醒条件队列里面阻塞线程
       trip.signalAll();
       //(8) 重置CyclicBarrier
       count = parties;
       generation = new Generation();
    }


上面代码是dowait方法的主干代码,当一个线程调用了dowait方法后首先会获取独占锁lock,如果创建CyclicBarrier的时候传递的参数为 10 ,那么后面 9 个调用线程会被阻塞;然后当前获取线程对计数器count进行递减操作,递减后的count = index = 9 ,因为 index != 0 ,所以当前线程会执行代码(4)。如果是无参数的当前线程调用的是无参数的await()方法,则这里 timed = false,所以当前线程会被放入条件变量trip的阻塞队列,当前线程会被挂起并释放获取的Lock锁;如果调用的有参数的await 方法 则timed = true,则当前线程也会被放入条件变量阻塞队列并释放锁的资源,但是不同的是当前线程会在指定时间超时后自动激活。

当第一个获取锁的线程由于被阻塞释放锁后,被阻塞的 9 个线程中有一个会竞争到lock锁,然后执行第一个线程同样的操作,直到最后一个线程获取到lock的时候,已经有 9 个线程被放入了Lock 的条件队列里面,最后一个线程 count 递减后,count = index 等于 0 ,所以执行代码(2),如果创建CyclicBarrier的时候传递了任务,则在其他线程被唤醒前先执行任务,任务执行完毕后再执行代码(3),唤醒其他 9 个线程,并重置CyclicBarrier,然后这 10个线程就可以继续向下执行了。

 

到目前位置理解了CyclicBarrier的原理后,接下来用几个例子来加深对CyclicBarrier的理解,下面例子我们要实现的是使用两个线程去执行一个被分解的任务 A,当两个线程把自己的任务都执行完毕后在对它们的结果进行汇总处理。例子如下:


package com.hjc;

import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

/**
 * Created by cong on 2018/7/7.
 */
public class CyclicBarrierTest1 {

    // 创建一个CyclicBarrier实例,添加一个所有子线程全部到达屏障后执行的一个任务
    private static volatile CyclicBarrier cyclicBarrier = new CyclicBarrier(2, new Runnable() {
        public void run() {
            System.out.println(Thread.currentThread() + " task1 merge result");
        }
    });

    public static void main(String[] args) throws InterruptedException {

        //创建一个线程个数固定为2的线程池
        ExecutorService executorService = Executors.newFixedThreadPool(2);

        // 加入线程A到线程池
        executorService.submit(new Runnable() {
            public void run() {
                try {

                    System.out.println(Thread.currentThread() + " task1-1");

                    System.out.println(Thread.currentThread() + " enter in barrier");
                    cyclicBarrier.await();
                    System.out.println(Thread.currentThread() + " enter out barrier");

                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        });

        // 加入线程B到线程池
        executorService.submit(new Runnable() {
            public void run() {
                try {
                    System.out.println(Thread.currentThread() + " task1-2");

                    System.out.println(Thread.currentThread() + " enter in barrier");
                    cyclicBarrier.await();
                    System.out.println(Thread.currentThread() + " enter out barrier");

                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        });

        // 关闭线程池
        executorService.shutdown();
    }
}


运行结果如下:

如上代码创建了一个 CyclicBarrier 对象,第一个参数为计数器初始值,第二个参数 Runable 是指当计数器为 0 时候需要执行的任务。main 函数里面首先创建了固定大小为 2 的线程池,然后添加两个子任务到线程池,每个子任务在执行完自己的逻辑后会调用 await 方法。

一开始计数器为 2,当第一个线程调用 await 方法时候,计数器会递减为 1,由于计数器不为 0,所以当前线程就到了屏障点会被阻塞,然后第二个线程调用 await 时候,会进入屏障,计数器也会递减现在计数器为 0,就会去执行在 CyclicBarrier 构造时候的任务,执行完毕后就会退出屏障点,并且会唤醒被阻塞的第一个线程,这时候第一个线程也会退出屏障点继续向下运行。

上面的例子说明了多个线程之间是相互等待的,假如计数器为 N,那么调用 await 方法的前面 N-1 的线程都会因为到达屏障点被阻塞,当第 N 个线程调用 await 后,计数器为 0 了,这时候第 N 个线程才会发出通知唤醒前面的 N-1 个线程。也就是全部线程达到屏障点时候才能一块继续向下执行,对与这个例子来说使用 CountDownLatch 也可以达到类似输出结果。

 

下面在放个例子来说明 CyclicBarrier 的可复用性。

假设一个任务由阶段 1、阶段 2、阶段 3 组成,每个线程要串行的执行阶段 1 和 2 和 3,多个线程执行该任务时候,必须要保证所有线程的阶段 1 全部完成后才能进行阶段 2 执行,所有线程的阶段 2 全部完成后才能进行阶段 3 执行,下面使用 CyclicBarrier 来完成这个需求。例子如下:


package com.hjc;

import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

/**
 * Created by cong on 2018/7/7.
 */
public class CyclicBarrierTest2 {

    // 创建一个CyclicBarrier实例
    private static volatile CyclicBarrier cyclicBarrier = new CyclicBarrier(2);

    public static void main(String[] args) throws InterruptedException {

        ExecutorService executorService = Executors.newFixedThreadPool(2);

        // 加入线程A到线程池
        executorService.submit(new Runnable() {
            public void run() {
                try {

                    System.out.println(Thread.currentThread() +  " step1");
                    cyclicBarrier.await();

                    System.out.println(Thread.currentThread() +  " step2");
                    cyclicBarrier.await();

                    System.out.println(Thread.currentThread() +  " step3");

                } catch (Exception e) {
                    // TODO Auto-generated catch block
                    e.printStackTrace();
                }
            }
        });

        // 加入线程B到线程池
        executorService.submit(new Runnable() {
            public void run() {
                try {
                    System.out.println(Thread.currentThread() +  " step1");
                    cyclicBarrier.await();

                    System.out.println(Thread.currentThread() +  " step2");
                    cyclicBarrier.await();

                    System.out.println(Thread.currentThread() +  " step3");

                } catch (Exception e) {
                    // TODO Auto-generated catch block
                    e.printStackTrace();
                }
            }
        });

        //关闭线程池
        executorService.shutdown();
    }
}


运行结果如下:

如上代码,在每个子线程执行完 step1 后都调用了 await 方法,所有线程都到达屏障点后才会一块往下执行,这就保证了所有线程完成了 step1 后才会开始执行 step2,然后在 step2 后面调用了 await 方法,这保证了所有线程的 step2 完成后,线程才能开始 step3 的执行,这个功能使用单个 CountDownLatch 是无法完成的。

目录
相关文章
|
7天前
|
安全 Java 程序员
深入理解Java内存模型与并发编程####
本文旨在探讨Java内存模型(JMM)的复杂性及其对并发编程的影响,不同于传统的摘要形式,本文将以一个实际案例为引子,逐步揭示JMM的核心概念,包括原子性、可见性、有序性,以及这些特性在多线程环境下的具体表现。通过对比分析不同并发工具类的应用,如synchronized、volatile关键字、Lock接口及其实现等,本文将展示如何在实践中有效利用JMM来设计高效且安全的并发程序。最后,还将简要介绍Java 8及更高版本中引入的新特性,如StampedLock,以及它们如何进一步优化多线程编程模型。 ####
14 0
|
9天前
|
Java 程序员
Java编程中的异常处理:从基础到高级
在Java的世界中,异常处理是代码健壮性的守护神。本文将带你从异常的基本概念出发,逐步深入到高级用法,探索如何优雅地处理程序中的错误和异常情况。通过实际案例,我们将一起学习如何编写更可靠、更易于维护的Java代码。准备好了吗?让我们一起踏上这段旅程,解锁Java异常处理的秘密!
|
6天前
|
安全 算法 Java
Java多线程编程中的陷阱与最佳实践####
本文探讨了Java多线程编程中常见的陷阱,并介绍了如何通过最佳实践来避免这些问题。我们将从基础概念入手,逐步深入到具体的代码示例,帮助开发者更好地理解和应用多线程技术。无论是初学者还是有经验的开发者,都能从中获得有价值的见解和建议。 ####
|
9天前
|
安全 Java 编译器
Kotlin教程笔记(27) -Kotlin 与 Java 共存(二)
Kotlin教程笔记(27) -Kotlin 与 Java 共存(二)
|
6天前
|
Java 调度
Java中的多线程编程与并发控制
本文深入探讨了Java编程语言中多线程编程的基础知识和并发控制机制。文章首先介绍了多线程的基本概念,包括线程的定义、生命周期以及在Java中创建和管理线程的方法。接着,详细讲解了Java提供的同步机制,如synchronized关键字、wait()和notify()方法等,以及如何通过这些机制实现线程间的协调与通信。最后,本文还讨论了一些常见的并发问题,例如死锁、竞态条件等,并提供了相应的解决策略。
23 3
|
9天前
|
Java 开发工具 Android开发
Kotlin教程笔记(26) -Kotlin 与 Java 共存(一)
Kotlin教程笔记(26) -Kotlin 与 Java 共存(一)
|
11天前
|
开发框架 安全 Java
Java 反射机制:动态编程的强大利器
Java反射机制允许程序在运行时检查类、接口、字段和方法的信息,并能操作对象。它提供了一种动态编程的方式,使得代码更加灵活,能够适应未知的或变化的需求,是开发框架和库的重要工具。
31 2
|
12天前
|
安全 Java 开发者
Java中的多线程编程:从基础到实践
本文深入探讨了Java多线程编程的核心概念和实践技巧,旨在帮助读者理解多线程的工作原理,掌握线程的创建、管理和同步机制。通过具体示例和最佳实践,本文展示了如何在Java应用中有效地利用多线程技术,提高程序性能和响应速度。
46 1
|
8天前
|
Java 数据库连接 编译器
Kotlin教程笔记(29) -Kotlin 兼容 Java 遇到的最大的“坑”
Kotlin教程笔记(29) -Kotlin 兼容 Java 遇到的最大的“坑”
26 0
|
25天前
|
监控 安全 Java
Java中的多线程编程:从入门到实践####
本文将深入浅出地探讨Java多线程编程的核心概念、应用场景及实践技巧。不同于传统的摘要形式,本文将以一个简短的代码示例作为开篇,直接展示多线程的魅力,随后再详细解析其背后的原理与实现方式,旨在帮助读者快速理解并掌握Java多线程编程的基本技能。 ```java // 简单的多线程示例:创建两个线程,分别打印不同的消息 public class SimpleMultithreading { public static void main(String[] args) { Thread thread1 = new Thread(() -> System.out.prin