针对编程中普遍能看到的生产消费状态,为了提高在相关场景中的效率,处理模式呈现出一系列演化,这里对前面三章的内容进行总结。

问题:

1.同步耦合生产和消费,导致生产者等待消费者,消费者等待生产者,相互阻塞
2.数据生产和消耗速度不一致
2.1生产速度快导致数据积压
2.2消费者处理速度快导致cup空转

如何解决:

1.异步解耦合,分离生产消费线程。
2.1设置缓存尺寸,调节内存压力。
2.2设置条件变量,调节生产消费速度。

实现:

分离生产者和消费者在两个子线程中运行
创建共享变量作为通信,生产者和消费者能同时访问到这个变量
资源上锁,防止多个线程同时访问共享变量,导致数据混乱
设置共享变量尺寸,当共享变量达到尺寸后暂停生产数据,避免数据积压
使用条件变量控制生产者和消费者的运行和休眠,当共享变量内没有数据时消费者线程休眠,防止cpu空转

演化过程:

阶段一:同步耦合

背景:
在很多场景中有一部分需要生产数据,一部分需要消费数据
比如日志记录系统,一部分需要生产字符串,一部分记录到磁盘
解决:
生产和消费以同步方式进行
生产者生产一个数据,消费者消费一个数据,
生产者生产完一个数据后,消费者消费完一个数据后,再生产下一个数据,以此类推

阶段二:异步解耦合

2.1:异步分离生产者和消费者
问题:
相互阻塞
当消费者处理缓慢后,生产者会一直等待消费者处理完数据后才能生产下一个数据,导致生产者等待
同理,当生产者处理缓慢后,消费者会一直等待生产者生产完数据后才能消费下一个数据,导致消费者等待
解决:
把者两个功能做成两个独立的线程
两者不相互依赖,同时运行
各自只关注需要处理的数据

2.2:共享变量的引入
问题:
无协调机制
生产者和消费者无法关联起来
解决:
创建共享变量作为通信手段
生产者和消费者能同时访问到这个变量
生产者生产数据后,将数据放入缓冲区
消费者从缓冲区中取出数据,进行消费

2.3:互斥锁的引入
问题:
数据竞争
当生产者生产数据时,消费者也在消费数据,导致数据混乱
同理,当消费者消费数据时,生产者也在生产数据,导致数据混乱
解决:
引入互斥锁
当生产者生产数据时,消费者无法访问缓冲区
当消费者消费数据时,生产者无法访问缓冲区

阶段三:有界缓冲区的引入

问题:
数据积压
当生产者生产速度大于消费者消费速度时,会导致数据积压,当数据过多时,会导致内存溢出
解决:
对共享变量进行尺寸限制,当共享变量达到尺寸后暂停生产数据,避免数据积压

阶段四:条件变量

问题:
忙等待
消费者缓冲区空依旧在工作,这个时候是在空循环,会导致cpu空转
解决:
引入条件变量
消费者消费数据时,如果缓冲区为空,则等待,直到生产者生产数据后,通知消费者可以消费数据
生产者生产数据时,如果缓冲区已满,则等待,直到消费者消费数据后,通知生产者可以生产数据

案例

日志系统
对比最初的同步耦合和最终消费者生产者模式

代码实现

这里再生产和消费数据中各加了100ms的延时,拉长数据处理的时间
因为当处理时间短时,生产者消费者模式优势不明显,甚至会显得比同步耦合更慢


// 基础实现,同步耦合
void sync_log()
{
    std::cout<<"同步耦合日志系统"<<std::endl;

    std::ofstream log_file;
    std::string log_path="log1.txt";


    std::chrono::high_resolution_clock::time_point start = std::chrono::high_resolution_clock::now();
    for(int i=0;i<10;i++)
    {
        log_file.open(log_path,std::ios::app);
        //cpu内存处理数据
        // std::chrono::high_resolution_clock::time_point t1 = std::chrono::high_resolution_clock::now();
        //大量数据
        std::string large_data(2048, 'x');  // 2KB 数据
        //时间记录
        auto time = std::chrono::system_clock::now();
        auto time_t = std::chrono::system_clock::to_time_t(time);
        std::string content_time=std::ctime(&time_t);
        // std::cout<<"日志记录时刻:"<<content_time;
        std::string content="["+content_time+"] "+large_data;
        // 延时
        std::this_thread::sleep_for(std::chrono::milliseconds(100));
        std::chrono::high_resolution_clock::time_point t2 = std::chrono::high_resolution_clock::now();
        //用户缓存区写日志
        log_file<<content<<std::endl;
        //写磁盘
        log_file.close();
        // 延时
        std::this_thread::sleep_for(std::chrono::milliseconds(100));
        // std::chrono::high_resolution_clock::time_point t3 = std::chrono::high_resolution_clock::now();
        // std::cout<<"处理业务耗时:"<<std::chrono::duration_cast<std::chrono::microseconds>(t2-t1).count()<<"微秒"<<std::endl;
        // std::cout<<"写日志耗时:"<<std::chrono::duration_cast<std::chrono::microseconds>(t3-t2).count()<<"微秒"<<std::endl<<std::endl;
    }
    // 写入磁盘
    std::chrono::high_resolution_clock::time_point end = std::chrono::high_resolution_clock::now();
    std::cout<<"生成日志和io相互阻塞"<<std::endl;
    std::cout<<"耗时:"<<std::chrono::duration_cast<std::chrono::microseconds>(end-start).count()<<"ms"<<std::endl;
}

//最终实现,异步解耦,条件变量
void async_log_solution_all()
{
    // 打印系统说明
    std::cout<<"异步解耦日志系统,生产者消费者模式"<<std::endl;
    // 日志文件相关初始化
    std::ofstream log_file;  // 文件输出流
    std::string log_path="log2.txt";  // 日志文件路径
    // 缓存尺寸
    int buffer_size=10;
    // 缓存
    std::deque<std::string> log_buffer;
    // 资源上锁
    std::mutex log_mutex;
    // 停止标志
    std::atomic<bool> stop_flag(false);
    // 条件变量
    std::condition_variable log_condition;

    // cpu内存处理数据
    auto log_data_func=[&log_buffer,&log_mutex,&buffer_size,&stop_flag,&log_condition](){
        int count=0;
        std::chrono::high_resolution_clock::time_point start = std::chrono::high_resolution_clock::now();
        while(true)
        {
            // std::chrono::high_resolution_clock::time_point t1 = std::chrono::high_resolution_clock::now();
            //创建数据
            std::string large_data(2048, 'x');  // 2KB 数据
            //时间记录
            auto time = std::chrono::system_clock::now();
            auto time_t = std::chrono::system_clock::to_time_t(time);
            std::string content_time=std::ctime(&time_t);
            // std::cout<<"生成日志:"<<content_time<<std::endl;
            std::string content="["+content_time+"]"+large_data;
            // 增加生成时间
            std::this_thread::sleep_for(std::chrono::milliseconds(100));
            //不满足条件时挂起
            std::unique_lock<std::mutex> lock(log_mutex);
            log_condition.wait(lock,[&log_buffer,&buffer_size,&stop_flag](){
                return log_buffer.size()<buffer_size||stop_flag;
            });
            // std::cout<<"写入缓存"<<std::endl;
            log_buffer.push_back(content);
            count++;
            if(stop_flag)
            {
                break;
            }
            lock.unlock();
            // 生成数据完成,通知消费者
            log_condition.notify_one();
            // std::chrono::high_resolution_clock::time_point t2 = std::chrono::high_resolution_clock::now();
            // std::cout<<"处理业务耗时:"<<std::chrono::duration_cast<std::chrono::microseconds>(t2-t1).count()<<"微秒"<<std::endl<<std::endl;
            if(count==10)
            {
                std::chrono::high_resolution_clock::time_point t_10 = std::chrono::high_resolution_clock::now();
                std::cout<<"生成10条日志耗时:"<<std::chrono::duration_cast<std::chrono::microseconds>(t_10-start).count()<<"ms"<<std::endl;
            }
        };
        // std::cout<<"生成日志结束"<<std::endl;
        // std::cout<<"生成日志数量:"<<count<<std::endl;
    };
    std::thread thread_log_data(log_data_func);
    std::this_thread::sleep_for(std::chrono::seconds(1)); // 等待1秒,确保生产者先运行
    // 日志写入磁盘
    auto log_disk_func=[&log_buffer,&log_file,&log_path,&log_mutex,&stop_flag,&log_condition](){
        //计时开始
        std::chrono::high_resolution_clock::time_point start = std::chrono::high_resolution_clock::now();
        int count=0;
        while(true)
        {
            // std::chrono::high_resolution_clock::time_point t1 = std::chrono::high_resolution_clock::now();
            //打开文件
            log_file.open(log_path,std::ios::app);
            //写日志
            // std::cout<<"写入日志"<<std::endl;
            // 不满足条件时挂起
            std::unique_lock<std::mutex> lock(log_mutex);
            log_condition.wait(lock,[&log_buffer](){
                return !log_buffer.empty();
            });
            auto data=log_buffer.front();
            log_buffer.pop_front();
            lock.unlock();
            log_file<<data<<std::endl;
            //关闭文件
            log_file.close();
            //增加写入时间
            std::this_thread::sleep_for(std::chrono::milliseconds(100));
            // std::chrono::high_resolution_clock::time_point t2 = std::chrono::high_resolution_clock::now();
            // std::cout<<"写日志耗时:"<<std::chrono::duration_cast<std::chrono::microseconds>(t2-t1).count()<<"微秒"<<std::endl;
            count++;
            //判断退出循环
            if (count>=10)
            {
                //通知生产者停止
                stop_flag=true;
                log_condition.notify_one();
                break;
            }
            // 写入完成,通知生产者生产
            log_condition.notify_one();   
        }
        //空转次数
        int count_spin=count-10;
        //计时结束
        std::cout<<"写入日志结束"<<std::endl;
        std::chrono::high_resolution_clock::time_point end = std::chrono::high_resolution_clock::now();
        std::cout<<"io耗时:"<<std::chrono::duration_cast<std::chrono::microseconds>(end-start).count()<<"ms"<<std::endl;
        //缓存积压
        std::cout<<"缓存积压:"<<log_buffer.size()<<std::endl;
        //空转次数
        std::cout<<"空转次数:"<<count_spin<<std::endl;
    };
    std::thread thread_log_disk(log_disk_func);
    thread_log_data.join();
    thread_log_disk.join();
}

运行结果

----------------
同步耦合日志系统
生成日志和io相互阻塞
耗时:2184746ms
----------------
异步解耦日志系统,生产者消费者模式
生成10条日志耗时:1085299ms
写入日志结束
io耗时:1090205ms
缓存积压:9
空转次数:0
----------------

总结

当满足模式需求时,生产者消费者模式效率远高于常用的同步阻塞模式。

更多推荐