一. 什么是生产者消费者模型
1. 基本概念
生产者消费者模型就是通过一个容器来解决生产者和消费者的强耦合问题。生产者和消费者彼此之间不直接通讯,而是通过容器来进行通讯,即生产者生产完数据之后不用等待消费者处理,直接扔给容器;消费者不找生产者要数据,而是直接从容器里取。
2. 三种关系
实际中,生产者可能有多个,消费者也可能有多个,它们彼此之间要应该满足什么关系呢?
假设下面的情景:
每次一个生产者一次只能生产一个数据,
每次一个消费者一次只能消费一个数据
唯一的容器容器每次只容许一个生产者push数据或一个消费者pop数据。
在满足上面的情景下,可以推测生产者、消费者彼此之间的关系:
[生产者和生产者]:互斥与同步关系。互斥体现在所有生产者竞争,只有一个能去容器pop数据。同步的话要保证每一个生产者都有机会到容器中pop数据。
[消费者和消费者]:互斥与同步关系。互斥体现在所有消费者竞争,只有一个能去容器push数据。同步要求每一个消费者都有机会去容器中push数据。
[生产者和消费者]:互斥与同步关系。互斥体现在二者只有一个能先访问容器,这时另外一个只能阻塞等待。同步体现在容器不能永远只是生产者在push或消费者在pop,生产者生产了一些数据后要告知消费者来消费,反之亦然。
3. 再次理解生产者消费者模型
生产者消费者模型的核心思想在于:众多的生产者和众多的消费者通过唯一的容器进行数据交互,在交互的同时必须维护好彼此之间的互斥与同步的关系。
二. 生产者消费者模型的优点
容器就相当于一个缓冲区,平衡了生产者和消费者的数据处理能力。这个容器就是用来给生产者和消费者解耦的。假如只是一对一的生产和消费,快的那方必须等待慢的那方才能完成一次交易,然后继续下一组;而如果它们之间有一个容器可以存储数据,其中一个生产者把数据push到容器后不用等消费者,下一个生产者继续往容器里push数据,也就是说在容器满之前生产者可以一直连续的生产数据,消费者也是一样的道理。
即通过容器使生产者和消费者解耦,提高了它们数据交互的效率。
三. 基于BlockingQueue的生产者消费者模型
在多线程编程中阻塞队列(Blocking Queue)是一种常用于实现生产者和消费者模型的数据结构,它有如下如下几个特点:
众多生产者中先内部竞争出一个生产者,去阻塞队列中生产一个数据,完成之后重新内部竞争。
众多消费者中也是内部竞争出一个消费者,去阻塞队列里拿取一个数据,拿到后重新内部竞争。
每次只能有一个线程操作队列,要么是消费者pop,要么是生产push。
当队列为空时,消费者通知生产者来生产数据,然后自己会被阻塞等待,直到合适的时候生产者把它唤醒过来提醒它消费;当队列满时,生产者通知消费者过来拿取数据,然后自己被阻塞等待,直到消费者把它唤醒,叫它继续生产。
1. 准备工作
从最简单的开始设计,只有一个生产者和一个消费者,创建两个线程代表它们,后续它们将在自己的控制流中完成相应的生产和消费任务;至于它们进行数据交互的容器,使用STL的容器适配器queue即可,交互的数据类型为整数。
在主线程中创建好生产者、消费者线程还有阻塞队列:
int main() { srand((unsigned int)time(nullptr)); // 1、new一个阻塞队列 BlockQueue<int>* p = new BlockQueue<int>; // 2、创建两个新线程,分别代表生产者和消费者 pthread_t pro, con; pthread_create(&pro, nullptr, ProducerAction, p); pthread_create(&con, nullptr, ConsumerAction, p); // 3、主线程等待它们完成任务后负责销毁阻塞队列 pthread_join(pro, nullptr); pthread_join(pro, nullptr); delete p; return 0; }
2. 阻塞队列实现
基本框架
阻塞队列中包含4个成员变量:
_q,一个普通队列,用来存储数据。
_capacity,阻塞队列的容量,默认可以存5个数据。
full,一个条件变量。当阻塞队列满时生产者在该条件下等待。
empty,一个条件变量。当阻塞队列空时消费者在该条件下等待。
mutex,一把互斥锁。保证所有时间内只有一个线程能操作队列。
构造函数负责初始化两个条件变量和锁,析构函数负责销毁它们:
template<class T> class BlockQueue { public: // 构造函数 BlockQueue(size_t capcity = 5) :_capacity(capcity) { pthread_cond_init(&full, nullptr); pthread_cond_init(&empty, nullptr); pthread_mutex_init(&mutex, nullptr); } // 析构函数 ~BlockQueue() { pthread_cond_destroy(&full); pthread_cond_destroy(&empty); pthread_mutex_destroy(&mutex); } // 生产者插入数据 void PushData(T data){}; // 消费者删除数据 void PopData(T& data); private: // 判断阻塞队列是否为空 bool IsFull() { return _q.size() >= _capacity; } // 判断阻塞队列是否为满 bool IsEmpty() { return _q.empty(); } queue<T> _q; size_t _capacity; pthread_cond_t full; pthread_cond_t empty; pthread_mutex_t mutex; };
生产者生产数据
成员函数void PushData(T data)由生产者调用,功能是插入一个数据到阻塞队列中,下面是该函数的几点说明:
该函数一进来就要申请锁,最后插入完成释放锁。
插入数据之前要检查阻塞队列是否满了,如果满了就要需要通知消费者来消费,然后自己在full条件下等待。
void PushData(T data) { pthread_mutex_lock(&mutex); while(IsFull()) { cout<<"queue is full"<<endl; pthread_cond_signal(&empty); pthread_cond_wait(&full, &mutex); } _q.push(data); pthread_mutex_unlock(&mutex); }
消费者拿取数据
消费者可以调用阻塞队列里的成员函数void PopData(T& data)拿走一个阻塞队列里的数据,下面是该函数的几点说明:
消费者调用时需要传入一个输出型参数。阻塞队列会把队头数据内容写入到输出型参数的内存空间中。
进来的第一步先申请锁,拿走数据后释放锁。
拿取数据之前要检查阻塞队列是否为空,为空的话要通知生产者进行生产,然后自己在empty条件下等待。
void PopData(T& data) { pthread_mutex_lock(&mutex); while(IsEmpty()) { cout<<"queue is empty"<<endl; pthread_cond_signal(&full); pthread_cond_wait(&empty, &mutex); } data = _q.front(); _q.pop(); pthread_mutex_unlock(&mutex); }
关于阻塞队列生产、拿取数据操作的几个问题
问题一:判断阻塞队列空满时为什么要用while循环,而不用if判断语句?
拿生产者来说,它在插入前队列已经满了,如果用if判断语句的话,在if里面要执行pthread_cond_wait()等待条件full满足,当这个生产者被唤醒后执行if外面的push插入数据。但是如果pthread_cond_wait()等待出错了,直接退出if语句会继续往下执行push操作,导致本来已经满了的队列多插入了一个数据;如果我们用while循环的话,即使等待出错了,这时还会重新回去判断队列是否满了,这样可以避免队列数据出错的问题。
问题二:判空和判满逻辑中,能不能先等待再唤醒?
答案是不行的,首先对于访问阻塞队列的锁mutex,生产者和消费者是共同竞争的,如果这个线程先等待的话锁被释放了,但是它不会继续往下执行唤醒另一个线程的操作了(因为这个线程自己也在等待被对方唤醒),最后导致锁没人申请,线程都等待各自的条件下死等待。
正确的逻辑是先唤醒对方,然后自己在对应的条件变量下等待;后面等到条件成熟时对方把自己唤醒。即我们在设计条件变量时要注意:条件变量在等待被唤醒时需要重新对条件进行判断,是否条件满足。
3. 测试阻塞队列
下面是生产者线程的控制流,由于只有一个生产者所以不用在其控制流中加锁和引入条件变量来维护生产者和生产者之间的同步与互斥关系。
我们让生产者每隔一秒生产一个数据:
void* ProducerAction(void* arg) { BlockQueue<int>* p = (BlockQueue<int>*)arg; while(true) { int data = rand()%100+1; p->PushData(data); cout<<"[producer] push data:"<<data<<endl; sleep(1); } }
消费者每隔一秒拿取一个数据:
void* ConsumerAction(void* arg) { BlockQueue<int>* p = (BlockQueue<int>*)arg; while(true) { int data = 0; p->PopData(data); cout<<"[consumer] get data:"<<data<<endl; sleep(1); } }
下面是main.cpp的全部代码:
// 包含所有需要的头文件和阻塞队列的定义 #include "blockqueue.h" // 生产者线程控制流 void* ProducerAction(void* arg) { BlockQueue<int>* p = (BlockQueue<int>*)arg; while(true) { int data = rand()%100+1; p->PushData(data); cout<<"[producer] push data:"<<data<<endl; } } // 消费者线程控制流 void* ConsumerAction(void* arg) { BlockQueue<int>* p = (BlockQueue<int>*)arg; while(true) { int data = 0; p->PopData(data); cout<<"[consumer] get data:"<<data<<endl; sleep(2); } } int main() { srand((unsigned int)time(nullptr)); // 1、new一个阻塞队列 BlockQueue<int>* p = new BlockQueue<int>; // 2、创建两个新线程,分别代表生产者和消费者 pthread_t pro, con; pthread_create(&pro, nullptr, ProducerAction, p); pthread_create(&con, nullptr, ConsumerAction, p); // 3、主线程等待它们完成任务后负责销毁阻塞队列 pthread_join(pro, nullptr); pthread_join(pro, nullptr); delete p; return 0; }
编译运行,发现每生产一个数据马上又被消费者拿走了,这种情况队列永远都不会满:
另外由于我们是先创建生产者线程,再创建消费者线程。所以是生产者先生产,消费者后消费。
如果我们先创建消费者线程的话,消费者线程先拿到队列锁,正欲拿取数据时发现队列为空,然后自己会在条件empty
下阻塞挂起并且释放操作队列的锁mutex(注意,如果有多个消费者的话,它们是没有机会抢这把锁的,因为它们在抢操作队列的这个锁之前必须要获得内部竞争的锁);等到生产者线程轮流生产完所有数据之后,最后一个生产者发现队列已经满了就会唤醒被一开始被阻塞挂起的消费者来消费;在所有消费者线程拿走完队列数据之前,这个生产者需要一直阻塞等待:
我们先创建消费者线程,消费者发现队列为空后输出“queue is empty”,然后阻塞挂起等待生产者生产完所有数据后唤醒这个消费者线程:
4. 阻塞队列完整代码
分两个文件:
头文件blockqueue.h
里包含阻塞队列的声明。
main.cpp:
负责创建生产者、消费者线程并声明它们的执行逻辑。
blockqueue.h
#pragma once #include <queue> #include <unistd.h> #include <stdlib.h> #include <iostream> #include <pthread.h> using namespace std; template<class T> class BlockQueue { public: BlockQueue(size_t capcity = 5) :_capacity(capcity) { pthread_cond_init(&full, nullptr); pthread_cond_init(&empty, nullptr); pthread_mutex_init(&mutex, nullptr); } ~BlockQueue() { pthread_cond_destroy(&full); pthread_cond_destroy(&empty); pthread_mutex_destroy(&mutex); } void PushData(T data) { pthread_mutex_lock(&mutex); while(IsFull()) { cout<<"queue is full"<<endl; pthread_cond_signal(&empty); pthread_cond_wait(&full, &mutex); } _q.push(data); pthread_mutex_unlock(&mutex); } void PopData(T& data) { pthread_mutex_lock(&mutex); while(IsEmpty()) { cout<<"queue is empty"<<endl; pthread_cond_signal(&full); pthread_cond_wait(&empty, &mutex); } data = _q.front(); _q.pop(); pthread_mutex_unlock(&mutex); } private: bool IsFull() { return _q.size() >= _capacity; } bool IsEmpty() { return _q.empty(); } queue<T> _q; size_t _capacity; pthread_cond_t full; pthread_cond_t empty; pthread_mutex_t mutex; };
main.cpp
#include "blockqueue.h" void* ProducerAction(void* arg) { BlockQueue<int>* p = (BlockQueue<int>*)arg; while(true) { int data = rand()%100+1; p->PushData(data); cout<<"[producer] push data:"<<data<<endl; sleep(1); } } void* ConsumerAction(void* arg) { BlockQueue<int>* p = (BlockQueue<int>*)arg; while(true) { int data = 0; p->PopData(data); cout<<"[consumer] get data:"<<data<<endl; sleep(1); } } int main() { srand((unsigned int)time(nullptr)); // 1、new一个阻塞队列 BlockQueue<int>* p = new BlockQueue<int>; // 2、创建两个新线程,分别代表生产者和消费者 pthread_t pro, con; pthread_create(&pro, nullptr, ProducerAction, p); pthread_create(&con, nullptr, ConsumerAction, p); // 3、主线程等待它们完成任务后负责销毁阻塞队列 pthread_join(pro, nullptr); pthread_join(pro, nullptr); delete p; return 0; }
5. 关于改进阻塞队列的几点补充
5.1 多生产者多消费者的设计
只有一个生产者和只有一个消费者的情况,只需在阻塞队列push和pop时维护生产者和消费者的同步与互斥关系即可。如果有多个生产者和消费者的话需要在它们各自的控制流中加不同锁和不同的条件变量,确保每次只有一个消费者和一个生产者能去操作队列。
5.2 阻塞队列所存储数据可以是更复杂的任务
阻塞队列不仅仅可以存简单的整型数字,还可以是复杂任务的结构体指针,这样生产者派发任务,消费者拿到后解决里面的任务。比如生产者派发用户输入的账号密码,消费者拿到后负责把账号密码传输到数据库中。
四. 基于环形队列的生产者消费者模型
1. 基本规则
- 生产者只关心是否还有格子用来生产数据。
- 消费者只关心环形队列中是否还有数据。
- 一开始没有数据,生产者和消费者指向同一个位置,这时生产者要先执行生产操作,消费者阻塞挂起;数据满时,生产者和消费者也指向同一个位置,这时消费者先执行消费操作再轮到生产者生产。
- 生产者和消费者不能同时访问队列中的同一个位置。
- 生产者和消费者可以并发访问环形队列中的不同位置。
2. 环形队列的实现
成员变量说明:
- 这里用一个数组来模拟环形队列,因为生产者和消费者要并发执行且不能同时操作相同位置的数据,刚好数组可以通过下标随机访问数据,所以这里我们选用数组。
- 定义了两个无符号整型对象_proPos和_cusPos分别指向生产者要生产数据的格子下标和消费者要拿取数据的位置下标。
- 还定义了_proSem和_cusSem两个信号量对象,分别记录着环形队列中格子数量和以生产数据个数。
- 最后还有必要记录环形队列的容量大小,可以用它来取模更新_proPos和_cusPos的值。
#pragma once #include <vector> #include <time.h> #include <iostream> #include <unistd.h> #include <semaphore.h> using namespace std; // 环形队列容量缺省值 const size_t NUM = 8; // 环形队列主体 template<class T> class RingQueue { public: RingQueue(size_t num = NUM) :_v(num) ,_cusPos(0) ,_proPos(0) ,_capacity(num) { sem_init(&_cusSem, 0, 0); sem_init(&_proSem, 0, num); } ~RingQueue() { sem_destroy(&_cusSem); sem_destroy(&_proSem); } // 生产者生产数据 void Push(const T& inData) { P(_proSem); _v[_proPos] = inData; V(_cusSem); ++_proPos; _proPos %= _capacity; } // 消费者消费数据 void Pop(T& outData) { P(_cusSem); outData = _v[_cusPos]; V(_proSem); ++_cusPos; _cusPos %= _capacity; } private: // 申请信号量 void P(sem_t& s) { sem_wait(&s); } // 释放信号量 void V(sem_t& s) { sem_post(&s); } sem_t _cusSem; // 记录队列中空格数量的信号量 sem_t _proSem; // 记录队列中数据数量的信号量 size_t _cusPos; // 记录当前空格所在下标 size_t _proPos; // 记录当前数据所在下标 vector<T> _v; // 用数组模拟环形队列 size_t _capacity;// 记录环形队列容量 };
成员函数说明:
这里特意封装了信号量的PV操作,只需把信号量对象作为参数传入就能完成信号量的申请、释放操作。
生产者执行Push()操作生产数据时,需要先申请(减一)_proSem信号量,生产完成后释放(加一)_cusPos信号量,让消费者来消费。反之亦然
2.3 单生产者单消费者
在主线程中创建两个新线程分别代表生产者和消费者,消费者每隔一秒地从环形队列中拿取数据,生产者每隔一秒生产一个数据:
// 基于环形队列的单生产者单消费者模型 #include "RingQueue.h" // 消费者线程执行的操作 void* Customer(void* arg) { RingQueue<int>* q = (RingQueue<int>*)arg; while(true) { sleep(1); int getData; q->Pop(getData); cout<<"[Customer] pop data:"<<getData<<endl; } } // 生产者线程执行的操作 void* Producer(void* arg) { RingQueue<int>* q = (RingQueue<int>*)arg; while(true) { sleep(1); int putData = (rand()%100) + 1; q->Push(putData); cout<<"[Producer] push data:"<<putData<<endl; } } int main() { // 1、制造随机数种子,作为生产者push到环形队列当中的数据 srand((size_t)time(nullptr)); // 2、new一个环形队列 RingQueue<int>* q = new RingQueue<int>; // 3、分别创建、等待一个生产者和一个消费者 pthread_t tid1, tid2; pthread_create(&tid1, nullptr, Customer, (void*)q); pthread_create(&tid2, nullptr, Producer, (void*)q); pthread_join(tid1, nullptr); pthread_join(tid2, nullptr); // 4、最后delete环形队列 delete q; return 0; }
编译运行,由于_proSem初始值为0,一开始没有数据生产者线程要挂起等待,消费者生产一个数据,生产者就拿取一个数据:
接下来我们让生产者生产得快,消费者消费的慢:
编译运行,发现生产者生产的数据瞬间把队列填满了,接下来消费者拿走一个数据,生产者再生产一个数据,二者串行执行:
如果消费者消费得快,生产者生产得慢的话,可以推测结果是生产者生产完一个数据,消费者马上就拿走,然后继续等待生产者生产数据,这个就不在做演示了。
2.4 多生产者多消费者
这次我们在主线程中分别新建三个生产者线程、三个消费者线程。生产者之间竞争proLock这把锁,消费者之间竞争cusLock这把锁,竞争到锁的线程才能去生产或拿取数据,它们完成一次操作后释放锁,然后重新内部竞争:
// 基于环形队列的多生产者多消费者模型 #include "RingQueue.h" // 构造两个全局互斥锁对象,分别用于所有生产者和所有消费者线程 pthread_mutex_t cusLock; pthread_mutex_t proLock; // new一个存储整数的全局环形队列 RingQueue<int>* q = new RingQueue<int>; // 消费者线程执行的操作 void* Customer(void* arg) { while(true) { size_t id = (size_t)arg; int getData; pthread_mutex_lock(&cusLock); q->Pop(getData); pthread_mutex_unlock(&cusLock); cout<<'['<<"Customer "<<id<<']'<<" Pop data:"<<getData<<endl; sleep(1); } } // 生产者线程执行的操作 void* Producer(void* arg) { size_t id = (size_t)arg; while(true) { int putData = (rand()%100) + 1; pthread_mutex_lock(&proLock); q->Push(putData); pthread_mutex_unlock(&proLock); cout<<'['<<"Producer "<<id<<']'<<" push data "<<putData<<endl; sleep(1); } } int main() { // 1、初始化两把全局互斥锁 pthread_mutex_init(&cusLock, nullptr); pthread_mutex_init(&proLock, nullptr); // 2、创造种子,用于生产随机数据插入到环形队列中 srand((size_t)time(nullptr)); // 3、分别新建三个生产者、消费者线程 pthread_t cusTids[3]; pthread_t proTids[3]; for(size_t i = 0; i < 3; ++i) { pthread_create(&cusTids[i], nullptr, Customer, (void*)(i+1)); } for(size_t i = 0; i < 3; ++i) { pthread_create(&proTids[i], nullptr, Producer, (void*)(i+1)); } // 4、分别等待三个生产者、消费者线程 for(size_t i = 0; i < 3; ++i) { pthread_join(cusTids[i], nullptr); } for(size_t i = 0; i < 3; ++i) { pthread_join(proTids[i], nullptr); } // 5、等待完成后delete环形队列并销毁互斥锁对象 delete q; pthread_mutex_destroy(&cusLock); pthread_mutex_destroy(&proLock); return 0; }
编译运行,生产和消费操作并发执行: