生产者消费者模式是并发编程的核心模式之一,核心是想要提高程序的运行效率。
这里记录一下自己的思考,使用通俗易懂的语言,和以日志记录为例,解读生产者消费者模式,并实现生产者消费者模式。
将生产者消费者模式的核心内容划分为三个问题:阻塞问题、内存积压问题、cpu空转问题。
这里是第一章,阻塞问题。

阻塞问题

生产者消费者模式首先就是要解决相互阻塞的问题。
在编程过程中循环是普遍存在的,循环内部普遍来说是顺序执行,顺序结构可以分为上下两个部分。
程序上方的输出数据往往是程序下方的输入,从数据的角度看,程序上方就是生产过程,称为生产者,程序下方就是消费过程,称为消费者。
这种结构下生产者和消费者之间必然相互等待,如果两个两者耗时较长,那么就会导致程序运行效率降低,这就称之为程序之间相互阻塞。

实际场景

日志记录系统就是一个典型的生产者消费者模式。
项目运行时,需要循环记录日志,在循环内部,程序上方是生成日志,程序下方是写入日志文件。
所以生产者就是生成字符串,消费者是写入日志文件。

代码实现

程序在同一个线程中循环执行,循环体中使用顺序结构,生产一个字符串,紧接着在磁盘中写入一个字符串。
这就是同步耦合实现,这样会导致生产者生产和消费者相互等待,即相互阻塞。

void sync_log()
{
    std::cout<<"同步耦合日志系统"<<std::endl;

    std::ofstream log_file;
    std::string log_path="log1.txt";


    std::chrono::high_resolution_clock::time_point start = std::chrono::high_resolution_clock::now();
    for(int i=0;i<10;i++)
    {
        log_file.open(log_path,std::ios::app);
        //cpu内存处理数据
        std::chrono::high_resolution_clock::time_point t1 = std::chrono::high_resolution_clock::now();
        //大量数据
        std::string large_data(2048, 'x');  // 2KB 数据
        //时间记录
        auto time = std::chrono::system_clock::now();
        auto time_t = std::chrono::system_clock::to_time_t(time);
        std::string content_time=std::ctime(&time_t);
        std::cout<<"日志记录时刻:"<<content_time;
        std::string content="["+content_time+"] "+large_data;
        std::chrono::high_resolution_clock::time_point t2 = std::chrono::high_resolution_clock::now();
        //用户缓存区写日志
        log_file<<content<<std::endl;
        //写磁盘
        log_file.close();
        std::chrono::high_resolution_clock::time_point t3 = std::chrono::high_resolution_clock::now();
        std::cout<<"处理业务耗时:"<<std::chrono::duration_cast<std::chrono::microseconds>(t2-t1).count()<<"微秒"<<std::endl;
        std::cout<<"写日志耗时:"<<std::chrono::duration_cast<std::chrono::microseconds>(t3-t2).count()<<"微秒"<<std::endl<<std::endl;
    }
    // 写入磁盘
    std::chrono::high_resolution_clock::time_point end = std::chrono::high_resolution_clock::now();
    std::cout<<"生成日志和io相互阻塞"<<std::endl;
    std::cout<<"耗时:"<<std::chrono::duration_cast<std::chrono::microseconds>(end-start).count()<<"ms"<<std::endl;
}
int main()
{
    sync_log();
    return 0;
}

运行结果:


同步耦合日志系统
日志记录时刻:Fri May 15 10:20:42 2026
处理业务耗时:2101微秒
写日志耗时:141微秒

日志记录时刻:Fri May 15 10:20:42 2026
处理业务耗时:68微秒
写日志耗时:112微秒

日志记录时刻:Fri May 15 10:20:42 2026
处理业务耗时:155微秒
写日志耗时:87微秒

日志记录时刻:Fri May 15 10:20:42 2026
处理业务耗时:86微秒
写日志耗时:111微秒

日志记录时刻:Fri May 15 10:20:42 2026
处理业务耗时:78微秒
写日志耗时:100微秒

日志记录时刻:Fri May 15 10:20:42 2026
处理业务耗时:56微秒
写日志耗时:95微秒

日志记录时刻:Fri May 15 10:20:42 2026
处理业务耗时:59微秒
写日志耗时:68微秒

日志记录时刻:Fri May 15 10:20:42 2026
处理业务耗时:45微秒
写日志耗时:85微秒

日志记录时刻:Fri May 15 10:20:42 2026
处理业务耗时:52微秒
写日志耗时:88微秒

