生产者-消费者模型

1、条件变量

条件变量是线程之间的一种通知机制,当某个共享数据达到某个条件时,唤醒等待这个条件的线程。

通过线程间共享的全局变量进行同步

  • 一个线程等待 “条件变量条件成立” 而阻塞
  • 另一个线程使 “条件成立”

使用时总是和互斥锁结合在一起

基础API

  • pthread_cond_init函数,用于初始化条件变量
  • pthread_cond_destory函数,销毁条件变量
  • pthread_cond_broadcast函数,以广播的方式唤醒所有等待目标条件变量的线程
  • pthread_cond_wait函数,用于等待目标条件变量。该函数调用时需要传入 mutex参数(加锁的互斥锁) ,函数执行时,先把调用线程放入条件变量的请求队列,然后将互斥锁mutex解锁,当函数成功返回为0时,表示重新抢到了互斥锁,互斥锁会再次被锁上, 也就是说函数内部会有一次解锁和加锁操作.
pthread_cond_wait 使用:
pthread_mutex_lock(&mutex);

while(线程执行的条件是否成立){
	pthread_cond_wait(&cond, &mutex);
}
线程执行
pthread_mutex_unlock(&mutex);

pthread_cond_wait执行后的内部操作分为以下几步:

  • 将线程放在条件变量的请求队列后,内部解锁阻塞后自动释放锁
  • 线程等待pthread_cond_broadcast信号唤醒或者pthread_cond_signal信号唤醒,唤醒后去竞争锁
  • 竞争到互斥锁,内部再次加锁

注:线程执行的条件用 while 而不是 if

【问题1】为什么要加锁?

线程执行的条件以及线程执行时,要访问共享资源,避免引起资源竞争

【问题2】为什么要在放在请求队列后才释放锁?

反例:(若在放在请求队列前就释放锁)

假设线程A 的执行顺序

  1. 线程A由于条件不满足 阻塞
  2. 释放锁
  3. 线程A放入请求队列

若线程B在A释放后拿到锁,访问资源,并且使得A的条件成立

  1. 线程A阻塞
  2. 线程A释放互斥锁
  3. 线程B拿到互斥锁
  4. 线程B执行
  5. 线程B使得条件满足,唤醒请求队列线程(不包括A
  6. 线程A放在请求队列中

A在条件满足时,也没有机会被唤醒执行

【问题3】为什么使用while?何时能使用 if ?
pthread_cond_wait(&qready, &qlock); 
/*block-->unlock-->lock-->wait() return*/ 

重新锁定互斥锁后,wait() 将直接返回

若不使用while 反例:

  1. 线程A 等待条件 阻塞,释放互斥锁
  2. 线程B 得到互斥锁,执行,使得条件成立,signal/broadcast 将等待条件的线程唤醒
  3. 此时,若线程A、C都被唤醒
  4. 线程C先拿到互斥锁,wait 返回,线程执行,条件不成立,释放锁
  5. 线程A再拿到互斥锁,因为在 if判断中,不再判断是否成立,wait 返回,直接执行,但资源已经被C消耗掉,A访问了不存在的资源,出错

当只有一个消费者时,可以使用if

2、生产者-消费者模型

  • 生产者和消费者对缓冲区访问互斥
  • 又是协作同步关系,只有当生产者生产之后,消费者才能消费
#include <pthread.h>
struct msg {
	struct msg *next;
	/* value...*/
}; // 消息队列

struct msg * workq;// 消息队列
pthread_cond_t qready = PTHREAD_COND_INITIALIZER;
pthread_mutex_t qlock = PTHREAD_MUTEX_INITIALIZER;

// 消费者
void process_msg() {
    struct msg * mp;
    for(;;) {
        pthread_mutex_lock(&qlock);
        while(workq == NULL) {
            pthread_cond_wait(&qready, &qlock);
        }
        // 消费
        mp = workq;
        workq = workq->next;
        pthread_mutex_unlock(&qlock);
    }
}

// 生产者
void enqueue_msg(struct msg *mp) {
    pthread_mutex_lock(&qlock);
    mp->next = workq;
    workq = mp;
    pthread_mutex_unlock(&qlock);
    pthread_cond_signal(&qready);
}
Logo

CSDN联合极客时间,共同打造面向开发者的精品内容学习社区,助力成长!

更多推荐