【linux】多线程(七)基于环形队列的生产者消费者模型,POSIX信号量: sem_init,sem_destroy,sem_wait,sem_post
一、POSIX信号量sem_initsem_destroysem_waitsem_post二、基于环形队列的生产者消费者模型环形队列原理讲解商心慈和方源玩摆盘子游戏基于环形队列的单生产者单消费者模型三、实现环形队列的单生产者单消费者模型Task.hppRingQueue.hpp基本框架PushPopmain.c + 运行代码四、实现环形队列的多生产者多消费者模型RingQueue.hpp基本框架P
小编个人主页详情<—请点击
小编个人gitee代码仓库<—请点击
linux系列专栏<—请点击
倘若命中无此运,孤身亦可登昆仑,送给屏幕面前的读者朋友们和小编自己!
目录
前言
【linux】多线程(六)生产者消费者模型,queue模拟阻塞队列的生产消费模型——书接上文 详情请点击<——,本文会在上文的基础上进行讲解,所以对上文不了解的读者友友请点击前方的蓝字链接进行学习
本文由小编为大家介绍——【linux】多线程(七)基于环形队列的生产者消费者模型,POSIX信号量: sem_init,sem_destroy,sem_wait,sem_post
一、POSIX信号量
- 这里小编讲解的POSIX信号量的原理和SystemV版本的信号量的原理是一样的,所以读者友友学习POSIX信号量的原理看小编之前讲解的SystemV版本的信号量原理即可,在后面蓝字链接文章的四、信号量原理铺垫,五、信号量原理中进行的讲解,如果不了解的读者友友可以点击后方蓝字链接自行学习详情请点击<——
- 信号量可以用于同步与互斥,信号量的本质是一把计数器,那么这把计数器的本质就是用来描述资源数目的,把资源是否就绪放在了临界区之外,申请信号量时,其实间接的就已经在做判断了,申请信号量要使用P操作(计数器- -),释放信号量要使用V操作(计数器++),访问临界资源的操作需要在PV操作之间,即P操作之后,V操作之前,如果申请成功了信号量,那么临界资源一定有你的一份,即对临界资源的预定机制
- 那么下面我们就来学习一下POSIX信号量的接口,使用POSIX信号量的时候都需要包信号量的头文件#include <semaphore.h>
sem_init

- 使用sem_t类型定义的变量就是一个信号量,然后初始化信号量使用sem_init即可,第一个参数是信号量的地址,第二个参数pshared如果传参0那么表示线程间共享,如果传参非零,那么表示进程间共享,本文要使用的场景是线程间共享,所以第二个参数传参0
- 第三个参数是信号量的初始值,信号量是一把计数器,你想让这把计数器从多少开始,那么就在第三个参数传参多少,计数器没有负数的概念,所以你可以选择传参0,或者传参大于0的数。
- 如果第三个参数传参你选择传参0,那么进行P操作申请信号量,此时没有信号量可以被申请了,所以信号量会申请失败,后续进行了V操作之后释放了信号量,那么此时就有信号量了,可以P操作申请信号量成功,可见只有当信号量这把计数器大于0才能申请信号量成功
sem_destroy

- 那么当使用完成信号量之后,确保后续不再使用信号量了,那么使用sem_destroy接口,传入信号量的地址即可释放销毁信号量
sem_wait

- 如果你想要执行P操作(计数器- -),即申请一个信号量,那么就应该使用sem_wait接口,传入信号量的地址即可进行申请信号量
- 如果信号量的计数此时为0,那么代表此时没有信号量可以被申请了,所以P操作信号量会申请失败,后续进行了V操作之后释放了信号量,那么此时就有信号量了,可以P操作申请信号量成功,可见只有当信号量这把计数器大于0才能申请信号量成功
sem_post

- 如果你想要执行V操作(计数器++),那么使用sem_post传入信号量的地址即可释放信号量
二、基于环形队列的生产者消费者模型
环形队列原理讲解
- 既然是基于环形队列的生产者消费者模型,那么我们就要首先了解什么是环形队列,我们又该用什么手段实现这个环形队列呢?有两种方案如下

- 环形队列是一个环形的队列,那么我们可以使用一个tail作为环形队列的尾部,tail的作用是指向环形队列尾部的格子,一个head作为环形队列的头部,那么head的作用是指向要放数据的格子,最开始的时候我们让tail和head指向一个格子表示环形队列为空,即对应上图中的最左边
- 红色即代表格子中已经放了数据,白色代表格子中没有数据,那么我们先放一个数据,那么head向后移,tail不动,即上图中的中间的环形队列对应的情况
- 接下来我们向环形队列的格子中持续放数据,那么head就持续的向后移动,直到head和tail重合,此时对应上图中的右边的环形队列对应的情况,所以此时环形队列为满喽,那么问题也就来了,什么问题呢?
- 最初环形队列为空的时候head和tail重合,现在环形队列为满,那么head和tail也重合了,所以我们就无法区分环形队列究竟是满了还是没有满,如果满了继续向格子中放数据,那么就会覆盖原有格子中已经存在的数据,所以上面设计方法是不可行的,因为无法判断环形队列究竟是空还是满,那么我们该如何做呢?
- 方案一:采用一个计数器记录当前有多少个格子已经放了数据,和总的格子数目进行比对,如果相等,那么则说明环形队列满了,否则则说明环形队列没有满
- 方案二:空一个格子,原理如下

- 首先我们判断环形队列的代价是浪费一个空间,所以也就注定了会有一个格子始终是不存放数据的,那么开始为空的时候,我们让head和tail指向同一个位置,即对应上图的最左边
- 接下来我们开始放一个数据,那么head位置放数据之后,head向后移动一个格子,那么对应上图中间的情况
- 那么为满的情况是head的下一个位置是tail,所以接下来我们开始持续放数据,每次放数据前都要判断head的下一个位置是是否是tail,如果不是那么说明此时环形队列还没有满,所以就向格子中放数据,于是格子就持续被放数据直到head的下一个位置是tail,那么此时代表环形队列在浪费一个格子的情况下已经被放满了,即对应上图的最右侧的情况,那么如何出数据呢?如下

