一. 什么是POSIX信号量?
POSIX和System V都是可移植的操作系统接口标准,它们都定义了操作系统应该为应用程序提供的接口标准。
POSIX信号量和System V信号量作用相同,都是用于同步和互斥操作,以达到无冲突的访问共享资源目的。
System V版本的信号量只适用于实现进程间的通信,而POSIX版本的信号量主要用于实现线程之间的通信。
信号量(信号灯)本质是一个是用来对临界资源进行更细粒度地描述和管理的计数器。
二. 为什么要有POSIX信号量?
POSIX信号量主要用于实现线程间的同步。
三. POSIX信号量实现原理
信号量的结构如下:
count:记录还有多少小块的临界资源未被使用。
queue:当count为0时,其它未申请到信号量的线程的task_struct地址会被放到信号量等待队列中阻塞挂起。
信号量的PV操作
P操作:我们把申请信号量得操作称为P操作,申请信号量的本质就是申请获得整块临界资源中某小块资源的使用权限,当申请成功时临界资源中小块资源的数目应该减一,因此P操作的本质就是让count- -。
V操作:我们将释放信号量称为V操作,释放信号量的本质就是归还临界资源中某块资源的使用权限,当释放成功时临界资源中资源的数目就应该加一,因此V操作的本质就是让count++。
申请不到信号量的线程被阻塞挂起
当count为0时,表示不允许其它线程再访问临界资源,这时其它申请信号量的线程会被阻塞到该信号量的等待队列中,直到其它线程释放信号量。
四. POSIX信号量接口函数
1. 创建、初始化信号量
信号量的类型是sem_t,我们可以根据这个类型自己定义信号量对象:
定义出信号量对象之后,必须用sem_init()函数来初始化这个信号量:
参数说明:
sem:信号量对象的地址。
pshared:0表示线程间共享,非零表示进程间共享。
value:信号量初始值,即count的大小。
函数说明:该函数主要用于设置信号量对象的基本属性。
2. 销毁信号量
函数说明:只需传入信号量对象的地址即可销毁该信号量。
3. 等待(申请)信号量
函数说明:传入信号量对象的地址用于申请该信号量,调用成功返回0,count- -;失败返回-1,count值不变。
4. 发布(释放)信号量
函数说明:传入信号量对象的地址用于释放该信号量,调用成功返回0,count++;失败返回-1,count值不变。
五. 信号量的应用
1. 二元信号量模拟互斥锁
当count = 1时,说明整块临界资源作为一个整体使用而没有被切分管理,那么这个信号量对象就相当于是一把互斥锁,称为二元信号量。
下面我们用二元信号量模拟互斥锁完成黄牛抢票代码:
在主线程中创建4个新线程去抢10张票。
此时票是临界资源,我们用二元信号量对其进行保护。
每个新线程抢票之前都要先申请二元信号量,没有申请到线程被阻塞挂起。
#include <iostream> #include <unistd.h> #include <pthread.h> #include <semaphore.h> using namespace std; // 封装一个自己的信号量类 class MySem { public: // 构造函数,用于初始化信号量对象,默认线程间通信,只需传入需要设置的count得值即可 MySem(size_t num) { sem_init(&_sem, 0, num); } // 析构函数,销毁信号量对象 ~MySem() { sem_destroy(&_sem); } // 申请信号量 void P() { sem_wait(&_sem); } // 释放信号量 void V() { sem_post(&_sem); } private: // 成员变量是一个信号量对象 sem_t _sem; }; // 定义的全局对象 int count = 10;// 票数设为10张 MySem sem(1);// 一元信号量 // 新线程执行的抢票逻辑 void* GetTickets(void* arg) { while(true) { size_t id = (size_t)arg; sem.P(); if(count > 0) { usleep(1000); cout<<'['<<"thread "<<id<<']'<<" get ticket No."<<count--<<endl; sem.V(); } else { sem.V(); break; } } return nullptr; } int main() { // 创建4个新线程 pthread_t tids[4]; for(size_t i = 0; i < 4; ++i) { pthread_create(&tids[i], nullptr, GetTickets, (void*)(i+1)); } // 等待4个新线程 for(size_t i = 0; i < 4; ++i) { pthread_join(tids[i], nullptr); } return 0; }
编译运行,由于我们没有实现同步所以都是第一个创建的1号线程申请到信号量,但是最终票的结果是对的,说明互斥是实现了的:
在这里插入图片描述
2. 基于环形队列的生产者消费者模型
2.1 基本规则
生产者只关心是否还有格子用来生产数据。
消费者只关心环形队列中是否还有数据。
一开始没有数据,生产者和消费者指向同一个位置,这时生产者要先执行生产操作,消费者阻塞挂起;数据满时,生产者和消费者也指向同一个位置,这时消费者先执行消费操作再轮到生产者生产。
生产者和消费者不能同时访问队列中的同一个位置。
生产者和消费者可以并发访问环形队列中的不同位置。
2.2 环形队列的实现
成员变量说明:
这里用一个数组来模拟环形队列,因为生产者和消费者要并发执行且不能同时操作相同位置的数据,刚好数组可以通过下标随机访问数据,所以这里我们选用数组。
定义了两个无符号整型对象_proPos和_cusPos分别指向生产者要生产数据的格子下标和消费者要拿取数据的位置下标。
还定义了_proSem和_cusSem两个信号量对象,分别记录着环形队列中格子数量和以生产数据个数。
最后还有必要记录环形队列的容量大小,可以用它来取模更新_proPos和_cusPos的值。
#pragma once #include <vector> #include <time.h> #include <iostream> #include <unistd.h> #include <semaphore.h> using namespace std; // 环形队列容量缺省值 const size_t NUM = 8; // 环形队列主体 template<class T> class RingQueue { public: RingQueue(size_t num = NUM) :_v(num) ,_cusPos(0) ,_proPos(0) ,_capacity(num) { sem_init(&_cusSem, 0, 0); sem_init(&_proSem, 0, num); } ~RingQueue() { sem_destroy(&_cusSem); sem_destroy(&_proSem); } // 生产者生产数据 void Push(const T& inData) { P(_proSem); _v[_proPos] = inData; V(_cusSem); ++_proPos; _proPos %= _capacity; } // 消费者消费数据 void Pop(T& outData) { P(_cusSem); outData = _v[_cusPos]; V(_proSem); ++_cusPos; _cusPos %= _capacity; } private: // 申请信号量 void P(sem_t& s) { sem_wait(&s); } // 释放信号量 void V(sem_t& s) { sem_post(&s); } sem_t _cusSem; // 记录队列中空格数量的信号量 sem_t _proSem; // 记录队列中数据数量的信号量 size_t _cusPos; // 记录当前空格所在下标 size_t _proPos; // 记录当前数据所在下标 vector<T> _v; // 用数组模拟环形队列 size_t _capacity;// 记录环形队列容量 };
成员函数说明:
这里特意封装了信号量的PV操作,只需把信号量对象作为参数传入就能完成信号量的申请、释放操作。
生产者执行Push()操作生产数据时,需要先申请(减一)_proSem信号量,生产完成后释放(加一)_cusPos信号量,让消费者来消费。反之亦然
2.3 单生产者单消费者
在主线程中创建两个新线程分别代表生产者和消费者,消费者每隔一秒地从环形队列中拿取数据,生产者每隔一秒生产一个数据:
// 基于环形队列的单生产者单消费者模型 #include "RingQueue.h" // 消费者线程执行的操作 void* Customer(void* arg) { RingQueue<int>* q = (RingQueue<int>*)arg; while(true) { sleep(1); int getData; q->Pop(getData); cout<<"[Customer] pop data:"<<getData<<endl; } } // 生产者线程执行的操作 void* Producer(void* arg) { RingQueue<int>* q = (RingQueue<int>*)arg; while(true) { sleep(1); int putData = (rand()%100) + 1; q->Push(putData); cout<<"[Producer] push data:"<<putData<<endl; } } int main() { // 1、制造随机数种子,作为生产者push到环形队列当中的数据 srand((size_t)time(nullptr)); // 2、new一个环形队列 RingQueue<int>* q = new RingQueue<int>; // 3、分别创建、等待一个生产者和一个消费者 pthread_t tid1, tid2; pthread_create(&tid1, nullptr, Customer, (void*)q); pthread_create(&tid2, nullptr, Producer, (void*)q); pthread_join(tid1, nullptr); pthread_join(tid2, nullptr); // 4、最后delete环形队列 delete q; return 0; }
编译运行,由于_proSem初始值为0,一开始没有数据生产者线程要挂起等待,消费者生产一个数据,生产者就拿取一个数据:
接下来我们让生产者生产得快,消费者消费的慢:
编译运行,发现生产者生产的数据瞬间把队列填满了,接下来消费者拿走一个数据,生产者再生产一个数据,二者串行执行:
如果消费者消费得快,生产者生产得慢的话,可以推测结果是生产者生产完一个数据,消费者马上就拿走,然后继续等待生产者生产数据,这个就不在做演示了。
2.4 多生产者多消费者
这次我们在主线程中分别新建三个生产者线程、三个消费者线程。生产者之间竞争proLock这把锁,消费者之间竞争cusLock这把锁,竞争到锁的线程才能去生产或拿取数据,它们完成一次操作后释放锁,然后重新内部竞争:
// 基于环形队列的多生产者多消费者模型 #include "RingQueue.h" // 构造两个全局互斥锁对象,分别用于所有生产者和所有消费者线程 pthread_mutex_t cusLock; pthread_mutex_t proLock; // new一个存储整数的全局环形队列 RingQueue<int>* q = new RingQueue<int>; // 消费者线程执行的操作 void* Customer(void* arg) { while(true) { size_t id = (size_t)arg; int getData; pthread_mutex_lock(&cusLock); q->Pop(getData); pthread_mutex_unlock(&cusLock); cout<<'['<<"Customer "<<id<<']'<<" Pop data:"<<getData<<endl; sleep(1); } } // 生产者线程执行的操作 void* Producer(void* arg) { size_t id = (size_t)arg; while(true) { int putData = (rand()%100) + 1; pthread_mutex_lock(&proLock); q->Push(putData); pthread_mutex_unlock(&proLock); cout<<'['<<"Producer "<<id<<']'<<" push data "<<putData<<endl; sleep(1); } } int main() { // 1、初始化两把全局互斥锁 pthread_mutex_init(&cusLock, nullptr); pthread_mutex_init(&proLock, nullptr); // 2、创造种子,用于生产随机数据插入到环形队列中 srand((size_t)time(nullptr)); // 3、分别新建三个生产者、消费者线程 pthread_t cusTids[3]; pthread_t proTids[3]; for(size_t i = 0; i < 3; ++i) { pthread_create(&cusTids[i], nullptr, Customer, (void*)(i+1)); } for(size_t i = 0; i < 3; ++i) { pthread_create(&proTids[i], nullptr, Producer, (void*)(i+1)); } // 4、分别等待三个生产者、消费者线程 for(size_t i = 0; i < 3; ++i) { pthread_join(cusTids[i], nullptr); } for(size_t i = 0; i < 3; ++i) { pthread_join(proTids[i], nullptr); } // 5、等待完成后delete环形队列并销毁互斥锁对象 delete q; pthread_mutex_destroy(&cusLock); pthread_mutex_destroy(&proLock); return 0; }
编译运行,生产和消费操作并发执行:
六. 信号量和条件变量的区别
信号量既可以实现同步还可以实现互斥,而条件变量只能实现同步;条件变量需要搭配互斥锁使用,而信号量通过自身计数器实现同步的条件判断,不需要搭配互斥锁使用。