生产者消费者模式是并发编程的核心模式之一,核心是想要提高程序的运行效率。
这里记录一下自己的思考,使用通俗的语言,和以日志记录为例,解读生产者消费者模式,并实现生产者消费者模式。
将生产者消费者模式的核心内容划分为三个问题:阻塞问题、内存积压问题、cpu空转问题。
这里是第三章,cpu空转问题。

cpu空转问题

实际运行中,生产者和消费者的速度是不一致的,
当生产者生产速度大于消费者消费速度时,会导致数据积压,当数据过多时,还会导致内存溢出,这个问题在上一章中已经提到并解决了。
那么,生产者生产速度小于消费者消费速度时,会产生什么问题吗。
答案是,当生产者生产速度小于消费者消费速度时,会使得缓冲区数据出现短暂空缺,此时消费者没有数据消耗就一直在忙等待,这会导致cpu空转,平白消耗cpu资源。

日志场景

由于日志案例中,生产者生产速度大于消费者消费速度,所以没有体现这个问题。
但是从第一章阻塞问题中可以看到,日志生产的时间比日志消费的时间要短30ms左右,只要在生产时让线程休眠100ms,这个问题就可以体现出来。

代码实现

在生产线程中,让线程休眠100ms,这样生产速度就小于消费速度了。
在消费线程中统计while循环的次数。
在两个线程启动中间,让主线程休眠1s,保证生产线程已经启动。

// 消费者空转
void async_log_qustion_spin()
{
    std::cout<<"异步解耦日志系统,消费者空转问题"<<std::endl;
    std::ofstream log_file;
    std::string log_path="log2.txt";
    // 缓存尺寸
    int buffer_size=10;
    // 缓存
    std::deque<std::string> log_buffer;
    // 资源上锁
    std::mutex log_mutex;
    // 停止标志
    std::atomic<bool> stop_flag(false);
    // cpu内存处理数据
    auto log_data_func=[&log_buffer,&log_mutex,&buffer_size,&stop_flag](){
        int count=0;
        std::chrono::high_resolution_clock::time_point start = std::chrono::high_resolution_clock::now();
        while(true)
        {
            if(log_buffer.size()<buffer_size)
            {
                // std::chrono::high_resolution_clock::time_point t1 = std::chrono::high_resolution_clock::now();
                //创建数据
                std::string large_data(2048, 'x');  // 2KB 数据
                //时间记录
                auto time = std::chrono::system_clock::now();
                auto time_t = std::chrono::system_clock::to_time_t(time);
                std::string content_time=std::ctime(&time_t);
                // std::cout<<"生成日志:"<<content_time<<std::endl;
                std::string content="["+content_time+"] "+large_data;
                // 增加生成时间
                std::this_thread::sleep_for(std::chrono::milliseconds(100));
                //写入缓存
                std::unique_lock<std::mutex> lock(log_mutex);
                log_buffer.push_back(content);
                lock.unlock();
                // std::chrono::high_resolution_clock::time_point t2 = std::chrono::high_resolution_clock::now();
                // std::cout<<"处理业务耗时:"<<std::chrono::duration_cast<std::chrono::microseconds>(t2-t1).count()<<"微秒"<<std::endl<<std::endl;
                count++;
            }
            if(stop_flag)
            {
                break;
            }
            if(count==10)
            {
                std::chrono::high_resolution_clock::time_point t_10 = std::chrono::high_resolution_clock::now();
                std::cout<<"生成10条日志耗时:"<<std::chrono::duration_cast<std::chrono::microseconds>(t_10-start).count()<<"ms"<<std::endl;
                count++;
            }
        };
    };
    std::thread thread_log_data(log_data_func);
    std::this_thread::sleep_for(std::chrono::seconds(1));    // 等待1秒,确保生产者线程启动
    // 日志写入磁盘
    auto log_disk_func=[&log_buffer,&log_file,&log_path,&log_mutex,&stop_flag](){
        //计时开始
        std::chrono::high_resolution_clock::time_point start = std::chrono::high_resolution_clock::now();
        int count=0;
        int count_spin=0;
        while(true)
        {
            //判断退出循环
            if (count>10)
            {
                break;
            }
            if(!log_buffer.empty())
            {
                std::chrono::high_resolution_clock::time_point t1 = std::chrono::high_resolution_clock::now();
                //打开文件
                log_file.open(log_path,std::ios::app);
                //写日志
                std::unique_lock<std::mutex> lock(log_mutex);
                auto data=log_buffer.front();
                log_buffer.pop_front();
                lock.unlock();
                log_file<<data<<std::endl;
                //关闭文件
                log_file.close();
                std::chrono::high_resolution_clock::time_point t2 = std::chrono::high_resolution_clock::now();
                // std::cout<<"写日志耗时:"<<std::chrono::duration_cast<std::chrono::microseconds>(t2-t1).count()<<"微秒"<<std::endl;
                count++;
            }
            else
            {
                count_spin++;
            }
        }
        //通知生产者
        stop_flag=true;
        //计时结束
        std::chrono::high_resolution_clock::time_point end = std::chrono::high_resolution_clock::now();
        std::cout<<"io耗时:"<<std::chrono::duration_cast<std::chrono::microseconds>(end-start).count()<<"ms"<<std::endl;
        //缓存积压
        std::cout<<"缓存积压:"<<log_buffer.size()<<std::endl;
        //空转次数
        std::cout<<"空转次数:"<<count_spin<<std::endl;
    };
    std::thread thread_log_disk(log_disk_func);
    thread_log_data.join();
    thread_log_disk.join();
}

