上一篇在模拟多线程抢票时,并发抢票出现负数问题,引出了锁的概念。本篇将深入探讨相关知识,包括原子性、线程互斥与同步的概念介绍条件变量以实现线程同步,讲解 mutex、cond、sem 系列系统接口函数的使用与简单封装。最后,利用这些同步与互斥知识,实现基于生产者 - 消费者模型的普通和环形 BlockingQueue。欢迎阅读!

博主主页☛☛☛羑悻的小杀马特.-CSDN博客 ☚☚☚  

欢迎拜访:羑悻的小杀马特.-CSDN博客

本篇主题:秒懂百科之探究Linux线程同步与互斥第一讲

制作日期:2025.06.16

隶属专栏:linux之旅

目录

一 ·线程互斥: 

1.1原子性:

1.2互斥锁(互斥量)的使用 :

 1.3互斥锁使用注意事项:

1.4互斥锁底层原理剖析:

对于硬件角度:

对于软件角度:

 1.5封装互斥锁:

 二·线程同步:

2·1条件变量概念: 

 2·2条件变量函数接口及使用:

局部条件变量:

全局条件变量:

条件变量等待:

条件变量唤醒:

2·3cond系列函数接口简单测试:

2.4封装条件变量:

2·5 cond条件变量总结:

 三·基于mutex与cond的生产者消费者模型:

3·1概念介绍:

3·2实现blockqueue: 

代码实现(附加超详细注释):

3.3加解锁及条件变量使用位置剖析:

 四·POSIX信号量:

4.1概念介绍:

4.2函数介绍及使用:

初始化信号量:

销毁信号量:

等待信号量:

发布信号量:

4.3封装信号量: 

4.4基于POSIX信号量的深度小结:

五·基于sem,cond,mutex实现环形生产消费模型:

5.1实现原理剖析:

5.2ringqueue实现: 

 5.3基于实现ringqueue后对信号量,互斥锁,条件变量的小结:

 六·本篇小结:

一 ·线程互斥: 

在理解什么是线程互斥的前提下;我们先来理解下何为原子性。

1.1原子性:

这里可以简单理解成:

要么做要么不做;只有这两种情况。

从汇编这块可以理解成只有一条的就是原子否则就不是(因为可能在当中某条指令发生线程切换)数据不一致。

首先,还是从上次遗留下的抢票问题来分析;

对于ticket--它不是个原子操作:

 对于它的汇编源码:

objdump -d a.out > test.objdump
152 40064b: 8b 05 e3 04 20 00 mov 0x2004e3(%rip),%eax #
600b34 <ticket>
153 400651: 83 e8 01 sub $0x1,%eax
154 400654: 89 05 da 04 20 00 mov %eax,0x2004da(%rip) #
600b34 <ticket>

对应了下面三个操作: 

load :将共享变量ticket从内存加载到寄存器中

update :更新寄存器⾥⾯的值,执⾏-1操作

store :将新值,从寄存器写回共享变量ticket的内存地址

因此在一个线程执行中有可能在中间部分被另个线程打断;保存上下文;然后等另一个线程执行完;它再接着;因此就造成了非原子性带了的数据不一致了。 

看一下大致过程理解下:

那么如何避免这非原子性带来的数据不一致问题?

就是加锁;在上一篇线程控制的时候我们使用的就是加锁来保护临界资源如果没有保护就会存在线程安全问题 。

 保护临界资源(内存空间数据等)-->保护临界区-->加锁+解锁-->(同时只允许一个线程/进程去访问)

对于cpu对线程的切换是什么时候?

当使用多线程的时候就会存在线程的并发此时就涉及到了它的切换;也就是当线程的时间片到了;或者处于sleep(放到等待队列执行其他的;直到醒来);还有IO操作等等;那么同样就是当内核态返回用户态的时候进行检查来完成。

总之,原子性就可以理解成两种情况要么做要么不做;不能存在正在做(这样可能会被打断;也就是不原子性了)。

下面我们就来分析下之前抢票问题抢到负的原因;根本不在ticket--;但是和它有关;根本就是ticket>0的判断这里。 

下面分析下:

当然;也有可能是sleep这里的原因:

比如它们几个线程的间隔时间差小于这个usleep的时间;就会票已经到1了;由于一开始的线程休眠住; ticket没有变;其他线程也进来休眠了;然后有先后顺序的执行后面的语句也就造成了票数为负的现象了。 

因此为了保证这些;Linux就引入了锁的概念:

