1、前言
JUC包中包含了三个非常实用的工具类:CountDownLatch(倒计数器),CyclicBarrier(循环栅栏),Semaphore(信号量)。
2、倒计数器:CountDownLatch
2.1、什么是CountDownLatch
英文中Count Down意为倒计数,Latch意为门闩,所以简单称之为倒计数器。门闩的含义就是把门锁起来,不让里面的线程跑出来。因此这个工具通常用来控制线程等待。它可以让某一个线程等待直到倒计时结束,在开始执行。
来看API文档:
相应API:
2.2、使用
如何使用,JDK API文档给出来了示例用法:CountDownLatch (Java Platform SE 8 )
- 示例用法:
这是一组类,其中一组工作线程使用两个倒计时锁存器:
- 第一个是启动信号,防止任何工作人员进入,直到驾驶员准备好继续前进;
- 第二个是完成信号,允许司机等到所有的工作人员完成。
class Driver { // ... void main() throws InterruptedException { CountDownLatch startSignal = new CountDownLatch(1); CountDownLatch doneSignal = new CountDownLatch(N); for (int i = 0; i < N; ++i) // create and start threads new Thread(new Worker(startSignal, doneSignal)).start(); doSomethingElse(); // don't let run yet startSignal.countDown(); // let all threads proceed doSomethingElse(); doneSignal.await(); // wait for all to finish } } class Worker implements Runnable { private final CountDownLatch startSignal; private final CountDownLatch doneSignal; Worker(CountDownLatch startSignal, CountDownLatch doneSignal) { this.startSignal = startSignal; this.doneSignal = doneSignal; } public void run() { try { startSignal.await(); doWork(); doneSignal.countDown(); } catch (InterruptedException ex) { } // return; } void doWork() { ... } }
内存一致性效果:直到计数调用之前达到零,在一个线程操作countDown() happen-before以下由相应的成功返回行动await()在另一个线程。
示例代码:
场景:某一个程序测试期间发现10个bug,需要将10个bug全部修复完毕,才能进行上线。
package juc.util; import cn.hutool.core.thread.ThreadUtil; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; /** * @author Shamee loop * @date 2023/5/16 */ public class CountDownLatchTest { public static void main(String[] args) throws InterruptedException { ExecutorService executorService = ThreadUtil.newFixedExecutor(10, "countdownlatch-pool-", true); // 生成一个CountDownLatch实例,计数数量10。 // 这里表示需要10个线程完成任务后,等待在CountDownLatch上的线程才能继续执行。 CountDownLatch end = new CountDownLatch(10); for (int i = 1; i <= 10; i++) { int finalI = i; executorService.submit(() -> { try { // 这里模拟bug修复时长1s ThreadUtil.sleep(1000); System.out.println(Thread.currentThread().getName() + "第[" + finalI + "]个bug修复完成"); } finally { // 计数器减1 end.countDown(); } }); } // 等待所有任务全部完成,主线程才能继续执行 end.await(); System.out.println("bug全部修复完成,项目上线"); // 关闭线程池 executorService.shutdown(); } }
运行结果:
3、循环栅栏:CyclicBarrier
3.1、什么是CyclicBarrier
CyclicBarrier是另外一种多线程并发控制工具。和CountDownLatch类似,他也可以实现线程间的计数等待,但他的功能比CountDownLatch更强大且复杂。
Cyclic意为循环,Barrier意为障碍或栅栏。所以简单称之为循环栅栏。既然名为栅栏,顾名思义就是用来阻止线程继续执行,要求线程在栅栏外等待。既然是循环栅栏,也就是该计数器可以循环使用。如我们将计数器设置为10,那么凑齐一批10个线程后,计数器就会归零,接着凑齐下一批。
来看官方API文档:
相应API:
3.2、使用
CyclicBarrier (Java Platform SE 8 )
- 示例用法:
以下是在并行分解设计中使用障碍的示例:
public class Solver { final int N; final float[][] data; final CyclicBarrier barrier; class Worker implements Runnable { int myRow; Worker(int row) { myRow = row; } public void run() { while (!done()) { processRow(myRow); try { barrier.await(); } catch (InterruptedException ex) { return; } catch (BrokenBarrierException ex) { return; } } } } public Solver(float[][] matrix) { data = matrix; N = matrix.length; Runnable barrierAction = new Runnable() { public void run() { mergeRows(...); } }; barrier = new CyclicBarrier(N, barrierAction); List<Thread> threads = new ArrayList<Thread>(N); for (int i = 0; i < N; i++) { Thread thread = new Thread(new Worker(i)); threads.add(thread); thread.start(); } // wait until done for (Thread thread : threads) thread.join(); } }
这里,每个工作线程处理矩阵的一行,然后等待屏障,直到所有行都被处理。
当处理所有行时,执行提供的Runnable屏障操作并合并行。 如果合并确定已经找到解决方案,那么done()将返回true ,并且每个工作人员将终止。
如果屏障操作不依赖于执行方暂停的各方,那么该方可以在释放任何线程时执行该操作。 为了方便这一点,每次调用await()返回该线程在屏障上的到达索引。 然后,您可以选择哪个线程应该执行屏障操作,例如:
if (barrier.await() == 0) { // log the completion of this iteration }
CyclicBarrier对失败的同步尝试使用all-or-none断裂模型:如果线程由于中断,故障或超时而过早离开障碍点,那么在该障碍点等待的所有其他线程也将通过BrokenBarrierException (或InterruptedException)异常离开如果他们也在同一时间被打断)。
内存一致性效果:线程中调用的行动之前, await() happen-before行动是屏障操作的一部分,进而发生,之前的动作之后,从相应的成功返回await()其他线程。
示例代码:
场景:某团购活动,每3人报名,则团购组团成功。
package juc.util; import cn.hutool.core.thread.ThreadUtil; import java.util.concurrent.BrokenBarrierException; import java.util.concurrent.CyclicBarrier; import java.util.concurrent.ExecutorService; /** * @author Shamee loop * @date 2023/5/16 */ public class CyclicBarrierTest { public static void main(String[] args) throws InterruptedException { ExecutorService executorService = ThreadUtil.newFixedExecutor(3, "CyclicBarrier-Pool-", true); // 等待3人报名成功,打印组团成功 CyclicBarrier barrier = new CyclicBarrier(3, () -> System.out.println("每3个人报名,则组团成功")); // 模拟10个人报名 for (int i = 1; i <= 10; i++) { int finalI = i; executorService.submit(() -> { // 这里模拟每个人报名时长1s ThreadUtil.sleep(1000); System.out.println(Thread.currentThread().getName() + "第[" + finalI + "]个报名成功"); try { barrier.await(); } catch (InterruptedException | BrokenBarrierException e) { e.printStackTrace(); } }); } // 关闭线程池 executorService.shutdown(); } }
执行结果:
从执行结果来看,确实满足我们的预期。但是我们发现第10个人报名的时候永远凑不齐3人组团,所以主线程会一直等待下去。当然await()的时候可以给予在屏障点的等待超时时间,如果超过时间还没等待到,那么就超时退出,避免程序卡死。
注:CyclicBarrier对失败的同步尝试使用all-or-none断裂模型:如果线程由于中断,故障或超时而过早离开障碍点,那么在该障碍点等待的所有其他线程也将通过BrokenBarrierException (或InterruptedException)异常离开如果他们也在同一时间被打断)。
4、信号量:Semaphore
4.1、什么是Semaphore
Semaphore意为信号量。为多线程写作提供了更为强大的控制方法,可以说是对锁的扩展。无论对Synchronized或ReentrantLock,一次都只允许一个线程访问一个资源,而Semaphore可以指定多个线程同时访问某一个资源。
官方API文档说明:
相关API:
4.2、使用
Semaphore (Java Platform SE 8 )
- 信号量通常用于限制线程数,而不是访问某些(物理或逻辑)资源。
例如,这是一个使用信号量来控制对一个项目池的访问的类:
public class Pool { private static final int MAX_AVAILABLE = 100; private final Semaphore available = new Semaphore(MAX_AVAILABLE, true); public Object getItem() throws InterruptedException { available.acquire(); return getNextAvailableItem(); } public void putItem(Object x) { if (markAsUnused(x)) available.release(); } // Not a particularly efficient data structure; just for demo // protected Object[] items = ... whatever kinds of items being managed protected boolean[] used = new boolean[MAX_AVAILABLE]; protected synchronized Object getNextAvailableItem() { for (int i = 0; i < MAX_AVAILABLE; ++i) { if (!used[i]) { used[i] = true; return items[i]; } } return null; // not reached } protected synchronized boolean markAsUnused(Object item) { for (int i = 0; i < MAX_AVAILABLE; ++i) { if (item == items[i]) { if (used[i]) used[i] = false; return true; else return false; return false; } } } }
在获得项目之前,每个线程必须从信号量获取许可证,以确保某个项目可用。 当线程完成该项目后,它将返回到池中,并将许可证返回到信号量,允许另一个线程获取该项目。 请注意,当调用acquire()时,不会保持同步锁定,因为这将阻止某个项目返回到池中。 信号量封装了限制对池的访问所需的同步,与保持池本身一致性所需的任何同步分开。
关于信号量的场景,很多时候适用于单机限流中 ----即限制同时访问某资源的并发数。基本思路:让1个线程以固定的速度生产,而让多个线程消费,这样消费者线程就能以低于某个上限的速度消费资源,不会导致系统超负荷。
简单的场景如红绿灯,每次绿灯亮起只能通行3辆车,而此时5辆车正在排队。
示例代码:
package juc.util; import cn.hutool.core.thread.ThreadUtil; import java.util.concurrent.Semaphore; /** * @author Shamee loop * @date 2023/5/17 */ public class SemaphoreTest { public static void main(String[] args) { // 每次绿灯通行5辆车,即线程数量 Semaphore semaphore = new Semaphore(3); for (int i = 1; i <= 5; i++) { new Thread(() -> { try { // 得到绿灯通行凭证 semaphore.acquire(); System.out.println(Thread.currentThread().getName() + "======> 通行"); ThreadUtil.sleep(3000); System.out.println(Thread.currentThread().getName() + "=======!结束"); } catch (InterruptedException e) { e.printStackTrace(); } finally { // 释放凭证 semaphore.release(); } }, "第" + i + "辆车").start(); } } }
执行结果:
可以看到每次至多只有3辆车可以通行。达到了类似限流的目的。