int main()
{
    // 空转问题
    std::cout<<"----------------"<<std::endl;
    async_log_qustion_spin();
    std::cout<<"----------------"<<std::endl;
    return 0;
}

运行结果

----------------
异步解耦日志系统,消费者空转问题
生成10条日志耗时:1103416ms
io耗时:204890ms
缓存积压:0
空转次数:105404254
----------------

结果分析

可以看到,消费者实际记录日志只有10条,但是空转次数高达105404254次,导致cpu资源大量浪费。
同时消费速度加快,缓冲区内存直接不再积压。

解决cpu空转问题

cpu的核心是生产者的速度大于消费者的速度,两者速度受算法的影响一般不能直接修改,
但是退一步看,可以让消费者在没有数据的时候休眠,节省cpu资源。
实现方式就是使用条件变量
在缓冲区没有数据的时候,消费者线程等待,当生产者生产数据后,通知消费者线程可以消费数据。
当缓冲区满的时候,生产者线程等待,当消费者消费数据后,通知生产者线程可以生产数据。

代码实现

生产者生产数据时,如果缓冲区已满,则等待,直到消费者消费数据后,通知生产者可以生产数据
消费者消费数据时,如果缓冲区为空,则等待,直到生产者生产数据后,通知消费者可以消费数据

// 消费者空转
void async_log_qustion_spin()
{
    std::cout<<"异步解耦日志系统,消费者空转问题"<<std::endl;
    std::ofstream log_file;
    std::string log_path="log2.txt";
    // 缓存尺寸
    int buffer_size=10;
    // 缓存
    std::deque<std::string> log_buffer;
    // 资源上锁
    std::mutex log_mutex;
    // 停止标志
    std::atomic<bool> stop_flag(false);
    // cpu内存处理数据
    auto log_data_func=[&log_buffer,&log_mutex,&buffer_size,&stop_flag](){
        int count=0;
        std::chrono::high_resolution_clock::time_point start = std::chrono::high_resolution_clock::now();
        while(true)
        {
            if(log_buffer.size()<buffer_size)
            {
                // std::chrono::high_resolution_clock::time_point t1 = std::chrono::high_resolution_clock::now();
                //创建数据
                std::string large_data(2048, 'x');  // 2KB 数据
                //时间记录
                auto time = std::chrono::system_clock::now();
                auto time_t = std::chrono::system_clock::to_time_t(time);
                std::string content_time=std::ctime(&time_t);
                // std::cout<<"生成日志:"<<content_time<<std::endl;
                std::string content="["+content_time+"] "+large_data;
                // 增加生成时间
                std::this_thread::sleep_for(std::chrono::milliseconds(100));
                //写入缓存
                std::unique_lock<std::mutex> lock(log_mutex);
                log_buffer.push_back(content);
                lock.unlock();
                // std::chrono::high_resolution_clock::time_point t2 = std::chrono::high_resolution_clock::now();
                // std::cout<<"处理业务耗时:"<<std::chrono::duration_cast<std::chrono::microseconds>(t2-t1).count()<<"微秒"<<std::endl<<std::endl;
                count++;
            }
            if(stop_flag)
            {
                break;
            }
            if(count==10)
            {
                std::chrono::high_resolution_clock::time_point t_10 = std::chrono::high_resolution_clock::now();
                std::cout<<"生成10条日志耗时:"<<std::chrono::duration_cast<std::chrono::microseconds>(t_10-start).count()<<"ms"<<std::endl;
                count++;
            }
        };
    };
    std::thread thread_log_data(log_data_func);
    std::this_thread::sleep_for(std::chrono::seconds(1));    // 等待1秒,确保生产者线程启动
    // 日志写入磁盘
    auto log_disk_func=[&log_buffer,&log_file,&log_path,&log_mutex,&stop_flag](){
        //计时开始
        std::chrono::high_resolution_clock::time_point start = std::chrono::high_resolution_clock::now();
        int count=0;
        int count_spin=0;
        while(true)
        {
            //判断退出循环
            if (count>10)
            {
                break;
            }
            if(!log_buffer.empty())
            {
                std::chrono::high_resolution_clock::time_point t1 = std::chrono::high_resolution_clock::now();
                //打开文件
                log_file.open(log_path,std::ios::app);
                //写日志
                std::unique_lock<std::mutex> lock(log_mutex);
                auto data=log_buffer.front();
                log_buffer.pop_front();
                lock.unlock();
                log_file<<data<<std::endl;
                //关闭文件
                log_file.close();
                std::chrono::high_resolution_clock::time_point t2 = std::chrono::high_resolution_clock::now();
                // std::cout<<"写日志耗时:"<<std::chrono::duration_cast<std::chrono::microseconds>(t2-t1).count()<<"微秒"<<std::endl;
                count++;
            }
            else
            {
                count_spin++;
            }
        }
        //通知生产者
        stop_flag=true;
        //计时结束
        std::chrono::high_resolution_clock::time_point end = std::chrono::high_resolution_clock::now();
        std::cout<<"io耗时:"<<std::chrono::duration_cast<std::chrono::microseconds>(end-start).count()<<"ms"<<std::endl;
        //缓存积压
        std::cout<<"缓存积压:"<<log_buffer.size()<<std::endl;
        //空转次数
        std::cout<<"空转次数:"<<count_spin<<std::endl;
    };
    std::thread thread_log_disk(log_disk_func);
    thread_log_data.join();
    thread_log_disk.join();
}

