并发编程(c++)——3.生产者消费者者模式笔记-4总结
针对编程中普遍能看到的生产消费状态,为了提高在相关场景中的效率,处理模式呈现出一系列演化,这里对前面三章的内容进行总结。
问题:
1.同步耦合生产和消费,导致生产者等待消费者,消费者等待生产者,相互阻塞
2.数据生产和消耗速度不一致
2.1生产速度快导致数据积压
2.2消费者处理速度快导致cup空转
如何解决:
1.异步解耦合,分离生产消费线程。
2.1设置缓存尺寸,调节内存压力。
2.2设置条件变量,调节生产消费速度。
实现:
分离生产者和消费者在两个子线程中运行
创建共享变量作为通信,生产者和消费者能同时访问到这个变量
资源上锁,防止多个线程同时访问共享变量,导致数据混乱
设置共享变量尺寸,当共享变量达到尺寸后暂停生产数据,避免数据积压
使用条件变量控制生产者和消费者的运行和休眠,当共享变量内没有数据时消费者线程休眠,防止cpu空转
演化过程:
阶段一:同步耦合
背景:
在很多场景中有一部分需要生产数据,一部分需要消费数据
比如日志记录系统,一部分需要生产字符串,一部分记录到磁盘
解决:
生产和消费以同步方式进行
生产者生产一个数据,消费者消费一个数据,
生产者生产完一个数据后,消费者消费完一个数据后,再生产下一个数据,以此类推
阶段二:异步解耦合
2.1:异步分离生产者和消费者
问题:
相互阻塞
当消费者处理缓慢后,生产者会一直等待消费者处理完数据后才能生产下一个数据,导致生产者等待
同理,当生产者处理缓慢后,消费者会一直等待生产者生产完数据后才能消费下一个数据,导致消费者等待
解决:
把者两个功能做成两个独立的线程
两者不相互依赖,同时运行
各自只关注需要处理的数据
2.2:共享变量的引入
问题:
无协调机制
生产者和消费者无法关联起来
解决:
创建共享变量作为通信手段
生产者和消费者能同时访问到这个变量
生产者生产数据后,将数据放入缓冲区
消费者从缓冲区中取出数据,进行消费
2.3:互斥锁的引入
问题:
数据竞争
当生产者生产数据时,消费者也在消费数据,导致数据混乱
同理,当消费者消费数据时,生产者也在生产数据,导致数据混乱
解决:
引入互斥锁
当生产者生产数据时,消费者无法访问缓冲区
当消费者消费数据时,生产者无法访问缓冲区
阶段三:有界缓冲区的引入
问题:
数据积压
当生产者生产速度大于消费者消费速度时,会导致数据积压,当数据过多时,会导致内存溢出
解决:
对共享变量进行尺寸限制,当共享变量达到尺寸后暂停生产数据,避免数据积压
阶段四:条件变量
问题:
忙等待
消费者缓冲区空依旧在工作,这个时候是在空循环,会导致cpu空转
解决:
引入条件变量
消费者消费数据时,如果缓冲区为空,则等待,直到生产者生产数据后,通知消费者可以消费数据
生产者生产数据时,如果缓冲区已满,则等待,直到消费者消费数据后,通知生产者可以生产数据
案例
日志系统
对比最初的同步耦合和最终消费者生产者模式
代码实现
这里再生产和消费数据中各加了100ms的延时,拉长数据处理的时间
因为当处理时间短时,生产者消费者模式优势不明显,甚至会显得比同步耦合更慢
// 基础实现,同步耦合
void sync_log()
{
std::cout<<"同步耦合日志系统"<<std::endl;
std::ofstream log_file;
std::string log_path="log1.txt";
std::chrono::high_resolution_clock::time_point start = std::chrono::high_resolution_clock::now();
for(int i=0;i<10;i++)
{
log_file.open(log_path,std::ios::app);
//cpu内存处理数据
// std::chrono::high_resolution_clock::time_point t1 = std::chrono::high_resolution_clock::now();
//大量数据
std::string large_data(2048, 'x'); // 2KB 数据
//时间记录
auto time = std::chrono::system_clock::now();
auto time_t = std::chrono::system_clock::to_time_t(time);
std::string content_time=std::ctime(&time_t);
// std::cout<<"日志记录时刻:"<<content_time;
std::string content="["+content_time+"] "+large_data;
// 延时
std::this_thread::sleep_for(std::chrono::milliseconds(100));
std::chrono::high_resolution_clock::time_point t2 = std::chrono::high_resolution_clock::now();
//用户缓存区写日志
log_file<<content<<std::endl;
//写磁盘
log_file.close();
// 延时
std::this_thread::sleep_for(std::chrono::milliseconds(100));
// std::chrono::high_resolution_clock::time_point t3 = std::chrono::high_resolution_clock::now();
// std::cout<<"处理业务耗时:"<<std::chrono::duration_cast<std::chrono::microseconds>(t2-t1).count()<<"微秒"<<std::endl;
// std::cout<<"写日志耗时:"<<std::chrono::duration_cast<std::chrono::microseconds>(t3-t2).count()<<"微秒"<<std::endl<<std::endl;
}
// 写入磁盘
std::chrono::high_resolution_clock::time_point end = std::chrono::high_resolution_clock::now();
std::cout<<"生成日志和io相互阻塞"<<std::endl;
std::cout<<"耗时:"<<std::chrono::duration_cast<std::chrono::microseconds>(end-start).count()<<"ms"<<std::endl;
}
//最终实现,异步解耦,条件变量
void async_log_solution_all()
{
// 打印系统说明
std::cout<<"异步解耦日志系统,生产者消费者模式"<<std::endl;
// 日志文件相关初始化
std::ofstream log_file; // 文件输出流
std::string log_path="log2.txt"; // 日志文件路径
// 缓存尺寸
int buffer_size=10;
// 缓存
std::deque<std::string> log_buffer;
// 资源上锁
std::mutex log_mutex;
// 停止标志
std::atomic<bool> stop_flag(false);
// 条件变量
std::condition_variable log_condition;
// cpu内存处理数据
auto log_data_func=[&log_buffer,&log_mutex,&buffer_size,&stop_flag,&log_condition](){
int count=0;
std::chrono::high_resolution_clock::time_point start = std::chrono::high_resolution_clock::now();
while(true)
{
// std::chrono::high_resolution_clock::time_point t1 = std::chrono::high_resolution_clock::now();
//创建数据
std::string large_data(2048, 'x'); // 2KB 数据
//时间记录
auto time = std::chrono::system_clock::now();
auto time_t = std::chrono::system_clock::to_time_t(time);
std::string content_time=std::ctime(&time_t);
// std::cout<<"生成日志:"<<content_time<<std::endl;
std::string content="["+content_time+"]"+large_data;
// 增加生成时间
std::this_thread::sleep_for(std::chrono::milliseconds(100));
//不满足条件时挂起
std::unique_lock<std::mutex> lock(log_mutex);
log_condition.wait(lock,[&log_buffer,&buffer_size,&stop_flag](){
return log_buffer.size()<buffer_size||stop_flag;
});
// std::cout<<"写入缓存"<<std::endl;
log_buffer.push_back(content);
count++;
if(stop_flag)
{
break;
}
lock.unlock();
// 生成数据完成,通知消费者
log_condition.notify_one();
// std::chrono::high_resolution_clock::time_point t2 = std::chrono::high_resolution_clock::now();
// std::cout<<"处理业务耗时:"<<std::chrono::duration_cast<std::chrono::microseconds>(t2-t1).count()<<"微秒"<<std::endl<<std::endl;
if(count==10)
{
std::chrono::high_resolution_clock::time_point t_10 = std::chrono::high_resolution_clock::now();
std::cout<<"生成10条日志耗时:"<<std::chrono::duration_cast<std::chrono::microseconds>(t_10-start).count()<<"ms"<<std::endl;
}
};
// std::cout<<"生成日志结束"<<std::endl;
// std::cout<<"生成日志数量:"<<count<<std::endl;
};
std::thread thread_log_data(log_data_func);
std::this_thread::sleep_for(std::chrono::seconds(1)); // 等待1秒,确保生产者先运行
// 日志写入磁盘
auto log_disk_func=[&log_buffer,&log_file,&log_path,&log_mutex,&stop_flag,&log_condition](){
//计时开始
std::chrono::high_resolution_clock::time_point start = std::chrono::high_resolution_clock::now();
int count=0;
while(true)
{
// std::chrono::high_resolution_clock::time_point t1 = std::chrono::high_resolution_clock::now();
//打开文件
log_file.open(log_path,std::ios::app);
//写日志
// std::cout<<"写入日志"<<std::endl;
// 不满足条件时挂起
std::unique_lock<std::mutex> lock(log_mutex);
log_condition.wait(lock,[&log_buffer](){
return !log_buffer.empty();
});
auto data=log_buffer.front();
log_buffer.pop_front();
lock.unlock();
log_file<<data<<std::endl;
//关闭文件
log_file.close();
//增加写入时间
std::this_thread::sleep_for(std::chrono::milliseconds(100));
// std::chrono::high_resolution_clock::time_point t2 = std::chrono::high_resolution_clock::now();
// std::cout<<"写日志耗时:"<<std::chrono::duration_cast<std::chrono::microseconds>(t2-t1).count()<<"微秒"<<std::endl;
count++;
//判断退出循环
if (count>=10)
{
//通知生产者停止
stop_flag=true;
log_condition.notify_one();
break;
}
// 写入完成,通知生产者生产
log_condition.notify_one();
}
//空转次数
int count_spin=count-10;
//计时结束
std::cout<<"写入日志结束"<<std::endl;
std::chrono::high_resolution_clock::time_point end = std::chrono::high_resolution_clock::now();
std::cout<<"io耗时:"<<std::chrono::duration_cast<std::chrono::microseconds>(end-start).count()<<"ms"<<std::endl;
//缓存积压
std::cout<<"缓存积压:"<<log_buffer.size()<<std::endl;
//空转次数
std::cout<<"空转次数:"<<count_spin<<std::endl;
};
std::thread thread_log_disk(log_disk_func);
thread_log_data.join();
thread_log_disk.join();
}
运行结果
----------------
同步耦合日志系统
生成日志和io相互阻塞
耗时:2184746ms
----------------
异步解耦日志系统,生产者消费者模式
生成10条日志耗时:1085299ms
写入日志结束
io耗时:1090205ms
缓存积压:9
空转次数:0
----------------
总结
当满足模式需求时,生产者消费者模式效率远高于常用的同步阻塞模式。
更多推荐
所有评论(0)