- 那么出数据的时候要时刻判断tail是否等于head,如果不等于,那么说明此时环形队列不为空,可以出格子中的数据,所以此时从最左侧的情况开始从格子中出数据,判断tail是否等于head,不等于,所以出格子中的数据,此时将tail位置对应格子的数据弹出,然后tail向后走一个格子,此时对应上图的中间的情况
- 那么继续出格子中的数据,判断tail是否等于head,不等于,那么就可以持续将tail位置对应格子的数据弹出,然后tail向后走一个格子,直到tail等于head,那么说明此时环形队列为空,所以停止出格子中的数据
- 所以tail等于head代表环形队列为空,head的下一个为tail代表环形队列在浪费一个格子的情况下为满
- 所以我们应该如何模拟这个环形队列呢?数组模拟,逻辑如下,但是今天由于POSIX信号量的存在,所以我们并不需要使用如下的判断逻辑以及空出一个空间的代价,具体原因后面解释

商心慈和方源玩摆盘子游戏
- 那么下面小编引进一个场景,蛊界的商心慈和方源很无聊,于是他们想玩摆盘子游戏,有一桌盘子,盘子为白色则为没有西瓜,盘子为绿色则为有西瓜,方源负责放西瓜,商心慈负责收西瓜,并且他俩约定一个盘子上只能放一个西瓜

- 左上角是人物简介,于是在最开始的时候,方源和商心慈站在同一个位置,上图的最左侧即为初始位置,那么开始的时候,方源和商心慈站在同一个位置,那么谁先走呀?
- 由于方源负责放西瓜,商心慈负责收西瓜,所以方源先走,于是方源就在开始的位置放了一个西瓜,然后向后走了一步,即上图的中间对应的图,这时候商心慈说:“方源哥哥,你先放吧,我不急着收西瓜”,于是方源就逐个逐个的在每个盘子上放置西瓜,到最后,方源和商心慈相遇了,即对应上图的最右侧

- 所以此时方源和商心慈处于上图最左侧的这种情况,那么此时当桌子上的盘子中西瓜全部为满的时候,那么此时方源还能继续放西瓜,继续走吗?不能了,如果继续再走,那么将会违背事先约定好的一个盘子上只能放一个西瓜的约定,所以应该谁先走呢?只能是商心慈先走,所以说方源不能将商心慈套一个圈,所以商心慈就先收起了一个西瓜,走了一步,对应上图中的中间,此时方源说:“你收西瓜吧,我等你把西瓜收完”
- 所以商心慈就逐个将西瓜一个一个的收起来了,到最后,商心慈和方源相遇了,即对应上图中的最右侧的情况。所以我们可以归纳出,当盘子全部为空的时候,方源先走。当盘子全部放上了西瓜之后,只能商心慈先走。什么时候方源和商心慈会相遇,当所有的盘子中为空或者为满的情况会相遇
- 同样的道理,如果方源放了西瓜之后,商心慈可以收西瓜吗?当然可以,只要盘子上有西瓜,商心慈就可以收西瓜,即商心慈和方源可以同时的工作,并且在这个过程中,他们两个访问的是桌子上的不同位置
- 什么时候方源和商心慈才会相遇,当所有盘子中为全部为空或者未满的时候才会相遇
- 所以当所有盘子全部不为空或者全部不为满的时候,方源和商心慈一定是在不同位置,方源和商心慈可以同时访问
- 在这个游戏中,方源负责生产西瓜到盘子上,方源代表的是生产者,商心慈负责消费西瓜,所以商心慈代表的是消费者,并且商心慈和方源玩这个摆盘子游戏,那么就要满足三个条件
(1)当指向同一个位置的时候,只能一个人访问。当为所有盘子为空的时候,只能方源(生产者)来访问。当所有盘子为满的时候,只能商心慈(消费者)来访问。
(2)商心慈(消费者)不能超过方源(生产者)
(3)方源(生产者)不能把商心慈(消费者)一个圈 - 在这个游戏中,方源(生产者)关注的是有多少个空盘子,即剩余空间。商心慈(消费者)关注的是有多少个盘子上被放了数据,即剩余数据
基于环形队列的单生产者单消费者模型
- 所以有了上面方源和商心慈的玩游戏的场景例子之后,此时小编就可以简单的从单生产者单消费者模型入手开始进行讲解了

- 那么我们使用C代表消费者,使用P代表生产者。那么上图的格子一共有N个,即一共有N个空间,那么最初的时候,C生产者和P消费者都指向同一个位置,对应上图的最左侧,那么最开始的时候C生产者和P消费者都指向同一个位置,此时所有的格子的数据为空,应该生产者先走,即生产者先向格子中生产数据,然后向后走一步,对应上图的中间图
- 接下来,假设消费者不动,那么生产者一直生产数据到格子中,那么此时生产者就将所有的格子全部生产满了,所以此时生产者就和消费者相遇了,此时所有的格子的数据为满,应该消费者先走

- 那么此时消费者就先走了,将当前格子的数据消费,然后向后走一步,对应上图中的最左侧的情况,接下来全部格子的数据不全部为满并且不全部为空,所以我P生产者也可以走,但是P生产者同样也可以选择不走,所以P生产者先选择不走,然后C消费者走,即消费者继续消费格子中的数据,消费者向后走一步,那么P生产者不走,那么对应上图的中间位置
- 再接下来全部格子的数据不全部为满并且不全部为空,P生产者也可以走,但是P生产者同样也可以选择不走,所以P生产者选择走,于此同时C消费者也在走,所以P生产者生产数据到当前格子,然后向后走一步,C消费者消费当前格子的数据,然后向后走一步,即对应上图的最右侧的情况
- 所以我们可以思考当所有格子的数据为空或者未满的时候,生产者消费者模型表现出了一种顺序性!即当所有格子的数据为空的时候,只能是生产者先走,当所有格子的数据为满的时候,只能是消费者先走,所以就需要用到信号量了

- 那么P生产者关注的是什么资源呢?即还有多少剩余资源,所以我们就可以给生产者定义一个信号量SpaceSem,那么上图最左侧的格子数目一共有N个,那么最初的时候,我们定义生产者的信号量的时候需要传参,那么需要传参多少呢?没错,传参N,因为最初的时候,空间资源,即格子都没有被放数据,所以P生产者最开始的空间资源的数目即为格子的数目N,所以传参N
- 那么C消费者关注的是什么资源呢?即还有多少剩余数据,所以我么就可以给消费者定义一个信号量DataSem,那么最开始的时候,我们定义消费者的信号量的时候需要传参,那么传参多少呢?没错,传参0,因为最初的时候生产者并没有生产数据到格子中,所以最初的时候格子中并没有数据,即剩余数据为0,所以传参0
- 所以此时我们再对应一下生产者与消费者各自的PV操作,P操作是申请信号量(计数- -),V操作是释放信号量(计数++),如下

- 所以最开始当格子的数据全部为空的时候,P生产者的信号量的计数是N,C消费者的信号量的计数是0,所以这时候格子为空应该先让生产者先跑,即生产数据到格子。但是如果此时C消费者先跑执行代码逻辑呢?那么消费者就会先申请信号量,可是此时信号量的计数为0,所以消费者就会申请信号量失败,那么申请信号量失败就不会运行,即运行失败。
- 与此同时,生产者的信号量的计数是N,那么生产者申请信号量就会申请成功,那么此时生产者就会生产数据到格子中,所以此时格子空间少了一个,所以生产者进行的P操作会让SpaceSem信号量的计数- -,但是生产者经过生产之后,格子中的数据多了一份,所以说剩余属于此时变成了1,所以生产者就可以执行V操作,将Data信号量的计数++。所以尽管消费者先跑,但是由于有信号量的存在不怕,消费者一定会申请信号量失败导致无法运行,可以保障当最开始格子为空的时候,生产者和消费者指向同一个位置,信号量一定可以保证生产者先跑,即生产数据到格子中
- 那么当格子的数据为满的时候,生产者和消费者一定是处于同一个位置,并且生产者的SpaceSem的信号量的计数是0,所以此时生产者想要运行,要运行就要先申请信号量,那么由于信号量此时为0,所以生产者就会P操作申请信号量失败,所以生产者无法运行。
- 所以消费者此时的DataSem信号量的计数为N,所以消费者此时想要运行,那么就会先P操作申请信号量,此时消费者申请信号量就会成功,所以消费者就可以消费数据了
- 所以所以所以,当格子的数据为满的时候,生产者和消费者一定是处于同一个位置,由于信号量的存在,一定可以保障消费者先运行
- 格子的数据是为全部为空或者全部为满的时候,生产者和消费者一定处于同一个位置,由于信号量的存在,可以保证一定的顺序性执行,即生产者与消费者的同步,并且仅仅有一个角色可以执行,即生产者与消费者的互斥,所以通过信号量的存在,我们就可以保证生产者和消费者之间的同步,互斥关系
- 并且由于大部分情况下格子的数据都是不全部为空也不全部未满的,所以生产者和消费者一定处于不同位置,所以虽然环形队列我们将它拆解成N份资源,生产者申请信号量成功访问的是N份资源中的一个位置,消费者申请信号量成功,访问的一定是N份资源中和生产者的资源不同的那一个位置,所以生产者和消费者大部分时间都可以并发访问这个环形队列的不同位置
- 生产者消费者模型需要满足321原则
3种关系:(1)生产者与生产者:互斥(2)消费者与消费者:互斥(3)生产者与消费者:互斥,同步。
2种角色:(1)生产者(2)消费者。
1个场所:(1)特定结构的内存空间 - 此时3种关系中的(1)(2)我们已经满足,因为我们目前讨论的单生产者单消费者的模型,所以天然没有多个生产者或多个消费者,即不需要维护多个生产者之间的互斥关系,不需要维护多个消费者之间的互斥关系,(3)我们已经通过信号量维护了生产者与消费者的互斥与同步关系
- 2种角色我们也已经满足,1个场所我们采用的是数组模拟环形队列,也已经满足。所以说此时关于基于环形队列的单生产者单消费者模型的理论性的准备工作我们已经做好了,下面我们就可以基于理论去实际实现这个基于环形队列的单生产者单消费者模型
三、实现环形队列的单生产者单消费者模型
- 我们在上面讲的是生产者将数据生产到环形队列中,消费者从环形队列中拿数据,可是仅仅传输诸如,int,string等类型的数据好像意义不是很大,在更多的应用场景中则是传输任务,即将任务封装在对象中放到环形队列中进行传输,下面我们就下来看一下模拟实现计算任务
Task.hpp
- 小编在之前的文章中已经讲解了如何模拟实现计算任务,所以这里小编就将任务直接拿来用,关于讲解后面的蓝字链接对应的文章中的第二点queue模拟阻塞队列的生产消费模型中的第一点Tash.hpp即计算任务的模拟实现的讲解,请点击阅读<——
#pragma once
#include <iostream>
#include <string>
std::string opers = "+-*/%";
enum Err
{
DivZero = 1,
ModZero,
Unknown
};
class Task
{
public:
Task(int data1, int data2, char op)
: data1_(data1), data2_(data2), oper_(op), result_(0), exitcode_(0)
{}
void operator()()
{
switch (oper_)
{
case '+':
result_ = data1_ + data2_;
break;
case '-':
result_ = data1_ - data2_;
break;
case '*':
result_ = data1_ * data2_;
break;
case '/':
{
if (data2_ == 0)
exitcode_ = DivZero;
else
result_ = data1_ / data2_;
}
break;
case '%':
{
if (data2_ == 0)
exitcode_ = ModZero;
else
result_ = data1_ % data2_;
}
break;
default:
exitcode_ = Unknown;
break;
}
}
std::string GetTask()
{
std::string r = (std::to_string(data1_));
r += ' ';
r += oper_;
r += ' ';
r += std::to_string(data2_);
r += " = ?";
return r;
}
std::string GetResult()
{
std::string r = (std::to_string(data1_));
r += ' ';
r += oper_;
r += ' ';
r += std::to_string(data2_);
r += " = ";
r += std::to_string(result_);
r += ", [";
r += "exitcode: ";
r += std::to_string(exitcode_);
r += "]";
return r;
}
~Task()
{}
private:
int data1_;
int data2_;
char oper_;
int result_;
int exitcode_;
};
RingQueue.hpp
基本框架
- 所以接下来我们就编写基于使用vector模拟环形队列的单生产者单消费者模型,由于作为编写者我们并不知道环形队列中要放什么类型的任务,所以我们将环形队列定义为模板类
- 下面我们来思考一下环形队列RingQueue的私有成员变量应该有什么?环形队列是采用vector模拟的,所以要有一个vector<T>类型的成员变量ringqueue_,还应该有生产者的信号量cdata_sem_,消费者的信号量pspace_sem_,生产者的位置c_step_,消费者的位置p_step_,还有环形队列的最大容量cap_,为什么要有最大容量cap_?
- 因为虽然我们使用的vector来模拟的环形队列,vector可以支持自动扩容,但是如果不采用有限的空间,那么我们就无法确定出环形队列的大小,进而生产者的信号量初始化需要的最大空间的数目也就无法传入,所以也就无法构造了,所以这里的最大容量我们还是需要的,那么我们就应该有一个最大容量的默认值,这里我们定义一个静态全局变量defaultnum初始化为5作为cap_的默认值
- 我们希望对信号量的申请与释放进行封装,所以申请信号量的接口sem_wait对应P操作,那么我们封装一个参数为信号量类型的P函数,P函数传入信号量的地址给sem_wait即可,释放信号量的接口sem_post对应V操作,那么我们封装一个参数为信号量类型的V函数,V函数内传入信号量的地址给sem_post即可,并且将这两个函数设置为私有成员函数保证一定的封装性,不对外提供
- 所以接下来我们就可以编写RingQueue环形队列的构造函数,所以参数设置为容量cap,缺省参数为defaultnum即可,然后在初始化列表中对ringqueue_进行扩容为cap大小,这样可以减少频繁扩容的次数提高效率,并且这样就可以直接对空间的数据进行访问修改了,容量cap_设置为cap即可,如果用户不传入,那么使用缺省值即可,如果用户显式传入,那么采用用户传入的即可
- 之后将生产者消费者的位置都设置为0,因为最开始环形队列创建的时候,数组中的所有下标位置并没有放有效数据,所以环形队列为空,生产者消费者的位置相同,即生产者消费者的位置都设置为0即可

