进程同步问题是一个非常重要且相当有趣的问题,因而吸引了很多学者对他进行研究。本文就选取其中较为代表性的生产者-消费者问题来进行学习,以帮助我们更好的理解进程同步的概念及实现方法。

一、问题描述

有一群生产者进程在生产产品,并将这些产品提供给消费者进程进行消费。为了使生产这边进程与消费进程能够并发的执行,在两者之间设置了一个具有n 个缓冲区的缓冲池,生产者进程将其他生产的产品放到一个缓冲区中,消费者进程可以从一个缓冲区中取走产品去消费。

需要注意的是,尽管所有的生产者和消费者都是以异步的方式运行的,但是他们之间必须保持同步,

既不允许消费者进程在缓冲区为空时去取产品,也不允许生产者进程在缓冲区已满且产品尚未被取走时向缓冲区投放产品。

https://i-blog.csdnimg.cn/blog_migrate/b26b8ea56b0d36e0dbf923f029c733a4.png

下图是一个生产者与消费者进程执行的流程图,比图中我们可以很清晰的看到上述的三个进程间的关系,其中生产者和消费者中操作缓冲区都需要先进行申请,也就是进入区,操作完成后要执行释放,也就是退出区,通过这样来实现对缓冲池的互斥访问。

二、问题分析

  • 缓冲池一次只能有一个进程访问。
  • 只要缓冲池未满,生产者就可以把产品放入缓冲区。
  • 只要缓冲池未空,消费者就要可以从缓冲区中取走产品。

通过图中的贯通两个进程的虚线来实现生产者和消费者的同步关系。

https://i-blog.csdnimg.cn/blog_migrate/1f2e3de6383effc5cf6c6196e297115f.png

三、信号量设置

由于有界缓冲池是一个临界资源,必须互斥使用,这时可以利用互斥信号量 mutex 来实现诸进程对缓冲池的互斥使用。因为是互斥信号量,所以 mutex 初值为 1。

另外,可以设置两个同步信号量:一个表示缓冲池中空缓冲区的数目,用empty 表示,初值 为缓冲池的大小 n;另一个表示已满缓冲区的数目,即缓冲池中消息的数量,用 full 表示,初值为 0;

除了信号量外,我们可以使用循环链表来表示有界缓冲池,假设缓冲池的大小为 n,我们用 buffer[n] 来表示,另外还有两个队首指针in 和 队尾指针 out ,其初值都为 0。

四、记录型信号量解决生产者-消费者问题

首先我们使用记录型信号量来解决生产者-消费者问题,根据上面的分析,我们先给出伪代码:

int in=0, out=0;

// item 为消息的结构体
item buffer[n];
semaphore mutex=1, empty=n, full=0;// 初始化信号量
void producer(){
	do{
// 生产者产生一条消息
		producer an item next p;
		......
// 判断缓冲池中是否仍有空闲的缓冲区P(empty);
// 判断是否可以进入临界区(操作缓冲池)P(mutex);
// 向缓冲池中投放消息
		buffer[in] = nextp;
// 移动入队指针
		in = (in+1) % n;
//退出临界区,允许别的进程操作缓冲池V(mutex);
// 缓冲池中数量的增加,可以唤醒等待的消费者进程。V(full);
	}while(true);
}

void consumer(){
	do{
//判断缓冲池中是否有非空的缓冲区(消息)P(full);
// 判断是否可以进入临界区(操作缓冲池)P(mutex);
		nextc = buffer[out];
// 移动出队指针
		out = (out+1) % n;
// 退出临界区,允许别的进程操作缓冲池V(mutex);
// 缓冲池中空缓冲区数量加1, 可以唤醒等待的生产者进程V(empty);
// 消费消息
		consumer the item in next c;
	}while(true);
}

通过上面的伪代码,我们可以看到,在每个程序中用于实现互斥的 P(mutex) 和 V(mutex) 必须成对的出现,并且要出现在同一个程序中;对于用于控制进程同步的信号量 full 和 empty,其P/V 操作也必须成对的出现,但他们分别处于不同的程序之中。

还有比较重要的就是,每个程序中多个P操作顺序不能颠倒,比如,应先执行对资源信号量的P操作-P(empty), 再执行对互斥信号量的P操作- P(mutex), 否则可能会因为持有了互斥锁,但是没有空闲的缓冲区而导致生产者进程阻塞,但是别的进程又无法进入临界区,导致发生死锁。

上代码!!!

