并发编程(c++)——3.生产者消费者者模式笔记-3cpu空转问题
生产者消费者模式是并发编程的核心模式之一,核心是想要提高程序的运行效率。
这里记录一下自己的思考,使用通俗的语言,和以日志记录为例,解读生产者消费者模式,并实现生产者消费者模式。
将生产者消费者模式的核心内容划分为三个问题:阻塞问题、内存积压问题、cpu空转问题。
这里是第三章,cpu空转问题。
cpu空转问题
实际运行中,生产者和消费者的速度是不一致的,
当生产者生产速度大于消费者消费速度时,会导致数据积压,当数据过多时,还会导致内存溢出,这个问题在上一章中已经提到并解决了。
那么,生产者生产速度小于消费者消费速度时,会产生什么问题吗。
答案是,当生产者生产速度小于消费者消费速度时,会使得缓冲区数据出现短暂空缺,此时消费者没有数据消耗就一直在忙等待,这会导致cpu空转,平白消耗cpu资源。
日志场景
由于日志案例中,生产者生产速度大于消费者消费速度,所以没有体现这个问题。
但是从第一章阻塞问题中可以看到,日志生产的时间比日志消费的时间要短30ms左右,只要在生产时让线程休眠100ms,这个问题就可以体现出来。
代码实现
在生产线程中,让线程休眠100ms,这样生产速度就小于消费速度了。
在消费线程中统计while循环的次数。
在两个线程启动中间,让主线程休眠1s,保证生产线程已经启动。
// 消费者空转
void async_log_qustion_spin()
{
std::cout<<"异步解耦日志系统,消费者空转问题"<<std::endl;
std::ofstream log_file;
std::string log_path="log2.txt";
// 缓存尺寸
int buffer_size=10;
// 缓存
std::deque<std::string> log_buffer;
// 资源上锁
std::mutex log_mutex;
// 停止标志
std::atomic<bool> stop_flag(false);
// cpu内存处理数据
auto log_data_func=[&log_buffer,&log_mutex,&buffer_size,&stop_flag](){
int count=0;
std::chrono::high_resolution_clock::time_point start = std::chrono::high_resolution_clock::now();
while(true)
{
if(log_buffer.size()<buffer_size)
{
// std::chrono::high_resolution_clock::time_point t1 = std::chrono::high_resolution_clock::now();
//创建数据
std::string large_data(2048, 'x'); // 2KB 数据
//时间记录
auto time = std::chrono::system_clock::now();
auto time_t = std::chrono::system_clock::to_time_t(time);
std::string content_time=std::ctime(&time_t);
// std::cout<<"生成日志:"<<content_time<<std::endl;
std::string content="["+content_time+"] "+large_data;
// 增加生成时间
std::this_thread::sleep_for(std::chrono::milliseconds(100));
//写入缓存
std::unique_lock<std::mutex> lock(log_mutex);
log_buffer.push_back(content);
lock.unlock();
// std::chrono::high_resolution_clock::time_point t2 = std::chrono::high_resolution_clock::now();
// std::cout<<"处理业务耗时:"<<std::chrono::duration_cast<std::chrono::microseconds>(t2-t1).count()<<"微秒"<<std::endl<<std::endl;
count++;
}
if(stop_flag)
{
break;
}
if(count==10)
{
std::chrono::high_resolution_clock::time_point t_10 = std::chrono::high_resolution_clock::now();
std::cout<<"生成10条日志耗时:"<<std::chrono::duration_cast<std::chrono::microseconds>(t_10-start).count()<<"ms"<<std::endl;
count++;
}
};
};
std::thread thread_log_data(log_data_func);
std::this_thread::sleep_for(std::chrono::seconds(1)); // 等待1秒,确保生产者线程启动
// 日志写入磁盘
auto log_disk_func=[&log_buffer,&log_file,&log_path,&log_mutex,&stop_flag](){
//计时开始
std::chrono::high_resolution_clock::time_point start = std::chrono::high_resolution_clock::now();
int count=0;
int count_spin=0;
while(true)
{
//判断退出循环
if (count>10)
{
break;
}
if(!log_buffer.empty())
{
std::chrono::high_resolution_clock::time_point t1 = std::chrono::high_resolution_clock::now();
//打开文件
log_file.open(log_path,std::ios::app);
//写日志
std::unique_lock<std::mutex> lock(log_mutex);
auto data=log_buffer.front();
log_buffer.pop_front();
lock.unlock();
log_file<<data<<std::endl;
//关闭文件
log_file.close();
std::chrono::high_resolution_clock::time_point t2 = std::chrono::high_resolution_clock::now();
// std::cout<<"写日志耗时:"<<std::chrono::duration_cast<std::chrono::microseconds>(t2-t1).count()<<"微秒"<<std::endl;
count++;
}
else
{
count_spin++;
}
}
//通知生产者
stop_flag=true;
//计时结束
std::chrono::high_resolution_clock::time_point end = std::chrono::high_resolution_clock::now();
std::cout<<"io耗时:"<<std::chrono::duration_cast<std::chrono::microseconds>(end-start).count()<<"ms"<<std::endl;
//缓存积压
std::cout<<"缓存积压:"<<log_buffer.size()<<std::endl;
//空转次数
std::cout<<"空转次数:"<<count_spin<<std::endl;
};
std::thread thread_log_disk(log_disk_func);
thread_log_data.join();
thread_log_disk.join();
}
int main()
{
// 空转问题
std::cout<<"----------------"<<std::endl;
async_log_qustion_spin();
std::cout<<"----------------"<<std::endl;
return 0;
}
运行结果
----------------
异步解耦日志系统,消费者空转问题
生成10条日志耗时:1103416ms
io耗时:204890ms
缓存积压:0
空转次数:105404254
----------------
结果分析
可以看到,消费者实际记录日志只有10条,但是空转次数高达105404254次,导致cpu资源大量浪费。
同时消费速度加快,缓冲区内存直接不再积压。
解决cpu空转问题
cpu的核心是生产者的速度大于消费者的速度,两者速度受算法的影响一般不能直接修改,
但是退一步看,可以让消费者在没有数据的时候休眠,节省cpu资源。
实现方式就是使用条件变量
在缓冲区没有数据的时候,消费者线程等待,当生产者生产数据后,通知消费者线程可以消费数据。
当缓冲区满的时候,生产者线程等待,当消费者消费数据后,通知生产者线程可以生产数据。
代码实现
生产者生产数据时,如果缓冲区已满,则等待,直到消费者消费数据后,通知生产者可以生产数据
消费者消费数据时,如果缓冲区为空,则等待,直到生产者生产数据后,通知消费者可以消费数据
// 消费者空转
void async_log_qustion_spin()
{
std::cout<<"异步解耦日志系统,消费者空转问题"<<std::endl;
std::ofstream log_file;
std::string log_path="log2.txt";
// 缓存尺寸
int buffer_size=10;
// 缓存
std::deque<std::string> log_buffer;
// 资源上锁
std::mutex log_mutex;
// 停止标志
std::atomic<bool> stop_flag(false);
// cpu内存处理数据
auto log_data_func=[&log_buffer,&log_mutex,&buffer_size,&stop_flag](){
int count=0;
std::chrono::high_resolution_clock::time_point start = std::chrono::high_resolution_clock::now();
while(true)
{
if(log_buffer.size()<buffer_size)
{
// std::chrono::high_resolution_clock::time_point t1 = std::chrono::high_resolution_clock::now();
//创建数据
std::string large_data(2048, 'x'); // 2KB 数据
//时间记录
auto time = std::chrono::system_clock::now();
auto time_t = std::chrono::system_clock::to_time_t(time);
std::string content_time=std::ctime(&time_t);
// std::cout<<"生成日志:"<<content_time<<std::endl;
std::string content="["+content_time+"] "+large_data;
// 增加生成时间
std::this_thread::sleep_for(std::chrono::milliseconds(100));
//写入缓存
std::unique_lock<std::mutex> lock(log_mutex);
log_buffer.push_back(content);
lock.unlock();
// std::chrono::high_resolution_clock::time_point t2 = std::chrono::high_resolution_clock::now();
// std::cout<<"处理业务耗时:"<<std::chrono::duration_cast<std::chrono::microseconds>(t2-t1).count()<<"微秒"<<std::endl<<std::endl;
count++;
}
if(stop_flag)
{
break;
}
if(count==10)
{
std::chrono::high_resolution_clock::time_point t_10 = std::chrono::high_resolution_clock::now();
std::cout<<"生成10条日志耗时:"<<std::chrono::duration_cast<std::chrono::microseconds>(t_10-start).count()<<"ms"<<std::endl;
count++;
}
};
};
std::thread thread_log_data(log_data_func);
std::this_thread::sleep_for(std::chrono::seconds(1)); // 等待1秒,确保生产者线程启动
// 日志写入磁盘
auto log_disk_func=[&log_buffer,&log_file,&log_path,&log_mutex,&stop_flag](){
//计时开始
std::chrono::high_resolution_clock::time_point start = std::chrono::high_resolution_clock::now();
int count=0;
int count_spin=0;
while(true)
{
//判断退出循环
if (count>10)
{
break;
}
if(!log_buffer.empty())
{
std::chrono::high_resolution_clock::time_point t1 = std::chrono::high_resolution_clock::now();
//打开文件
log_file.open(log_path,std::ios::app);
//写日志
std::unique_lock<std::mutex> lock(log_mutex);
auto data=log_buffer.front();
log_buffer.pop_front();
lock.unlock();
log_file<<data<<std::endl;
//关闭文件
log_file.close();
std::chrono::high_resolution_clock::time_point t2 = std::chrono::high_resolution_clock::now();
// std::cout<<"写日志耗时:"<<std::chrono::duration_cast<std::chrono::microseconds>(t2-t1).count()<<"微秒"<<std::endl;
count++;
}
else
{
count_spin++;
}
}
//通知生产者
stop_flag=true;
//计时结束
std::chrono::high_resolution_clock::time_point end = std::chrono::high_resolution_clock::now();
std::cout<<"io耗时:"<<std::chrono::duration_cast<std::chrono::microseconds>(end-start).count()<<"ms"<<std::endl;
//缓存积压
std::cout<<"缓存积压:"<<log_buffer.size()<<std::endl;
//空转次数
std::cout<<"空转次数:"<<count_spin<<std::endl;
};
std::thread thread_log_disk(log_disk_func);
thread_log_data.join();
thread_log_disk.join();
}
// 引入条件变量,解决空转问题
void async_log_solution_spin()
{
// 打印系统说明
std::cout<<"异步解耦日志系统,消费者空转问题"<<std::endl;
std::cout<<"异步解耦日志系统,使用条件变量解决消费者空转问题"<<std::endl;
// 日志文件相关初始化
std::ofstream log_file; // 文件输出流
std::string log_path="log2.txt"; // 日志文件路径
// 缓存尺寸
int buffer_size=10;
// 缓存
std::deque<std::string> log_buffer;
// 资源上锁
std::mutex log_mutex;
// 停止标志
std::atomic<bool> stop_flag(false);
// 条件变量
std::condition_variable log_condition;
// cpu内存处理数据
auto log_data_func=[&log_buffer,&log_mutex,&buffer_size,&stop_flag,&log_condition](){
int count=0;
std::chrono::high_resolution_clock::time_point start = std::chrono::high_resolution_clock::now();
while(true)
{
// std::chrono::high_resolution_clock::time_point t1 = std::chrono::high_resolution_clock::now();
//创建数据
std::string large_data(2048, 'x'); // 2KB 数据
//时间记录
auto time = std::chrono::system_clock::now();
auto time_t = std::chrono::system_clock::to_time_t(time);
std::string content_time=std::ctime(&time_t);
// std::cout<<"生成日志:"<<content_time<<std::endl;
std::string content="["+content_time+"]"+large_data;
// 增加生成时间
std::this_thread::sleep_for(std::chrono::milliseconds(100));
//不满足条件时挂起
std::unique_lock<std::mutex> lock(log_mutex);
log_condition.wait(lock,[&log_buffer,&buffer_size,&stop_flag](){
return log_buffer.size()<buffer_size||stop_flag;
});
if(stop_flag)
{
break;
}
// std::cout<<"写入缓存"<<std::endl;
log_buffer.push_back(content);
lock.unlock();
// 生成数据完成,通知消费者
log_condition.notify_one();
// std::chrono::high_resolution_clock::time_point t2 = std::chrono::high_resolution_clock::now();
// std::cout<<"处理业务耗时:"<<std::chrono::duration_cast<std::chrono::microseconds>(t2-t1).count()<<"微秒"<<std::endl<<std::endl;
count++;
if(count==10)
{
std::chrono::high_resolution_clock::time_point t_10 = std::chrono::high_resolution_clock::now();
std::cout<<"生成10条日志耗时:"<<std::chrono::duration_cast<std::chrono::microseconds>(t_10-start).count()<<"ms"<<std::endl;
}
};
// std::cout<<"生成日志结束"<<std::endl;
// std::cout<<"生成日志数量:"<<count<<std::endl;
};
std::thread thread_log_data(log_data_func);
std::this_thread::sleep_for(std::chrono::seconds(1)); // 等待1秒,确保生产者先运行
// 日志写入磁盘
auto log_disk_func=[&log_buffer,&log_file,&log_path,&log_mutex,&stop_flag,&log_condition](){
//计时开始
std::chrono::high_resolution_clock::time_point start = std::chrono::high_resolution_clock::now();
int count=0;
while(true)
{
// std::chrono::high_resolution_clock::time_point t1 = std::chrono::high_resolution_clock::now();
//打开文件
log_file.open(log_path,std::ios::app);
//写日志
// std::cout<<"写入日志"<<std::endl;
// 不满足条件时挂起
std::unique_lock<std::mutex> lock(log_mutex);
log_condition.wait(lock,[&log_buffer](){
return !log_buffer.empty();
});
auto data=log_buffer.front();
log_buffer.pop_front();
lock.unlock();
log_file<<data<<std::endl;
//关闭文件
log_file.close();
// std::chrono::high_resolution_clock::time_point t2 = std::chrono::high_resolution_clock::now();
// std::cout<<"写日志耗时:"<<std::chrono::duration_cast<std::chrono::microseconds>(t2-t1).count()<<"微秒"<<std::endl;
count++;
//判断退出循环
if (count>=10)
{
//通知生产者停止
stop_flag=true;
log_condition.notify_one();
break;
}
// 写入完成,通知生产者生产
log_condition.notify_one();
}
//空转次数
int count_spin=count-10;
//计时结束
std::cout<<"写入日志结束"<<std::endl;
std::chrono::high_resolution_clock::time_point end = std::chrono::high_resolution_clock::now();
std::cout<<"io耗时:"<<std::chrono::duration_cast<std::chrono::microseconds>(end-start).count()<<"ms"<<std::endl;
//缓存积压
std::cout<<"缓存积压:"<<log_buffer.size()<<std::endl;
//空转次数
std::cout<<"空转次数:"<<count_spin<<std::endl;
};
std::thread thread_log_disk(log_disk_func);
thread_log_data.join();
thread_log_disk.join();
}
int main()
{
// 空转问题
std::cout<<"----------------"<<std::endl;
async_log_qustion_spin();
std::cout<<"----------------"<<std::endl;
async_log_solution_spin();
std::cout<<"----------------"<<std::endl;
return 0;
}
运行结果:
----------------
异步解耦日志系统,消费者空转问题
生成10条日志耗时:1093046ms
io耗时:200463ms
缓存积压:0
空转次数:112735095
----------------
异步解耦日志系统,消费者空转问题
异步解耦日志系统,使用条件变量解决消费者空转问题
生成10条日志耗时:1083089ms
写入日志结束
io耗时:78324ms
缓存积压:0
空转次数:0
结果分析:
可以看到,使用条件变量调节消费线程休眠后,空转次数从112735095次降到了0次,说明消费者不再空转了。
结论:
使用条件变量可以解决消费者空转问题,节约cpu资源。
更多推荐


所有评论(0)