Linux中线程锁 pthread_mutex_t 、条件变量 pthread_cond_t (有关日志的实现)
在头文件#include <pthread.h>中 pthread_mutex_t与pthread_cond_t这两个都是在<pthread.h>中定义的结构,pthread_mutex_t是互斥锁,pthread_cond_t是条件变量,主要用于PV操作。1. 线程互斥量 pthread_mutex_t 的初始化: 初始化可以用/* Initialize a mute
在头文件
#include <pthread.h>
中
pthread_mutex_t 与 pthread_cond_t
这两个都是在<pthread.h>中定义的结构,pthread_mutex_t是互斥锁,pthread_cond_t是条件变量,主要用于PV操作。
1. 线程互斥量 pthread_mutex_t 的初始化:
初始化可以用
/* Initialize a mutex. */
extern int pthread_mutex_init (pthread_mutex_t *__mutex, const pthread_mutexattr_t *__mutexattr
这是动态创建方法,参数mutex传入pthread_mutex_t结构体的地址,参数attr是定义互斥锁的属性,一般为NULL。成功初始化
返回0,否则返回其他值( 错误编号)。至于后面的attr可以有4个取值的属性,现列在下方:
互斥锁的属性在创建锁的时候指定,在LinuxThreads实现中仅有一个锁类型属性,不同的锁类型在试图对一个已经被锁定的互斥锁加锁时表现不同。当前(glibc2.2.3,linuxthreads0.9)有四个值可供选择:
-
PTHREAD_MUTEX_TIMED_NP,这是缺省值,也就是普通锁。当一个线程加锁以后,其余请求锁的线程将形成一个等待队列,并在解锁后按优先级获得锁。这种锁策略保证了资源分配的公平性。
-
PTHREAD_MUTEX_RECURSIVE_NP,嵌套锁,允许同一个线程对同一个锁成功获得多次,并通过多次unlock解锁。如果是不同线程请求,则在加锁线程解锁时重新竞争。
-
PTHREAD_MUTEX_ERRORCHECK_NP,检错锁,如果同一个线程请求同一个锁,则返回EDEADLK,否则与PTHREAD_MUTEX_TIMED_NP类型动作相同。这样就保证当不允许多次加锁时不会出现最简单情况下的死锁。
-
PTHREAD_MUTEX_ADAPTIVE_NP,适应锁,动作最简单的锁类型,仅等待解锁后重新竞争。
除了动态初始化,还有静态初始化方法:POSIX定义了一个宏 PTHREAD_MUTEX_INITIALIZER 来静态初始化互斥锁,方法如下:
pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
前面已经说过,pthread_mutex_t 是一个结构体,而PTHREAD_MUTEX_INITIALIZER是一个结构常量。
2.pthread_mutex_t的销毁:
pthread_mutex_destroy(); //用于注销一个互斥锁,API定义如下:
int pthread_mutex_destroy(pthread_mutex_t *mutex); // 返回0则销毁成功,否则销毁失败。
3.pthread_mutex_t的加锁和解锁过程:
这里介绍三种:
int pthread_mutex_lock(pthread_mutex_t *mutex);
int pthread_mutex_unlock(pthread_mutex_t *mutex);
int pthread_mutex_trylock(pthread_mutex_t *mutex);
trylock与lock不同的一点在于在锁已经被占据时返回EBUSY而不是挂起等待。pthread_mutex_lock() 在成功完成之后会返回零。其他任何返回值都表示出现了错误。如果出现以下任一情况,该函数将失败并返回对应的值。pthread_mutex_unlock() 在成功完成之后会返回零。其他任何返回值都表示出现了错误。如果出现以下情况,该函数将失败并返回对应的值。
4.条件变量 pthread_cond_t:
pthread_cond_t是条件变量,和pthread_mutex_t类似,初始化分为静态和动态的:
#include <pthread.h>
int pthread_cond_init(pthread_cond_t *cv, const pthread_condattr_t *cattr); //或者是下面的静态初始化
//成功则返回0,否则返回其他值。注意不能由多个线程同时初始化一个条件变量。
//当需要重新初始化或释放一个条件变量时,应用程序必须保证这个条件变量未被使用。
pthread_cond_t cv = PTHREAD_COND_INITIALIZER;
除此之外还有其他函数,下面会具体讲解:
阻塞在条件变量上 pthread_cond_wait
解除在条件变量上的阻塞 pthread_cond_signal
阻塞直到指定时间 pthread_cond_timedwait
释放阻塞的所有线程 pthread_cond_broadcast
释放条件变量 pthread_cond_destroy
//首先是pthread_cond_wait:它的定义:
#include <pthread.h>
int pthread_cond_wait(pthread_cond_t *cv, pthread_mutex_t *mutex);
//返回值:函数成功返回0;任何其他返回值都表示错误
函数将解锁 mutex 参数指向的互斥锁,并使当前线程阻塞在cv参数指向的条件变量上。
被阻塞的线程可以被pthread_cond_signal函数,pthread_cond_broadcast函数唤醒,也可能在被信号中断后被唤醒。
pthread_cond_wait 函数的返回并不意味着条件的值一定发生了变化,必须重新检查条件的值。
pthread_cond_wait 函数返回时,相应的互斥锁将被当前 线程锁定,即使是函数出错返回。
基于生产者消费者模式的MQ(msg queue)实现了线程间通信,在生产者消费者模型中通常都会用到互斥锁pthread_mutex_t来保护共享内存资源,多个线程访问共享内核空间之前都会尝试获取mutex,如果有其他线程正在使用则当前线程进入锁等待状态。这样的机制难免会带来两个问题:
1:如果两个线程同时获取mutex,则两个线程会进入死锁状态
2:如果多个线程依次获取mutex,那么这些线程都会进入锁等待状态,浪费了cpu资源
因此,我们可以使用互斥锁pthread_mutex_t和pthread_cond_t配合来避免上面的问题
简单来说这种方法的核心思路:就是调用pthread_mutex_lock(mutex)获取锁时再判断一下是否满足获取资源的条件,比如队列
为空时执行get操作就是不满足条件,如果不满足条件则调用pthread_cond_wait(cond, mutex)这个函数使当前线程进入睡眠并且把
mutex互斥锁释放掉,等待获取资源的条件满足时,比如队列新加了一条msg,此时其他线程会调用pthread_cond_signal()来唤醒这
个线程并锁定mutex,这样就不会出现多个线程等待一个mutex或者死锁的情况了。
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <pthread.h>
#include <iostream>
#include <signal.h>
using namespace std;
pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;/*初始化互斥锁*/
pthread_cond_t cond = PTHREAD_COND_INITIALIZER;/*初始化条件变量*/
void *thread1(void* arg){
while(1){
pthread_cond_wait(&cond, &mutex);
cout << "线程一被唤醒" << endl;
}
}
void *thread2(void *arg){
while (1)
{
pthread_cond_wait(&cond, &mutex);
cout << "线程二被唤醒" << endl;
}
}
void func(int sig){
pthread_cond_signal(&cond);
// pthread_cond_broadcast(&cond);
}
int main(){
signal(15, func); //接收信号
// pthread_mutex_init(&mutex, NULL);
// pthread_cond_init(&cond, NULL);
pthread_t thid1, thid2;
pthread_create(&thid1, NULL, thread1, NULL);
pthread_create(&thid2, NULL, thread2, NULL);
pthread_join(thid1, NULL);
pthread_join(thid2, NULL);
return 0;
}
在另一个终端 输入 killall -15 a.out ,pthread_cond_signal 的时候依次被唤醒,pthread_cond_broadcast 都被唤醒。
生产者、 消费者 代码
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <pthread.h>
#include <string>
#include <iostream>
#include <signal.h>
#include <vector>
#include <string.h>
using namespace std;
int mesgid = 1; //消息的计数器
//缓存消息的结构体
struct st_message{
int mesgid;
char message[1024];
}stmesg;
vector<struct st_message> vcache; //用vector容器做缓存
pthread_cond_t cond = PTHREAD_COND_INITIALIZER;
pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
//消费者 出队线程主函数
void *outcache(void *arg){
//增加线程清理函数、释放锁
struct st_message stmesg;
while(1){
pthread_mutex_lock(&mutex); //加锁
//如果缓存为空,等待
while(vcache.size() == 0){
pthread_cond_wait(&cond, &mutex);
}
//从缓存中获取第一条记录,然后删除该记录
memcpy(&stmesg, &vcache[0], sizeof(struct st_message));
vcache.erase(vcache.begin());
pthread_mutex_unlock(&mutex); //解锁
//以下是处理业务的代码
cout<< "phid = " << pthread_self() << " mesgid = " << stmesg.mesgid << endl;
usleep(100);
}
}
//生产者、把生产的数据放入缓存
void incache(int sig){
struct st_message stmesg;
memset(&stmesg, 0, sizeof(struct st_message));
pthread_mutex_lock(&mutex); //加锁
//生产数据 放入缓存
stmesg.mesgid = mesgid++; //这里为了避免内存拷贝,可以用链表
vcache.push_back(stmesg);
stmesg.mesgid = mesgid++;
vcache.push_back(stmesg);
stmesg.mesgid = mesgid++;
vcache.push_back(stmesg);
stmesg.mesgid = mesgid++;
vcache.push_back(stmesg);
stmesg.mesgid = mesgid++;
vcache.push_back(stmesg);
pthread_mutex_unlock(&mutex); // 解锁
// pthread_cond_signal(&cond);
pthread_cond_broadcast(&cond); //触发条件 激活全部的线程
}
int main(){
signal(15, incache);
pthread_t thid1, thid2, thid3;
pthread_create(&thid1, NULL, outcache, NULL);
pthread_create(&thid2, NULL, outcache, NULL);
pthread_create(&thid3, NULL, outcache, NULL);
pthread_join(thid1, NULL);
pthread_join(thid2, NULL);
pthread_join(thid3, NULL);
return 0;
}
这里pthread_mutex_t配合条件变量 pthread_cond_t 会防止出现死锁,如果没有条件变量,会出现下面这种情况
消费者进程拿到互斥锁 --> 进入临界区 --> 发现共享资源 n 不满足继续执行的条件 --> 等待 n > 0
生产者进程占有互斥锁 --> 生产者进程无法进入临界区 --> 无法修改 n 的值 --> 生产者等待消费者释放互斥锁
分解pthread_cond_wait的动作为以下几步:
1,线程放在等待队列上,解锁
2,等待 pthread_cond_signal或者pthread_cond_broadcast信号之后去竞争锁
3,若竞争到互斥索则加锁。
持有锁的时间应该是越短越好,因为持有锁的话,就意味着等待
上面代码为了防止内存拷贝,可以用链表
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <pthread.h>
//链表的结点
struct msg
{
int num;
struct msg *next;
};
struct msg *head = NULL; //头指针
struct msg *temp = NULL; //节点指针
//静态方式初始化互斥锁和条件变量
pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
pthread_cond_t has_producer = PTHREAD_COND_INITIALIZER;
/*
用于阻塞当前线程,等待别的线程使用pthread_cond_signal()或pthread_cond_broadcast来唤醒它 pthread_cond_wait()
必须与pthread_mutex配套使用。pthread_cond_wait()函数一进入wait状态就会自动release mutex。
当其他线程通过pthread_cond_signal()或pthread_cond_broadcast,把该线程唤醒,使pthread_cond_wait()通过(返回)时,该线程又自动获得该mutex。
*/
void *producer(void *arg)
{
while (1) //线程正常不会解锁,除非收到终止信号
{
pthread_mutex_lock(&mutex); //加锁
temp = (msg*)malloc(sizeof(struct msg));
temp->num = rand() % 100 + 1;
temp->next = head;
head = temp; //头插法
printf("---producered---%d\n", temp->num);
pthread_mutex_unlock(&mutex); //解锁
pthread_cond_signal(&has_producer); //唤醒消费者线程
usleep(rand() % 3000); //为了使该线程放弃cpu,让结果看起来更加明显。
}
return NULL;
}
void *consumer(void *arg)
{
while (1) //线程正常不会解锁,除非收到终止信号
{
pthread_mutex_lock(&mutex); //加锁
while (head == NULL) //如果共享区域没有数据,则解锁并等待条件变量
{
pthread_cond_wait(&has_producer, &mutex); //我们通常在一个循环内使用该函数
}
temp = head;
head = temp->next;
printf("------------------consumer--%d\n", temp->num);
free(temp); //删除节点,头删法
temp = NULL; //防止野指针
pthread_mutex_unlock(&mutex); //解锁
usleep(rand() % 3000); //为了使该线程放弃cpu,让结果看起来更加明显。
}
return NULL;
}
int main(void)
{
pthread_t ptid, ctid;
srand(time(NULL)); //根据时间摇一个随机数种子
//创建生产者和消费者线程
pthread_create(&ptid, NULL, producer, NULL);
pthread_create(&ctid, NULL, consumer, NULL);
//主线程回收两个子线程
pthread_join(ptid, NULL);
pthread_join(ctid, NULL);
return 0;
}
信号量
信号量与互斥量的区别
1. 互斥量用于线程的互斥,信号量用于线程的同步。
这是互斥量和信号量的根本区别,也就是互斥和同步之间的区别。
互斥:是指某一资源同时只允许一个访问者对其进行访问,具有唯一性和排它性。但互斥无法限制访问者对资源的访问顺序,即访问是无序的。
同步:是指在互斥的基础上(大多数情况),通过其它机制实现访问者对资源的有序访问。在大多数情况下,同步已经实现了互斥,特别是所有写入资源的情况必定是互斥的。少数情况是指可以允许多个访问者同时访问资源
以上区别是主要想记住的。
note:信号量可以用来实现互斥量的功能
2. 互斥量值只能为0/1,信号量值可以为非负整数。
也就是说,一个互斥量只能用于一个资源的互斥访问,它不能实现多个资源的多线程互斥问题。信号量可以实现多个同类资源
的多线程互斥和同步。当信号量为单值信号量是,也可以完成一个资源的互斥访问。
3. 互斥量的加锁和解锁必须由同一线程分别对应使用,信号量可以由一个线程释放,另一个线程得到。
sem_init() 函数是Posix信号量操作中的函数。sem_init() 初始化一个定位在 sem 的匿名信号量。value 参数指定信号量的初
始值。 pshared 参数指明信号量是由进程内线程共享,还是由进程之间共享。如果 pshared 的值为 0,那么信号量将被进程内的线程共享,
并且应该放置在这个进程的所有线程都可见的地址上(如全局变量,或者堆上动态分配的变量)。
int sem_init(sem_t *sem,int pshared,unsigned int value);
sem :指向信号量对象
pshared : 指明信号量的类型。不为0时此信号量在进程间共享,否则只能为当前进程的所有线程共享。
value : 指定信号量值的大小。
int sem_post(sem_t *sem);
sem_post是给信号量的值加上一个“1”,它是一个“原子操作”。
int sem_wait(sem_t * sem);
sem_wait也是一个原子操作,它的作用是从信号量的值减去一个“1”,但它永远会先等待该信号量为一个非零值才开始做减法。
如果对一个值为0的信号量调用sem_wait(),这个函数就会原地等待直到有其它线程增加了这个值使它不再是0为止。
如果有两个线程都在sem_wait()中等待同一个信号量变成非零值,那么当它被第三个线程增加 一个“1”时,
等待线程中只有一个能够对信号量做减法并继续执行,另一个还将处于等待状态。
为什么要有日志文件
我们不可能实时的24小时对系统进行人工监控,那么该如何定位功能丧失的原因呢?这时,对于系统日志来说就“是时候表演真
正的技术了”。日志对于运行环境中系统的监控和问题定位是至关重要的,在系统设计、开发和实现的过程中必须时刻注意着log的输出,这
将会对于日后的系统监控和异常分析起至关重要的作用!
日志等级通常分为四种:DEBUG、INFO、WARN、ERROR
DEBUG:系统调试信息,通常用于开发过程中对系统运行情况的监控,在实际运行环境中不进行输出。
INFO:系统运行的关键性信息,通常用于对系统运行情况的监控。
WARN:告警信息,系统存在潜在的问题,有可能引起运行异常,但此时并未产生异常。
ERROR:系统错误信息,需要进行及时处理和优化。
这里列出来了各种等级的日志信息,在开发过程中哪些信息需要设置为哪种等级有赖于开发者的自己判断,这里只是给个建议。
同步日志,日志写入函数与工作线程串行执行,由于涉及到I/O操作,当单条日志比较大的时候,同步模式会阻塞整个处理流程,服务器所能处理的并发能力将有所下降,尤其是在峰值的时候,写日志可能成为系统的瓶颈。
异步日志,将所写的日志内容先存入阻塞队列,写线程从阻塞队列中取出内容,写入日志。
异步写入方式,将生产者-消费者模型封装为阻塞队列,创建一个写线程,工作线程将要写的内容push进队列,写线程从队列中
取出内容,写入日志文件。日志系统大致可以分成两部分,其一是单例模式与阻塞队列的定义,其二是日志类的定义与使用。
更多推荐
所有评论(0)