【JUC基础】08. 三大工具类

简介: JUC包中包含了三个非常实用的工具类:CountDownLatch(倒计数器),CyclicBarrier(循环栅栏),Semaphore(信号量)。

1、前言

JUC包中包含了三个非常实用的工具类:CountDownLatch(倒计数器),CyclicBarrier(循环栅栏),Semaphore(信号量)。

2、倒计数器:CountDownLatch

2.1、什么是CountDownLatch

英文中Count Down意为倒计数,Latch意为门闩,所以简单称之为倒计数器。门闩的含义就是把门锁起来,不让里面的线程跑出来。因此这个工具通常用来控制线程等待。它可以让某一个线程等待直到倒计时结束,在开始执行。

来看API文档:

image.png

相应API:

image.png

2.2、使用

如何使用,JDK API文档给出来了示例用法:CountDownLatch (Java Platform SE 8 )

    • 示例用法:

    这是一组类,其中一组工作线程使用两个倒计时锁存器:

      • 第一个是启动信号,防止任何工作人员进入,直到驾驶员准备好继续前进;
      • 第二个是完成信号,允许司机等到所有的工作人员完成。
      class Driver { 
          // ... 
          void main() throws InterruptedException { 
              CountDownLatch startSignal = new CountDownLatch(1); 
              CountDownLatch doneSignal = new CountDownLatch(N); 
              for (int i = 0; i < N; ++i) // create and start threads 
                  new Thread(new Worker(startSignal, doneSignal)).start(); 
              doSomethingElse(); // don't let run yet startSignal.countDown(); // let all threads proceed doSomethingElse(); 
              doneSignal.await(); // wait for all to finish 
           } 
      } 
      class Worker implements Runnable { 
          private final CountDownLatch startSignal; 
          private final CountDownLatch doneSignal; 
          Worker(CountDownLatch startSignal, CountDownLatch doneSignal) { 
              this.startSignal = startSignal; 
              this.doneSignal = doneSignal; 
          } 
          public void run() { 
              try { 
                  startSignal.await(); 
                  doWork(); 
                  doneSignal.countDown(); 
              } catch (InterruptedException ex) {
              } 
              // return; 
          } 
          void doWork() { ... } 
      }

      image.gif

      内存一致性效果:直到计数调用之前达到零,在一个线程操作countDown() happen-before以下由相应的成功返回行动await()在另一个线程。

      示例代码:

      场景:某一个程序测试期间发现10个bug,需要将10个bug全部修复完毕,才能进行上线。

      package juc.util;
      import cn.hutool.core.thread.ThreadUtil;
      import java.util.concurrent.CountDownLatch;
      import java.util.concurrent.ExecutorService;
      /**
       * @author Shamee loop
       * @date 2023/5/16
       */
      public class CountDownLatchTest {
          public static void main(String[] args) throws InterruptedException {
              ExecutorService executorService = ThreadUtil.newFixedExecutor(10, "countdownlatch-pool-", true);
              // 生成一个CountDownLatch实例,计数数量10。
              // 这里表示需要10个线程完成任务后,等待在CountDownLatch上的线程才能继续执行。
              CountDownLatch end = new CountDownLatch(10);
              for (int i = 1; i <= 10; i++) {
                  int finalI = i;
                  executorService.submit(() -> {
                      try {
                          // 这里模拟bug修复时长1s
                          ThreadUtil.sleep(1000);
                          System.out.println(Thread.currentThread().getName() + "第[" + finalI + "]个bug修复完成");
                      } finally {
                          // 计数器减1
                          end.countDown();
                      }
                  });
              }
              // 等待所有任务全部完成,主线程才能继续执行
              end.await();
              System.out.println("bug全部修复完成,项目上线");
              // 关闭线程池
              executorService.shutdown();
          }
      }

      image.gif

      运行结果:

      image.png

      3、循环栅栏:CyclicBarrier

      3.1、什么是CyclicBarrier

      CyclicBarrier是另外一种多线程并发控制工具。和CountDownLatch类似,他也可以实现线程间的计数等待,但他的功能比CountDownLatch更强大且复杂。

      Cyclic意为循环,Barrier意为障碍或栅栏。所以简单称之为循环栅栏。既然名为栅栏,顾名思义就是用来阻止线程继续执行,要求线程在栅栏外等待。既然是循环栅栏,也就是该计数器可以循环使用。如我们将计数器设置为10,那么凑齐一批10个线程后,计数器就会归零,接着凑齐下一批。

      来看官方API文档:

      image.png

      相应API:

      image.png

      3.2、使用

      CyclicBarrier (Java Platform SE 8 )

        • 示例用法:

        以下是在并行分解设计中使用障碍的示例:

        public class Solver {
            final int N;
            final float[][] data;
            final CyclicBarrier barrier;
            class Worker implements Runnable {
                int myRow;
                Worker(int row) {
                    myRow = row; 
                }
                public void run() { 
                    while (!done()) { 
                        processRow(myRow); 
                        try { 
                            barrier.await(); 
                        } catch (InterruptedException ex) { 
                            return; 
                        } catch (BrokenBarrierException ex) { 
                            return; 
                        } 
                    } 
                } 
            } 
            public Solver(float[][] matrix) { 
                data = matrix; 
                N = matrix.length; 
                Runnable barrierAction = new Runnable() { 
                    public void run() { 
                        mergeRows(...); 
                    }
                }; 
                barrier = new CyclicBarrier(N, barrierAction); 
                List<Thread> threads = new ArrayList<Thread>(N); 
                for (int i = 0; i < N; i++) { 
                    Thread thread = new Thread(new Worker(i)); 
                    threads.add(thread); 
                    thread.start(); 
                } // wait until done 
                for (Thread thread : threads) thread.join(); 
            } 
        }

        image.gif

        这里,每个工作线程处理矩阵的一行,然后等待屏障,直到所有行都被处理。

        当处理所有行时,执行提供的Runnable屏障操作并合并行。 如果合并确定已经找到解决方案,那么done()将返回true ,并且每个工作人员将终止。

        如果屏障操作不依赖于执行方暂停的各方,那么该方可以在释放任何线程时执行该操作。 为了方便这一点,每次调用await()返回该线程在屏障上的到达索引。 然后,您可以选择哪个线程应该执行屏障操作,例如:

        if (barrier.await() == 0) { // log the completion of this iteration }

        image.gif

        CyclicBarrier对失败的同步尝试使用all-or-none断裂模型:如果线程由于中断,故障或超时而过早离开障碍点,那么在该障碍点等待的所有其他线程也将通过BrokenBarrierException (或InterruptedException)异常离开如果他们也在同一时间被打断)。

        内存一致性效果:线程中调用的行动之前, await() happen-before行动是屏障操作的一部分,进而发生,之前的动作之后,从相应的成功返回await()其他线程。

        示例代码:

        场景:某团购活动,每3人报名,则团购组团成功。

        package juc.util;
        import cn.hutool.core.thread.ThreadUtil;
        import java.util.concurrent.BrokenBarrierException;
        import java.util.concurrent.CyclicBarrier;
        import java.util.concurrent.ExecutorService;
        /**
         * @author Shamee loop
         * @date 2023/5/16
         */
        public class CyclicBarrierTest {
            public static void main(String[] args) throws InterruptedException {
                ExecutorService executorService = ThreadUtil.newFixedExecutor(3, "CyclicBarrier-Pool-", true);
                // 等待3人报名成功,打印组团成功
                CyclicBarrier barrier = new CyclicBarrier(3, () -> System.out.println("每3个人报名,则组团成功"));
                // 模拟10个人报名
                for (int i = 1; i <= 10; i++) {
                    int finalI = i;
                    executorService.submit(() -> {
                        // 这里模拟每个人报名时长1s
                        ThreadUtil.sleep(1000);
                        System.out.println(Thread.currentThread().getName() + "第[" + finalI + "]个报名成功");
                        try {
                            barrier.await();
                        } catch (InterruptedException | BrokenBarrierException e) {
                            e.printStackTrace();
                        }
                    });
                }
                // 关闭线程池
                executorService.shutdown();
            }
        }

        image.gif

        执行结果:

        image.png

        从执行结果来看,确实满足我们的预期。但是我们发现第10个人报名的时候永远凑不齐3人组团,所以主线程会一直等待下去。当然await()的时候可以给予在屏障点的等待超时时间,如果超过时间还没等待到,那么就超时退出,避免程序卡死。

        注:CyclicBarrier对失败的同步尝试使用all-or-none断裂模型:如果线程由于中断,故障或超时而过早离开障碍点,那么在该障碍点等待的所有其他线程也将通过BrokenBarrierException (或InterruptedException)异常离开如果他们也在同一时间被打断)。

        4、信号量:Semaphore

        4.1、什么是Semaphore

        Semaphore意为信号量。为多线程写作提供了更为强大的控制方法,可以说是对锁的扩展。无论对Synchronized或ReentrantLock,一次都只允许一个线程访问一个资源,而Semaphore可以指定多个线程同时访问某一个资源。

        官方API文档说明:

        image.png

        相关API:

        image.png

        4.2、使用

        Semaphore (Java Platform SE 8 )

          • 信号量通常用于限制线程数,而不是访问某些(物理或逻辑)资源。

          例如,这是一个使用信号量来控制对一个项目池的访问的类:

          public class Pool {
              private static final int MAX_AVAILABLE = 100;
              private final Semaphore available = new Semaphore(MAX_AVAILABLE, true);
              public Object getItem() throws InterruptedException {
                  available.acquire();
                  return getNextAvailableItem();
              }
              public void putItem(Object x) {
                  if (markAsUnused(x))
                      available.release();
              }
              //     Not a particularly efficient data structure; just for demo
          //    protected Object[] items = ... whatever kinds of items being managed 
              protected boolean[] used = new boolean[MAX_AVAILABLE];
              protected synchronized Object getNextAvailableItem() {
                  for (int i = 0; i < MAX_AVAILABLE; ++i) {
                      if (!used[i]) {
                          used[i] = true;
                          return items[i];
                      }
                  }
                  return null; // not reached 
              }
              protected synchronized boolean markAsUnused(Object item) {
                  for (int i = 0; i < MAX_AVAILABLE; ++i) {
                      if (item == items[i]) {
                          if (used[i]) 
                              used[i] = false;
                              return true;
                           else 
                              return false;
                          return false;
                      }
                  }
              }
          }

          image.gif

          在获得项目之前,每个线程必须从信号量获取许可证,以确保某个项目可用。 当线程完成该项目后,它将返回到池中,并将许可证返回到信号量,允许另一个线程获取该项目。 请注意,当调用acquire()时,不会保持同步锁定,因为这将阻止某个项目返回到池中。 信号量封装了限制对池的访问所需的同步,与保持池本身一致性所需的任何同步分开。

          关于信号量的场景,很多时候适用于单机限流中 ----即限制同时访问某资源的并发数。基本思路:让1个线程以固定的速度生产,而让多个线程消费,这样消费者线程就能以低于某个上限的速度消费资源,不会导致系统超负荷。

          简单的场景如红绿灯,每次绿灯亮起只能通行3辆车,而此时5辆车正在排队。

          示例代码:

          package juc.util;
          import cn.hutool.core.thread.ThreadUtil;
          import java.util.concurrent.Semaphore;
          /**
           * @author Shamee loop
           * @date 2023/5/17
           */
          public class SemaphoreTest {
              public static void main(String[] args) {
                  // 每次绿灯通行5辆车,即线程数量
                  Semaphore semaphore = new Semaphore(3);
                  for (int i = 1; i <= 5; i++) {
                      new Thread(() -> {
                          try {
                              // 得到绿灯通行凭证
                              semaphore.acquire();
                              System.out.println(Thread.currentThread().getName() + "======> 通行");
                              ThreadUtil.sleep(3000);
                              System.out.println(Thread.currentThread().getName() + "=======!结束");
                          } catch (InterruptedException e) {
                              e.printStackTrace();
                          } finally {
                              // 释放凭证
                              semaphore.release();
                          }
                      }, "第" + i + "辆车").start();
                  }
              }
          }

          image.gif

          执行结果:

          image.png

          可以看到每次至多只有3辆车可以通行。达到了类似限流的目的。

          相关文章
          |
          5月前
          |
          安全 算法 Java
          Java面试题:如何使用并发集合,例如ConcurrentHashMap?
          Java面试题:如何使用并发集合,例如ConcurrentHashMap?
          57 1
          |
          5月前
          |
          安全 Java 开发者
          Java中的并发工具类与线程安全实现
          Java中的并发工具类与线程安全实现
          |
          5月前
          |
          设计模式 安全 NoSQL
          Java面试题:结合单例模式与Java内存管理,设计一个线程安全的单例类?分析Java多线程工具类ExecutorService与Java并发工具包中的工具类,设计一个Java并发框架的分布式锁实现
          Java面试题:结合单例模式与Java内存管理,设计一个线程安全的单例类?分析Java多线程工具类ExecutorService与Java并发工具包中的工具类,设计一个Java并发框架的分布式锁实现
          68 0
          |
          7月前
          |
          监控 安全 Java
          【JavaEE多线程】深入解析Java并发工具类与应用实践
          【JavaEE多线程】深入解析Java并发工具类与应用实践
          92 1
          |
          存储 安全 算法
          一天一个 JUC 工具类 -- 并发集合
          使用JUC工具包中的并发集合,我们可以避免手动处理锁和同步的复杂性,从而降低出现线程安全问题的概率。这些并发集合通过内部采用高效的算法和数据结构来优化并发操作,从而提供更好的性能和扩展性。
          |
          数据采集 算法 安全
          一天一个 JUC 工具类 -- 真工具类
          CountDownLatch CyclicBarrier ForkJoin Semaphore 使用方法和注意事项
          |
          分布式计算 测试技术 开发者
          JUC基础(五)—— 并发工具类
          JUC基础(五)—— 并发工具类
          119 0
          |
          7月前
          |
          安全 Java API
          JUC的常见类
          JUC的常见类
          44 0
          |
          安全 Java 开发者
          【Java|多线程与高并发】JUC中常用的类和接口
          JUC是Java并发编程中的一个重要模块,全称为Java Util Concurrent(Java并发工具包),它提供了一组用于多线程编程的工具类和框架,帮助开发者更方便地编写线程安全的并发代码。
          |
          存储 安全 Java
          一天一个 JUC 工具类 -- AQS
          AbstractQueuedSynchronizer(AQS)是Java中用于构建锁和同步器的抽象基类。它是Java并发工具包(java.util.concurrent)中实现高级线程同步控制的关键组件之一。AQS提供了一种基于等待队列的同步器框架,允许开发者构建自定义的同步器。在这篇文章中我们将从源码分析和底层原理的角度来介绍AQS。