一、信号量简介
1.信号量
本质:内核中的一个计数器+等待队列
操作:PV操作
P操作:判断计数器:
大于0,则返回,返回前计数器-1;
小于等于0则阻塞。
V操作:计数器计数+1,唤醒一个阻塞的执行流
作用:实现进程或线程间的同步与互斥
同步实现:计数器对资源进行计数
获取资源前,进行P操作;
产生一个资源,进行V操作;
互斥实现:计数器置1,表示资源只有一个
访问资源前,进行P操作;
访问资源完毕后,进行V操作。
2.信号量与条件变量的区别
相同点:
信号量与条件变量都可以实现同步
区别:
信号量本身带有计数器,自身提供了资源获取条件判断的功能;
条件变量,条件判断需要用户自己实现。
二、信号量标准接口POSIX
1.定义信号量
sem_t;
2.初始化信号量
int sem_init(sem_t *sem, int pshared, int val);
sem:信号量变量;
pshared: 0用于线程间;非0用于进程间;
val:信号量的初始值;
返回值:
成功,返回0;失败,返回-1。
3.P操作
int sem_wait(sem_t *sem);阻塞 int sem_trywait(sem_t *sem);非阻塞 int sem_timedwait(sem_t *sem);有时长限制的阻塞。
4.V操作
int sem_post(sem_t *sem);
5.释放信号量
int sem_destroy(sem_t *sem);
三、信号量实现生产者与消费者模型
1.信号量实现线程安全的环形队列
template <class T> class CircularQueue { pricate: //实现环形队列 std::vector<T> _array; int _capacity; int _front = 0; int _rear = 0; //实现同步 sem_t _sem_idle;//对队列空闲空间计数 sem_t _sem_data;//对有效数据节点计数 //实现互斥 sem_t _sem_lock;//实现互斥锁 };
2.完整代码
#include<iostream> #include<cstdlib> #include<vector> #include<semaphore.h> #include<pthread.h> #define MAX_QUEUE 5 #define PRODUCER 4 #define CONSUMER 4 template <class T> class CircularQueue { private: std::vector<T> _array; int _front; int _rear; int _capacity; sem_t _sem_idle;//对空闲空间计数 sem_t _sem_data;//对数据空间计数 sem_t _sem_lock;//实现互斥锁 public: CircularQueue(int cap = MAX_QUEUE) : _capacity(cap) ,_front(0) ,_rear(0) ,_array(cap) { sem_init(&_sem_idle, 0, cap); sem_init(&_sem_data, 0, 0); sem_init(&_sem_lock, 0, 1); } ~CircularQueue() { sem_destroy(&_sem_idle); sem_destroy(&_sem_data); sem_destroy(&_sem_lock); } bool Push(const T data) { //1.P操作,对空闲空间计数进行判断 sem_wait(&_sem_idle); //2.获取锁 sem_wait(&_sem_lock); //3.放入数据 _array[_front] = data; _front = (_front + 1) % _capacity; //4.解锁 sem_post(&_sem_lock); //5.V操作,对数据空间进行计数+1 sem_post(&_sem_data); } bool Pop(T *data) { //1.P操作,对数据空间计数进行判断 sem_wait(&_sem_data); //2.获取锁 sem_wait(&_sem_lock); //3.获取数据 *data = _array[_rear]; _rear = (_rear + 1) % _capacity; //4.解锁 sem_post(&_sem_lock); //5.V操作,对空闲空间计数+1 sem_post(&_sem_idle); } }; void *Consumer(void *arg) { CircularQueue<int> *p = (CircularQueue<int>*)arg; while (1) { int data; p -> Pop(&data); printf("Consumer get data: %d\n", data); } } void *Producer(void *arg) { CircularQueue<int> *p = (CircularQueue<int>*)arg; int data = 1; while (1) { p -> Push(data); printf("Producer put data: %d\n", data); ++data; } } void Test() { int ret; pthread_t con_tid[CONSUMER], pro_tid[PRODUCER]; CircularQueue<int> q; //Create consumer threads for (int i = 0; i < CONSUMER; ++i) { pthread_create(&con_tid[i], NULL, Consumer, (void*)&q); if (ret != 0) { std::cout<<"Create consumer threads error!"<<std::endl; return; } } //Create producer threads for (int i = 0; i < PRODUCER; ++i) { pthread_create(&pro_tid[i], NULL, Producer, (void*)&q); if (ret != 0) { std::cout<<"Create producer threads error!"<<std::endl; return ; } } //wait threads for (int i = 0; i < CONSUMER; ++i) { pthread_join(con_tid[i], NULL); } for (int i = 0; i < PRODUCER; ++i) { pthread_join(pro_tid[i], NULL); } } int main () { Test(); return 0; }
实现效果: