目录

概念

实现

互斥

死锁

同步

条件变量

信号量

生产者消费者模型

 条件变量实现

信号量实现


概念

多线程之间对同一个临界资源的访问操作是安全的,访问同一个临界资源可能会造成数据二义

实现

互斥:保证执行流在同一时间对临界资源的唯一访问,保证安全

同步:通过一些判断实现对资源获取的合理序列操作

互斥

互斥锁本质:就是一个0/1的计数器,主要用于标记资源的访问状态;本身就是用来实现互斥,自己也就是一个临界资源.(同一个资源所有进程在访问的时候得加同一把锁)

自身的安全---互斥锁是一个原子操作

操作

加锁(将状态置为不可访问状态)

解锁(将状态置为可访问状态)

一个执行流在访问临界资源之前进行加锁操作,如果不能加锁则阻塞,在访问完资源之后进行解锁

1.定义互斥锁变量

pthread_mutex_t mutex;

2.初始化互斥锁

int pthread_mutex_init(pthread_mutex_t *mutex,pthread_mutexattr_t *attr);
  • mutex定义的互斥锁变量
  • attr:互斥锁属性,通常置NULL;
  • 返回值;成功返回0,失败返回错误编号

3.加锁

int pthread_mutex_lock(pthread_mutex_t *mutex);阻塞接口
int pthread_mutex_trylock(pthread_mutex_t *mutex);尝试加锁,加锁失败报错返回,非阻塞

 4.解锁

int pthread_mutex_unlock(pthread_mutex_t *mutex)

5.释放锁

int pthread_mutex_destroy(pthread_mutex_t *mutex)

注意:

  • 使用锁过程中,加锁后,在任意有可能退出的位置都要解锁
  • 锁只能保证安全操作,无法保证操作合理

死锁

程序流程无法继续推进,卡死的情况叫做死锁,这是由于对锁资源的争抢不当所导致的

产生的四个必要条件:

  1. 互斥:我加的锁,别人不能继续加
  2. 不可剥夺:我加的锁别人不能解
  3. 请求与保持:加A锁请求B锁,B锁请求不到不释放A
  4. 环路等待:加A请求B,对方加B请求A

预防:编写代码过程中破坏死锁产生的必要条件

1.加解锁顺序保持一致

2.使用非阻塞加锁,加不了锁释放已有

避免:银行家算法

思想:查看资源分配表,哪个线程邀请求哪个锁,根据已分配资源表和所有资源表判断,这个锁分配给线程是否有可能造成环路等待,有可能则不予分配

同步

通过一些条件判断,保证执行流的合理获取资源

条件变量

提供了一个pcb等待队列以及使线程阻塞和唤醒线程的两个接口

1.定义条件变量
pthread_cond_t cond;
2.初始化条件变量
int pthread_cons_init(pthread_cond_t *cond,pthread_condattr_t *attr)//属性通常置NULL
3.使线程阻塞
int pthread_cond_wait(pthread_cond_t *cond,pthread_mutex_t *mutex);

这个接口涉及三个操作:解锁,阻塞,被唤醒后加锁

并且解锁和陷入阻塞是原子操作,一步完成不会被打断

int pthread_cond_timedwait()限制时长的阻塞等待

pthread_cond_wait使线程阻塞.修改线程状态,将线程pcb加入到cond的pcb队列里

4.唤醒阻塞的线程
int pthread_cond_signal(pthread_cond_t *cond);将cond的pcb队列中的线程至少唤醒一个
int pthread_cond_broadcast(pthread_cond_t *cond);将cond的pcb队列中的线程全部唤醒
5.释放销毁
int pthread_cond_destroy(pthread_cond_t *cond);

信号只提供了使线程阻塞和唤醒线程接口,至于什么时候阻塞,全部由用户控制

注意:

1.是否满足条件的判断应该使用循环操作

2.多种角色线程等待应该分开来等待,分开唤醒防止唤醒角色错误,需要定义多个多个条件变量

