并发编程(c++)——3.生产者消费者者模式笔记-1阻塞问题
生产者消费者模式是并发编程的核心模式之一,核心是想要提高程序的运行效率。
这里记录一下自己的思考,使用通俗易懂的语言,和以日志记录为例,解读生产者消费者模式,并实现生产者消费者模式。
将生产者消费者模式的核心内容划分为三个问题:阻塞问题、内存积压问题、cpu空转问题。
这里是第一章,阻塞问题。
阻塞问题
生产者消费者模式首先就是要解决相互阻塞的问题。
在编程过程中循环是普遍存在的,循环内部普遍来说是顺序执行,顺序结构可以分为上下两个部分。
程序上方的输出数据往往是程序下方的输入,从数据的角度看,程序上方就是生产过程,称为生产者,程序下方就是消费过程,称为消费者。
这种结构下生产者和消费者之间必然相互等待,如果两个两者耗时较长,那么就会导致程序运行效率降低,这就称之为程序之间相互阻塞。
实际场景
日志记录系统就是一个典型的生产者消费者模式。
项目运行时,需要循环记录日志,在循环内部,程序上方是生成日志,程序下方是写入日志文件。
所以生产者就是生成字符串,消费者是写入日志文件。
代码实现
程序在同一个线程中循环执行,循环体中使用顺序结构,生产一个字符串,紧接着在磁盘中写入一个字符串。
这就是同步耦合实现,这样会导致生产者生产和消费者相互等待,即相互阻塞。
void sync_log()
{
std::cout<<"同步耦合日志系统"<<std::endl;
std::ofstream log_file;
std::string log_path="log1.txt";
std::chrono::high_resolution_clock::time_point start = std::chrono::high_resolution_clock::now();
for(int i=0;i<10;i++)
{
log_file.open(log_path,std::ios::app);
//cpu内存处理数据
std::chrono::high_resolution_clock::time_point t1 = std::chrono::high_resolution_clock::now();
//大量数据
std::string large_data(2048, 'x'); // 2KB 数据
//时间记录
auto time = std::chrono::system_clock::now();
auto time_t = std::chrono::system_clock::to_time_t(time);
std::string content_time=std::ctime(&time_t);
std::cout<<"日志记录时刻:"<<content_time;
std::string content="["+content_time+"] "+large_data;
std::chrono::high_resolution_clock::time_point t2 = std::chrono::high_resolution_clock::now();
//用户缓存区写日志
log_file<<content<<std::endl;
//写磁盘
log_file.close();
std::chrono::high_resolution_clock::time_point t3 = std::chrono::high_resolution_clock::now();
std::cout<<"处理业务耗时:"<<std::chrono::duration_cast<std::chrono::microseconds>(t2-t1).count()<<"微秒"<<std::endl;
std::cout<<"写日志耗时:"<<std::chrono::duration_cast<std::chrono::microseconds>(t3-t2).count()<<"微秒"<<std::endl<<std::endl;
}
// 写入磁盘
std::chrono::high_resolution_clock::time_point end = std::chrono::high_resolution_clock::now();
std::cout<<"生成日志和io相互阻塞"<<std::endl;
std::cout<<"耗时:"<<std::chrono::duration_cast<std::chrono::microseconds>(end-start).count()<<"ms"<<std::endl;
}
int main()
{
sync_log();
return 0;
}
运行结果:
同步耦合日志系统
日志记录时刻:Fri May 15 10:20:42 2026
处理业务耗时:2101微秒
写日志耗时:141微秒
日志记录时刻:Fri May 15 10:20:42 2026
处理业务耗时:68微秒
写日志耗时:112微秒
日志记录时刻:Fri May 15 10:20:42 2026
处理业务耗时:155微秒
写日志耗时:87微秒
日志记录时刻:Fri May 15 10:20:42 2026
处理业务耗时:86微秒
写日志耗时:111微秒
日志记录时刻:Fri May 15 10:20:42 2026
处理业务耗时:78微秒
写日志耗时:100微秒
日志记录时刻:Fri May 15 10:20:42 2026
处理业务耗时:56微秒
写日志耗时:95微秒
日志记录时刻:Fri May 15 10:20:42 2026
处理业务耗时:59微秒
写日志耗时:68微秒
日志记录时刻:Fri May 15 10:20:42 2026
处理业务耗时:45微秒
写日志耗时:85微秒
日志记录时刻:Fri May 15 10:20:42 2026
处理业务耗时:52微秒
写日志耗时:88微秒
日志记录时刻:Fri May 15 10:20:42 2026
处理业务耗时:66微秒
写日志耗时:107微秒
生成日志和io相互阻塞
耗时:8398ms
结果分析
每一次循环都需要生成一个字符串,除了在第一次循环,其他每一次都需要等待写入日志完成后才能生成字符串,
同理,写入磁盘也类似,包括第一次循环,每一次都需要等待生成字符串完成后才能写入磁盘。
这就是生产者和消费者速度相互阻塞,生成字符串是生产者,写入磁盘是消费者,两者相互等待,导致程序整体效率低。
解决阻塞
生产者和消费者相互阻塞如何解决?
同步耦合
深入分析,阻塞问题的根本原因在于同步耦合,
即调用方必须等待被调用方完成,才能继续执行的模式。
在生产者消费者视角下看,
同步耦合来自于生产者和消费者在同一个线程、处于同一个顺序结构中上下游的位置,当循环时前后者就会相互制约。
异步解耦合
既然阻塞时同步耦合导致的,那么解决就需要异步解耦合。
即调用方不需要等待被调用方完成,就可以继续执行的模式。
在生产者消费者视角下看,
就需要将生产者和消费者放在两个独立的线程中运行,这样两者不相互依赖,同时运行。
代码实现
分离生产者和消费者线程
生产者只需要生成日志字符串,不需要关心写入日志文件
消费者只需要写入日志文件,不需要关心生成日志字符串
但是这样又会产生新的问题,生产者生产数据,消费者消费数据,两者如何关联起来呢?
为了解决这些问题,我们可以创建共享变量作为通信方式
生产者和消费者能同时访问到这个变量
生产者生产数据后,将数据放入共享变量
消费者消费数据时,从共享变量中取出数据,进行消费
由于是多线程访问共享变量,所以需要引入互斥锁
当生产者生产数据时,消费者无法访问共享变量
当消费者消费数据时,生产者无法访问共享变量
避免了数据竞争,保证了数据安全
这样生产者和消费者就互不依赖,同时运行,解决了阻塞问题
这样就是异步解耦合,程序完成相同的功能,但是效率更高。
void sync_log()
{
std::cout<<"同步耦合日志系统"<<std::endl;
std::ofstream log_file;
std::string log_path="log1.txt";
std::chrono::high_resolution_clock::time_point start = std::chrono::high_resolution_clock::now();
for(int i=0;i<10;i++)
{
log_file.open(log_path,std::ios::app);
//cpu内存处理数据
// std::chrono::high_resolution_clock::time_point t1 = std::chrono::high_resolution_clock::now();
//大量数据
std::string large_data(2048, 'x'); // 2KB 数据
//时间记录
auto time = std::chrono::system_clock::now();
auto time_t = std::chrono::system_clock::to_time_t(time);
std::string content_time=std::ctime(&time_t);
// std::cout<<"日志记录时刻:"<<content_time;
std::string content="["+content_time+"] "+large_data;
std::chrono::high_resolution_clock::time_point t2 = std::chrono::high_resolution_clock::now();
//用户缓存区写日志
log_file<<content<<std::endl;
//写磁盘
log_file.close();
// std::chrono::high_resolution_clock::time_point t3 = std::chrono::high_resolution_clock::now();
// std::cout<<"处理业务耗时:"<<std::chrono::duration_cast<std::chrono::microseconds>(t2-t1).count()<<"微秒"<<std::endl;
// std::cout<<"写日志耗时:"<<std::chrono::duration_cast<std::chrono::microseconds>(t3-t2).count()<<"微秒"<<std::endl<<std::endl;
}
// 写入磁盘
std::chrono::high_resolution_clock::time_point end = std::chrono::high_resolution_clock::now();
std::cout<<"生成日志和io相互阻塞"<<std::endl;
std::cout<<"耗时:"<<std::chrono::duration_cast<std::chrono::microseconds>(end-start).count()<<"ms"<<std::endl;
}
void async_log()
{
std::cout<<"异步解耦日志系统,不再相互阻塞"<<std::endl;
std::ofstream log_file;
std::string log_path="log2.txt";
// 缓存
std::deque<std::string> log_buffer;
// 停止标志
std::atomic<bool> stop_flag(false);
// 加锁,避免数据竞争
std::mutex log_mutex;
// cpu内存处理数据
auto log_data_func=[&log_buffer,&stop_flag,&log_mutex](){
int count=1;
std::chrono::high_resolution_clock::time_point start = std::chrono::high_resolution_clock::now();
while(true)
{
// std::chrono::high_resolution_clock::time_point t1 = std::chrono::high_resolution_clock::now();
//创建数据
std::string large_data(2048, 'x'); // 4KB 数据
// std::this_thread::sleep_for(std::chrono::milliseconds(100));
//时间记录
auto time = std::chrono::system_clock::now();
auto time_t = std::chrono::system_clock::to_time_t(time);
std::string content_time=std::ctime(&time_t);
// std::cout<<"生成日志:"<<content_time<<std::endl;
std::string content="["+content_time+"] "+large_data;
//写入缓存
std::unique_lock<std::mutex> lock(log_mutex);
log_buffer.push_back(content);
lock.unlock();
// std::chrono::high_resolution_clock::time_point t2 = std::chrono::high_resolution_clock::now();
// std::cout<<"处理业务耗时:"<<std::chrono::duration_cast<std::chrono::microseconds>(t2-t1).count()<<"微秒"<<std::endl<<std::endl;
count++;
if(count==10)
{
std::chrono::high_resolution_clock::time_point t_10 = std::chrono::high_resolution_clock::now();
std::cout<<"生成10条日志耗时:"<<std::chrono::duration_cast<std::chrono::microseconds>(t_10-start).count()<<"ms"<<std::endl;
}
if (stop_flag)
{
break;
}
};
std::cout<<"生成日志结束"<<std::endl;
std::cout<<"总共生成数据量:"<<count<<"条"<<std::endl;
};
std::thread thread_log_data(log_data_func);
// 日志写入磁盘
auto log_disk_func=[&log_buffer,&log_file,&log_path,&stop_flag,&log_mutex](){
//计时开始
std::chrono::high_resolution_clock::time_point start = std::chrono::high_resolution_clock::now();
int count=1;
while(true)
{
//判断退出循环
if (count>10)
{
break;
}
if(!log_buffer.empty())
{
std::chrono::high_resolution_clock::time_point t1 = std::chrono::high_resolution_clock::now();
//打开文件
log_file.open(log_path,std::ios::app);
//写日志
std::unique_lock<std::mutex> lock(log_mutex);
auto data=log_buffer.front();
log_buffer.pop_front();
lock.unlock();
log_file<<data<<std::endl;
//关闭文件
log_file.close();
std::chrono::high_resolution_clock::time_point t2 = std::chrono::high_resolution_clock::now();
// std::cout<<"写日志耗时:"<<std::chrono::duration_cast<std::chrono::microseconds>(t2-t1).count()<<"微秒"<<std::endl;
count++;
}
}
//通知生产进程结束
stop_flag = true;
//计时结束
std::chrono::high_resolution_clock::time_point end = std::chrono::high_resolution_clock::now();
std::cout<<"io耗时:"<<std::chrono::duration_cast<std::chrono::microseconds>(end-start).count()<<"ms"<<std::endl;
};
std::thread thread_log_disk(log_disk_func);
thread_log_data.join();
thread_log_disk.join();
}
int main()
{
// 同步阻塞问题
sync_log();
std::cout<<"----------------"<<std::endl;
async_log();
return 0;
}
运行结果:
同步耦合日志系统
生成日志和io相互阻塞
耗时:4978ms
----------------
异步解耦日志系统,不再相互阻塞
生成10条日志耗时:46ms
io耗时:3864ms
生成日志结束
总共生成数据量:600条
结果分析:
同步耦合日志系统,生成日志和io相互阻塞,记录10条日志到文件,共耗时4978ms
异步解耦日志系统,生成日志和io不再相互阻塞,记录10条日志到文件,共耗时3864ms
异步解耦合日志系统,当10条日志记录结束时,总共生成数据量都已经生成600条日志了
同样是记录10条日志,相互阻塞的方式耗时4978ms,非阻塞的方式耗时3864ms,非阻塞的方式耗时明显更短
同样是记录10条日志,相互阻塞的方式只生成了10条日志,而非阻塞的方式生成了600条,非阻塞的方式生成了更多的数据
结论
异步解耦合的方式以更短的时间生成了达到同样的记录效果,并且生成了更多的数据,说明异步解耦合的方式更高效
更多推荐

所有评论(0)