Linux多线程理解---(线程安全(互斥锁,信号量,条件变量)#/)
线程安全概念多线程之间对同一个临界资源的访问操作是安全的,访问同一个临界资源可能会造成数据二义实现互斥:保证执行流在同一时间对临界资源的唯一访问,保证安全同步:通过一些判断实现对资源获取的合理序列操作互斥互斥锁本质:就是一个0/1的计数器,主要用于标记资源的访问状态;本身就是用来实现互斥,自己也就是一个临界资源.(同一个资源所有进程在访问的时候得加同一把锁)自身的安全---互斥锁是一个原子操作操作
目录
概念
多线程之间对同一个临界资源的访问操作是安全的,访问同一个临界资源可能会造成数据二义
实现
互斥:保证执行流在同一时间对临界资源的唯一访问,保证安全
同步:通过一些判断实现对资源获取的合理序列操作
互斥
互斥锁本质:就是一个0/1的计数器,主要用于标记资源的访问状态;本身就是用来实现互斥,自己也就是一个临界资源.(同一个资源所有进程在访问的时候得加同一把锁)
自身的安全---互斥锁是一个原子操作
操作
加锁(将状态置为不可访问状态)
解锁(将状态置为可访问状态)
一个执行流在访问临界资源之前进行加锁操作,如果不能加锁则阻塞,在访问完资源之后进行解锁
1.定义互斥锁变量
pthread_mutex_t mutex;
2.初始化互斥锁
int pthread_mutex_init(pthread_mutex_t *mutex,pthread_mutexattr_t *attr);
- mutex定义的互斥锁变量
- attr:互斥锁属性,通常置NULL;
- 返回值;成功返回0,失败返回错误编号
3.加锁
int pthread_mutex_lock(pthread_mutex_t *mutex);阻塞接口
int pthread_mutex_trylock(pthread_mutex_t *mutex);尝试加锁,加锁失败报错返回,非阻塞
4.解锁
int pthread_mutex_unlock(pthread_mutex_t *mutex)
5.释放锁
int pthread_mutex_destroy(pthread_mutex_t *mutex)
注意:
- 使用锁过程中,加锁后,在任意有可能退出的位置都要解锁
- 锁只能保证安全操作,无法保证操作合理
死锁
程序流程无法继续推进,卡死的情况叫做死锁,这是由于对锁资源的争抢不当所导致的
产生的四个必要条件:
- 互斥:我加的锁,别人不能继续加
- 不可剥夺:我加的锁别人不能解
- 请求与保持:加A锁请求B锁,B锁请求不到不释放A
- 环路等待:加A请求B,对方加B请求A
预防:编写代码过程中破坏死锁产生的必要条件
1.加解锁顺序保持一致
2.使用非阻塞加锁,加不了锁释放已有
避免:银行家算法
思想:查看资源分配表,哪个线程邀请求哪个锁,根据已分配资源表和所有资源表判断,这个锁分配给线程是否有可能造成环路等待,有可能则不予分配
同步
通过一些条件判断,保证执行流的合理获取资源
条件变量
提供了一个pcb等待队列以及使线程阻塞和唤醒线程的两个接口
1.定义条件变量
pthread_cond_t cond;
2.初始化条件变量
int pthread_cons_init(pthread_cond_t *cond,pthread_condattr_t *attr)//属性通常置NULL
3.使线程阻塞
int pthread_cond_wait(pthread_cond_t *cond,pthread_mutex_t *mutex);
这个接口涉及三个操作:解锁,阻塞,被唤醒后加锁
并且解锁和陷入阻塞是原子操作,一步完成不会被打断
int pthread_cond_timedwait()限制时长的阻塞等待
pthread_cond_wait使线程阻塞.修改线程状态,将线程pcb加入到cond的pcb队列里
4.唤醒阻塞的线程
int pthread_cond_signal(pthread_cond_t *cond);将cond的pcb队列中的线程至少唤醒一个
int pthread_cond_broadcast(pthread_cond_t *cond);将cond的pcb队列中的线程全部唤醒
5.释放销毁
int pthread_cond_destroy(pthread_cond_t *cond);
信号只提供了使线程阻塞和唤醒线程接口,至于什么时候阻塞,全部由用户控制
注意:
1.是否满足条件的判断应该使用循环操作
2.多种角色线程等待应该分开来等待,分开唤醒防止唤醒角色错误,需要定义多个多个条件变量
#include<stdio.h>
#include<pthread.h>
#include<stdlib.h>
#include<unistd.h>
int pots=0;//盆 1表示有饭,0表示没饭
pthread_mutex_t mutex;
pthread_cond_t cond_stu;
pthread_cond_t cond_aut;
void *student(void *arg)
{
while(1)
{
pthread_mutex_lock(&mutex);
while(pots==0)
{
pthread_cond_wait(&cond_stu,&mutex);
}
printf("nice!真好吃~~~!再来一碗-%d\n",pots);
pots=0;
pthread_mutex_unlock(&mutex);//解锁
pthread_cond_signal(&cond_aut);//唤醒阿姨
}
return NULL;
}
void *aunt(void *arg)
{
while(1)
{
//加锁
pthread_mutex_lock(&mutex);
while(pots==1)
{
//阻塞
pthread_cond_wait(&cond_aut,&mutex);
}
printf("ljx你的饭好了~~~!\n");
pots=1;
//解锁
pthread_mutex_unlock(&mutex);
pthread_cond_signal(&cond_stu);
//唤醒学生
}
return NULL;
}
int main()
{
pthread_t stu_id,aunt_id;
int ret;
pthread_mutex_init(&mutex,NULL);
pthread_cond_init(&cond_aut,NULL);
pthread_cond_init(&cond_stu,NULL);
for(int i=0;i<4;++i)
{
ret=pthread_create(&stu_id,NULL,student,NULL);
if(ret!=0)
{
printf("create thread error\n");
return -1;
}
ret=pthread_create(&aunt_id,NULL,aunt,NULL);
if(ret!=0)
{
printf("create thread error\n");
return -1;
}
}
pthread_join(stu_id,NULL);
pthread_join(aunt_id,NULL);
pthread_mutex_destroy(&mutex);
pthread_cond_destroy(&cond_aut);
pthread_cond_destroy(&cond_stu);
return 0;
}
信号量
本质是一个计数器,用于实现进程或线程之间的同步与互斥
操作:
P操作:计数-1,判断计数是否大于等于0,成立则返回否则阻塞
V操作:计数+1,唤醒一个阻塞的进程或线程
同步的实现:
通过计数器对资源的数量进行计数,获取资源之前进行P操作,产生资源后进行V操作通过这种方式实现对资源的合理获取
互斥的实现
计数器初始值为1(资源只有一个),访问资源前进行P操作,访问完进行V操作,实现的类似于加锁和解锁的操作。
接口流程
1.定义: sem_t sem;
2.初始化: int sem_init(sem_t *sem,int pshared,int value)
- sem:定义的信号量
- pshared:决定这个信号量用于进程间(!0)还是线程间(0)
- value :信号量初值
- 返回值:成功返回0,失败返回-1;
3.P操作: int sem_wait(sem_t *sem);--阻塞等待
int sem_trywait(sem_t *sem);--非阻塞等待
int sem_timedwait(sem_t *sem,struct timespec*)---时间等待
4.V操作: int sem_post(sem_t *sem);
5.销毁操作 int sem_destroy(sem_t *sem);
生产者消费者模型
一种特殊的处理方式;针对场景:有大量数据的产生以及处理的场景
- 解耦合
- 支持忙闲不均(数据缓冲队列)
- 支持并发(缓冲队列必须线程安全)
实现
两个线程---生产者与消费者
线程安全的任务队列上(入队和出队)
线程安全的任务队列:
class BlockQueue
{
public:
BlockQueue(){};
~BlockQueue(){}
bool Push(const int data);
bool Pop(int *data);
private:
std::queue<int> _queue;
int _capacity;
pthread_mutex_t _mutex;
pthread_cond_t _cond_pro
pthread_cond_t _cond_cum
}
条件变量实现
#include<iostream>
#include<cstdio>
#include<cstdlib>
#include<pthread.h>
#include<unistd.h>
#include<queue>
#define MAX_QUEUE 10
class BlockQueue
{
public:
BlockQueue(int maxq=MAX_QUEUE)
:_capacity(maxq)
{
pthread_mutex_init(&_mutex,NULL);
pthread_cond_init(&_cond_pro,NULL);
pthread_cond_init(&_cond_cum,NULL);
}
~BlockQueue(){
pthread_mutex_destroy(&_mutex);
pthread_cond_destroy(&_cond_pro);
pthread_cond_destroy(&_cond_cum);
}
bool Push(int data)
{
pthread_mutex_lock(&_mutex);
while(_queue.size()==_capacity)
{
pthread_cond_wait(&_cond_pro,&_mutex);
}
_queue.push(data);
pthread_mutex_unlock(&_mutex);
pthread_cond_signal(&_cond_cum);
return true;
}
bool Pop(int *data)
{
pthread_mutex_lock(&_mutex);
while(_queue.empty())
{
pthread_cond_wait(&_cond_cum,&_mutex);
}
_queue.pop();
pthread_mutex_unlock(&_mutex);
pthread_cond_signal(&_cond_pro);
return true;
}
private:
std::queue<int> _queue;
int _capacity;
pthread_mutex_t _mutex;
pthread_cond_t _cond_pro;
pthread_cond_t _cond_cum;
};
void *cums(void *arg)
{
BlockQueue *q=(BlockQueue*)arg;
int data;
while(1)
{
q->Pop(&data);
printf("%p- pop data:%d\n",pthread_self(),data);
}
return NULL;
}
void *pros(void *arg)
{
BlockQueue *q=(BlockQueue*)arg;
int i=0;
while(1)
{
q->Push(i);
printf("%p - Push data:%d\n",pthread_self(),i);
}
return NULL;
}
int main()
{
pthread_t ctid[4],ptid[4];
int ret;
BlockQueue q;
for(int i=0;i<4;++i)
{
ret=pthread_create(&ctid[i],NULL,cums,(void*)&q);
if(ret!=0)
{
printf("thread create error!\n");
return -1;
}
ret=pthread_create(&ptid[i],NULL,pros,(void*)&q);
if(ret!=0)
{
printf("thread create error!\n");
return -1;
}
}
for(int i=0;i<4;++i)
{
pthread_join(ctid[i],NULL);
pthread_join(ptid[i],NULL);
}
return 0;
}
信号量实现
实现线程安全的数据队列
class RingQueue//环形队列
{
std::vector<int> _array;
int _capacity;
int write_step;
int read_step;
sem_t sem_idle;//空闲空间计数
sem_t sme_data;//数据计数
};
#include<iostream>
#include<unistd.h>
#include<vector>
#include<pthread.h>
#include<cstdio>
#include<cstdlib>
#include<semaphore.h>
#define MAXQ 8
class RingQueue
{
public:
RingQueue(int maxq=MAXQ)
:_array(maxq)
,_capacity(maxq)
,write_step(0)
,read_step(0)
{
sem_init(&sem_idle,0,maxq);
sem_init(&sem_data,0,0);
sem_init(&sem_lock,0,1);
}
~RingQueue()
{
sem_destroy(&sem_idle);
sem_destroy(&sem_data);
sem_destroy(&sem_lock);
}
bool Push(const int data)
{
sem_wait(&sem_idle);//空闲空间-1
sem_wait(&sem_lock);
_array[write_step]=data;
write_step=(write_step+1)%_capacity;
sem_post(&sem_lock);
sem_post(&sem_data);//资源+1
return true;
}
bool Pop(int *data)
{
sem_wait(&sem_data);//资源-1
sem_wait(&sem_lock);
*data=_array[read_step];
read_step=(read_step+1)%_capacity;
sem_post(&sem_lock);
sem_post(&sem_idle);//空闲空间+1
return true;
}
private:
std::vector<int> _array;
int _capacity;
int write_step;
int read_step;
sem_t sem_idle;
sem_t sem_data;
sem_t sem_lock;
};
void *cums(void *arg)
{
RingQueue *q=(RingQueue*)arg;
int data;
while(1)
{
q->Pop(&data);
printf("%p- pop data:%d\n",pthread_self(),data);
}
return NULL;
}
void *pros(void *arg)
{
RingQueue *q=(RingQueue*)arg;
int i=0;
while(1)
{
q->Push(i);
printf("%p - Push data:%d\n",pthread_self(),i);
}
return NULL;
}
int main()
{
pthread_t ctid[4],ptid[4];
int ret;
RingQueue q;
for(int i=0;i<4;++i)
{
ret=pthread_create(&ctid[i],NULL,cums,(void*)&q);
if(ret!=0)
{
printf("thread create error!\n");
return -1;
}
ret=pthread_create(&ptid[i],NULL,pros,(void*)&q);
if(ret!=0)
{
printf("thread create error!\n");
return -1;
}
}
for(int i=0;i<4;++i)
{
pthread_join(ctid[i],NULL);
pthread_join(ptid[i],NULL);
}
return 0;
}
更多推荐
所有评论(0)