线程池

线程池是并发编程核心的模式之一,覆盖了大多数需要并发的场景,这篇文章记录了对线程池的理解,使用简单的语言对线程池的概念进行阐述,主要包括线程池的使用场景,面临的问题,线程池的核心思想和如何使用c++代码实现,最后用一个io密集的案例进行使用示范。

目录

概念

目的:在保证系统的稳定性下,尽量提高系统的性能。
思想:预先创建若干线程,当有任务时,直接从池中取出线程执行任务,任务完成后线程归还给池。
方法:线程池通过限制线程数量 + 复用线程 + 任务队列缓冲 + future 异步回执。

场景

存在大量并发任务的场景
1.io密集型任务,如大量读写文件网络和数据库请求
2.cpu密集任务,如科学计算、大数据处理

存在问题

1.系统对线程数量存在上限,无限制创建可能导致系统不稳定甚至崩溃
2.线程创建过多会消耗大量内存资源,存在 OOM 风险
3.线程分散创建,生命周期、异常与资源回收难以统一管理
4.频繁创建与销毁线程,系统开销大
5.线程数量过多,上下文切换成本高,降低整体吞吐

解决方案

池化思想,预先创建若干线程,当有任务时,直接从池中取出线程执行任务,任务完成后线程归还给池。
池统一管理线程,创建和销毁,线程复用,线程数量可控,上下文切换开销小。
1.线程池限制了线程数量,防止系统崩溃
2.线程池限制了线程数量,防止内存溢出
3.线程池统一创建销毁,管理简单
4.线程复用,减少创建/销毁成本
5.控制线程数量,降低上下文切换

线程池实现

核心流程:
1.生成线程池,创建若干工作线程,使其进入阻塞等待任务状态
2.提交任务,将任务封装后放入任务队列,通过条件变量唤醒线程执行,并返回任务回执(future)
3.等待结果,调用方通过 future 等待任务执行完成并获取结果

实现要素:
1.线程队列,存放工作线程,线程死循环,默认阻塞状态,无任务时阻塞,有任务时唤醒
2.任务队列,存放任务
3.条件变量,通信任务和线程(实现线程与任务间的同步),存放任务时唤醒线程
4.future/promise, 接收线程执行结果(异步获取执行结果),future等待线程执行结果

线程池类

class ThreadPool {
    private:
        // 线程队列
        std::vector<std::thread> workers;
        // 任务队列
        std::queue<std::function<void()>> tasks;
        // 资源锁,保护任务队列
        std::mutex mtx;
        // 条件变量
        std::condition_variable cv;
        // 管理线程池是否停止
        bool stop;
    
    public:
        // 构造函数,生成线程池
        ThreadPool(size_t threads);
        // 析构函数,销毁线程池
        ~ThreadPool();
        // 提交任务
        template<class F>
        auto enqueue(F&&f)->std::future<decltype(f())>;
};

生成线程池

输入线程数量size_t threads
管理线程池停止变量stop初始化为false
循环生成threads个线程,线程函数为lambda表达式,lambda表达式内为线程函数逻辑
线程函数内,定义任务task,用于接收任务队列中的任务
线程函数内,进行死循环
线程函数内,任务队列tasks为空时,线程阻塞,等待条件变量cv,任务队列tasks非空时,线程唤醒,从任务队列tasks中取出一个任务,执行任务
线程函数内,任务队列tasks为空且stop为true时,线程退出循环,线程结束

ThreadPool::ThreadPool(size_t threads):stop(false) {
    for(size_t i=0;i<threads;++i) {
        workers.emplace_back([this]
        {
            // 任务
            std::function<void()> task;
            while(true) 
            {
                {
                    std::unique_lock<std::mutex> lock(mtx);
                    // 当任务队列为空时,等待
                    cv.wait(lock,[this]{return stop || !tasks.empty();});
                    // 如果线程池已经停止,则退出循环
                    if(stop && tasks.empty()) {
                        return;
                    }
                    // 从任务队列中取出一个任务
                    task = std::move(tasks.front());
                    tasks.pop();
                }
                // 执行任务
                task();
            }

        });
    }
    
}

提交任务

