并发编程(c++)——2.线程池模式
线程池
线程池是并发编程核心的模式之一,覆盖了大多数需要并发的场景,这篇文章记录了对线程池的理解,使用简单的语言对线程池的概念进行阐述,主要包括线程池的使用场景,面临的问题,线程池的核心思想和如何使用c++代码实现,最后用一个io密集的案例进行使用示范。
目录
概念
目的:在保证系统的稳定性下,尽量提高系统的性能。
思想:预先创建若干线程,当有任务时,直接从池中取出线程执行任务,任务完成后线程归还给池。
方法:线程池通过限制线程数量 + 复用线程 + 任务队列缓冲 + future 异步回执。
场景
存在大量并发任务的场景
1.io密集型任务,如大量读写文件网络和数据库请求
2.cpu密集任务,如科学计算、大数据处理
存在问题
1.系统对线程数量存在上限,无限制创建可能导致系统不稳定甚至崩溃
2.线程创建过多会消耗大量内存资源,存在 OOM 风险
3.线程分散创建,生命周期、异常与资源回收难以统一管理
4.频繁创建与销毁线程,系统开销大
5.线程数量过多,上下文切换成本高,降低整体吞吐
解决方案
池化思想,预先创建若干线程,当有任务时,直接从池中取出线程执行任务,任务完成后线程归还给池。
池统一管理线程,创建和销毁,线程复用,线程数量可控,上下文切换开销小。
1.线程池限制了线程数量,防止系统崩溃
2.线程池限制了线程数量,防止内存溢出
3.线程池统一创建销毁,管理简单
4.线程复用,减少创建/销毁成本
5.控制线程数量,降低上下文切换
线程池实现
核心流程:
1.生成线程池,创建若干工作线程,使其进入阻塞等待任务状态
2.提交任务,将任务封装后放入任务队列,通过条件变量唤醒线程执行,并返回任务回执(future)
3.等待结果,调用方通过 future 等待任务执行完成并获取结果
实现要素:
1.线程队列,存放工作线程,线程死循环,默认阻塞状态,无任务时阻塞,有任务时唤醒
2.任务队列,存放任务
3.条件变量,通信任务和线程(实现线程与任务间的同步),存放任务时唤醒线程
4.future/promise, 接收线程执行结果(异步获取执行结果),future等待线程执行结果
线程池类
class ThreadPool {
private:
// 线程队列
std::vector<std::thread> workers;
// 任务队列
std::queue<std::function<void()>> tasks;
// 资源锁,保护任务队列
std::mutex mtx;
// 条件变量
std::condition_variable cv;
// 管理线程池是否停止
bool stop;
public:
// 构造函数,生成线程池
ThreadPool(size_t threads);
// 析构函数,销毁线程池
~ThreadPool();
// 提交任务
template<class F>
auto enqueue(F&&f)->std::future<decltype(f())>;
};
生成线程池
输入线程数量size_t threads
管理线程池停止变量stop初始化为false
循环生成threads个线程,线程函数为lambda表达式,lambda表达式内为线程函数逻辑
线程函数内,定义任务task,用于接收任务队列中的任务
线程函数内,进行死循环
线程函数内,任务队列tasks为空时,线程阻塞,等待条件变量cv,任务队列tasks非空时,线程唤醒,从任务队列tasks中取出一个任务,执行任务
线程函数内,任务队列tasks为空且stop为true时,线程退出循环,线程结束
ThreadPool::ThreadPool(size_t threads):stop(false) {
for(size_t i=0;i<threads;++i) {
workers.emplace_back([this]
{
// 任务
std::function<void()> task;
while(true)
{
{
std::unique_lock<std::mutex> lock(mtx);
// 当任务队列为空时,等待
cv.wait(lock,[this]{return stop || !tasks.empty();});
// 如果线程池已经停止,则退出循环
if(stop && tasks.empty()) {
return;
}
// 从任务队列中取出一个任务
task = std::move(tasks.front());
tasks.pop();
}
// 执行任务
task();
}
});
}
}
提交任务
输入任务f
使用完美转发,将任务f封装为std::packaged_task
使用future获取任务f回执
将封装的任务f放入任务队列tasks中
通知条件变量cv,唤醒线程执行任务
返回任务回执
template<class F>
auto ThreadPool::enqueue(F&&f)->std::future<decltype(f())> {
// 函数类型
using return_type = decltype(f());
auto task = std::make_shared<std::packaged_task<return_type()>>(std::forward<F>(f));
// 获取任务的结果
std::future<return_type> res = task->get_future();
{
std::unique_lock<std::mutex> lock(mtx);
if(stop) {
throw std::runtime_error("enqueue on stopped ThreadPool");
}
// 将任务放入队列中
tasks.emplace([task](){(*task)();});
}
cv.notify_one();
return res;
}
销毁线程池
销毁时触发
将stop设置为true
通知所有线程,线程池已经停止
等待所有线程结束
ThreadPool::~ThreadPool() {
{
std::unique_lock<std::mutex> lock(mtx);
stop = true;
}
cv.notify_all();
for(std::thread& worker:workers) {
worker.join();
}
}
案例
磁盘io密集型任务,如大量读写文件
存在大量文件,文件中存储了大量数据,需要读取文件中的数据,进行计算处理
每一个任务为读取一个文件,处理文件中的数据
生成100个文件,每个文件中存储100个1000,计算所有文件中数字的和,结果为10000000
生成数据
生成100个文件,每个文件中随机存储100个数据
// 创建txt文件
void create_file() {
std::vector<std::string> files;
std::string filename = "data/file";
// 生成100个文件
for (int i = 1; i <= 100; ++i) {
files.push_back(filename + std::to_string(i) + ".txt");
}
for (const auto& file : files) {
std::ofstream f(file);
//写入100行随机4位数
for (int i = 0; i < 100; ++i) {
f << rand() % 10000 << std::endl;
}
f.close();
}
}
数据处理任务
计算文件中所有数字的和
// 计算文件数据和
int count_files(const std::string& filename) {
std::ifstream file(filename);
if (!file) {
throw std::runtime_error("cannot open " + filename);
}
std::string line;
// 计算每行数字和
size_t sum = 0;
while (std::getline(file, line)) {
// 将字符串转换为数字
size_t num = std::stoul(line);
sum += num;
}
return sum;
}
线程池计调用
1.生成文件
2.根据cpu核心数,生成线程池
3.遍历文件列表,每个任务读取一个文件,提交到线程池,获取回执
4.遍历回执,获取每个文件的数据和
5.求和
int main()
{
// 创建文件
create_file();
// 初始化线程池
size_t cpu_cores = std::thread::hardware_concurrency();
int thread_counts = cpu_cores * 2;
std::cout << "cpu cores: " << cpu_cores << std::endl;
ThreadPool pool(thread_counts);
// 文件列表
std::vector<std::string> files;
std::string filename = "data/file";
for (int i = 1; i <= 100; ++i) {
files.push_back(filename + std::to_string(i) + ".txt");
}
// 提交任务到线程池
std::vector<std::future<int>> futures;
for (const auto& file : files)
{
futures.emplace_back
(
pool.enqueue(
[file]{return count_files(file);}
)
);
}
// 获取结果
std::vector<int> results;
for (auto& f : futures) {
try {
results.push_back(f.get());
} catch (const std::exception& e) {
std::cerr << e.what() << std::endl;
}
}
// 求和
int data_sum = 0;
for (const auto& result : results)
{
std::cout << filename << ": \n";
std::cout << "sum: " << result << std::endl;
data_sum += result;
}
std::cout << "data sum: " << data_sum << std::endl;
return 0;
}
结果
sum: 100000
data/file:
sum: 100000
data/file:
sum: 100000
data/file:
sum: 100000
data/file:
sum: 100000
data/file:
sum: 100000
data/file:
sum: 100000
data sum: 10000000
结果和预期一致,为1000×100×100=10000000
更多推荐

所有评论(0)