生产者消费模型
概念
举个栗子
在学校里,超市中存放着许多商品,这些商品并不是超市生产的而是由供应商供货,通过超市这一交易场所为学生提供商品
由于超市的存在,学生与供应商之间就不需要产生联系也就是所谓的“解耦”;学生不需要到供应商那里购买商品,两者在生产和消费之间没有任何联系
在超市中,学生与学生之间可能会存在购买同一商品,且商品库存不够的情景,在线程中称作互斥关系;在供应商向超市提供商品时,同一商品的不同品牌也存在着互斥关系,只能存在其中一个
学生与供应商之间:只有货架上存在商品时,才能去购买,不能说商品边生产边消费,这也是一种互斥关系;供应商一定是生产一部分,学生消费一部分,待消费得差不多时,再进行生产,这里存在着同步的关系
概念:
生产者消费者模型就是通过一个容器来解决生产者和消费者的强耦合问题。生产者和消费者彼此之间不直接通讯,而是通过阻塞队列来进行通讯,所以生产者生产完数据之后不用等待消费者处理,直接扔给阻塞队列,消费者不找生产者要数据,而是直接从阻塞队列里取,阻塞队列就相当于一个缓冲区,平衡了生产者和消费者的处理能力,这个阻塞队列就是用来给生产者和消费者解耦的
优点
生产线程和销毁线程进行解耦
支持生产和消费的一段忙闲不均
提高效率
总结
3种关系:生产者与生产者(互斥),消费者与消费者(互斥),生产者与消费者(互斥,同步)
2种角色:生产者,消费者
1个交易场所:一段特定结构的缓冲区
不过呢,还存在一个问题:
当超市的商品已经足够时,由于供应商并不知道,所以需要对超市(共享资源)加锁,判断,解锁;学生的优先级较低,只能看着供应商一直在重复着加锁,判断,解锁的操作,条件变量就是用来解决这一问题的
基于阻塞队列的生产消费模型
在多线程编程中阻塞队列是一种常用于实现生产者和消费者模型的数据结构;当队列为空时,从队列中获取元素的操作将会被阻塞,直到队列中放入元素;当队列满时,向队列中存放元素的操作也会被阻塞,直到有元素被从队列中取出
阻塞队列实现
const int bmaxcap=5; template<class T> class BlockQueue { public: BlockQueue(const int&maxcap=bmaxcap) :_maxcap(maxcap) { pthread_mutex_init(&_mutex,nullptr); pthread_cond_init(&_pcond,nullptr); pthread_cond_init(&_ccond,nullptr); } //输入型参数 const & void push(const T&in) { pthread_mutex_lock(&_mutex); //充当条件判断的语句必须是while //可能会存在假唤醒 while(is_full()) { pthread_cond_wait(&_pcond,&_mutex); } _q.push(in); pthread_cond_signal(&_ccond); pthread_mutex_unlock(&_mutex); } //输出型参数 * void pop(T*out) { pthread_mutex_lock(&_mutex); while(is_empty()) { pthread_cond_wait(&_ccond,&_mutex); } *out=_q.front(); _q.pop(); pthread_cond_signal(&_pcond); pthread_mutex_unlock(&_mutex); } ~BlockQueue() { pthread_mutex_destroy(&_mutex); pthread_cond_destroy(&_pcond); pthread_cond_destroy(&_ccond); } private: bool is_empty() { return _q.empty(); } bool is_full() { return _q.size()==_maxcap; } private: queue<T> _q; int _maxcap;//队列的上限 //互斥量以保证共享资源安全 pthread_mutex_t _mutex; //生产者对应的条件变量 pthread_cond_t _pcond; //消费者对应的条件变量 pthread_cond_t _ccond; };
生产任务代码
class Task { using func_t=function<int(int,int,char)>; public: Task() {} Task(int x,int y,char op,func_t func) :_x(x) ,_y(y) ,_op(op) ,_callback(func) {} string operator()() { int result=_callback(_x,_y,_op); char buffer[1024]; snprintf(buffer,sizeof(buffer),"%d %c %d = %d",_x,_op,_y,result); return buffer; } string toTaskString() { char buffer[1024]; snprintf(buffer,sizeof(buffer),"%d %c %d = ?",_x,_op,_y); return buffer; } private: int _x; int _y; char _op; func_t _callback; };
测试代码
const string oper="+-*/%"; int mymath(int x,int y,char op) { int result=0; switch(op) { case '+': result=x+y; break; case '-': result=x-y; break; case '*': result=x*y; break; case '/': if(y==0) { cout<<"div zero!"<<endl; result=-1; } else result=x/y; break; case '%': if(y==0) { cout<<"mod zero!"<<endl; result=-1; } else result=x%y; break; default: break; }; } //消费者 void*consumer(void*_bq) { BlockQueue<Task>*bq=static_cast<BlockQueue<Task>*>(_bq); while(true) { Task t; bq->pop(&t); cout<<"消费任务: "<<t()<<endl; } return nullptr; } //生产者 void*productor(void*_bq) { BlockQueue<Task>*bq=static_cast<BlockQueue<Task>*>(_bq); while(true) { int x=rand()%10+1; int y=rand()%5; int opercode=rand()%oper.size(); Task t(x,y,oper[opercode],mymath); bq->push(t); cout<<"生产任务: "<<t.toTaskString()<<endl; sleep(1); } return nullptr; } int main() { srand((unsigned long)time(nullptr)^getpid()); BlockQueue<Task>*bq=new BlockQueue<Task>(); pthread_t c,p; pthread_create(&c,nullptr,consumer,bq); pthread_create(&p,nullptr,productor,bq); pthread_join(c,nullptr); pthread_join(p,nullptr); return 0; }
运行结果
在这里可以理解生产消费模型的高效:生产者在将任务放到队列之前,获取和构建任务需要消耗大量时间;消费者从队列中取出任务之后,执行任务同样也需要时间;模型的出现使线程在任务在生产之前,消费之后并行执行,因此而变得高效
POSIX信号量
概念
在上面阻塞队列中,生产者产生任务时,队列必须有空的资源,也就是满足产生任务的条件;判断公共资源是否满足条件在没有访问之前是无法得知的,只能通过先加锁,进行检测,接着操作,最后解锁,而且默认情况下只要对资源加锁,就意味着对整个资源都使用,但事实并不是如此,也可能是使用同一资源的不同区域
信号量的引入就可以提前得知资源的情况,也可以实现访问同一资源的不同区域
概念:
信号量本质是一个计数器,用来衡量资源数量的计数器。在线程访问某一资源之前先申请信号量,如果申请成功,就说明资源满足条件,自己也就拥有资源的这一区域,如果失败说明条件不满足,只能等待;所以申请信号量的本质是对资源中特定小块资源的预定
举个栗子
临界资源就是电影院,在电影开幕之前,观众需要进行买票,买票这一操作就是预定,座位就可以看作是资源的不同区域;买票成功,就表示可以进行观影
线程想要访问临界资源中的某一区域,需要先申请信号量,申请的前提是所有线程都能看到,也就是说信号量本质也是公共资源
信号量 sem_t sem的结构能够保证其操作是原子性的
信号量初始化
int sem_init(sem_t *sem, int pshared, unsigned int value);
pshared:0表示线程间共享;!0表示进程间共享
value:信号量初值,表示资源的多少
销毁信号量
int sem_destroy(sem_t *sem);
等待信号量
int sem_wait(sem_t *sem);
等待信号量,相当于 sem--;申请资源
发布信号量
int sem_post(sem_t *sem);
发布信号量,表示资源使用完毕,相当于sem++;归还资源
基于环形队列的生产消费模型
环形队列为空或为满时,生产者和消费者可能会访问同一个位置
为空时,生产者先生产,所以消费者只能在生产者后面的位置,不可能超越
为满时,只有消费者消费一个任务,生产者才能生产任务,所以生产者在消费者的后面,不可能超越
对于生产者:productor_sem
申请成功,将生产的任务放到环形队列中,生产者信号量总数减少P(productor_sem),消费者的信号量增加V(consumer_sem)
对于消费者:consumer_sem
消费一个任务,消费者信号量减少P(consumer_ssem),生产者信号量增加V(productor_sem)
生产者和消费者的位置其实就是队列的下标
static const int gcap=5; template<class T> class Ringqueue { private: void P(sem_t &sem) { int n=sem_wait(&sem); assert(n==0); (void)n; } void V(sem_t &sem) { int n=sem_post(&sem); assert(n==0); (void)n; } public: Ringqueue(const int&cap=gcap) :_queue(cap) ,_cap(cap) { int n=sem_init(&_spaceSem,0,_cap); assert(n==0); n=sem_init(&_dataSem,0,0); assert(n==0); _productor=_consumer=0; pthread_mutex_init(&_pmutex,nullptr); pthread_mutex_init(&_cmutex,nullptr); } //生产者 void Push(const T&in) { //申请信号量-减少空间信号量 P(_spaceSem); pthread_mutex_lock(&_pmutex); _queue[_productor++]=in; _productor%=_cap; pthread_mutex_unlock(&_pmutex); //增加数据信号量 V(_dataSem); } //消费者 void Pop(T*out) { //减少数据信号量 P(_dataSem); pthread_mutex_lock(&_cmutex); *out=_queue[_consumer++]; _consumer%=_cap; pthread_mutex_unlock(&_cmutex); //减少消费者信号量 V(_spaceSem); } ~Ringqueue() { sem_destroy(&_spaceSem); sem_destroy(&_dataSem); pthread_mutex_destroy(&_pmutex); pthread_mutex_destroy(&_cmutex); } private: vector<T> _queue; //队列上限 int _cap; sem_t _spaceSem;//生产者,空间资源 sem_t _dataSem;//消费者,数据资源 int _productor;//生产者位置 int _consumer;//消费者位置 pthread_mutex_t _pmutex; pthread_mutex_t _cmutex; };
线程池
一种线程使用的模式。线程过多会带来调度开销,进而影响效率;线程池维护着多个线程,等待着分配的可并发的任务,避免了在处理短时间任务创建和销毁线程的代价
简单来说,线程池就是一批提前创建完成的线程。当有任务到来时,就去处理任务;没有任务时,线程就进行休眠,通过空间来换取时间







