一、生产者与消费者模型简介
生产者与消费者模型:是一种设计模式。
设计模式:针对典型的应用场景设计的解决方案。
应用场景:具有资源的产生与处理的场景。
优点:
1)解耦合
生产者与消费者通过缓冲区通信,不再是直接通信,降低耦合度。
2)支持忙闲不均
一定程度解决一方过闲,一方过忙的问题。
3)支持并发(缓冲区必须线程安全)
多线程处理。
二、生产者与消费者模型的实现
1.具有多个生产者&消费者线程
2.具备线程安全的缓冲区
线程安全:
生产者与生产者的关系:互斥
生产者与消费者的关系:同步 + 互斥
消费者与消费者的关系:互斥
实现关键:
创建一个线程安全的缓冲区,然后创建生产者与消费者
3.线程安全的缓冲区(队列)的实现
3.1基于阻塞队列、互斥锁、条件变量的实现
template <class T> class BlockQueue { private: int _capacity; std::queue<T> _queue; pthread_mutex_t _mutex; pthread_cond_t _cond_pro; pthread_cond_t _cond_con; public: BlockQueue(); ~BlockQueue(); bool Push(const T &data); bool Pop(T *data); };
3.2基于环形队列、信号量的实现
template <class T> class CircularQueue { pricate: //实现环形队列 std::vector<T> _array; int _capacity; int _front = 0; int _rear = 0; //实现同步 sem_t _sem_idle;//对队列空闲空间计数 sem_t _sem_data;//对有效数据节点计数 //实现互斥 sem_t _sem_lock;//实现互斥锁 };
三、代码实现
1.基于阻塞队列、互斥锁,条件变量的实现
#include<iostream> #include<cstdlib> #include<unistd.h> #include<pthread.h> #include<queue> #define MAX_QUEUE 5 #define PRODUCER 4 #define CONSUMER 4 template <class T> class BlockQueue{ private: int _capacity;//缓冲区容量 std::queue<T> _queue; pthread_mutex_t _mutex; pthread_cond_t _cond_pro; pthread_cond_t _cond_con; public: BlockQueue(int cap = MAX_QUEUE) : _capacity(cap) { pthread_mutex_init(&_mutex, NULL); pthread_cond_init(&_cond_pro, NULL); pthread_cond_init(&_cond_con, NULL); } ~BlockQueue() { pthread_mutex_destroy(&_mutex); pthread_cond_destroy(&_cond_pro); pthread_cond_destroy(&_cond_con); } bool Push(const T &data) { pthread_mutex_lock(&_mutex); while (_queue.size() == _capacity) { pthread_cond_wait(&_cond_pro, &_mutex); } _queue.push(data); pthread_cond_signal(&_cond_con); pthread_mutex_unlock(&_mutex); } bool Pop(T *data) { pthread_mutex_lock(&_mutex); while (_queue.empty()) { pthread_cond_wait(&_cond_con, &_mutex); } *data = _queue.front(); _queue.pop(); pthread_cond_signal(&_cond_pro); pthread_mutex_unlock(&_mutex); } }; void *Consumer(void *arg) { BlockQueue<int> *p = (BlockQueue<int>*)arg; while (1) { int data; p -> Pop(&data); printf("Consumer get data: %d\n", data); } } void *Producer(void *arg) { BlockQueue<int> *p = (BlockQueue<int>*)arg; int data = 1; while (1) { p -> Push(data); printf("Producer put data: %d\n", data); ++data; } } void Test() { int ret; pthread_t con_tid[CONSUMER], pro_tid[PRODUCER]; BlockQueue<int> q; //Create consumer threads for (int i = 0; i < CONSUMER; ++i) { pthread_create(&con_tid[i], NULL, Consumer, (void*)&q); if (ret != 0) { std::cout<<"Create consumer threads error!"<<std::endl; return; } } //Create producer threads for (int i = 0; i < PRODUCER; ++i) { pthread_create(&pro_tid[i], NULL, Producer, (void*)&q); if (ret != 0) { std::cout<<"Create producer threads error!"<<std::endl; return ; } } //wait threads for (int i = 0; i < CONSUMER; ++i) { pthread_join(con_tid[i], NULL); } for (int i = 0; i < PRODUCER; ++i) { pthread_join(pro_tid[i], NULL); } } int main() { Test(); return 0; }
实现效果:
2.基于环形队列、信号量的实现
基于线程安全的环形队列、信号量,实现生产者与消费者模型。