线程间的协作(2)——生产者与消费者模式

本文涉及的产品
私网连接 PrivateLink,5万GB流量 1.5万小时实例时长
简介: 参考资料《Java并发编程的艺术》《Java编程思想》

1.何为生产者与消费者

    在线程世界里,生产者就是生产数据的线程,消费者就是消费数据的线程。


import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

/**
 * @ClassName:Restraurant
 * @Description:何为生产者与消费者
 * @author: 
 * @date:2018年5月3日
 */
public class Restraurant {
	Meal m=null;
	Chef chef=new Chef(this);
	WaitPerson wait=new WaitPerson(this);
	ExecutorService service=Executors.newCachedThreadPool();
	public Restraurant() {
		service.execute(chef);
		service.execute(wait);
	}
	public static void main(String[] args) {
		new Restraurant();
	}
}
/**
 * @ClassName:Meal
 * @Description:生产者生成的数据
 * @author: 
 * @date:2018年5月3日
 */
class Meal{
	private final int orderNum;//食物订单编号
	public Meal(int num){
		orderNum=num;
	}
	public String toString(){
		return "Meal"+orderNum;
	}
}
/**
 * @ClassName:Chef
 * @Description:厨师类,及生产者
 * @author: 
 * @date:2018年5月3日
 */
class Chef implements Runnable{
	Restraurant r;
	int count=0;
	public Chef(Restraurant r) {
		this.r=r;
	}
	@Override
	public void run() {
		try{
			while(!Thread.interrupted()){
				synchronized (this) {
					while(r.m!=null){
						System.out.println("厨师等待中");
						wait();//等待服务员取餐
					}
				}
				if(count++==10){
					System.out.println("今日已售完");
					r.service.shutdownNow();
				}
				System.out.println("订单完成,服务员取餐");
				synchronized (r.wait) {
					r.m=new Meal(count);
					r.wait.notifyAll();
					
				}
				TimeUnit.SECONDS.sleep(1);
			}
		}catch (InterruptedException e) {
			System.out.println("生产者线程强制中断");
		}
		
	}
}
/**
 * @ClassName:WaitPerson
 * @Description:服务员类,即消费者
 * @author: 
 * @date:2018年5月3日
 */
class WaitPerson implements Runnable{
	Restraurant r;
	public WaitPerson(Restraurant r) {
		this.r=r;
	}
	@Override
	public void run() {
		try {
			while (!Thread.interrupted()) {
				synchronized (this) {
					while (r.m == null) {
						System.out.println("服务员等待中");
						wait();// 等待厨师生成食物
					}
				}

				System.out.println("服务员以取餐" + r.m);
				synchronized (r.chef) {
					r.m = null;
					r.chef.notifyAll();
				}
			}
		} catch (InterruptedException e) {
			System.out.println("消费者线程强制中断");
		}
		
	}
	
}

2.生产者与消费者模式

    1)产生原因:在多线程开发 中,如果生产者处理速度很快,而消费者处理速度很慢,那么生产者就必须等待消费者处理 完,才能继续生产数据。同样的道理,如果消费者的处理能力大于生产者,那么消费者就必须 等待生产者。wait与notify方法以一种非常低级的方式解决了任务互相通知的问题,即每次交互都要进行一次握手,极大影响的效率以及性能,为了解决这种生产消费能力不均衡的问题,便有了生产者和消费者模式。

    2)原理:生产者和消费者模式是通过一个容器(比如同步阻塞队列)来解决生产者和消费者的强耦合问题。生产者和消 费者彼此之间不直接通信,而是通过阻塞队列来进行通信,所以生产者生产完数据之后不用 等待消费者处理,直接扔给阻塞队列,消费者不找生产者要数据,而是直接从阻塞队列里取, 阻塞队列就相当于一个缓冲区,平衡了生产者和消费者的处理能力。 这个阻塞队列就是用来给生产者和消费者解耦的。java.util.concurrent.BlockingQueue接口提供了这个队列,通常使用其实现子类ArrayBlockingQueue,LinkedBlockingQueue。当消费者任务试图从同步队列中获取对象,如果队列为空时,那么队列则会挂起消费者任务,并且当拥有足够多的元素可用时才会恢复消费者任务。

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;

public class UseBlockingQueue {
	public static void main(String[] args) throws InterruptedException {
		LinkedBlockingQueue<Toast> dry=new LinkedBlockingQueue<Toast>(),
				butter=new LinkedBlockingQueue<Toast>(),
				jam=new LinkedBlockingQueue<Toast>(),
				con=new LinkedBlockingQueue<Toast>();
		ExecutorService exec=Executors.newCachedThreadPool();
		exec.execute(new MakeToast(dry));//制作初始吐司任务
		exec.execute(new Butter(dry,butter));//吐司抹黄油任务
		exec.execute(new Jam(butter,jam));//吐司抹果酱任务
		exec.execute(new Consumer(jam));//消费者任务,食用吐司
		TimeUnit.SECONDS.sleep(5);
		exec.shutdownNow();
	}
}
class Toast{
	private int status;//吐司状态:0代表制作吐司,1代表抹黄油,2代表向抹了黄油的吐司抹果酱
	private final int id;
	public Toast(int id1) {
		id=id1;
	}
	public void butter(){
		status=1;
	};
	public void jam(){
		status=2;
	}
	public int getStatus(){
		return status;
	}
	public int getId(){
		return id;
	}
	public String toString(){
		return "toast "+id+":"+status;
	}
}
/**
 * @Description:制作初始吐司
 */
class MakeToast implements Runnable{
	private LinkedBlockingQueue<Toast> queue=new LinkedBlockingQueue<Toast>();
	private int count=0;
	public MakeToast(LinkedBlockingQueue<Toast> q) {
		queue=q;
	}
	@Override
	public void run() {
		try{
			while(!Thread.interrupted()){
				Thread.sleep(1000);//制作时间
				Toast t=new Toast(count);
				System.out.println(t);
				queue.put(t);//添加到同步队列
				count++;
			}
		}catch (InterruptedException e) {
			System.out.println("make process interrupted");
		}
		System.out.println("make process off");
	}
}
/**
 * @Description:涂抹黄油
 */
class Butter implements Runnable{
	private LinkedBlockingQueue<Toast> queue1,queue2;//未加料吐司队列,抹黄油后吐司队列
	public Butter(LinkedBlockingQueue<Toast> q1,LinkedBlockingQueue<Toast>q2) {
		queue1=q1;
		queue2=q2;
	}
	@Override
	public void run() {
		try{
			while(!Thread.interrupted()){
				Toast t=queue1.take();//如果队列中没有可用元素将会阻塞,直至有可用元素被添加
				t.butter();
				System.out.println(t);
				queue2.put(t);
			}
		}catch (InterruptedException e) {
			System.out.println("butter process interrupted");
		}
		System.out.println("butter process off");
	}
}
/**
 * @Description:涂抹果酱
 */
class Jam implements Runnable{
	private LinkedBlockingQueue<Toast> queue1,queue2;//抹黄油后吐司队列,抹果酱吐司队列
	public Jam(LinkedBlockingQueue<Toast> q1,LinkedBlockingQueue<Toast>q2) {
		queue1=q1;
		queue2=q2;
	}
	@Override
	public void run() {
		try{
			while(!Thread.interrupted()){
				Toast t=queue1.take();//如果队列中没有可用元素将会阻塞,直至有可用元素被添加
				t.jam();
				System.out.println(t);
				queue2.put(t);
			}
		}catch (InterruptedException e) {
			System.out.println("jam process interrupted");
		}
		System.out.println("jam process off");
	}
}
/**
 * @Description:被食用
 */
class Consumer implements Runnable{
	private LinkedBlockingQueue<Toast> finished;//抹黄油后吐司队列,抹果酱吐司队列
	int count=0;
	public Consumer(LinkedBlockingQueue<Toast> q) {
		finished=q;
	}
	@Override
	public void run() {
		try{
			while(!Thread.interrupted()){
				Toast t=finished.take();//如果队列中没有可用元素将会阻塞,直至有可用元素被添加
				if(t.getId()!=count++||t.getStatus()!=2){
					System.out.println("过程出现错误");
					return;
				}else{
					System.out.println("所有过程正确实现"+"toast "+t.getId()+"被食用");
				}
			}
		}catch (InterruptedException e) {
			System.out.println("eat process interrupted");
		}
		System.out.println("eat process off");
	}
}

 

相关文章
|
1月前
|
Java 开发者
解锁并发编程新姿势!深度揭秘AQS独占锁&ReentrantLock重入锁奥秘,Condition条件变量让你玩转线程协作,秒变并发大神!
【8月更文挑战第4天】AQS是Java并发编程的核心框架,为锁和同步器提供基础结构。ReentrantLock基于AQS实现可重入互斥锁,比`synchronized`更灵活,支持可中断锁获取及超时控制。通过维护计数器实现锁的重入性。Condition接口允许ReentrantLock创建多个条件变量,支持细粒度线程协作,超越了传统`wait`/`notify`机制,助力开发者构建高效可靠的并发应用。
72 0
|
1月前
|
算法 Java
JUC(1)线程和进程、并发和并行、线程的状态、lock锁、生产者和消费者问题
该博客文章综合介绍了Java并发编程的基础知识,包括线程与进程的区别、并发与并行的概念、线程的生命周期状态、`sleep`与`wait`方法的差异、`Lock`接口及其实现类与`synchronized`关键字的对比,以及生产者和消费者问题的解决方案和使用`Condition`对象替代`synchronized`关键字的方法。
JUC(1)线程和进程、并发和并行、线程的状态、lock锁、生产者和消费者问题
|
30天前
|
消息中间件 设计模式 安全
多线程魔法:揭秘一个JVM中如何同时运行多个消费者
【8月更文挑战第22天】在Java虚拟机(JVM)中探索多消费者模式,此模式解耦生产与消费过程,提升系统性能。通过`ExecutorService`和`BlockingQueue`构建含2个生产者及4个消费者的系统,实现实时消息处理。多消费者模式虽增强处理能力,但也引入线程安全与资源竞争等挑战,需谨慎设计以确保高效稳定运行。
59 2
|
1月前
|
安全 Java
Java模拟生产者-消费者问题。生产者不断的往仓库中存放产品,消费者从仓库中消费产品。其中生产者和消费者都可以有若干个。在这里,生产者是一个线程,消费者是一个线程。仓库容量有限,只有库满时生产者不能存
该博客文章通过Java代码示例演示了生产者-消费者问题,其中生产者在仓库未满时生产产品,消费者在仓库有产品时消费产品,通过同步机制确保多线程环境下的线程安全和有效通信。
|
2月前
|
安全 Java API
Java并发编程的艺术:解锁多线程同步与协作的秘密
【7月更文挑战第28天】在Java的世界中,并发编程如同一场精心编排的交响乐,每一个线程都是乐团中的乐手,而同步机制则是那指挥棒,确保旋律的和谐与统一。本文将深入探讨Java并发编程的核心概念,包括线程的创建、同步机制、以及线程间的通信方式,旨在帮助读者解锁Java多线程编程的秘密,提升程序的性能和响应性。
35 3
|
1月前
|
NoSQL Redis
Lettuce的特性和内部实现问题之在同步调用模式下,业务线程是如何拿到结果数据的
Lettuce的特性和内部实现问题之在同步调用模式下,业务线程是如何拿到结果数据的
|
1月前
|
NoSQL 关系型数据库 MySQL
简述redis的单线程模式
简述redis的单线程模式
|
2月前
|
Prometheus 监控 数据可视化
通用快照方案问题之Hystrix进行指标监控如何解决
通用快照方案问题之Hystrix进行指标监控如何解决
36 0
|
25天前
|
存储 监控 Java
Java多线程优化:提高线程池性能的技巧与实践
Java多线程优化:提高线程池性能的技巧与实践
49 1
|
8天前
|
Java 数据库 Android开发
一个Android App最少有几个线程?实现多线程的方式有哪些?
本文介绍了Android多线程编程的重要性及其实现方法,涵盖了基本概念、常见线程类型(如主线程、工作线程)以及多种多线程实现方式(如`Thread`、`HandlerThread`、`Executors`、Kotlin协程等)。通过合理的多线程管理,可大幅提升应用性能和用户体验。
25 15
一个Android App最少有几个线程?实现多线程的方式有哪些?