【C++】五分钟带你了解C++系列(七):综合案例——线程池
·
我们的《五分钟带你了解C++系列》终于迎来了尾声!在这里用一个”富含“C++特性的线程池作为综合案例来为本系列画上句号。如果你没有了解过C++,或者对线程池不熟悉,可以先参考纯C实现线程池。
一、线程池设计
线程池内部包括:
- 任务队列(阻塞队列)
- 工作队列(一组固定数量的工作线程)
并且提供对外接口:Post()用于推送任务,这里的任务(task)是可调用对象,具体而言,可以是本系列六讲过的用packaged_task包装好的函数
阻塞队列BlockingQueue,阻塞队列封装了公共方法Push()和Pop()用于操作队列元素、Destroy()用于销毁。操作时依赖std::mutex锁来管理。
线程池结构如下图所示:

流程:
任务(task)通过调用Post()进入线程池,而后通过Post()进入任务队列task_queue,检测到队列不为空时,唤醒工作线程从队列中Pop()任务并执行它。
二、阻塞队列实现
阻塞队列的底层数据结构设计为std::queue,我们需要为阻塞队列设计Pop、Push方法用于出入队列,用mutex管理竞争风险,一个Cancel方法用于删除队列,并且私有成员中包含nonblock_变量来表示目前队列是否阻塞(有任务则不阻塞,无任务则阻塞),有任务时用condition_variable来唤醒工作线程
template <typename T> //可变参数模板
class BlockingQueue {
public:
BlockingQueue(bool nonblock = false) : nonblock_(nonblock) {
}
void Push(const T &value) {
std::lock_guard<std::mutex> lock(mutex_);
queue_.push(value);
not_empty_.notify_one();
}
//正常 pop 弹出元素
//异常 pop 没有返回元素
//为空的时候阻塞队列
bool Pop(T &value) {
//可手动unlock
std::unique_lock<std::mutex> lock(mutex_);
// 1. mutex_.unlock()
// 2. 条件queue_.empty() && !nonblock_ 线程在wait中阻塞
// notify_one notify_all唤醒线程
// 3. 假设满足条件 mutex_.lock()
// 4. 不满足条件又回到2
not_empty_.wait(lock, [this]{ return !queue_.empty() || nonblock_;});
if (queue_.empty()) return false;
value = queue_.front();
queue_.pop();
return true;
}
//解除阻塞
void Cancel() {
std::lock_guard<std::mutex> lock(mutex_);
nonblock_ = true;
not_empty_.notify_all();
}
private:
bool nonblock_;
std::queue<T> queue_;
std::mutex mutex_;
std::condition_variable not_empty_;
};
三、线程池实现
线程池本体则包含两个数据结构:任务队列task_queue_和工作线程workers_,任务队列用blockingqueue来实现,工作线程用thread vector容器,一个公共方法Post用于外部推送任务到线程池。
任务队列中的元素类型设计为function<void>无返回值的可适配各种可调用对象,工作线程的thread则在创建时绑定Worker()线程执行函数——不断取出任务并执行
class ThreadPool {
public:
//线程池初始化
explicit ThreadPool(int threads_num) {
for (size_t i = 0; i < threads_num; ++i) {
workers_.emplace_back([this]{Workers();});
}
}
~ThreadPool() {
//析构函数
task_queue_.Cancel();
for (auto &worker: workers_) {
if (worker.joinable()) {
worker.join();
}
}
}
void Post(std::function<void()> task) {//异步
task_queue_.Push(task);
}
private:
void Workers() {
while (true) {
std::function<void()> task;
if (!task_queue_.Pop(task)) {
break;
}
task();
}
}
//std的queue不是线程安全的,以下代码不好
//std::queue<std::function<void()>> queue_;
//用阻塞队列
BlockingQueue<std::function<void()>> task_queue_;
//线程集合
std::vector<std::thread> workers_;
};
更多推荐
所有评论(0)