1`代码必须要有互斥⾏为:当代码进⼊临界区执⾏时,不允许其他线程进⼊该临界区。

2`如果多个线程同时要求执⾏临界区的代码,并且临界区没有线程在执⾏,那么只能允许⼀个线程进⼊该临界区。
3`如果线程不在临界区中执⾏,那么该线程不能阻⽌其他线程进⼊临界区。

1.2互斥锁(互斥量)的使用 :

竞争申请锁,多线程都得先看到锁,锁本身就是临界资源!!申请锁的过程,必须是原子的:

这点有os保证;不可能出现一个线程申请的时候;暂停了另一个又来(类似上面的ticket) —->必须原子性。 

首先对于创建锁有两种一种全局锁一种就是局部锁:

头文件还是这个:

#include<pthread.h>

全局锁:

pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;

使用的话只需要全局对它初始化即可;程序结束锁会自动destroy无需手动。

局部锁: 

 pthread_mutex_t restrict mutex;
int pthread_mutex_init(pthread_mutex_t *restrict mutex, const pthread_mutexattr_t *restrict attr);

 这里第二个参数可以默认设置成nullptr。

但是这里还是要手动销毁锁的:

   int pthread_mutex_destroy(pthread_mutex_t *mutex);

对于使用的话;全局锁和局部锁都是一样使用的:

int pthread_mutex_lock(pthread_mutex_t *mutex);
int pthread_mutex_trylock(pthread_mutex_t *mutex);//非阻塞版本;也就是在等锁的时候还会干别的事
int pthread_mutex_unlock(pthread_mutex_t *mutex);

成功,继续向后运行,访问临界区代码,访问临界资源失败:阻塞挂起申请执行流。

 因此加锁也是对临界资源的一种变相的原子保护。

可能有个疑问对应共享内存版的进程通信如何做到锁???

 我们把对应共享内存映射的首地址转化成锁大小的地址;然后定义个锁;进程在写或者读的时候都要申请锁不就好了。

下面我们就使用下这些接口:

全局锁测试代码:

#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <pthread.h>
using namespace std;
// 创建四个进程模拟抢票:
int ticket = 1000;
pthread_mutex_t lock = PTHREAD_MUTEX_INITIALIZER;
void *route(void *arg)
{
    char *id = (char *)arg;
    while (1)
    {   //保护临界区
        pthread_mutex_lock(&lock);
        if (ticket > 0) // 1. 判断
        {
            usleep(1000);                               // 模拟抢票花的时间
            printf("%s sells ticket:%d\n", id, ticket); // 2. 抢到了票
            ticket--;                                   // 3. 票数--
            pthread_mutex_unlock(&lock);
        }
        else
        {
            pthread_mutex_unlock(&lock);
            //这里还需要解锁否则到最后抢完票线程加锁后有其他线程不能被join进而阻塞住

            break;
        }
    }
    return nullptr;
}
int main()
{
    pthread_t t1, t2, t3, t4;

    pthread_create(&t1, NULL, route, (void *)"thread 1");
    pthread_create(&t2, NULL, route, (void *)"thread 2");
    pthread_create(&t3, NULL, route, (void *)"thread 3");
    pthread_create(&t4, NULL, route, (void *)"thread 4");

    pthread_join(t1, NULL);
    pthread_join(t2, NULL);
    pthread_join(t3, NULL);
    pthread_join(t4, NULL);
}

局部锁测试代码:

#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <pthread.h>
using namespace std;
// 创建四个进程模拟抢票:
int ticket = 1000;

void *route(void *arg)
{
    pthread_mutex_t*lk= static_cast< pthread_mutex_t*>(arg);
    while (1)
    {   //保护临界区
        pthread_mutex_lock(lk);
        if (ticket > 0) // 1. 判断
        {
            usleep(1000);                               // 模拟抢票花的时间
            printf("pthread sells ticket:%d\n",  ticket); // 2. 抢到了票
            ticket--;                                   // 3. 票数--
            pthread_mutex_unlock(lk);
        }
        else
        {
            pthread_mutex_unlock(lk);
            //这里还需要解锁否则到最后抢完票线程加锁后有其他线程不能被join进而阻塞住

            break;
        }
    }
    return nullptr;
}
int main()
{ 
    pthread_mutex_t lock;
    pthread_mutex_init(&lock,nullptr);
    pthread_t t1, t2, t3, t4;

    pthread_create(&t1, NULL, route, &lock);
    pthread_create(&t2, NULL, route, &lock);
    pthread_create(&t3, NULL, route, &lock);
    pthread_create(&t4, NULL, route, &lock);
    pthread_mutex_destroy(&lock);
    
    pthread_join(t1, NULL);
    pthread_join(t2, NULL);
    pthread_join(t3, NULL);
    pthread_join(t4, NULL);
}

当然了,C++也封装了mutex:

#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include<iostream>
#include<unistd.h>
#include <mutex>
using namespace std;
// 创建四个进程模拟抢票:
int ticket = 1000;
std::mutex m;
void *route(void *arg)
{
    char *id = (char *)arg;
    while (1)
    {   //保护临界区
        m.lock();
        if (ticket > 0) // 1. 判断
        {
            usleep(1000);                               // 模拟抢票花的时间
            printf("%s sells ticket:%d\n", id, ticket); // 2. 抢到了票
            ticket--;                                   // 3. 票数--
            m.unlock();

        }
        else
        {
            m.unlock();
            //这里还需要解锁否则到最后抢完票线程加锁后有其他线程不能被join进而阻塞住

            break;
        }
    }
    return nullptr;
}
int main()
{
    pthread_t t1, t2, t3, t4;

    pthread_create(&t1, NULL, route, (void *)"thread 1");
    pthread_create(&t2, NULL, route, (void *)"thread 2");
    pthread_create(&t3, NULL, route, (void *)"thread 3");
    pthread_create(&t4, NULL, route, (void *)"thread 4");

    pthread_join(t1, NULL);
    pthread_join(t2, NULL);
    pthread_join(t3, NULL);
    pthread_join(t4, NULL);
}

效果都是一样的:

 

 1.3互斥锁使用注意事项:

因为是加锁;故所有线程都要看到这个锁因此就是共享资源;必须是原子性的(OS自己就能保证);因此一个线程拿到锁的时候;其他线程只能等着。

也就是说;当你在访问的时候;暂停了类似的;别的线程不会再次进来干扰你从而造成数据不一致性;而只有等你访间完交锁才可以->并发性变成了线程串行。

那么,如果一个线程拿着锁到了临界区内部;允不允许线程切换:

允许;因为这个线程即使被切换了(它还是拿着锁的,后面原理会讲到) ;其他线程同样是无法进入临界区访问的;只有等它还锁;然后其他线程再去竞争锁;直到拿到锁的线程再去访问。

在哪里加锁解锁: 

给它具体化一下:

加锁与解锁一定要控制在出入临界区。 

1.4互斥锁底层原理剖析:

锁的本质:

对于硬件角度:

关闭时钟中断(比如遇到的死机)。

也就是当一个线程执行共享任务的时候;关闭;系统不再对其他线程做响应;但是一般不采取。

对于软件角度:

为了实现互斥锁操作,大多数体系结构都提供了swap或exchange指令,该指令的作用是把寄存器和内存单先的数据相交换;可以暂时把锁理解成特殊整型变量;1为加锁;0为解锁。

看张图清晰了解底层:

也就是cpu调度的线程的数据会被来回切换到寄存器中;当cpu不再调度它;就会保存它的上下文方便下次记录位置开始调度;这就是cpu执行调度的方案;也就构成了加锁解锁底层原理的实现了。 

现在假设a线程进行加锁的后;然后b线程再次访问;然后a线程解锁;线程被唤醒的模拟下本质过程:

1· 首先a线程清空%al然后交换回来此时cpu调度a线程时候%al上下文保存的是a线程的;此时a拿到锁;然后退出干别的事情。

2·此时,b线程来了﹔情况a线程在%al寄存器留下的内容;然后交换;发现还是0;因此被挂起.。

3·a线程完成了任务;进行解锁;然后把物理内存的mutex变成1;此时唤醒b线程;b拿到锁........

 1.5封装互斥锁:

这里封装锁是为了后面实现生产消费模型,线程池等准备。

mutex.hpp:

#pragma once
#include<pthread.h>
//封装锁:
class mutex{
    public:
     mutex(){
       int n= pthread_mutex_init(&_mutex,nullptr);
       (void)n;
     }
      void Lock(){    pthread_mutex_lock(&_mutex);}
      void Unlock(){    pthread_mutex_unlock(&_mutex);}


     ~mutex(){
        int n= pthread_mutex_destroy(&_mutex);
        (void)n;
      }
private: 
  pthread_mutex_t _mutex;
};
//自动上锁与解锁
class mutexguard{
    public:
    //初始化为上锁;
    mutexguard(mutex &mg):_mg(mg){  _mg.Lock() ;  }//引用
    //析构为解锁:
    ~mutexguard(){_mg.Unlock() ;    }

     private:
     mutex &_mg;//注意引用:确保不同线程上锁与解锁的时候拿到同一把锁;不能是直接赋值
};

main.cc:

#include"mutex.hpp"
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include<iostream>
#include<unistd.h>
using namespace std;
class thread_set{
    public:
    thread_set(mutex *plock,string name):_name(name),_plock(plock){}
    string getname(){ return _name;}
    mutex *getlock(){ return _plock;}
    ~ thread_set(){}
 private:
 std:: string _name;
 mutex *_plock;//为了都拿到同一把锁
};
// 创建2个进程模拟抢票:
int ticket = 1000;
void *route(void *arg)
{
    thread_set * ts = static_cast<thread_set*>(arg);
    while (1)
    {   //保护临界区
        {
          mutexguard md(*(ts->getlock()));//除了这个代码块自动析构:类似RAII原则
          //把锁传递给guard然后这个类内部引用给它的成员变量然后进行上锁与解锁
        if (ticket > 0) // 1. 判断
        {
            usleep(1000);                               // 模拟抢票花的时间
            printf("%s sells ticket:%d\n", ts->getname().c_str(), ticket); // 2. 抢到了票
            ticket--;                                   // 3. 票数--
          
        }
        else
        {
          
            //这里还需要解锁否则到最后抢完票线程加锁后有其他线程不能被join进而阻塞住

            break;
        }
    }
    }
    return nullptr;
}


int main()
{
    pthread_t t1,t2;
    mutex mx;
    //为了让线程拿到名字和同一把锁进行封装
    thread_set s1(&mx,"thread-1");
    thread_set s2(&mx,"thread-2");
    pthread_create(&t1, NULL, route, &s1);
    pthread_create(&t2, NULL, route, &s2);
   

    pthread_join(t1, NULL);
    pthread_join(t2, NULL);
    
}

测试效果还是和之前抢票一样就不多说了;对于封装过程有些细节地方;博主在代码中都注释了;可以查看。 

 二·线程同步:

首先要先理解什么是同步:

在保证数据安全的前提下,让线程能够按照某种特定的顺序访问临界资源,从而有效避免饥饿问题。

2·1条件变量概念: 

 那么什么能够保证同步呢?就是条件变量。

当⼀个线程互斥地访问某个变量时,它可能发现在其它线程改变状态之前,它什么也做不了;此时就需要它去条件变量等待。

说白了就是不符合访问临界资源的条件就去队列排队等待。

举个例子说明下两者关系:

例如⼀个线程访问队列时,发现队列为空,它只能等待,只到其它线程将⼀个节点添加到队列中。这种情况就需要⽤到条件变量。

 比如它是满的;一个线程进入了啥也不千;一直重复while循环而且还拿着锁;这样其他线程只能千等着;也就是其他线程饥饿 。也就是啥也干不了就去cond条件变量去排队去等待;把锁上交就可以让其他线程去访问了;如果被唤醒了这个队列的线程也是有一定顺序醒来开始争夺锁;去访问临界资源---->这样不就保证了按照顺序访问临界资源。

这里我们可以把条件变量 理解成一个头取尾插的队列。

 2·2条件变量函数接口及使用:

首先它也是分为全局和局部的;头文件和上面mutex一样。

局部条件变量:

 int pthread_cond_destroy(pthread_cond_t *cond);
 int pthread_cond_init(pthread_cond_t *restrict cond,  const pthread_condattr_t *restrict attr);

属性直接传nullptr即可;这里也是需要手动destroy的:

全局条件变量:

  pthread_cond_t cond = PTHREAD_COND_INITIALIZER;

系统自动destroy。 

条件变量等待:

int pthread_cond_timedwait(pthread_cond_t *restrict cond,pthread_mutex_t *restrictmutex,const struct timespec *restrict abstime);//定时等待自动唤醒

int pthread_cond_wait(pthread_cond_t *restrict cond, pthread_mutex_t *restrict mutex);

条件变量唤醒:

 int pthread_cond_signal(pthread_cond_t *cond);//唤醒队头的一个
  int pthread_cond_broadcast(pthread_cond_t *cond);//整个队列全部唤醒
//注:这里如果唤醒了全部的线程;但是一开始只有一个线程拿到锁;其他线程会继续休眠;但是无需重新唤醒;因为OS机制保证了只要那个线程还锁;其他等待的线程就会立刻去竞争---->故无需二次全唤醒

2·3cond系列函数接口简单测试:

如果所有线程都进入了cond队列等待;而我们没有对它唤醒;那么就会是全部阻塞住。

先让线程全部cond休眠;然后一个个唤醒!!! 

 测试源代码:

#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include<iostream>
#include<unistd.h>
#include<vector>
#include<pthread.h>
using namespace std;
#define N 5
pthread_mutex_t glock=PTHREAD_MUTEX_INITIALIZER;
pthread_cond_t gcond=PTHREAD_COND_INITIALIZER;
 int cnt;
void * routine(void *arg){
   char *name=static_cast<char*>(arg);
   while(1){
   pthread_mutex_lock(&glock);
   pthread_cond_wait(&gcond,&glock);//如果所有线程都进去了;如果也没有唤醒;
   //那么就相当于阻塞住。

   //先把所有线程放入等待队列: 0 1 2 3 4;从头开始取;尾部插入;
   //因此可以看到取出的线程直接就拿到锁了;
   //本质是还和其他等待锁的线程竞争;但是无其他线程了
       cout<<name<<" accumulate cnt before:"<<cnt++
       <<" accumulate cnt after:"<<cnt<<endl;
   pthread_mutex_unlock(&glock);

   }
 return nullptr;

}
int w=0;
void* r(void*arg){
  string name=static_cast<const char*>(arg);
  while(1){
  pthread_mutex_lock(&glock);
    cout<<name<<endl;
    sleep(1);
    if(w==0){
        w++;
        pthread_cond_wait(&gcond,&glock);   
        w--;
       cout<<name<<endl;

    }
  pthread_cond_signal(&gcond);
   pthread_mutex_unlock(&glock);
   sleep(1);

  }

 return nullptr;
}
int main(){

     //俩线程轮流打印:
      //   pthread_t t1,t2;

      //  pthread_create(&t1,nullptr,r,(void*)"我是线程A");
      //   pthread_create(&t2,nullptr,r,(void*)"我是线程B");     
      //     pthread_join(t1,nullptr);
      //  pthread_join(t2,nullptr);

      
    vector<pthread_t> v;
    for(int i=0;i<N;i++){
        pthread_t tid;
        //堆:
        char *buff =new char[100];
        sprintf(buff,"thread-%d",i);
        pthread_create(&tid,nullptr,routine,buff);
        v.emplace_back(tid);
    }
//主线程对它们唤醒:
    while(1){
        sleep(1);
        cout<<"唤醒gcond下等待的线程 "<<endl;
        pthread_cond_signal(&gcond);
    }
    for(int i=0;i<v.size();i++){
     void *ret=nullptr;
        pthread_join(v[i],&ret);
    }

}

测试效果:

2.4封装条件变量:

 cond.hpp:

#pragma once
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <iostream>
#include <unistd.h>
#include <vector>
#include <pthread.h>
#include "mutex.hpp"

class cond
{
public:
    cond() { pthread_cond_init(&_cond, nullptr); }
     
void Wait(mutex &mx)
{
int n = pthread_cond_wait(&_cond, mx.getmutex());
(void)n;
}
    void notify()
    {
        int n = pthread_cond_signal(&_cond);
        (void)n;
    }
    void allnotify()
    {
        int n = pthread_cond_broadcast(&_cond);
        (void)n;
    }
    ~cond() { pthread_cond_destroy(&_cond); }

private:
    pthread_cond_t _cond;
};

里面的互斥锁就是我们上面封装的mutex.hpp。 

2·5 cond条件变量总结:

我们在使用cond的wait会发现为什么每次都要把锁传给它?

这里为什么参数还要传递个锁:当一个线程不符合访问的条件去cond下排队的时候;不能造成其他线程饥饿;故把锁上交;其他线程可以抢锁;然后保证了同步性。


一个线程自己没办法满足条件就需要有一个线程去改变使得它满足;因此需要把锁放开;来借助别的线程满足它的条件类似把它从cond中释放出去访问共享资源-->交锁

因此就更加理解为什么要传锁的地址方便找对应的物理内存的位置然后改成1;即解锁。 

形象理解cond: 

调用wait其实就相当关于在此处搞了个cond等待队列;然后把线程尾插进去;然后把锁放回物理内存方便其他线程争夺去访问临界资源;这个进入cond队列的线程就相当于阻塞;然后如果被唤醒也就是队头被唤醒;然后它是没有锁的;因此从wait函数那行复活去抢夺锁;如果抢到锁就直接接着往后执行否则继续队列休眠-->伪唤醒。

 三·基于mutex与cond的生产者消费者模型:

3·1概念介绍:

生产者消费者模式就是通过一个容器来解决生产者和消费者的强耦合问题。生产者和消费者彼此之间不直接通讯,而通过阻塞队列来进行通讯,所以生产者生产完数据乙石个用寺侍洞贫看火理,且仅”给阻塞队列,消费者不找生产者要数据,而是直接从阻塞队列里取,阻塞队列就相当于一个菠严区,平衡了生产者和消费者的处理能力。这个阻塞队列就是用来给生产者和消费者解耦的。

如;

 在多线程编程中阻塞队列(Blocking Queue)是一种常用于实现生产者和消费者模型的数据结构。其与普通的队列区别在于,当队列为空时,从队列获取元素的操作将会被阻塞,直到队列中被放入了元素;当队列满时,往队列里存放元素的操作也会被阻塞,直到有元素被从队列中取出(以上的操作都是基于不同的线程来说的,线程在对阻塞队列进程操作时会被阻塞)。

优点:解耦,支持并发,支持忙闲不均。 

那么这样就遵循什么?

①生产者与生产者是互斥。

消费者与消费者是互斥。 

生产者与消费之是互斥与同步。

②生产者 消费者两种关系(线程模拟实现)。

③交易场所就是内存空间。

因此就记作321原则。 

那么它是怎么提高效率的;并不是在进去交易场所;而是在实际执行和获取任务的时候。

获取任务与执行任务也是需要时间的;这种模式对双方的时间利用都是有好处的(加了个bq)。

3·2实现blockqueue: 

 下面我们就基于线程和队列来实现下上面所说的生产者消费者模型:

这里先看成是也就只有两个线程一个放一个取;之后进行加锁+条件变量就变成多线程的即可。

下面就结合上面说的321原则说下整体思路:

其实就是放bq的线程不断走put操作;然后取bq的线程就直接走take操作;但是会有满空的情况;如果是满了就直接让put的线程去条件变量等待;直到take走了一个然后对它唤醒;空的话也是类似;其实就是按照“生产者消费者关系”访问队列因为是临界资源故只允许一个线程(不能同时放或者取;或者多线程放;多线程取;因此需要加锁);其他就是一些细节问题;会在实现代码注释中标明。

代码实现(附加超详细注释):

blockqueue.hpp:

#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include<iostream>
#include<unistd.h>
#include<vector>
#include<pthread.h>
#include<queue>
using namespace std;
const int N=5;
 template<class T>
class blockqueue{

     private:
       bool isfull(){ return _q.size()==N;}
       bool isempty(){ return _q.empty();}
   public: 

     blockqueue(int cap=N):_cap(cap),_csleep_num(0)
     ,_psleep_num(0){
         pthread_mutex_init(&_lock,nullptr);
         pthread_cond_init(&_ccond,nullptr);
         pthread_cond_init(&_pcond,nullptr);

     }
     void equeue(T &data) {
        //加锁:
        pthread_mutex_lock(&_lock);
        //临界区:
        while(isfull()){//防止多线程被唤醒;造成无full判断而q满了还在push
            cout<<"resource full : 生产者休眠"<<endl;
            _psleep_num++;
            pthread_cond_wait(&_pcond,&_lock);//当前行醒来;无锁需要继续争取;
            //如果抢不到就继续休眠--.>伪唤醒
            _psleep_num--;


        }
         _q.push(data);
        //pthread_mutex_unlock(&_lock);
        //这样也可以;不过可能有种极端情况:假设线程1解开锁了;线程2来了;此时消费者
        //休眠队列只有一个;然而线程1刚执行完要进入if;此时线程2来了被cpu调度;此时就可能
        //发生数据不一致;明明休眠队列没人了;线程1还调用了一次;
        //但是此时signal函数只是返回错误码;所以影响不大-->也就是可以在此处进行解锁
    
         if( _csleep_num>0){
            cout<<"唤醒消费者:"<<endl;
            pthread_cond_signal(&_ccond);
         }
        pthread_mutex_unlock(&_lock);//此时需要等待锁被解开;然后所有线程开始竞争
        //谁抢到锁归谁操作;可以是生产者也可以是消费者--->更建议这种解锁
     }
          
     T&Pop(){
        pthread_mutex_lock(&_lock);
            while(isempty()){//防止多线程被唤醒;造成empty判断而q空了还在pop
                cout<<"resource empty: 消费者休眠"<<endl;
            _csleep_num++;
            pthread_cond_wait(&_ccond,&_lock);
            _csleep_num--;

            }  
         T &res=_q.front();
        _q.pop();
        //也可以在这解锁;同理
        //pthread_mutex_unlock(&_lock);

        if(_psleep_num>0){
            cout<<"唤醒生产者:"<<endl;
            pthread_cond_signal(&_pcond);
        }
        pthread_mutex_unlock(&_lock);//同理
            return res;
     }
     ~blockqueue(){
        pthread_mutex_destroy(&_lock);
        pthread_cond_destroy(&_ccond);
        pthread_cond_destroy(&_pcond);

     }
  private:
   queue<T> _q;
   int _cap;
   int _csleep_num;
   int _psleep_num;
   pthread_mutex_t _lock;
   pthread_cond_t _ccond;
   pthread_cond_t _pcond;
};

main.cc:

#include"blockqueue.hpp"
#include<functional>
using func_t=function<void()>;
void print(){
    cout<<"打印中..."<<endl;
    sleep(1);
}
void download(){
    cout<<"下载中..."<<endl;
    sleep(1);
}
void getlog(){
    cout<<"获得日志中..."<<endl;
    sleep(1);
}
void browse(){
    cout<<"浏览中..."<<endl;
    sleep(1);
}
vector<func_t>task;
int cnt=0;
void * consumer(void *arg){
    //生产者快:
   // sleep(2);

    blockqueue<func_t>* pbq=static_cast< blockqueue<func_t>*>(arg);
   while(1){
   func_t tk= pbq->Pop();
    tk();
     
   }
}  

void * producter(void *arg){
    blockqueue<func_t>* pbq=static_cast< blockqueue<func_t>*>(arg);
     while(1){
        //消费者快
        sleep(2);

        //cout<<"生产一个任务..."<<endl;
        pbq->equeue(task[cnt%task.size()]);
        //生产者快:
        cout<<"生产一个任务..."<<endl;

        cnt++;
        
     }
}

int main(){
     task.push_back(print);
     task.push_back(download);
     task.push_back(getlog);
     task.push_back(browse);
     blockqueue<func_t> bq;
    pthread_t c[2],p[3];
    pthread_create(c,nullptr,consumer,&bq);
    pthread_create(c+1,nullptr,consumer,&bq);
    pthread_create(p,nullptr,producter,&bq);
    pthread_create(p+1,nullptr,producter,&bq);
    pthread_create(p+2,nullptr,producter,&bq);
    

    pthread_join(*c,nullptr);
    pthread_join(*(c+1),nullptr);

    pthread_join(*p,nullptr);
    pthread_join(*(p+1),nullptr);
    pthread_join(*(p+2),nullptr);
 





    
}

下面测试一下: 

 

 

这里由于显示器文件并没有加锁故有点乱! 

3.3加解锁及条件变量使用位置剖析:

下面我们来探究下什么时候加锁解锁以及进入条件变量等待:

因为pop和equeue是大差不差的;因此我们以pop为例:

先说结论:

加解锁一定是在临界区边缘; cond与异步唤醒也是在临界区内。

lock 与unlock之间是临界区放wait与signal;满足条件进行异步唤醒。

这里解锁不能位于wait之前:

比如队列空了然后一个线程去等待之前解了锁假设生产者继续生成了一个然后队列不空了去发信号但是cond队列无刚才的线程;然后信号白发了之后这个第一个线程就进去了-->不合理。

因此可以总结下步骤:

 加锁-->判断条件-->wait-->异步唤醒--->解锁。

 四·POSIX信号量:

4.1概念介绍:

POSIX信号量和SystemV信号量作⽤相同,都是⽤于同步操作,达到⽆冲突的访问共享资源⽬的。但POSIX可以⽤于线程间同步。

比如一大块共享资源可以分为很多小块;然后可以被线程共同同时一小块一小块使用;此时就可以用信号量;本质是一个计数器,是对特定资源的预订机制!

注:抢信号量之间的互斥而不是后面执行操作之间的互斥。

可以是线程也可以是进程(进程就慢了; cpu内部很多需要切换)。

主要用于线程间同步(进程也可以用);某种意义上可以理解成互斥锁(同类线程竞争之间的)+条件变量

4.2函数介绍及使用:

 对于信号量;之前偶尔提过;就是它是计数器然后存在PV操作;P就是对cnt--来申请信号量;V操作就是对cnt++来还原信号量。

初始化信号量:

#include <semaphore.h>
int sem_init(sem_t *sem, int pshared, unsigned int value);
//pshared:0表⽰线程间共享,⾮零表⽰进程间共享
//value:信号量初始值

销毁信号量:

int sem_destroy(sem_t *sem);

等待信号量:

int sem_wait(sem_t *sem); //P()
//功能:等待信号量,会将信号量的值减1

发布信号量:

int sem_post(sem_t *sem);//V()
//功能:发布信号量,表⽰资源使⽤完毕,可以归还资源了。将信号量值加1。

4.3封装信号量: 

这里使用的话;我们后面结合实现的ringqueue来看;下面还是封装下为后面实现做准备!

 sem.hpp:

#pragma once
#include <semaphore.h>
//0:线程专属使用
class Sem
{
public:
    Sem(int n)
    {
        sem_init(&_sem, 0, n);
    }
    void P()
    {
        sem_wait(&_sem);
    }

    void V()
    {
        sem_post(&_sem);
    }
    ~Sem()
    {
        sem_destroy(&_sem);
    }

private:
    sem_t _sem;
};

4.4基于POSIX信号量的深度小结:

 实现线程间通知和唤醒只有信号量和条件变量;

两者都会维护—个fifo队列(通知队头);而互斥锁只是保证一个线程访问资源;开锁后还是线程竞争

 为什么说在进程内部使用的话使用临界区会带来速度上的优势并能够减少资源占用量:

可以理解成临界区需要的只是简单的锁;而不需信号量;此时速度就更快;因此线程很快访问完-->每个线程占用的这个临界区资源就相当于少了。

临界区是一种轻量级的同步机制,它主要用于实现同一进程内多个线程对共享资源的互斥访问。与信号量等其他同步机制相比,临界区不需要在内核态和用户态之间进行频繁切换,减少了系统开销。

因此可以这么认为:信号量底层可以认为封装了特殊的锁。

五·基于sem,cond,mutex实现环形生产消费模型:

5.1实现原理剖析:

环形结构起始状态和结束状态都是⼀样的,不好判断为空或者为满,所以可以通过加计数器或者标记位来判断满或者空。另外也可以预留⼀个空的位置,作为满的状态。

也就可以理解成物理追及问题;一个放;一个取;然后是个环形;它们之间肯定遵循某种关系:这里我们采用数组取模等方式来模拟这个唤醒队列(因为数组的话可以让消费者与生产者在特定条件下同时进行):

下面就是这个追及游戏的规矩:

 

其实看起来这么多规矩;以为麻烦;其实两个信号量就解决了;综上信号量就维护了这些可能性(pc移动追击问题的正确性)。 

实现的时候需要注意:维护好cc pp之间互斥(互斥锁)以及cp之间同步+互斥(信号量)

5.2ringqueue实现: 

这里实现就用到了我们上面封装的sem.hpp(作用可以认为包含了cond);mutex.hpp。

pc.hpp:

#pragma once
#include <semaphore.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <iostream>
#include <unistd.h>
#include <vector>
#include <pthread.h>
#include "sem.hpp"
#include "mutex.hpp"
using namespace std;
const int N = 5;
template <typename T>
class singqueue
{
public:
    singqueue(int cap = N) : _sq(cap), _cap(cap)
                     , _datasem(0), _blocksem(cap),
                     _cstep(0),_pstep(0) { }
    void equeue(T& data){
        //现申请信号量再申请锁:高效率:比如人们都先买好票然后再去排队就不用
        //一个人排个队买个票重复了
       _blocksem.P();//申请空格信号量--
       {
       mutexguard md(_p);
       _sq[_pstep++]=data;
       _pstep%=_cap;
       }
       _datasem.V();//放入数据后增加数据信号量

    }
    T& pop(){
        static T out;//防止局部变量销毁
        _datasem.P();//--
        {
        mutexguard md(_c);
        out=_sq[_cstep++];
        _cstep%=_cap;
        }
        _blocksem.V();//++
        return out;
    }

    ~singqueue()   { }

private:
    vector<T> _sq;
    int _cap;
    // 共享资源中管理分块的信号量
    Sem _datasem;
    Sem _blocksem;
    // 消费者生成者之间互斥的锁
    mutex _p;
    mutex _c;
    // 标记当前访问部分资源的位置:
    int _cstep;
    int _pstep;
};

main.cc:

#include"pc.hpp"
#include<functional>
using func_t=function<void()>;
void print(){
    cout<<"打印中..."<<endl;
    sleep(1);
}
void download(){
    cout<<"下载中..."<<endl;
    sleep(1);
}
void getlog(){
    cout<<"获得日志中..."<<endl;
    sleep(1);
}
void browse(){
    cout<<"浏览中..."<<endl;
    sleep(1);
}
vector<func_t>task;
int cnt=0;

class thread_set{
    public:
     thread_set(string n, singqueue<func_t> &s):_name(n),_sq(s){}
     ~thread_set(){}
     string _name;
     singqueue<func_t> &_sq;
};

void * consumer(void *arg){
    thread_set* ts=static_cast< thread_set*>(arg);
   while(1){
    //生产者快:
    sleep(2);
   func_t tk= ts->_sq.pop();
   cout<<ts->_name<<" : ";
    tk();
     
   }
}  

void * producter(void *arg){
    thread_set* ts=static_cast< thread_set*>(arg);
     while(1){
        //消费者快
       // sleep(2);
        //cout<<"生产一个任务..."<<endl;
       ts->_sq.equeue(task[cnt%task.size()]);
        //生产者快:
        cout<<ts->_name<<" 生产一个任务..."<<endl;

        cnt++;
        
     }
}

int main(){
     task.push_back(print);
     task.push_back(download);
     task.push_back(getlog);
     task.push_back(browse);
     singqueue<func_t>sq;
     thread_set *ts1=new thread_set ("t1",sq);
     thread_set *ts2=new thread_set ("t2",sq);
     thread_set *ts3=new thread_set ("t3",sq);
     thread_set *ts4=new thread_set ("t4",sq);
     thread_set *ts5=new thread_set ("t5",sq);

 // 两个消费者三个生产者
    pthread_t c[2],p[3];
    pthread_create(c,nullptr,consumer,ts1);
    pthread_create(c+1,nullptr,consumer,ts2);
    pthread_create(p,nullptr,producter,ts3);
    pthread_create(p+1,nullptr,producter,ts4);
    pthread_create(p+2,nullptr,producter,ts5);
    

    pthread_join(*c,nullptr);
    pthread_join(*(c+1),nullptr);

    pthread_join(*p,nullptr);
    pthread_join(*(p+1),nullptr);
    pthread_join(*(p+2),nullptr);
 

    
}

测试效果:

 单线程操作:

多线程操作:

 5.3基于实现ringqueue后对信号量,互斥锁,条件变量的小结:

我们通过这次实现的ringqueue不难发现尽然没有判断空还是满;全靠信号量就解决了:

信号量本身就封装了同步的判断条件(对资源个数的统计)-->因此就不用像ful1 empty那样判断了;信号量把对临界资源是否存在/就绪等的条件,以原子性的形式,呈现在访问临界资源之前就判断了。

关于信号量等的使用:

1·如果是整体资源如之前的blockqueue-->此时只能p放c拿由于队列设计结构故不能同时进行--->二元信号量(互斥锁+cond条件变量)。

2·如果是整个资源分成小部分小部分-->也就是可以cp同时访问(如我们的vector模拟的环形队列;但是注意一些条件【可用信号量维护】)-->多元信号量(也可以变成互斥锁+cond条件变量;但是比较麻烦)。

最后我们简单理解下ringqueue:

简单理解下:就是比如之前的blockqueue队列;当线程a去访问这块资源时候发现不符合条件;因此就会等待直到有个线程b来了使得这个条件满足了;此时a才出来访问-->这样就相当于做到了线程ab之间的同步;也就是消费者于生成者之间的同步。

对于使用上的小结:

如果共享资源可以拆分就选信号量+互斥锁;如果不能拆分,是整体使用的话就选择互斥锁+条件变量。

 六·本篇小结:

在本篇;学习了mutex/cond/sem等相关概念及使用以及用它们实现的blockqueue/ringqueue等等;学习了设计时候的一些细节技巧等;以及它们底层的一些原理;注意事项等等;此外还通俗易懂对它们进行了相关总结方便记忆与使用与理解;本篇也就告以尾声;在续篇,博主将会介绍如何去实现日志策略以及用我们上面封装的那些实现线程池等以及死锁去,其他线程相关概念等等;欢迎大家继续订阅!!!

                              

Logo

纵情码海钱塘涌,杭州开发者创新动! 属于杭州的开发者社区!致力于为杭州地区的开发者提供学习、合作和成长的机会;同时也为企业交流招聘提供舞台!

更多推荐