日志记录时刻:Fri May 15 10:20:42 2026
处理业务耗时:66微秒
写日志耗时:107微秒

生成日志和io相互阻塞
耗时:8398ms

结果分析

每一次循环都需要生成一个字符串,除了在第一次循环,其他每一次都需要等待写入日志完成后才能生成字符串,
同理,写入磁盘也类似,包括第一次循环,每一次都需要等待生成字符串完成后才能写入磁盘。
这就是生产者和消费者速度相互阻塞,生成字符串是生产者,写入磁盘是消费者,两者相互等待,导致程序整体效率低。

解决阻塞

生产者和消费者相互阻塞如何解决?

同步耦合

深入分析,阻塞问题的根本原因在于同步耦合,
即调用方必须等待被调用方完成,才能继续执行的模式。
在生产者消费者视角下看,
同步耦合来自于生产者和消费者在同一个线程、处于同一个顺序结构中上下游的位置,当循环时前后者就会相互制约。

异步解耦合

既然阻塞时同步耦合导致的,那么解决就需要异步解耦合。
即调用方不需要等待被调用方完成,就可以继续执行的模式。
在生产者消费者视角下看,
就需要将生产者和消费者放在两个独立的线程中运行,这样两者不相互依赖,同时运行。

代码实现

分离生产者和消费者线程
生产者只需要生成日志字符串,不需要关心写入日志文件
消费者只需要写入日志文件,不需要关心生成日志字符串

但是这样又会产生新的问题,生产者生产数据,消费者消费数据,两者如何关联起来呢?

为了解决这些问题,我们可以创建共享变量作为通信方式
生产者和消费者能同时访问到这个变量
生产者生产数据后,将数据放入共享变量
消费者消费数据时,从共享变量中取出数据,进行消费

由于是多线程访问共享变量,所以需要引入互斥锁
当生产者生产数据时,消费者无法访问共享变量
当消费者消费数据时,生产者无法访问共享变量
避免了数据竞争,保证了数据安全

这样生产者和消费者就互不依赖,同时运行,解决了阻塞问题
这样就是异步解耦合,程序完成相同的功能,但是效率更高。

void sync_log()
{
    std::cout<<"同步耦合日志系统"<<std::endl;

    std::ofstream log_file;
    std::string log_path="log1.txt";


    std::chrono::high_resolution_clock::time_point start = std::chrono::high_resolution_clock::now();
    for(int i=0;i<10;i++)
    {
        log_file.open(log_path,std::ios::app);
        //cpu内存处理数据
        // std::chrono::high_resolution_clock::time_point t1 = std::chrono::high_resolution_clock::now();
        //大量数据
        std::string large_data(2048, 'x');  // 2KB 数据
        //时间记录
        auto time = std::chrono::system_clock::now();
        auto time_t = std::chrono::system_clock::to_time_t(time);
        std::string content_time=std::ctime(&time_t);
        // std::cout<<"日志记录时刻:"<<content_time;
        std::string content="["+content_time+"] "+large_data;
        std::chrono::high_resolution_clock::time_point t2 = std::chrono::high_resolution_clock::now();
        //用户缓存区写日志
        log_file<<content<<std::endl;
        //写磁盘
        log_file.close();
        // std::chrono::high_resolution_clock::time_point t3 = std::chrono::high_resolution_clock::now();
        // std::cout<<"处理业务耗时:"<<std::chrono::duration_cast<std::chrono::microseconds>(t2-t1).count()<<"微秒"<<std::endl;
        // std::cout<<"写日志耗时:"<<std::chrono::duration_cast<std::chrono::microseconds>(t3-t2).count()<<"微秒"<<std::endl<<std::endl;
    }
    // 写入磁盘
    std::chrono::high_resolution_clock::time_point end = std::chrono::high_resolution_clock::now();
    std::cout<<"生成日志和io相互阻塞"<<std::endl;
    std::cout<<"耗时:"<<std::chrono::duration_cast<std::chrono::microseconds>(end-start).count()<<"ms"<<std::endl;
}

