生产者消费者模式是并发编程的核心模式之一,核心是想要提高程序的运行效率。
这里记录一下自己的思考,使用通俗的语言,和以日志记录为例,解读生产者消费者模式,并实现生产者消费者模式。
将生产者消费者模式的核心内容划分为三个问题:阻塞问题、内存积压问题、cpu空转问题。
这里是第二章,内存积压问题。

内存积压问题

异步解耦合解决了生产者和消费者相互阻塞的问题,能有效提高程序的运行效率。
此时消费者和生产者的分离,理想情况是两者速度一致,生产者生产一份数据,消费者就进行消费,程序处于一个动态平衡状态。
但是实际运行中,生产者和消费者的速度是不一致的,
当生产者生产速度大于消费者消费速度时,会导致数据积压,当数据过多时,还会导致内存溢出。

日志场景

在日志记录的案例中,生产者和消费者的速度是明显不一致的
因为日志生产在cpu和内存中,而日志消费在io硬盘中,cpu和内存的速度远大于io硬盘的速度。

代码实现

以第一章中的异步解耦合日志系统为例。

void async_log()
{
    std::cout<<"异步解耦日志系统,不再相互阻塞"<<std::endl;
    std::ofstream log_file;
    std::string log_path="log2.txt";
    // 缓存
    std::deque<std::string> log_buffer;
    // 停止标志
    std::atomic<bool> stop_flag(false);
    // 加锁,避免数据竞争
    std::mutex log_mutex;

    // cpu内存处理数据
    auto log_data_func=[&log_buffer,&stop_flag,&log_mutex](){
        int count=1;
        std::chrono::high_resolution_clock::time_point start = std::chrono::high_resolution_clock::now();
        while(true)
        {
            // std::chrono::high_resolution_clock::time_point t1 = std::chrono::high_resolution_clock::now();
            //创建数据
            std::string large_data(2048, 'x');  // 4KB 数据
            // std::this_thread::sleep_for(std::chrono::milliseconds(100));
            //时间记录
            auto time = std::chrono::system_clock::now();
            auto time_t = std::chrono::system_clock::to_time_t(time);
            std::string content_time=std::ctime(&time_t);
            // std::cout<<"生成日志:"<<content_time<<std::endl;
            std::string content="["+content_time+"] "+large_data;
            //写入缓存
            std::unique_lock<std::mutex> lock(log_mutex);
            log_buffer.push_back(content);
            lock.unlock();
            // std::chrono::high_resolution_clock::time_point t2 = std::chrono::high_resolution_clock::now();
            // std::cout<<"处理业务耗时:"<<std::chrono::duration_cast<std::chrono::microseconds>(t2-t1).count()<<"微秒"<<std::endl<<std::endl;
            count++;
            if(count==10)
            {
                std::chrono::high_resolution_clock::time_point t_10 = std::chrono::high_resolution_clock::now();
                std::cout<<"生成10条日志耗时:"<<std::chrono::duration_cast<std::chrono::microseconds>(t_10-start).count()<<"ms"<<std::endl;
            }
            if (stop_flag)
            {
                break;
            }
            
        };
        std::cout<<"生成日志结束"<<std::endl;
        std::cout<<"总共生成数据量:"<<count<<"条"<<std::endl;
    };
    std::thread thread_log_data(log_data_func);
    // 日志写入磁盘
    auto log_disk_func=[&log_buffer,&log_file,&log_path,&stop_flag,&log_mutex](){
        //计时开始
        std::chrono::high_resolution_clock::time_point start = std::chrono::high_resolution_clock::now();
        int count=1;
        while(true)
        {
            //判断退出循环
            if (count>10)
            {
                break;
            }
            if(!log_buffer.empty())
            {
                std::chrono::high_resolution_clock::time_point t1 = std::chrono::high_resolution_clock::now();
                //打开文件
                log_file.open(log_path,std::ios::app);
                //写日志
                std::unique_lock<std::mutex> lock(log_mutex);
                auto data=log_buffer.front();
                log_buffer.pop_front();
                lock.unlock();
                log_file<<data<<std::endl;
                //关闭文件
                log_file.close();
                std::chrono::high_resolution_clock::time_point t2 = std::chrono::high_resolution_clock::now();
                // std::cout<<"写日志耗时:"<<std::chrono::duration_cast<std::chrono::microseconds>(t2-t1).count()<<"微秒"<<std::endl;
                count++;
            }
        }
        //通知生产进程结束
        stop_flag = true;
        //计时结束
        std::chrono::high_resolution_clock::time_point end = std::chrono::high_resolution_clock::now();
        std::cout<<"io耗时:"<<std::chrono::duration_cast<std::chrono::microseconds>(end-start).count()<<"ms"<<std::endl;
    };
    std::thread thread_log_disk(log_disk_func);
    thread_log_data.join();
    thread_log_disk.join();

}

int main()
{
    // 同步阻塞问题
    std::cout<<"----------------"<<std::endl;
    async_log();

    return 0;
}

运行结果:

----------------
异步解耦日志系统,不再相互阻塞
生成10条日志耗时:46ms
io耗时:3864ms
生成日志结束
总共生成数据量:600条

结果分析:
可以看到,生成10条日志耗时46ms,而写日志耗时3864ms,说明io硬盘的写入速度远小于cpu和内存的速度。
当10条日志写入磁盘完成后,内存中生成的日志数据量已经达到了600条,说明内存中已经积压了大量的日志数据。

解决内存积压

内存积压的核心是生产者的速度大于消费者的速度,这个受算法的影响一般不能直接修改,但是次一级的原因是分配的共享内存没有进行限制。
所以,我们可以对共享内存进行限制,当共享内存达到一定大小的时候,生产者需要等待消费者消费完共享内存中的数据。

代码实现:

void async_log_buffer_size_question()
{
    std::cout<<"异步解耦日志系统,io硬盘不再阻塞,缓存积压"<<std::endl;
    std::ofstream log_file;
    std::string log_path="log2.txt";
    // 缓存
    std::deque<std::string> log_buffer;
    // 资源上锁
    std::mutex log_mutex;
    // 停止标志
    std::atomic<bool> stop_flag(false);


    // 生成数据
    auto log_data_func=[&log_buffer,&log_mutex,&stop_flag](){
        int count=1;
        std::chrono::high_resolution_clock::time_point start = std::chrono::high_resolution_clock::now();
        while(true)
        {
            // std::chrono::high_resolution_clock::time_point t1 = std::chrono::high_resolution_clock::now();
            //创建数据
            std::string large_data(2048, 'x');  // 4KB 数据
            //时间记录
            auto time = std::chrono::system_clock::now();
            auto time_t = std::chrono::system_clock::to_time_t(time);
            std::string content_time=std::ctime(&time_t);
            // std::cout<<"生成日志:"<<content_time<<std::endl;
            std::string content="["+content_time+"] "+large_data;
            //写入缓存
            std::unique_lock<std::mutex> lock(log_mutex);
            log_buffer.push_back(content);
            lock.unlock();
            // std::chrono::high_resolution_clock::time_point t2 = std::chrono::high_resolution_clock::now();
            // std::cout<<"处理业务耗时:"<<std::chrono::duration_cast<std::chrono::microseconds>(t2-t1).count()<<"微秒"<<std::endl<<std::endl;
            count++;

            if(stop_flag)
            {
                break;
            }
            if(count==10)
            {
                std::chrono::high_resolution_clock::time_point t_10 = std::chrono::high_resolution_clock::now();
                std::cout<<"生成10条日志耗时:"<<std::chrono::duration_cast<std::chrono::microseconds>(t_10-start).count()<<"ms"<<std::endl;
            }
        };
    };
    std::thread thread_log_data(log_data_func);
    // 日志写入磁盘
    auto log_disk_func=[&log_buffer,&log_file,&log_path,&log_mutex,&stop_flag](){
        //计时开始
        std::chrono::high_resolution_clock::time_point start = std::chrono::high_resolution_clock::now();
        int count=1;
        while(true)
        {
            //判断退出循环
            if (count>10)
            {
                break;
            }
            if(!log_buffer.empty())
            {
                // std::chrono::high_resolution_clock::time_point t1 = std::chrono::high_resolution_clock::now();
                //打开文件
                log_file.open(log_path,std::ios::app);
                //写日志
                std::unique_lock<std::mutex> lock(log_mutex);
                auto data=log_buffer.front();
                log_buffer.pop_front();
                lock.unlock();
                log_file<<data<<std::endl;
                //关闭文件
                log_file.close();
                // std::this_thread::sleep_for(std::chrono::milliseconds(100));
                std::chrono::high_resolution_clock::time_point t2 = std::chrono::high_resolution_clock::now();
                // std::cout<<"写日志耗时:"<<std::chrono::duration_cast<std::chrono::microseconds>(t2-t1).count()<<"微秒"<<std::endl;
                count++;
            }
        }
        //关闭生产者
        stop_flag=true;
        //计时结束
        std::chrono::high_resolution_clock::time_point end = std::chrono::high_resolution_clock::now();
        std::cout<<"io耗时:"<<std::chrono::duration_cast<std::chrono::microseconds>(end-start).count()<<"ms"<<std::endl;
        //缓存积压
        std::cout<<"缓存积压:"<<log_buffer.size()<<std::endl;
    };
    std::thread thread_log_disk(log_disk_func);
    thread_log_data.join();
    thread_log_disk.join();

}


void async_log_buffer_size_solution()
{
    std::cout<<"异步解耦日志系统,io硬盘不再阻塞,缓存积压"<<std::endl;
    std::cout<<"限制缓存,缓存尺寸10"<<std::endl;
    std::ofstream log_file;
    std::string log_path="log2.txt";
    // 缓存尺寸
    int buffer_size=10;
    // 缓存
    std::deque<std::string> log_buffer;
    // 资源上锁
    std::mutex log_mutex;
    // 停止标志
    std::atomic<bool> stop_flag(false);


    // cpu内存处理数据
    auto log_data_func=[&log_buffer,&log_mutex,&buffer_size,&stop_flag](){
        int count=1;
        std::chrono::high_resolution_clock::time_point start = std::chrono::high_resolution_clock::now();
        while(true)
        {
            if(log_buffer.size()<buffer_size)
            {
                // std::chrono::high_resolution_clock::time_point t1 = std::chrono::high_resolution_clock::now();
                //创建数据
                std::string large_data(2048, 'x');  // 4KB 数据
                //时间记录
                auto time = std::chrono::system_clock::now();
                auto time_t = std::chrono::system_clock::to_time_t(time);
                std::string content_time=std::ctime(&time_t);
                // std::cout<<"生成日志:"<<content_time<<std::endl;
                std::string content="["+content_time+"] "+large_data;
                //写入缓存
                std::unique_lock<std::mutex> lock(log_mutex);
                log_buffer.push_back(content);
                lock.unlock();
                // std::chrono::high_resolution_clock::time_point t2 = std::chrono::high_resolution_clock::now();
                // std::cout<<"处理业务耗时:"<<std::chrono::duration_cast<std::chrono::microseconds>(t2-t1).count()<<"微秒"<<std::endl<<std::endl;
                count++;
            }
            if(stop_flag)
            {
                break;
            }
            if(count==10)
            {
                std::chrono::high_resolution_clock::time_point t_10 = std::chrono::high_resolution_clock::now();
                std::cout<<"生成10条日志耗时:"<<std::chrono::duration_cast<std::chrono::microseconds>(t_10-start).count()<<"ms"<<std::endl;
            }
        };
    };
    std::thread thread_log_data(log_data_func);
    // 日志写入磁盘
    auto log_disk_func=[&log_buffer,&log_file,&log_path,&log_mutex,&stop_flag](){
        //计时开始
        std::chrono::high_resolution_clock::time_point start = std::chrono::high_resolution_clock::now();
        int count=1;
        while(true)
        {
            //判断退出循环
            if (count>10)
            {
                break;
            }
            if(!log_buffer.empty())
            {
                std::chrono::high_resolution_clock::time_point t1 = std::chrono::high_resolution_clock::now();
                //打开文件
                log_file.open(log_path,std::ios::app);
                //写日志
                std::unique_lock<std::mutex> lock(log_mutex);
                auto data=log_buffer.front();
                log_buffer.pop_front();
                lock.unlock();
                log_file<<data<<std::endl;
                //关闭文件
                log_file.close();
                // std::this_thread::sleep_for(std::chrono::milliseconds(100));
                std::chrono::high_resolution_clock::time_point t2 = std::chrono::high_resolution_clock::now();
                // std::cout<<"写日志耗时:"<<std::chrono::duration_cast<std::chrono::microseconds>(t2-t1).count()<<"微秒"<<std::endl;
                count++;
            }
        }
        //通知生产者
        stop_flag=true;
        //计时结束
        std::chrono::high_resolution_clock::time_point end = std::chrono::high_resolution_clock::now();
        std::cout<<"io耗时:"<<std::chrono::duration_cast<std::chrono::microseconds>(end-start).count()<<"ms"<<std::endl;
        //缓存积压
        std::cout<<"缓存积压:"<<log_buffer.size()<<std::endl;
    };
    std::thread thread_log_disk(log_disk_func);
    thread_log_data.join();
    thread_log_disk.join();
}

int main()
{
    // 缓存积压问题
    std::cout<<"----------------"<<std::endl;
    async_log_buffer_size_question();
    std::cout<<"----------------"<<std::endl;
    async_log_buffer_size_solution();
    return 0;
}


结果:

----------------
异步解耦日志系统,异步解耦合,缓存积压
生成10条日志耗时:587ms
io耗时:3489ms
缓存积压:514
----------------
异步解耦日志系统,异步解耦合,缓存积压解决
限制缓存,缓存尺寸10
生成10条日志耗时:46ms
io耗时:3122ms
缓存积压:10

结果分析:
限制共享内存和没限制内存,两者记录相同数量的日志,消耗时间基本一致
没有限制内存,会生成514条数据在缓冲区,每条数据2kB,内存占用1MB左右,如果生成时间拉长,会造成更大的内存占用,直到内存溢出。
限制内存,最多生成10条数据在缓冲区,每条数据2kB,内存占用20kB左右,不会出现内存占用堆积的问题。

结论:
在异步解耦日志系统中,通过限制共享内存的大小,成功防止内存大量占用。

更多推荐