// 引入条件变量,解决空转问题
void async_log_solution_spin()
{
    // 打印系统说明
    std::cout<<"异步解耦日志系统,消费者空转问题"<<std::endl;
    std::cout<<"异步解耦日志系统,使用条件变量解决消费者空转问题"<<std::endl;
    // 日志文件相关初始化
    std::ofstream log_file;  // 文件输出流
    std::string log_path="log2.txt";  // 日志文件路径
    // 缓存尺寸
    int buffer_size=10;
    // 缓存
    std::deque<std::string> log_buffer;
    // 资源上锁
    std::mutex log_mutex;
    // 停止标志
    std::atomic<bool> stop_flag(false);
    // 条件变量
    std::condition_variable log_condition;


    // cpu内存处理数据
    auto log_data_func=[&log_buffer,&log_mutex,&buffer_size,&stop_flag,&log_condition](){
        int count=0;
        std::chrono::high_resolution_clock::time_point start = std::chrono::high_resolution_clock::now();
        while(true)
        {
            // std::chrono::high_resolution_clock::time_point t1 = std::chrono::high_resolution_clock::now();
            //创建数据
            std::string large_data(2048, 'x');  // 2KB 数据
            //时间记录
            auto time = std::chrono::system_clock::now();
            auto time_t = std::chrono::system_clock::to_time_t(time);
            std::string content_time=std::ctime(&time_t);
            // std::cout<<"生成日志:"<<content_time<<std::endl;
            std::string content="["+content_time+"]"+large_data;
            // 增加生成时间
            std::this_thread::sleep_for(std::chrono::milliseconds(100));
            //不满足条件时挂起
            std::unique_lock<std::mutex> lock(log_mutex);
            log_condition.wait(lock,[&log_buffer,&buffer_size,&stop_flag](){
                return log_buffer.size()<buffer_size||stop_flag;
            });
            if(stop_flag)
            {
                break;
            }
            // std::cout<<"写入缓存"<<std::endl;
            log_buffer.push_back(content);
            lock.unlock();
            // 生成数据完成,通知消费者
            log_condition.notify_one();
            // std::chrono::high_resolution_clock::time_point t2 = std::chrono::high_resolution_clock::now();
            // std::cout<<"处理业务耗时:"<<std::chrono::duration_cast<std::chrono::microseconds>(t2-t1).count()<<"微秒"<<std::endl<<std::endl;
            count++;

            if(count==10)
            {
                std::chrono::high_resolution_clock::time_point t_10 = std::chrono::high_resolution_clock::now();
                std::cout<<"生成10条日志耗时:"<<std::chrono::duration_cast<std::chrono::microseconds>(t_10-start).count()<<"ms"<<std::endl;
            }
        };
        // std::cout<<"生成日志结束"<<std::endl;
        // std::cout<<"生成日志数量:"<<count<<std::endl;
    };
    std::thread thread_log_data(log_data_func);
    std::this_thread::sleep_for(std::chrono::seconds(1)); // 等待1秒,确保生产者先运行
    // 日志写入磁盘
    auto log_disk_func=[&log_buffer,&log_file,&log_path,&log_mutex,&stop_flag,&log_condition](){
        //计时开始
        std::chrono::high_resolution_clock::time_point start = std::chrono::high_resolution_clock::now();
        int count=0;
        while(true)
        {
            // std::chrono::high_resolution_clock::time_point t1 = std::chrono::high_resolution_clock::now();
            //打开文件
            log_file.open(log_path,std::ios::app);
            //写日志
            // std::cout<<"写入日志"<<std::endl;
            // 不满足条件时挂起
            std::unique_lock<std::mutex> lock(log_mutex);
            log_condition.wait(lock,[&log_buffer](){
                return !log_buffer.empty();
            });
            auto data=log_buffer.front();
            log_buffer.pop_front();
            lock.unlock();
            log_file<<data<<std::endl;
            //关闭文件
            log_file.close();
            // std::chrono::high_resolution_clock::time_point t2 = std::chrono::high_resolution_clock::now();
            // std::cout<<"写日志耗时:"<<std::chrono::duration_cast<std::chrono::microseconds>(t2-t1).count()<<"微秒"<<std::endl;
            count++;
            //判断退出循环
            if (count>=10)
            {
                //通知生产者停止
                stop_flag=true;
                log_condition.notify_one();
                break;
            }
            // 写入完成,通知生产者生产
            log_condition.notify_one();   
        }
        //空转次数
        int count_spin=count-10;
        //计时结束
        std::cout<<"写入日志结束"<<std::endl;
        std::chrono::high_resolution_clock::time_point end = std::chrono::high_resolution_clock::now();
        std::cout<<"io耗时:"<<std::chrono::duration_cast<std::chrono::microseconds>(end-start).count()<<"ms"<<std::endl;
        //缓存积压
        std::cout<<"缓存积压:"<<log_buffer.size()<<std::endl;
        //空转次数
        std::cout<<"空转次数:"<<count_spin<<std::endl;
    };
    std::thread thread_log_disk(log_disk_func);
    thread_log_data.join();
    thread_log_disk.join();
}

int main()
{
    // 空转问题
    std::cout<<"----------------"<<std::endl;
    async_log_qustion_spin();
    std::cout<<"----------------"<<std::endl;
    async_log_solution_spin();
    std::cout<<"----------------"<<std::endl;
    return 0;
}

运行结果:

----------------
异步解耦日志系统,消费者空转问题
生成10条日志耗时:1093046ms
io耗时:200463ms
缓存积压:0
空转次数:112735095
----------------
异步解耦日志系统,消费者空转问题
异步解耦日志系统,使用条件变量解决消费者空转问题
生成10条日志耗时:1083089ms
写入日志结束
io耗时:78324ms
缓存积压:0
空转次数:0

结果分析:

可以看到,使用条件变量调节消费线程休眠后,空转次数从112735095次降到了0次,说明消费者不再空转了。

结论:

使用条件变量可以解决消费者空转问题,节约cpu资源。

更多推荐