void async_log()
{
    std::cout<<"异步解耦日志系统,不再相互阻塞"<<std::endl;
    std::ofstream log_file;
    std::string log_path="log2.txt";
    // 缓存
    std::deque<std::string> log_buffer;
    // 停止标志
    std::atomic<bool> stop_flag(false);
    // 加锁,避免数据竞争
    std::mutex log_mutex;

    // cpu内存处理数据
    auto log_data_func=[&log_buffer,&stop_flag,&log_mutex](){
        int count=1;
        std::chrono::high_resolution_clock::time_point start = std::chrono::high_resolution_clock::now();
        while(true)
        {
            // std::chrono::high_resolution_clock::time_point t1 = std::chrono::high_resolution_clock::now();
            //创建数据
            std::string large_data(2048, 'x');  // 4KB 数据
            // std::this_thread::sleep_for(std::chrono::milliseconds(100));
            //时间记录
            auto time = std::chrono::system_clock::now();
            auto time_t = std::chrono::system_clock::to_time_t(time);
            std::string content_time=std::ctime(&time_t);
            // std::cout<<"生成日志:"<<content_time<<std::endl;
            std::string content="["+content_time+"] "+large_data;
            //写入缓存
            std::unique_lock<std::mutex> lock(log_mutex);
            log_buffer.push_back(content);
            lock.unlock();
            // std::chrono::high_resolution_clock::time_point t2 = std::chrono::high_resolution_clock::now();
            // std::cout<<"处理业务耗时:"<<std::chrono::duration_cast<std::chrono::microseconds>(t2-t1).count()<<"微秒"<<std::endl<<std::endl;
            count++;
            if(count==10)
            {
                std::chrono::high_resolution_clock::time_point t_10 = std::chrono::high_resolution_clock::now();
                std::cout<<"生成10条日志耗时:"<<std::chrono::duration_cast<std::chrono::microseconds>(t_10-start).count()<<"ms"<<std::endl;
            }
            if (stop_flag)
            {
                break;
            }
            
        };
        std::cout<<"生成日志结束"<<std::endl;
        std::cout<<"总共生成数据量:"<<count<<"条"<<std::endl;
    };
    std::thread thread_log_data(log_data_func);
    // 日志写入磁盘
    auto log_disk_func=[&log_buffer,&log_file,&log_path,&stop_flag,&log_mutex](){
        //计时开始
        std::chrono::high_resolution_clock::time_point start = std::chrono::high_resolution_clock::now();
        int count=1;
        while(true)
        {
            //判断退出循环
            if (count>10)
            {
                break;
            }
            if(!log_buffer.empty())
            {
                std::chrono::high_resolution_clock::time_point t1 = std::chrono::high_resolution_clock::now();
                //打开文件
                log_file.open(log_path,std::ios::app);
                //写日志
                std::unique_lock<std::mutex> lock(log_mutex);
                auto data=log_buffer.front();
                log_buffer.pop_front();
                lock.unlock();
                log_file<<data<<std::endl;
                //关闭文件
                log_file.close();
                std::chrono::high_resolution_clock::time_point t2 = std::chrono::high_resolution_clock::now();
                // std::cout<<"写日志耗时:"<<std::chrono::duration_cast<std::chrono::microseconds>(t2-t1).count()<<"微秒"<<std::endl;
                count++;
            }
        }
        //通知生产进程结束
        stop_flag = true;
        //计时结束
        std::chrono::high_resolution_clock::time_point end = std::chrono::high_resolution_clock::now();
        std::cout<<"io耗时:"<<std::chrono::duration_cast<std::chrono::microseconds>(end-start).count()<<"ms"<<std::endl;
    };
    std::thread thread_log_disk(log_disk_func);
    thread_log_data.join();
    thread_log_disk.join();

}

int main()
{
    // 同步阻塞问题
    sync_log();
    std::cout<<"----------------"<<std::endl;
    async_log();

    return 0;
}

运行结果:

同步耦合日志系统
生成日志和io相互阻塞
耗时:4978ms
----------------
异步解耦日志系统,不再相互阻塞
生成10条日志耗时:46ms
io耗时:3864ms
生成日志结束
总共生成数据量:600条

结果分析:

同步耦合日志系统,生成日志和io相互阻塞,记录10条日志到文件,共耗时4978ms
异步解耦日志系统,生成日志和io不再相互阻塞,记录10条日志到文件,共耗时3864ms
异步解耦合日志系统,当10条日志记录结束时,总共生成数据量都已经生成600条日志了

同样是记录10条日志,相互阻塞的方式耗时4978ms,非阻塞的方式耗时3864ms,非阻塞的方式耗时明显更短
同样是记录10条日志,相互阻塞的方式只生成了10条日志,而非阻塞的方式生成了600条,非阻塞的方式生成了更多的数据

结论

异步解耦合的方式以更短的时间生成了达到同样的记录效果,并且生成了更多的数据,说明异步解耦合的方式更高效

更多推荐