```c
#include <stdio.h>   
#include <unistd.h>//  sleep的头文件
#include <pthread.h>    //pthread的头文件
#include <semaphore.h>
#include <stdlib.h>  //包含exit的头文件
#define N 100
#define true 1
#define producerNum  3   //生产者数目
#define consumerNum  3   //消费者数目
//线程方法pthread  gcc -o main.c main -pthread  需要pthread头文件,但pthread不是linux的头文件需要链接
typedef int semaphore;    //类型定义 int型 信号量
typedef int item;        
// item 为消息的结构体
item buffer[N] = {0};
int in = 0;//队首指针in 和 队尾指针 out ,其初值都为 0
int out = 0;
int nextp = 0;//产品数目
// 初始化信号量
semaphore mutex = 1, empty = N, full = 0;
/*用互斥信号量 mutex 来实现诸进程对缓冲池的互斥使用。因为是互斥信号量,所以 mutex 初值为 1
由于有界缓冲池是一个临界资源,必须互斥使用,这时可以利用互斥信号量 mutex 来实现诸进程对缓冲池的互斥使用。
因为是互斥信号量,所以 mutex 初值为 1。
另外,可以设置两个同步信号量:一个表示缓冲池中空缓冲区的数目,用empty 表示,初值 为缓冲池的大小 n;
另一个表示已满缓冲区的数目,即缓冲池中消息的数量,用 full 表示,初值为 0;
除了信号量外,我们可以使用循环链表来表示有界缓冲池,假设缓冲池的大小为 n,
我们用 buffer[n] 来表示,另外还有两个队首指针in 和 队尾指针 out ,其初值都为 0。*/
void * producer(void * a){
    do{
        // 生产者产生一条消息
        nextp++;
        // 判断缓冲池中是否仍有空闲的缓冲区P(empty);
        while(empty <= 0){
            printf("缓冲区已满!\n");
        }  //如果满了 ,阻塞
        sleep(1);// 等待唤醒
        // 判断是否可以进入临界区(操作缓冲池)P(mutex);
        while(mutex <= 0);
        empty--;
        mutex--;              
        printf("生产一个产品ID%d, 缓冲区位置为%d\n",nextp,in);
        // 向缓冲池中投放消息  
        buffer[in] = nextp;
        // 移动入队指针
        in = (in + 1) % N;
        //退出临界区,允许别的进程操作缓冲池V(mutex);
        mutex++;
        // 缓冲池中数量的增加,可以唤醒等待的消费者进程。V(full);
        full++; 
        
    } while(true);
}

void * consumer(void *b){
    do{
        //判断缓冲池中是否有非空的缓冲区(消息)P(full);  
        while(full <= 0){
            printf("缓冲区为空!\n"); //如果空,阻塞
        }     
        sleep(1);//等待唤醒       
        // 判断是否可以进入临界区(操作缓冲池)P(mutex);   如果缓冲区是没有产品就没必要进入临界区
        while(mutex <= 0);
        mutex--;      
        full--; 
        // 从池中取出信息
        int nextc = buffer[out];
        // 移动出队指针
        out = (out + 1) % N;
        // 退出临界区,允许别的进程操作缓冲池V(mutex);
        // 缓冲池中空缓冲区数量加1, 可以唤醒等待的生产者进程V(empty);
        mutex++;
        empty++;         
       
        printf("\t\t\t\t消费一个产品ID%d,缓冲区位置为%d\n", nextc,out);
        // 消费消息
    }while(true);
}
//主函数部分的pthread没有搞明白。
int main()
{

    pthread_t threadPool[producerNum+consumerNum];
    int i;
    for(i = 0; i < producerNum; i++)
    {
        pthread_t temp;
        if(pthread_create(&temp, NULL, producer, NULL) == -1)
        {
            printf("ERROR, fail to create producer%d\n", i);
            exit(1);
        }
        threadPool[i] = temp;
    }//创建生产者进程放入线程池


    for(i = 0; i < consumerNum; i++)
    {
        pthread_t temp;
        if(pthread_create(&temp, NULL, consumer, NULL) == -1)
        {
            printf("ERROR, fail to create consumer%d\n", i);
            exit(1);
        }
        threadPool[i+producerNum] = temp;
    }//创建消费者进程放入线程池


    void * result;
    for(i = 0; i < producerNum+consumerNum; i++){
        if(pthread_join(threadPool[i], &result) == -1){
            printf("fail to recollect\n");
            exit(1);
        }
    }//运行线程池
    return 0;
}
可以说是最偷懒的行为了,直接++--比较粗暴!!!
部分代码参考: @Reacubeth

Logo

更多推荐