#include<stdio.h>
#include<pthread.h>
#include<stdlib.h>
#include<unistd.h>
int pots=0;//盆 1表示有饭,0表示没饭
pthread_mutex_t mutex;
pthread_cond_t cond_stu;
pthread_cond_t cond_aut;
void *student(void *arg)
{
  while(1)
  {
    pthread_mutex_lock(&mutex);
    while(pots==0)
    {

      pthread_cond_wait(&cond_stu,&mutex);
    }
    printf("nice!真好吃~~~!再来一碗-%d\n",pots);
    pots=0;
    pthread_mutex_unlock(&mutex);//解锁
    pthread_cond_signal(&cond_aut);//唤醒阿姨
  }
  return NULL;
}
void *aunt(void *arg)
{
  while(1)
  {
    //加锁
    pthread_mutex_lock(&mutex);
    while(pots==1)
    {
      //阻塞
      pthread_cond_wait(&cond_aut,&mutex);
    }
    printf("ljx你的饭好了~~~!\n");
    pots=1;
    //解锁
    pthread_mutex_unlock(&mutex);
    pthread_cond_signal(&cond_stu);
    //唤醒学生
  }
  return NULL;
}
int main()
{
  pthread_t stu_id,aunt_id;
  int ret;
  pthread_mutex_init(&mutex,NULL);
  pthread_cond_init(&cond_aut,NULL);
  pthread_cond_init(&cond_stu,NULL);
  for(int i=0;i<4;++i)
  {
    ret=pthread_create(&stu_id,NULL,student,NULL);
    if(ret!=0)
    {
      printf("create  thread error\n");
      return -1;
    }
    ret=pthread_create(&aunt_id,NULL,aunt,NULL);
    if(ret!=0)
    {
      printf("create thread error\n");
      return -1;
    }
  }
  pthread_join(stu_id,NULL);
  pthread_join(aunt_id,NULL);
  pthread_mutex_destroy(&mutex);
  pthread_cond_destroy(&cond_aut);
  pthread_cond_destroy(&cond_stu);
  return 0;
}

信号量

本质是一个计数器,用于实现进程或线程之间的同步与互斥

操作:

P操作:计数-1,判断计数是否大于等于0,成立则返回否则阻塞

V操作:计数+1,唤醒一个阻塞的进程或线程

同步的实现:

通过计数器对资源的数量进行计数,获取资源之前进行P操作,产生资源后进行V操作通过这种方式实现对资源的合理获取

互斥的实现

计数器初始值为1(资源只有一个),访问资源前进行P操作,访问完进行V操作,实现的类似于加锁和解锁的操作。

接口流程

1.定义: sem_t sem;
2.初始化: int sem_init(sem_t *sem,int pshared,int value)
  •  sem:定义的信号量
  • pshared:决定这个信号量用于进程间(!0)还是线程间(0)
  • value :信号量初值
  • 返回值:成功返回0,失败返回-1;
3.P操作: int sem_wait(sem_t *sem);--阻塞等待
         int sem_trywait(sem_t *sem);--非阻塞等待
         int sem_timedwait(sem_t *sem,struct timespec*)---时间等待
4.V操作: int sem_post(sem_t *sem);
5.销毁操作 int sem_destroy(sem_t *sem);

生产者消费者模型

一种特殊的处理方式;针对场景:有大量数据的产生以及处理的场景

  1. 解耦合
  2. 支持忙闲不均(数据缓冲队列)
  3. 支持并发(缓冲队列必须线程安全)

实现

两个线程---生产者与消费者

线程安全的任务队列上(入队和出队)

线程安全的任务队列:

class BlockQueue
{
public:
    BlockQueue(){};
    ~BlockQueue(){}
    bool Push(const int data);
    bool Pop(int *data);

private:
    std::queue<int> _queue;
    int _capacity;
    pthread_mutex_t _mutex;
    pthread_cond_t _cond_pro
    pthread_cond_t _cond_cum
}

 条件变量实现

#include<iostream>
#include<cstdio>
#include<cstdlib>
#include<pthread.h>
#include<unistd.h>
#include<queue>
#define MAX_QUEUE 10
class BlockQueue
{
  public:

  BlockQueue(int maxq=MAX_QUEUE)
    :_capacity(maxq)
  {
    pthread_mutex_init(&_mutex,NULL);
    pthread_cond_init(&_cond_pro,NULL);
    pthread_cond_init(&_cond_cum,NULL);
  }
  ~BlockQueue(){
    pthread_mutex_destroy(&_mutex);
    pthread_cond_destroy(&_cond_pro);
    pthread_cond_destroy(&_cond_cum);
  }
  bool Push(int data)
  {
    pthread_mutex_lock(&_mutex);
    while(_queue.size()==_capacity)
    {
      pthread_cond_wait(&_cond_pro,&_mutex);
    }
    _queue.push(data);
    pthread_mutex_unlock(&_mutex);
    pthread_cond_signal(&_cond_cum);
    return true;
  }
  bool Pop(int *data)
  {
    pthread_mutex_lock(&_mutex);
    while(_queue.empty())
    {
      pthread_cond_wait(&_cond_cum,&_mutex);
    }
    _queue.pop();
    pthread_mutex_unlock(&_mutex);
    pthread_cond_signal(&_cond_pro);
    return true;
  }
  private:
  std::queue<int> _queue;
  int _capacity;
  pthread_mutex_t _mutex;
  pthread_cond_t _cond_pro;
  pthread_cond_t _cond_cum;
};
void *cums(void *arg)
{
  BlockQueue *q=(BlockQueue*)arg;
  int data;
  while(1)
  {
    q->Pop(&data);
    printf("%p- pop data:%d\n",pthread_self(),data);
  }
  return NULL;
}
void *pros(void *arg)
{
  BlockQueue *q=(BlockQueue*)arg;
  int i=0;
  while(1)
  {
    q->Push(i);
    printf("%p - Push data:%d\n",pthread_self(),i);
  }
  return NULL;
}
int main()
{
  pthread_t ctid[4],ptid[4];
  int ret;
  BlockQueue q;
  for(int i=0;i<4;++i)
  {
    ret=pthread_create(&ctid[i],NULL,cums,(void*)&q);
    if(ret!=0)
    {
      printf("thread create error!\n");
      return -1;
    } 
    ret=pthread_create(&ptid[i],NULL,pros,(void*)&q);
    if(ret!=0)
    {
      printf("thread create error!\n");
      return -1;
    }
  }
  for(int i=0;i<4;++i)
  {
    pthread_join(ctid[i],NULL);
    pthread_join(ptid[i],NULL);
  }
  return 0;
}

信号量实现

实现线程安全的数据队列

class RingQueue//环形队列
{
    std::vector<int> _array;
    int _capacity;
    int write_step;
    int read_step;
    sem_t sem_idle;//空闲空间计数
    sem_t sme_data;//数据计数
};
#include<iostream>
#include<unistd.h>
#include<vector>
#include<pthread.h>
#include<cstdio>
#include<cstdlib>
#include<semaphore.h>
#define MAXQ 8
class RingQueue
{
  public:
    RingQueue(int maxq=MAXQ)
      :_array(maxq)
      ,_capacity(maxq)
       ,write_step(0)
       ,read_step(0)
  {
    sem_init(&sem_idle,0,maxq);
    sem_init(&sem_data,0,0);
    sem_init(&sem_lock,0,1);
  }
    ~RingQueue()
    {
      sem_destroy(&sem_idle);
      sem_destroy(&sem_data);
      sem_destroy(&sem_lock);
    }
    bool Push(const int data)
    {
      sem_wait(&sem_idle);//空闲空间-1
      sem_wait(&sem_lock);
      _array[write_step]=data;
      write_step=(write_step+1)%_capacity;
      sem_post(&sem_lock);
      sem_post(&sem_data);//资源+1
      return true;
    }
    bool Pop(int *data)
    {
      sem_wait(&sem_data);//资源-1
      sem_wait(&sem_lock);
      *data=_array[read_step];
      read_step=(read_step+1)%_capacity;
      sem_post(&sem_lock);
      sem_post(&sem_idle);//空闲空间+1
      return true;
    }
  private:
    std::vector<int> _array;
    int _capacity;
    int write_step;
    int read_step;
    sem_t sem_idle;
    sem_t sem_data;
    sem_t sem_lock;
};

void *cums(void *arg)
{
  RingQueue *q=(RingQueue*)arg;
  int data;
  while(1)
  {
    q->Pop(&data);
    printf("%p- pop data:%d\n",pthread_self(),data);
  }
  return NULL;
}
void *pros(void *arg)
{
  RingQueue *q=(RingQueue*)arg;
  int i=0;
  while(1)
  {
    q->Push(i);
    printf("%p - Push data:%d\n",pthread_self(),i);
  }
  return NULL;
}
int main()
{
  pthread_t ctid[4],ptid[4];
  int ret;
  RingQueue q;
  for(int i=0;i<4;++i)
  {
    ret=pthread_create(&ctid[i],NULL,cums,(void*)&q);
    if(ret!=0)
    {
      printf("thread create error!\n");
      return -1;
    } 
    ret=pthread_create(&ptid[i],NULL,pros,(void*)&q);
    if(ret!=0)
    {
      printf("thread create error!\n");
      return -1;
    }
  }
  for(int i=0;i<4;++i)
  {
    pthread_join(ctid[i],NULL);
    pthread_join(ptid[i],NULL);
  }
  return 0;


}

Logo

更多推荐