- 由于我们使用了信号量,那么在构造函数这里需要使用sem_init对信号量进行初始化,所以如上图的N对应当前的cap_,那么由于最开始数组内没有数据,所以剩余可用空间资源为cap_,那么生产者的信号量的计数设置为cap即可,由于最开始数组内的最开始并没有数据,所以剩余数据为0,那么消费者的信号量的计数设置为0即可
- ringqueue的析构函数则使用sem_destroy传入信号量的地址,销毁信号量即可
#pragma once
#include <vector>
#include <semaphore.h>
static int defaultnum = 5;
template<class T>
class RingQueue
{
private:
void P(sem_t& sem)
{
sem_wait(&sem);
}
void V(sem_t& sem)
{
sem_post(&sem);
}
public:
RingQueue(int cap = defaultnum):ringqueue_(cap), cap_(cap), c_step_(0), p_step_(0)
{
sem_init(&pspace_sem_, 0, cap_);
sem_init(&cdata_sem_, 0, 0);
}
~RingQueue()
{
sem_destroy(&cdata_sem_);
sem_destroy(&pspace_sem_);
}
private:
std::vector<T> ringqueue_;
int cap_; //容量
int c_step_; //消费者的位置
int p_step_; //生产者的位置
sem_t cdata_sem_; //消费者所需要的剩余数据信号量
sem_t pspace_sem_; //生产者所需要的剩余空间信号量
};
Push
- 那么接下来我们开始编写Push这个成员函数,push就是向环形队列中入数据,那么此时这个push就是生产者要调用的接口,生产数据到环形队列

- 所以我们就按照上图左侧申请信号量P操作申请空间信号量以及释放信号量V操作释放数据信号量即可,那么在P操作之后,V操作之前,那么就应该是生产数据到环形队列的操作
- 由于是采用的vector模拟的环形队列,生产者即将要访问的位置是p_step_,所以我们使用[ ]+下标直接访问对应位置,将对应位置赋值成数据in即可,那么这个下标位置已经使用,所以要++,之后为了避免超过最大容量cap_,所以还应该取模cap_
void Push(const T& in)
{
P(pspace_sem_);
ringqueue_[p_step_] = in;
p_step_++;
p_step_ %= cap_;
V(cdata_sem_);
}
Pop
- 那么接下来我们开始编写Pop这个成员函数,pop就是从环形队列中弹出数据,那么此时这个pop就是消费者要调用的接口,消费数据

- 所以我们就按照上图左侧申请信号量P操作申请空间信号量以及释放信号量V操作释放数据信号量即可,那么在P操作之后,V操作之前,那么就是消费者消费数据的过程
- 将数据弹出,那么我们设置一个T*的指针作为输出型参数,c_step_是消费者要访问的位置,所以我们就使用[ ]+下标,拿到对应的数据,然后赋值给解引用out即可,然后此时外部就拿到了数据,接下来由于c_step_已经使用,所以我们接下来就++到下一个位置,然后为了放置越界,然后取模cap_即可
void Pop(T* out)
{
P(cdata_sem_);
*out = ringqueue_[c_step_];
c_step_++;
c_step_ %= cap_;
V(pspace_sem_);
}
main.c + 运行代码
- 那么我们在main函数这个文件中,就要创建出生产者线程和消费者线程,然后这两个线程要执行对应的线程函数,生产者线程要执行生产者线程函数Producter,消费者线程要执行消费者线程函数Consumer,这两个线程函数都需要拿到环形队列才能进行操作,所以参数要传入环形队列,两个线程函数的开始都要先将线程类型为void*的形参转换为环形队列的指针才继续进行后续的操作
- 生产者线程函数Producter,就要先获取数据,在实际的场景中这个数据应该是从用户,网络等进行获取,那么这里我们采用随机数的方式获取数据,左操作数和的值我们控制在0到9之间,所以应该采用随机数取模10,即取模获取左操作数,usleep休眠
- 这个休眠是必须的,因为随机数是基于时间生成的,如果代码跑的太快,那么左操作数的值概率可能和右操作数相同,由于休眠了,大概率右操作数的值大概率可能和左操作数不同,右操作数的值我们期望是0到4,所以随机数取模5,仅仅是0到4,右操作数据范围较小,这样有较大的可能得到0,那么这样就可以较大概率看到异常结果,让右操作数为0,发生除零错误,发生取模零错误,我们期望看到异常对应的错误码
- 计算运算符字符串opers的长度,由于opers = "+-*/%"所以我们计算出这个字符串的长度为5,然后让随机数取模5,这样就可以在0到4之间随机,然后我们采用下标+[ ]的方式拿操作符op即可
- 接下来采用上面的构建好的数据传入Task类构造出对象t,我们再打印一下任务回显给用户即可,然后将t生产push到环形队列中即可,最后小编让生产者线程函数休眠1秒,即生产者生产慢,期望看到同步现象,即当环形队列中没有节点数据的时候,生产者先跑先运行,上述过程要放到while死循环中,我们期望生产者线程源源不断的生产节点到环形队列,一旦环形队列满了,那么消费者线程先跑先运行
- 那么下面小编讲解消费者线程Consumer,那么消费者线程就要取出环形队列的节点,调用仿函数进行运算,最后回显打印结果即可,同样的上述过程放到死循环中,即消费者线程源源不断的从环形队列中拿节点,即当环形队列中没有数据的时候,此时生产者线程先跑先运行
- 接下来是main函数中的逻辑,main函数创建一个随机数种子,主线程new一个环形队列便于传参到线程函数中,然后创建一个消费者线程,创建一个生产者线程,然后主线程pthread_join等待两个线程退出即可,最后delete释放环形队列放置内存泄漏
- 所以当下面的进程运行,我们期望看到生产者线程先运行向环形队列生产节点数据,因为如果消费者线程先运行,那么由于环形队列中没有数据,那么消费者线程就会申请信号量失败,但是由于是while循环式的执行,所以消费者线程会持续申请信号量,直到环形队列中有数据,所以生产者线程向环形队列生产节点数据,一旦生产数据成功,那么此时消费者线程会立即申请信号量成功去消费数据,并且在生产者sleep一秒期间,,消费者没有休眠,所以消费者消费很快,生产者由于执行了sleep,所以生产者生产很慢,所以消费者当已经把环形队列的数据消费完了之后,由于环形队列没有数据,所以消费者就会申请信号量失败,所以消费者不会执行,即我们期望观察到间隔1秒,生产了数据之后立马数据就被消费
#include <iostream>
#include <ctime>
#include <unistd.h>
#include <pthread.h>
#include "Task.hpp"
#include "RingQueue.hpp"
void* Productor(void* args)
{
RingQueue<Task>* rq = static_cast<RingQueue<Task>*>(args);
int len = opers.size();
while(true)
{
int data1 = rand() % 10; //[0, 9]
usleep(10);
int data2 = rand() % 5; //[0, 4]
char op = opers[rand() % len];
Task t(data1, data2, op);
rq->Push(t);
std::cout << "Productor task done, task is: " << t.GetTask() << std::endl;
sleep(1);
}
return nullptr;
}
void* Consumer(void* args)
{
RingQueue<Task>* rq = static_cast<RingQueue<Task>*>(args);
while(true)
{
Task t;
rq->Pop(&t);
t();
std::cout << "Consumer task done, task is: " << t.GetTask() << ", result: " << t.GetResult() << std::endl;
// sleep(1);
}
return nullptr;
}
int main()
{
srand(time(nullptr) ^ pthread_self());
RingQueue<Task>* rq = new RingQueue<Task>();
pthread_t c, p;
pthread_create(&c, nullptr, Consumer, (void*)rq);
pthread_create(&p, nullptr, Productor, (void*)rq);
pthread_join(c, nullptr);
pthread_join(p, nullptr);
delete rq;
return 0;
}
运行结果如下
小编,小编居然无法编译的嘞,怎么回事呢?Task t无法实例化,这是为什么呢?那么我们看一下Task类模板的构造函数就知道答案了
因为Task类模板的构造函数我们已经显式写了构造函数,所以编译器不会生成默认的无参的构造函数,而我们写的构造函数又没有缺省参数,所以使用Task进行实例化定义对象必须传入参数,而我们的定义方式是Task t,没有传入参数,所以报错是必然的,那么我们给Task类模板提供一个无参的构造即可
所以此时我们再编译就可以了
运行结果如下,无误
四、实现环形队列的多生产者多消费者模型
- 生产者消费者模型需要满足321原则
3种关系:(1)生产者与生产者:互斥(2)消费者与消费者:互斥(3)生产者与消费者:互斥,同步。
2种角色:(1)生产者(2)消费者。
1个场所:(1)特定结构的内存空间 - 之前我们实现的是单生产单消费者模型,所以对于上面的3中关系的(1)(2)我们不需要维系,但是现在我们要编写的是多生产多消费模型,所以对于上面的3中关系的(1)(2)我们必须要维系,即维系生产者与生产者的互斥关系,如何维系?给所有的生产者之间使用一把互斥锁即可,如何维系消费者与消费者的互斥关系?给所以消费者与消费者之间使用一把互斥锁即可,所以一共就需要有两把互斥锁喽,那么问题来了,为什么生产者和生产者,消费者和消费者不使用同一把互斥锁呢?
- 因为如果连生产者和消费者都是用了同一把锁,那么就注定了同一时间只能有一个角色访问环形队列,那么当环形队列的数据不全部是空,或者不全部是满的情况下,生产者和消费者不在同一个位置的并发访问环形队列的场景也就没有了,那么将会极大的降低效率
- 所以我们需要给生产者和生产者之间使用一把锁,消费者和消费者之间使用一把锁,这样可以保证互斥,并且可以保证环形队列的并发访问,即同一时间当环形队列的数据不全部是空,或者不全部是满的情况下,生产者和消费者不在同一个位置,可以允许一个生产者,一个消费者并发访问环形队列,即并发访问提高效率
- 所以我们就需要修改RingQueue环形队列了
RingQueue.hpp
基本框架
- 所以我们在环形队列RingQueue原有的私有成员变量的基础上添加一把生产者的互斥锁c_mutex_,添加一把消费者的互斥锁p_mutex_
- 那么关于锁的申请与释放的接口,同样的,我们不期望暴露出来,所以申请锁封装pthread_mutex_lock为Lock即可,释放锁封装pthread_mutex_unlock为Unlock即可
- 那么由于添加了锁所以要在构造函数对锁进行初始化,在析构函数对锁进行销毁
#pragma once
#include <vector>
#include <semaphore.h>
static int defaultnum = 5;
template<class T>
class RingQueue
{
private:
void P(sem_t& sem)
{
sem_wait(&sem);
}
void V(sem_t& sem)
{
sem_post(&sem);
}
void Lock(pthread_mutex_t& mutex)
{
pthread_mutex_lock(&mutex);
}
void Unlock(pthread_mutex_t& mutex)
{
pthread_mutex_unlock(&mutex);
}
public:
RingQueue(int cap = defaultnum):ringqueue_(cap), cap_(cap), c_step_(0), p_step_(0)
{
sem_init(&pspace_sem_, 0, cap_);
sem_init(&cdata_sem_, 0, 0);
pthread_mutex_init(&c_mutex_, nullptr);
pthread_mutex_init(&p_mutex_, nullptr);
}
~RingQueue()
{
sem_destroy(&cdata_sem_);
sem_destroy(&pspace_sem_);
pthread_mutex_destroy(&c_mutex_);
pthread_mutex_destroy(&p_mutex_);
}
private:
std::vector<T> ringqueue_;
int cap_; //容量
int c_step_; //消费者的位置
int p_step_; //生产者的位置
sem_t cdata_sem_; //消费者所需要的剩余数据信号量
sem_t pspace_sem_; //生产者所需要的剩余空间信号量
pthread_mutex_t c_mutex_; //消费者的锁
pthread_mutex_t p_mutex_; //生产者的锁
};
Push Pop
- 为了多线程的并发性考虑,申请锁释放锁要在申请信号量P操作以及释放信号量V操作之间,原因有二,如下
- 第一点:P操作申请信号量以及V操作释放信号量,本身就是原子的,不需要互斥锁的保护,互斥锁的申请与释放保护的代码区域称之为临界区,锁的中间不包含P操作,V操作可以减少临界区的代码行数,进而多线程访问临界区的代码的比重减少,由于临界区的代码是被锁保护起来的,多个线程要想访问临界区必须先申请锁,即对于临界区,同一时间只允许一个执行流(线程)访问,所以也就注定了线程必须是串行执行临界区的代码,临界区的代码少,线程串行执行的时间就会减少,进而线程并发的场景增多,所以就会提高效率
- 第二点:信号量的本质是对资源的预定机制,但是预定也需要花费时间的,所以P操作,V操作也就需要时间,没毛病,那么如果申请锁之后再P操作申请信号量,那么同一时间多个线程只能有一个线程申请到锁,然后再申请信号量,那么此时就耗费时间了,并且此时其它线程由于申请锁失败,自然也就无法执行P操作申请信号量,信号量可以提前申请,因为信号量的本质就是对资源的预定
- 所以如果先执行P操作申请信号量,再申请锁,那么此时如果有一个线程成功申请到信号量,并且成功申请了锁,正在执行临界区的代码,我们知道执行临界区的代码也是要花费时间的,那么于此同时其它线程此时可能也在申请信号量,申请信号量也是要花费时间的,所以一个线程执行临界区的代码,其它线程与此同时可能再申请信号量,并发执行代码,所以时间消耗也就被极大的减少了,当一个线程执行完临界区的代码的时候,其它线程此时可能已经申请成功信号量了,那么就会直接申请锁,所以我们可以在一定程度上让申请成功,执行临界区的代码的时间消耗和申请信号量的时间消耗并发消耗,而不是串行消耗,同样的道理对于释放锁,释放信号量也同样使用,所以申请锁释放锁要在申请信号量P操作以及释放信号量V操作之间可以提高效率
- 所以Push是生产者执行的操作,为了保证生产者之间的互斥性,那么对于生产者对应的锁p_mutex_,那么我们就把申请锁释放锁要在申请信号量P操作以及释放信号量V操作之间即可
- 所以Pop是消费者执行的操作,为了保证消费者之间的互斥性,那么对于消费者对应的锁c_mutex_,那么我们就把申请锁释放锁要在申请信号量P操作以及释放信号量V操作之间即可
void Push(const T& in)
{
P(pspace_sem_);
Lock(p_mutex_);
ringqueue_[p_step_] = in;
p_step_++;
p_step_ %= cap_;
Unlock(p_mutex_);
V(cdata_sem_);
}
void Pop(T* out)
{
P(cdata_sem_);
Lock(c_mutex_);
*out = ringqueue_[c_step_];
c_step_++;
c_step_ %= cap_;
Unlock(c_mutex_);
V(pspace_sem_);
}
main.cc
- 那么在main函数中定义消费者线程的数组以及生产者线程的数组,那么使用for循环分别进行pthread_create创建多个对应的新线程,最后主线程pthead_join等待对应的多个新线程即可,这里创建3个消费者线程,创建3个生产者线程
- 那么既然有多个消费者线程以及多个生产者线程了,所以我们期望每一个线程都有自己的名字,所以我们定义一个线程数据的struct类ThreadData,那么其中的成员变量我们设置为环形队列的指针,以及线程名字,所以我们就可以在for循环内new这个ThreadData类得到这个类型的一个指针td,将环形队列的指针传入,然后构造线程名字,序号则采用 i 即可,如果是生产者线程,那么名字格式就是 “Productor” + 序号i,如果是消费者线程,那么名字格子就是 “Consumer” + 序号i,那么将td指针传入pthread_create即可
- 那么对于生产者对应的线程函数Productor以及消费者对应的线程函数Consumer,那么我们将线程函数的形参args进行类型转换为ThreadData即可,于是我们就得到了类型为ThreadData的td,然后定义线程名字name,以及环形队列的指针rq,利用td对name和rq进行初始化即可
- 那么对于生产者对应的线程函数Productor,我们将sleep注释掉,所以一旦程序运行,那么生产者将会不断运行,生产数据到环形队列中,直到将环形队列打满,打印语句中将对应的name线程名称也进行打印
- 那么对于消费者对应的线程函数Consumer,我们进行sleep,打印语句中将对应的name线程名称也进行打印,所以说现在是生产者生产的快,消费者消费的满
- 所以当程序运行的一瞬间,生产者将会直接将环形队列打满,为了观察生产者生产现象更明显,所以我们在main函数中环形队列new的时候我们传参20,即让最开始的时候环形队列中有20个格子可以被生产者生产数据到格子中,所以程序运行的一瞬间我们将会观察20个生产者打印的生产任务的语句,并且由于消费者一直在执行while循环,并且开始的时候唤醒队列中并没有数据,所以消费者会一直申请信号量,一直申请信号量失败,直到生产者生产数据到环形队列中,一旦生产者生产数据到环形队列中,那么消费者就会直接从环形队列中取出数据进行消费,然后加工处理数据,然后执行sleep语句,由于sleep语句的存在,因此消费者消费慢,而生产者生产快,所以在3个消费者线程执行sleep的时候,它们就可以将环形队列生产满,所以我们可以观察到最开始的时候生产者先运行生产数据,然后3个消费者将会依次消费数据,然后多个生产者持续生产数据到环形队列,直到环形队列满
#include <iostream>
#include <ctime>
#include <unistd.h>
#include <pthread.h>
#include "Task.hpp"
#include "RingQueue.hpp"
struct ThreadData
{
RingQueue<Task>* rq = nullptr;
std::string threadname = "";
};
void* Productor(void* args)
{
ThreadData* td = static_cast<ThreadData*>(args);
RingQueue<Task>* rq = td->rq;
std::string name = td->threadname;
int len = opers.size();
while(true)
{
int data1 = rand() % 10; //[0, 9]
usleep(10);
int data2 = rand() % 5; //[0, 4]
char op = opers[rand() % len];
Task t(data1, data2, op);
rq->Push(t);
std::cout << "Productor task done, task is: " << t.GetTask() << ", who: " << name << std::endl;
// sleep(1);
}
delete td;
return nullptr;
}
void* Consumer(void* args)
{
ThreadData* td = static_cast<ThreadData*>(args);
RingQueue<Task>* rq = td->rq;
std::string name = td->threadname;
while(true)
{
Task t;
rq->Pop(&t);
t();
std::cout << "Consumer task done, task is: " << t.GetTask() << ", result: " << t.GetResult()
<<", who: " << name << std::endl;
sleep(1);
}
delete td;
return nullptr;
}
int main()
{
srand(time(nullptr) ^ pthread_self());
RingQueue<Task>* rq = new RingQueue<Task>(20);
pthread_t c[3], p[3];
for(int i = 0; i < 3; i++)
{
ThreadData* td = new ThreadData();
td->rq = rq;
td->threadname = "Consumer-" + std::to_string(i);
pthread_create(c + i, nullptr, Consumer, (void*)td);
}
for(int i = 0; i < 3; i++)
{
ThreadData* td = new ThreadData();
td->rq = rq;
td->threadname = "Productor-" + std::to_string(i);
pthread_create(p + i, nullptr, Productor, (void*)td);
}
for(int i = 0; i < 3; i++)
{
pthread_join(c[i], nullptr);
}
for(int i = 0; i < 3; i++)
{
pthread_join(p[i], nullptr);
}
delete rq;
return 0;
}
运行结果如下,无误
接下来由于环形队列的容量是有限的cap_,那么生产者将环形队列打满之后,就会申请信号量失败,然后消费者申请信号量成功就开始消费数据,数据被消费了,消费者就sleep一秒去了,由于生产者生产快,消费者消费慢,所以生产者就会立即将数据生产到环形队列中,所以当生产者将环形队列打满之后,生产者和消费者继续向后运行,就会出现间隔1秒生产者等待消费者消费的现象
五、源代码
main.cc
#include <iostream>
#include <ctime>
#include <unistd.h>
#include <pthread.h>
#include "Task.hpp"
#include "RingQueue.hpp"
struct ThreadData
{
RingQueue<Task>* rq = nullptr;
std::string threadname = "";
};
void* Productor(void* args)
{
ThreadData* td = static_cast<ThreadData*>(args);
RingQueue<Task>* rq = td->rq;
std::string name = td->threadname;
int len = opers.size();
while(true)
{
int data1 = rand() % 10; //[0, 9]
usleep(10);
int data2 = rand() % 5; //[0, 4]
char op = opers[rand() % len];
Task t(data1, data2, op);
rq->Push(t);
std::cout << "Productor task done, task is: " << t.GetTask() << ", who: " << name << std::endl;
// sleep(1);
}
delete td;
return nullptr;
}
void* Consumer(void* args)
{
ThreadData* td = static_cast<ThreadData*>(args);
RingQueue<Task>* rq = td->rq;
std::string name = td->threadname;
while(true)
{
Task t;
rq->Pop(&t);
t();
std::cout << "Consumer task done, task is: " << t.GetTask() << ", result: " << t.GetResult()
<<", who: " << name << std::endl;
sleep(1);
}
delete td;
return nullptr;
}
int main()
{
srand(time(nullptr) ^ pthread_self());
RingQueue<Task>* rq = new RingQueue<Task>(20);
pthread_t c[3], p[3];
for(int i = 0; i < 3; i++)
{
ThreadData* td = new ThreadData();
td->rq = rq;
td->threadname = "Consumer-" + std::to_string(i);
pthread_create(c + i, nullptr, Consumer, (void*)td);
}
for(int i = 0; i < 3; i++)
{
ThreadData* td = new ThreadData();
td->rq = rq;
td->threadname = "Productor-" + std::to_string(i);
pthread_create(p + i, nullptr, Productor, (void*)td);
}
for(int i = 0; i < 3; i++)
{
pthread_join(c[i], nullptr);
}
for(int i = 0; i < 3; i++)
{
pthread_join(p[i], nullptr);
}
// pthread_t c, p;
// pthread_create(&c, nullptr, Consumer, (void*)rq);
// pthread_create(&p, nullptr, Productor, (void*)rq);
// pthread_join(c, nullptr);
// pthread_join(p, nullptr);
delete rq;
return 0;
}
RingQueue.hpp
#pragma once
#include <vector>
#include <semaphore.h>
static int defaultnum = 5;
template<class T>
class RingQueue
{
private:
void P(sem_t& sem)
{
sem_wait(&sem);
}
void V(sem_t& sem)
{
sem_post(&sem);
}
void Lock(pthread_mutex_t& mutex)
{
pthread_mutex_lock(&mutex);
}
void Unlock(pthread_mutex_t& mutex)
{
pthread_mutex_unlock(&mutex);
}
public:
RingQueue(int cap = defaultnum):ringqueue_(cap), cap_(cap), c_step_(0), p_step_(0)
{
sem_init(&pspace_sem_, 0, cap_);
sem_init(&cdata_sem_, 0, 0);
pthread_mutex_init(&c_mutex_, nullptr);
pthread_mutex_init(&p_mutex_, nullptr);
}
void Push(const T& in)
{
P(pspace_sem_);
Lock(p_mutex_);
ringqueue_[p_step_] = in;
p_step_++;
p_step_ %= cap_;
Unlock(p_mutex_);
V(cdata_sem_);
}
void Pop(T* out)
{
P(cdata_sem_);
Lock(c_mutex_);
*out = ringqueue_[c_step_];
c_step_++;
c_step_ %= cap_;
Unlock(c_mutex_);
V(pspace_sem_);
}
~RingQueue()
{
sem_destroy(&cdata_sem_);
sem_destroy(&pspace_sem_);
pthread_mutex_destroy(&c_mutex_);
pthread_mutex_destroy(&p_mutex_);
}
private:
std::vector<T> ringqueue_;
int cap_; //容量
int c_step_; //消费者的位置
int p_step_; //生产者的位置
sem_t cdata_sem_; //消费者所需要的剩余数据信号量
sem_t pspace_sem_; //生产者所需要的剩余空间信号量
pthread_mutex_t c_mutex_; //消费者锁
pthread_mutex_t p_mutex_; //生产者锁
};
Task.hpp
#pragma once
#include <iostream>
#include <string>
std::string opers = "+-*/%";
enum Err
{
DivZero = 1,
ModZero,
Unknown
};
class Task
{
public:
Task()
{}
Task(int data1, int data2, char op)
: data1_(data1), data2_(data2), oper_(op), result_(0), exitcode_(0)
{}
void operator()()
{
switch (oper_)
{
case '+':
result_ = data1_ + data2_;
break;
case '-':
result_ = data1_ - data2_;
break;
case '*':
result_ = data1_ * data2_;
break;
case '/':
{
if (data2_ == 0)
exitcode_ = DivZero;
else
result_ = data1_ / data2_;
}
break;
case '%':
{
if (data2_ == 0)
exitcode_ = ModZero;
else
result_ = data1_ % data2_;
}
break;
default:
exitcode_ = Unknown;
break;
}
}
std::string GetTask()
{
std::string r = (std::to_string(data1_));
r += ' ';
r += oper_;
r += ' ';
r += std::to_string(data2_);
r += " = ?";
return r;
}
std::string GetResult()
{
std::string r = (std::to_string(data1_));
r += ' ';
r += oper_;
r += ' ';
r += std::to_string(data2_);
r += " = ";
r += std::to_string(result_);
r += ", [";
r += "exitcode: ";
r += std::to_string(exitcode_);
r += "]";
return r;
}
~Task()
{}
private:
int data1_;
int data2_;
char oper_;
int result_;
int exitcode_;
};
makefile
RingQueueTest:main.cc
g++ -o $@ $^ -lpthread -std=c++11
.PHONY:clean
clean:
rm -f RingQueueTest
总结
以上就是今天的博客内容啦,希望对读者朋友们有帮助
水滴石穿,坚持就是胜利,读者朋友们可以点个关注
点赞收藏加关注,找到小编不迷路!
更多推荐












所有评论(0)