同步机制之生产者和消费者问题
信号量:
信号量是进程间通信的一种机制,也可以解决同一进程不同线程之间的通信问题。它是用来保证两个或多个关键代码段不被并发调用,防止多个进程同时对共享资源进行操作。
原理:
在进入一个关键代码段之前,线程必须获取一个信号量;一旦该关键代码段完成了,那么该线程必须释放信号量。其它想进入该关键代码段的线程必须等待直到第一个线程释放信号量。
形象理解:
以一个停车场的运作为例。假设停车场只有三个车位,一开始三个车位都是空的。这时如果同时来了五辆车,看门人允许其中三辆直接进入,然后放下车拦,剩下的车则必须在入口等待,此后来的车也都不得不在入口处等待。这时,有一辆车离开停车场,看门人得知后,打开车拦,放入外面的一辆进去,如果又离开两辆,则又可以放入两辆,如此往复。 在这个停车场系统中,车位是公共资源,每辆车好比一个线程,看门人起的就是信号量的作用。
两种操作:
抽象的来讲,信号量的特性如下:信号量是一个非负整数(车位数),所有通过它的线程/进程(车辆)都会将该整数减一(通过它当然是为了使用资源),当该整数值为零时,所有试图通过它的线程都将处于等待状态。
Wait(等待,或叫P操作) 当一个线程调用Wait操作时,它要么得到资源然后将信号量减一,要么一直等下去(指放入阻塞队列),直到信号量大于等于一时。
signal(释放,或叫V操作) 实际上是在信号量上执行加一操作,对应于车辆离开停车场,该操作之所以叫做“释放”是因为释放了由信号量守护的资源。
Linux POSIX API中对应的两个函数:
1.sem_wait函数
函数原型 :int sem_wait(sem_t * sem);
它的作用是从信号量的值减去一个“1”,但它永远会先等待该信号量为一个非零值才开始做减法。
2.sem_post函数
函数原型 :int sem_post(sem_t *sem);
作用是给信号量的值加上一个“1”。 当有线程阻塞在这个信号量上时,调用这个函数会使其中一个线程不再阻塞,选择机制是由线程的调度策略决定的。
生产者和消费者问题
生产者-消费者问题是一个经典的进程同步问题,已经属于化石级别的了。该问题最早由Dijkstra提出,用以演示他提出的信号量机制。要求设计在同一个进程地址空间内执行的两个线程。生产者线程生产物品,然后将物品放置在一个空缓冲区中供消费者线程消费。消费者线程从缓冲区中获得物品,然后释放缓冲区。当生产者线程生产物品时,如果没有空缓冲区可用,那么生产者线程必须等待消费者线程释放出一个空缓冲区。当消费者线程消费物品时,如果没有满的缓冲区,那么消费者线程将被阻塞,直到新的物品被生产出来。
要求设计并实现一个进程,该进程拥有一个生产者线程和一个消费者线程,它们使用N个不同的缓冲区(N为一个自定义的确定的数值,例如N=32)。需要使用如下信号量:
一个互斥信号量,用以阻止生产者线程和消费者线程同时操作缓冲区列表;
一个信号量,当生产者线程生产出一个物品时可以用它向消费者线程发出信号;
一个信号量,消费者线程释放出一个空缓冲区时可以用它向生产者线程发出信号;
可参考 pv操作经典问题-生产者与消费者问题
实验要求
- 运行例子中的代码,请观察运行结果,并说明生产者和消费者在并发执行过程中,互斥信号量和同步信号量的作用。
- 这是一个单生产者和单消费者的问题,改进程序,使其成为多生产者和消费者问题。
- 实验过程中遇到哪些问题,有哪些收获,越详细越具体越好。
单生产者与单消费者【源程序】
代码
1.c
//pv操作:生产者与消费者经典问题 //author:leaf #include <stdio.h> #include <stdlib.h> #include <unistd.h> #include <pthread.h> #include <semaphore.h> #define M 32 /*缓冲数目*/ #define P(x) sem_wait(&x) #define V(x) sem_post(&x) int in = 0; /*生产者放置产品的位置*/ int out = 0; /*消费者取产品的位置*/ int buff[M] = {0}; /*缓冲初始化为0, 开始时没有产品*/ sem_t empty_sem; /*同步信号量,当满了时阻止生产者放产品*/ sem_t full_sem; /*同步信号量,当没产品时阻止消费者消费*/ pthread_mutex_t mutex; /*互斥信号量, 一次只有一个线程访问缓冲*/ /* *output the buffer */ void print() { int i; for(i = 0; i < M; i++) printf("%d ", buff[i]); printf("\n"); } /* *producer */ void *producer() { for(;;) { sleep(1); P(empty_sem); pthread_mutex_lock(&mutex); in = in % M; printf("(+)produce a product. buffer:"); buff[in] = 1; print(); ++in; pthread_mutex_unlock(&mutex); V(full_sem); } } /* *consumer */ void *consumer() { for(;;) { sleep(2); P(full_sem); pthread_mutex_lock(&mutex); out = out % M; printf("(-)consume a product. buffer:"); buff[out] = 0; print(); ++out; pthread_mutex_unlock(&mutex); V(empty_sem); } } void sem_mutex_init() { /* *semaphore initialize */ int init1 = sem_init(&empty_sem, 0, M); int init2 = sem_init(&full_sem, 0, 0); if( (init1 != 0) && (init2 != 0)) { printf("sem init failed \n"); exit(1); } /* *mutex initialize */ int init3 = pthread_mutex_init(&mutex, NULL); if(init3 != 0) { printf("mutex init failed \n"); exit(1); } } int main() { pthread_t id1; pthread_t id2; int i; int ret; sem_mutex_init(); /*create the producer thread*/ ret = pthread_create(&id1, NULL, producer, NULL); if(ret != 0) { printf("producer creation failed \n"); exit(1); } /*create the consumer thread*/ ret = pthread_create(&id2, NULL, consumer, NULL); if(ret != 0) { printf("consumer creation failed \n"); exit(1); } pthread_join(id1,NULL); pthread_join(id2,NULL); exit(0); }
结果
ctrl+c强制结束
多生产者和多消费者【程序改进】
代码
最简单的改法
//增加 void *producer2() void *consumer2() ret = pthread_create(&id3, NULL, producer2, NULL); ret = pthread_create(&id4, NULL, consumer2, NULL);
2.c
//pv操作:生产者与消费者经典问题 //author:leaf #include <stdio.h> #include <stdlib.h> #include <unistd.h> #include <pthread.h> #include <semaphore.h> #define M 32 /*缓冲数目*/ #define P(x) sem_wait(&x) #define V(x) sem_post(&x) int in = 0; /*生产者放置产品的位置*/ int out = 0; /*消费者取产品的位置*/ int buff[M] = {0}; /*缓冲初始化为0, 开始时没有产品*/ sem_t empty_sem; /*同步信号量,当满了时阻止生产者放产品*/ sem_t full_sem; /*同步信号量,当没产品时阻止消费者消费*/ pthread_mutex_t mutex; /*互斥信号量, 一次只有一个线程访问缓冲*/ /* *output the buffer */ void print() { int i; for(i = 0; i < M; i++) printf("%d ", buff[i]); printf("\n"); } /* *producer */ void *producer() { for(;;) { sleep(1); P(empty_sem); pthread_mutex_lock(&mutex); in = in % M; printf("(+)produce a product. buffer:"); buff[in] = 1; print(); ++in; pthread_mutex_unlock(&mutex); V(full_sem); } } /* *consumer */ void *consumer() { for(;;) { sleep(2); P(full_sem); pthread_mutex_lock(&mutex); out = out % M; printf("(-)consume a product. buffer:"); buff[out] = 0; print(); ++out; pthread_mutex_unlock(&mutex); V(empty_sem); } } /* *producer2 */ void *producer2() { for(;;) { sleep(1); P(empty_sem); pthread_mutex_lock(&mutex); in = in % M; printf("(+)produce2 a product. buffer:"); buff[in] = 1; print(); ++in; pthread_mutex_unlock(&mutex); V(full_sem); } } /* *consumer2 */ void *consumer2() { for(;;) { sleep(2); P(full_sem); pthread_mutex_lock(&mutex); out = out % M; printf("(-)consume2 a product. buffer:"); buff[out] = 0; print(); ++out; pthread_mutex_unlock(&mutex); V(empty_sem); } } void sem_mutex_init() { /* *semaphore initialize */ int init1 = sem_init(&empty_sem, 0, M); int init2 = sem_init(&full_sem, 0, 0); if( (init1 != 0) && (init2 != 0)) { printf("sem init failed \n"); exit(1); } /* *mutex initialize */ int init3 = pthread_mutex_init(&mutex, NULL); if(init3 != 0) { printf("mutex init failed \n"); exit(1); } } int main() { pthread_t id1; pthread_t id2; pthread_t id3; pthread_t id4; int i; int ret; sem_mutex_init(); /*create the producer thread*/ ret = pthread_create(&id1, NULL, producer, NULL); if(ret != 0) { printf("producer creation failed \n"); exit(1); } /*create the consumer thread*/ ret = pthread_create(&id2, NULL, consumer, NULL); if(ret != 0) { printf("consumer creation failed \n"); exit(1); } /*create the producer2 thread*/ ret = pthread_create(&id3, NULL, producer2, NULL); if(ret != 0) { printf("producer2 creation failed \n"); exit(1); } /*create the consumer2 thread*/ ret = pthread_create(&id4, NULL, consumer2, NULL); if(ret != 0) { printf("consumer2 creation failed \n"); exit(1); } pthread_join(id1,NULL); pthread_join(id2,NULL); pthread_join(id3,NULL); pthread_join(id4,NULL); exit(0); }
结果
产生死锁的代码【程序改进】
产生死锁
把void *consumer()
中的
P(full_sem); pthread_mutex_lock(&mutex);
调换顺序 改为
pthread_mutex_lock(&mutex); P(full_sem);
更明显
并且为了更快的产生死锁
把void *producer()
中的
sleep(1);
改为
sleep(10);
代码
3.c
//pv操作:生产者与消费者经典问题 //author:leaf #include <stdio.h> #include <stdlib.h> #include <unistd.h> #include <pthread.h> #include <semaphore.h> #define M 32 /*缓冲数目*/ #define P(x) sem_wait(&x) #define V(x) sem_post(&x) int in = 0; /*生产者放置产品的位置*/ int out = 0; /*消费者取产品的位置*/ int buff[M] = {0}; /*缓冲初始化为0, 开始时没有产品*/ sem_t empty_sem; /*同步信号量,当满了时阻止生产者放产品*/ sem_t full_sem; /*同步信号量,当没产品时阻止消费者消费*/ pthread_mutex_t mutex; /*互斥信号量, 一次只有一个线程访问缓冲*/ /* *output the buffer */ void print() { int i; for(i = 0; i < M; i++) printf("%d ", buff[i]); printf("\n"); } /* *producer */ void *producer() { for(;;) { sleep(10); P(empty_sem); pthread_mutex_lock(&mutex); in = in % M; printf("(+)produce a product. buffer:"); buff[in] = 1; print(); ++in; pthread_mutex_unlock(&mutex); V(full_sem); } } /* *consumer */ void *consumer() { for(;;) { sleep(2); pthread_mutex_lock(&mutex); P(full_sem); out = out % M; printf("(-)consume a product. buffer:"); buff[out] = 0; print(); ++out; pthread_mutex_unlock(&mutex); V(empty_sem); } } void sem_mutex_init() { /* *semaphore initialize */ int init1 = sem_init(&empty_sem, 0, M); int init2 = sem_init(&full_sem, 0, 0); if( (init1 != 0) && (init2 != 0)) { printf("sem init failed \n"); exit(1); } /* *mutex initialize */ int init3 = pthread_mutex_init(&mutex, NULL); if(init3 != 0) { printf("mutex init failed \n"); exit(1); } } int main() { pthread_t id1; pthread_t id2; int i; int ret; sem_mutex_init(); /*create the producer thread*/ ret = pthread_create(&id1, NULL, producer, NULL); if(ret != 0) { printf("producer creation failed \n"); exit(1); } /*create the consumer thread*/ ret = pthread_create(&id2, NULL, consumer, NULL); if(ret != 0) { printf("consumer creation failed \n"); exit(1); } pthread_join(id1,NULL); pthread_join(id2,NULL); exit(0); }
结果
分析产生死锁的原因
略