输入任务f
使用完美转发,将任务f封装为std::packaged_task
使用future获取任务f回执
将封装的任务f放入任务队列tasks中
通知条件变量cv,唤醒线程执行任务
返回任务回执

template<class F>
auto ThreadPool::enqueue(F&&f)->std::future<decltype(f())> {
    // 函数类型
    using return_type = decltype(f());
    auto task = std::make_shared<std::packaged_task<return_type()>>(std::forward<F>(f));
    // 获取任务的结果
    std::future<return_type> res = task->get_future();
    {
        std::unique_lock<std::mutex> lock(mtx);
        if(stop) {
            throw std::runtime_error("enqueue on stopped ThreadPool");
        }
        // 将任务放入队列中
        tasks.emplace([task](){(*task)();});
    }
    cv.notify_one();
    return res;
}

销毁线程池

销毁时触发
将stop设置为true
通知所有线程,线程池已经停止
等待所有线程结束

ThreadPool::~ThreadPool() {
    {
        std::unique_lock<std::mutex> lock(mtx);
        stop = true;
    }
    cv.notify_all();
    for(std::thread& worker:workers) {
        worker.join();
    }
}

案例

磁盘io密集型任务,如大量读写文件
存在大量文件,文件中存储了大量数据,需要读取文件中的数据,进行计算处理
每一个任务为读取一个文件,处理文件中的数据
生成100个文件,每个文件中存储100个1000,计算所有文件中数字的和,结果为10000000

生成数据

生成100个文件,每个文件中随机存储100个数据

// 创建txt文件
void create_file() {
    std::vector<std::string> files;
    std::string filename = "data/file";
    // 生成100个文件
    for (int i = 1; i <= 100; ++i) {
        files.push_back(filename + std::to_string(i) + ".txt");
    }
    for (const auto& file : files) {
        std::ofstream f(file);
        //写入100行随机4位数
        for (int i = 0; i < 100; ++i) {
            f << rand() % 10000 << std::endl;
        }
        f.close();
    }
}

数据处理任务

计算文件中所有数字的和

// 计算文件数据和
int count_files(const std::string& filename) {
    std::ifstream file(filename);
    if (!file) {
        throw std::runtime_error("cannot open " + filename);
    }
    std::string line;
    // 计算每行数字和
    size_t sum = 0;
    while (std::getline(file, line)) {
        // 将字符串转换为数字
        size_t num = std::stoul(line);
        sum += num;
    }
    return sum;
}

线程池计调用

1.生成文件
2.根据cpu核心数,生成线程池
3.遍历文件列表,每个任务读取一个文件,提交到线程池,获取回执
4.遍历回执,获取每个文件的数据和
5.求和

int main()
{
    // 创建文件
    create_file();
    // 初始化线程池
    size_t cpu_cores = std::thread::hardware_concurrency();
    int thread_counts = cpu_cores * 2;
    std::cout << "cpu cores: " << cpu_cores << std::endl;
    ThreadPool pool(thread_counts);

    // 文件列表
    std::vector<std::string> files;
    std::string filename = "data/file";
    for (int i = 1; i <= 100; ++i) {
        files.push_back(filename + std::to_string(i) + ".txt");
    }

    // 提交任务到线程池
    std::vector<std::future<int>> futures;
    for (const auto& file : files) 
    {
        futures.emplace_back
        (
            pool.enqueue(
                [file]{return count_files(file);}
            )
        );
    }

    // 获取结果
    std::vector<int> results;
    for (auto& f : futures) {
        try {
            results.push_back(f.get());
        } catch (const std::exception& e) {
            std::cerr << e.what() << std::endl;
        }
    }

    // 求和
    int data_sum = 0;
    for (const auto& result : results)
    {
        std::cout << filename << ": \n";
        std::cout << "sum: " << result << std::endl;
        data_sum += result;
    }
    std::cout << "data sum: " << data_sum << std::endl;
    return 0;
}

结果

sum: 100000
data/file: 
sum: 100000
data/file: 
sum: 100000
data/file: 
sum: 100000
data/file: 
sum: 100000
data/file: 
sum: 100000
data/file: 
sum: 100000
data sum: 10000000

结果和预期一致,为1000×100×100=10000000

更多推荐