多线程并发工具类

简介: 参考资料《Java并发编程的艺术》《Java编程思想》

1.等待多线程完成的CountDownLatch

    CountDownLatch允许一个或多个线程等待其他线程完成操作。即他可以实现与join()方法相同的功能,而且比join的功能更多。可以在初始化CountDownLatch时传入一个int参数来设置初始计数值,任何在CountDownLatch对象上调用wait()的方法都将被阻塞,直到这个CountDownLatch对象的计数值为0。CountDownLatch被设计为只能触发一次,计数值不能被重置。

    当我们调用CountDownLatch的countDown方法时,计数值N就会减1,CountDownLatch的await方法 会阻塞当前线程,直到N变成零。由于countDown方法可以用在任何地方,所以这里说的N个 点,可以是N个线程,也可以是1个线程里的N个执行步骤。用在多个线程时,只需要把这个 CountDownLatch的引用传递到线程里即可。

    注意:计数器必须大于等于0,只是等于0时候,计数器就是零,则此时调用await方法时不会阻塞当前线程。CountDownLatch不可能重新初始化或者修改CountDownLatch对象的内部计数器的值。一个线程调用countDown方法发生之前,另外一个线程调用await方法。一个线程调用countDown方法并不会被阻塞,只有调用await()方法的线程才会被阻塞。

public class TestCountDownLatch {
	static CountDownLatch c=new CountDownLatch(8);
	public static void main(String[] args) throws InterruptedException {
		for(int i=1;i<=8;i++){
			Thread t=new Thread(new Runnable() {
				@Override
				public void run() {
					try {
						Thread.sleep(1000);
						System.out.println(Thread.currentThread().getName()+"完成");
						c.countDown();//计数器减1
					} catch (InterruptedException e) {
						// TODO Auto-generated catch block
						e.printStackTrace();
					}
				}
			});
			t.start();
		}
		c.await();//主线程会在此处阻塞,直到CountDownLatch的计数器为0才会恢复
		System.out.println("完成所有准备任务");
		System.out.println("主程序开始执行");
	}
}

2.同步屏障CyclicBarrier

    CyclicBarrier的字面意思是可循环使用(Cyclic)的屏障(Barrier)。它要做的事情是,让一组线程到达一个屏障(也可以叫同步点)时被阻塞,直到最后一个线程到达屏障时,屏障才会开门,所有被屏障拦截的线程才会继续运行。

    CyclicBarrier默认的构造方法是CyclicBarrier(int parties),其参数表示屏障拦截的线程数量,每个线程调用await方法告诉CyclicBarrier我已经到达了屏障,然后当前线程被阻塞。

public class TestCyclicBarrier {
	static CyclicBarrier c=new CyclicBarrier(2);
	public static void main(String[] args) {
		new Thread(new Runnable() {
			@Override
			public void run() {
				try {
					c.await();//在此处阻塞,等待剩余的所有任务进入阻塞(到达屏障)即可解除阻塞
				} catch (Exception e) {
				}
				System.out.println(1);
			}
		}).start();
		try {
			c.await();//在此处阻塞,等待剩余的所有任务进入阻塞(到达屏障)即可解除阻塞
		} catch (Exception e) {
			
		}
		System.out.println(2);
	}

}

    注意:如果把new CyclicBarrier(2)修改成new CyclicBarrier(3),则主线程和子线程会永远等待, 因为没有第三个线程执行await方法,即没有第三个线程到达屏障,所以之前到达屏障的两个线程都不会继续执行。

    CyclicBarrier还提供一个更高级的构造函数CyclicBarrier(int parties,Runnable barrierAction),用于在线程到达屏障时,保证会优先执行barrierAction,方便处理更复杂的业务场景。

public class TestCyclicBarrier1 {
	static CyclicBarrier c = new CyclicBarrier(2, new A());
	public static void main(String[] args) throws InterruptedException, BrokenBarrierException {
		new Thread(new Runnable() {
			@Override
			public void run() {
				// TODO Auto-generated method stub
				try {
					c.await();
					System.out.println(2);
				} catch (Exception e) {
					// TODO: handle exception
				}
			}
		}).start();
		c.await();
		System.out.println(1);
	}
	static class A implements Runnable {
		@Override
		public void run() {
			System.out.println(3);
		}
	}
}
/*
 * 输出结果:
 * 3
 * 2
 * 1
 */

    因为CyclicBarrier设置了拦截线程的数量是2,所以必须等代码中的第一个线程和线程A 都执行完之后,才会继续执行主线程,所以输出结果为3 2 1。那么此时有一个问题,如果阻塞的线程数大于CyclicBarrier的计数器会怎样?

public class TestCyclicBarrier {
	static CyclicBarrier c=new CyclicBarrier(2);
	public static void main(String[] args) {
		for(int i=1;i<=3;i++){
		new Thread(new Runnable() {
			@Override
			public void run() {
				try {
					c.await();//在此处阻塞,等待剩余的所有任务进入阻塞(到达屏障)
				} catch (Exception e) {
				}
				System.out.println(Thread.currentThread().getName());
			}
		}).start();
		}
		try {
			c.await();//在此处阻塞,等待剩余的所有任务进入阻塞(到达屏障)
		} catch (Exception e) {
			
		}
		System.out.println(2);
	}

}

根据结果可以知道,CyclicBarrier可以自动重置计数器数量,当拦截线程数量为2时会把从阻塞队列中任意取出两个解除阻塞并执行,如果还有剩余的阻塞队列则会重置计数器,如果剩余阻塞队列数量小于计数器则会阻塞运行,也就是说,如果有阻塞队列数X与计数器N,X%N==0,那么所有线程都会执行,如果X%N!=0,那么会有部分线程处于阻塞状态无法执行。也可以手动调用 reset()方法来进行重置计数器。

    CyclicBarrier还提供其他有用的方法,比如getNumberWaiting方法可以获得Cyclic-Barrier 阻塞的线程数量。isBroken()方法用来了解阻塞的线程是否被中断。

public class TestCyclicBarrier {
	static CyclicBarrier c=new CyclicBarrier(2);
	public static void main(String[] args) {
		Thread t=new Thread(new Runnable() {
			@Override
			public void run() {
				try {
					c.await();//在此处阻塞,等待剩余的所有任务进入阻塞(到达屏障)
				} catch (Exception e) {
				}
				System.out.println(Thread.currentThread().getName());
			}
		});
		t.start();
		t.interrupt();
		try {
			c.await();//在此处阻塞,等待剩余的所有任务进入阻塞(到达屏障)
			System.out.println(2);
		} catch (BrokenBarrierException|InterruptedException e) {
			System.out.println(c.isBroken());
		}
		
	}

}
/*
true
*/

3.控制并发线程数的Semaphore

    Semaphore(信号量)是用来控制同时访问特定资源的线程数量,它通过协调各个线程,以保证合理的使用公共资源。换句话说,锁(Lock锁或synchronized锁)在任何时刻只允许一个任务访问被加锁的资源,而计数信号量允许n个任务同时访问这个资源,还可以将信号量看作是向外分发使用资源的“许可证”,尽管内部没有这种许可证对象。

    “许可证”的数量是有限的,所以当执有“许可证”的线程数量与“许可证”数量相同时,就会阻止其他线程对共享资源的使用,如果某一个或多个线程使用完共享资源后,就会归还“许可证”,此时Semaphore(信号量)就会将这些归还的“许可证”再次分发给阻塞中的线程。通过这种方式就实现了控制线程并发数。

    3.1 API

  • Semaphore(int permits):构造器,接受一个整型的数字,表示可用的许可证数量。
  • acquire():线程调用该方法获取一个许可证来获取使用共享资源的资格。
  • release():线程使用完共享资源之后调用方法归还许可证。
  • tryAcquire():线程调用该方法尝试获取许可证。
  • int availablePermits():返回此信号量中当前可用的许可证数。
  • int getQueueLength():返回正在等待获取许可证的线程数。
  • boolean hasQueuedThreads():是否有线程正在等待获取许可证。
  • void reducePermits(int reduction):减少reduction个许可证,是个protected方法。
  • Collection getQueuedThreads():返回所有等待获取许可证的线程集合,是个protected方 法。

    3.2 应用

    Semaphore可以用于做流量控制,特别是公用资源有限的应用场景,比如数据库连接。假如有一个需求,要读取几万个文件的数据,因为都是IO密集型任务,我们可以启动几十个线程 并发地读取,但是如果读到内存后,还需要存储到数据库中,而数据库的连接数只有10个,这 时我们必须控制只有10个线程同时获取数据库连接保存数据,否则会报错无法获取数据库连 接。这个时候,就可以使用Semaphore来做流量控制。

public class TestSemaphore {
	static Semaphore s=new Semaphore(10);
	static int threadNum=30;
	public static void main(String[] args) {
		for(int i=1;i<=30;i++){
			new Thread(new Runnable() {
				@Override
				public void run() {
					try {
						s.acquire();
						System.out.println(Thread.currentThread().getName()+"do some work");
						s.release();
					} catch (InterruptedException e) {
						// TODO Auto-generated catch block
						e.printStackTrace();
					}
					
				}
			}).start();
		}
	}
}

4. 线程间交换数据的Exchanger

    Exchanger(交换者)是一个用于线程间协作的工具类。Exchanger用于进行线程间的数据交换。它提供一个同步点,在这个同步点,两个线程可以交换彼此的数据。这两个线程通过exchange方法交换数据,如果第一个线程先执行exchange()方法,它会一直等待第二个线程也 执行exchange方法,当两个线程都到达同步点时,这两个线程就可以交换数据,将本线程生产出来的数据传递给对方。所以由此可见,Exchanger将会与 生产者-消费者模型相关。

    其应用场景有:Exchanger可以用于遗传算法,遗传算法里需要选出两个人作为交配对象,这时候会交换 两人的数据,并使用交叉规则得出2个交配结果。Exchanger也可以用于校对工作,比如我们需 要将纸制银行流水通过人工的方式录入成电子银行流水,为了避免错误,采用AB岗两人进行 录入,录入到Excel之后,系统需要加载这两个Excel,并对两个Excel数据进行校对,看看是否录入一致。如果两个线程有一个没有执行exchange()方法,则会一直等待,如果担心有特殊情况发 生,避免一直等待,可以使用exchange(V x,longtimeout,TimeUnit unit)设置最大等待时长。

public class TestExchanger {
	static Exchanger<String> ex=new Exchanger<String>();
	static ExecutorService service=Executors.newFixedThreadPool(2);
	public static void main(String[] args) {
		service.execute(new Runnable() {
			@Override
			public void run() {
				String a="A";
				try {
					ex.exchange(a);
				} catch (InterruptedException e) {
					// TODO Auto-generated catch block
					e.printStackTrace();
				}
			}
		});
		service.execute(new Runnable() {
			@Override
			public void run() {
				String b="B";
				try {
					String a=ex.exchange(b);
					System.out.println("a录入的是"+a);
					System.out.println("b录入的是"+b);
					System.out.println("a与b是否一致:"+a.equalsIgnoreCase(b));
				} catch (InterruptedException e) {
					// TODO Auto-generated catch block
					e.printStackTrace();
				}
			}
		});
		service.shutdown();
	}
}

 

相关文章
|
1月前
|
Java 开发者
解锁并发编程新姿势!深度揭秘AQS独占锁&ReentrantLock重入锁奥秘,Condition条件变量让你玩转线程协作,秒变并发大神!
【8月更文挑战第4天】AQS是Java并发编程的核心框架,为锁和同步器提供基础结构。ReentrantLock基于AQS实现可重入互斥锁,比`synchronized`更灵活,支持可中断锁获取及超时控制。通过维护计数器实现锁的重入性。Condition接口允许ReentrantLock创建多个条件变量,支持细粒度线程协作,超越了传统`wait`/`notify`机制,助力开发者构建高效可靠的并发应用。
72 0
|
17天前
|
网络协议 C语言
C语言 网络编程(十四)并发的TCP服务端-以线程完成功能
这段代码实现了一个基于TCP协议的多线程服务器和客户端程序,服务器端通过为每个客户端创建独立的线程来处理并发请求,解决了粘包问题并支持不定长数据传输。服务器监听在IP地址`172.17.140.183`的`8080`端口上,接收客户端发来的数据,并将接收到的消息添加“-回传”后返回给客户端。客户端则可以循环输入并发送数据,同时接收服务器回传的信息。当输入“exit”时,客户端会结束与服务器的通信并关闭连接。
|
1月前
|
算法 Java
JUC(1)线程和进程、并发和并行、线程的状态、lock锁、生产者和消费者问题
该博客文章综合介绍了Java并发编程的基础知识,包括线程与进程的区别、并发与并行的概念、线程的生命周期状态、`sleep`与`wait`方法的差异、`Lock`接口及其实现类与`synchronized`关键字的对比,以及生产者和消费者问题的解决方案和使用`Condition`对象替代`synchronized`关键字的方法。
JUC(1)线程和进程、并发和并行、线程的状态、lock锁、生产者和消费者问题
|
17天前
|
C语言
C语言 网络编程(九)并发的UDP服务端 以线程完成功能
这是一个基于UDP协议的客户端和服务端程序,其中服务端采用多线程并发处理客户端请求。客户端通过UDP向服务端发送登录请求,并根据登录结果与服务端的新子线程进行后续交互。服务端在主线程中接收客户端请求并创建新线程处理登录验证及后续通信,子线程创建新的套接字并与客户端进行数据交换。该程序展示了如何利用线程和UDP实现简单的并发服务器架构。
|
21天前
|
Rust 并行计算 安全
揭秘Rust并发奇技!线程与消息传递背后的秘密,让程序性能飙升的终极奥义!
【8月更文挑战第31天】Rust 以其安全性和高性能著称,其并发模型在现代软件开发中至关重要。通过 `std::thread` 模块,Rust 支持高效的线程管理和数据共享,同时确保内存和线程安全。本文探讨 Rust 的线程与消息传递机制,并通过示例代码展示其应用。例如,使用 `Mutex` 实现线程同步,通过通道(channel)实现线程间安全通信。Rust 的并发模型结合了线程和消息传递的优势,确保了高效且安全的并行执行,适用于高性能和高并发场景。
31 0
|
30天前
|
Java 开发者
【编程高手必备】Java多线程编程实战揭秘:解锁高效并发的秘密武器!
【8月更文挑战第22天】Java多线程编程是提升软件性能的关键技术,可通过继承`Thread`类或实现`Runnable`接口创建线程。为确保数据一致性,可采用`synchronized`关键字或`ReentrantLock`进行线程同步。此外,利用`wait()`和`notify()`方法实现线程间通信。预防死锁策略包括避免嵌套锁定、固定锁顺序及设置获取锁的超时。掌握这些技巧能有效增强程序的并发处理能力。
19 2
|
21天前
|
开发框架 Android开发 iOS开发
跨平台开发的双重奏:Xamarin在不同规模项目中的实战表现与成功故事解析
【8月更文挑战第31天】在移动应用开发领域,选择合适的开发框架至关重要。Xamarin作为一款基于.NET的跨平台解决方案,凭借其独特的代码共享和快速迭代能力,赢得了广泛青睐。本文通过两个案例对比展示Xamarin的优势:一是初创公司利用Xamarin.Forms快速开发出适用于Android和iOS的应用;二是大型企业借助Xamarin实现高性能的原生应用体验及稳定的后端支持。无论是资源有限的小型企业还是需求复杂的大公司,Xamarin均能提供高效灵活的解决方案,彰显其在跨平台开发领域的强大实力。
26 0
|
2月前
|
Java 开发者
Java中的多线程与并发控制
【7月更文挑战第31天】在Java的世界中,多线程是提升程序性能和响应能力的关键。本文将通过实际案例,深入探讨Java多线程的创建、同步机制以及并发包的使用,旨在帮助读者理解并掌握如何在Java中高效地实现多线程编程。
40 3
|
1月前
|
存储 缓存 安全
聊一聊高效并发之线程安全
该文章主要探讨了高效并发中的线程安全问题,包括线程安全的定义、线程安全的类别划分以及实现线程安全的一些方法。
|
1月前
使用通义灵码写了一个多线程工具类,通义灵码处于什么水平
当方法间无依赖需提升执行效率时,可采用并行执行。示例通过`MultiThreadTaskExecutor`类实现多线程并发,其中`executeParallelDynamicMethods`方法接收一系列`Callable`任务并返回所有任务的结果列表。测试显示,四个耗时方法并行执行仅需4秒,相较于串行执行的12秒显著提升效率。该代码展示了良好的多线程编程实践。
39 0