我们的《五分钟带你了解C++系列》终于迎来了尾声!在这里用一个”富含“C++特性的线程池作为综合案例来为本系列画上句号。如果你没有了解过C++,或者对线程池不熟悉,可以先参考纯C实现线程池

一、线程池设计

线程池内部包括:

  • 任务队列(阻塞队列)
  • 工作队列(一组固定数量的工作线程)

并且提供对外接口:Post()用于推送任务,这里的任务(task)是可调用对象,具体而言,可以是本系列六讲过的用packaged_task包装好的函数

阻塞队列BlockingQueue,阻塞队列封装了公共方法Push()Pop()用于操作队列元素、Destroy()用于销毁。操作时依赖std::mutex锁来管理。

线程池结构如下图所示:

在这里插入图片描述
流程
任务(task)通过调用Post()进入线程池,而后通过Post()进入任务队列task_queue,检测到队列不为空时,唤醒工作线程从队列中Pop()任务并执行它。

二、阻塞队列实现

阻塞队列的底层数据结构设计为std::queue,我们需要为阻塞队列设计Pop、Push方法用于出入队列,用mutex管理竞争风险,一个Cancel方法用于删除队列,并且私有成员中包含nonblock_变量来表示目前队列是否阻塞(有任务则不阻塞,无任务则阻塞),有任务时用condition_variable来唤醒工作线程

template <typename T> //可变参数模板
class BlockingQueue {
public: 

    BlockingQueue(bool nonblock = false) : nonblock_(nonblock) {
        
    }
    void Push(const T &value) {
        std::lock_guard<std::mutex> lock(mutex_);
        queue_.push(value);
        not_empty_.notify_one();
    }

    //正常 pop 弹出元素
    //异常 pop 没有返回元素
    //为空的时候阻塞队列
    bool Pop(T &value) {
        //可手动unlock
        std::unique_lock<std::mutex> lock(mutex_);

        // 1. mutex_.unlock()
        // 2. 条件queue_.empty() && !nonblock_ 线程在wait中阻塞
        // notify_one notify_all唤醒线程
        // 3. 假设满足条件 mutex_.lock()
        // 4. 不满足条件又回到2
        not_empty_.wait(lock, [this]{ return !queue_.empty() || nonblock_;});
        if (queue_.empty()) return false;

        value = queue_.front();
        queue_.pop();
        return true;
    }

    //解除阻塞
    void Cancel() {
        std::lock_guard<std::mutex> lock(mutex_);
        nonblock_ = true;
        not_empty_.notify_all();
    }

private:
    bool nonblock_;
    std::queue<T> queue_;
    std::mutex mutex_;
    std::condition_variable not_empty_; 

};

三、线程池实现

线程池本体则包含两个数据结构:任务队列task_queue_和工作线程workers_,任务队列用blockingqueue来实现,工作线程用thread vector容器,一个公共方法Post用于外部推送任务到线程池。

任务队列中的元素类型设计为function<void>无返回值的可适配各种可调用对象,工作线程的thread则在创建时绑定Worker()线程执行函数——不断取出任务并执行

class ThreadPool {
public:
    //线程池初始化
    explicit ThreadPool(int threads_num) {
        for (size_t i = 0; i < threads_num; ++i) {
            workers_.emplace_back([this]{Workers();});
        }
    }

    ~ThreadPool() {
        //析构函数
        task_queue_.Cancel();
        for (auto &worker: workers_) {
            if (worker.joinable()) {
                worker.join();
            }
        }
    }
    void Post(std::function<void()> task) {//异步
        task_queue_.Push(task);
    }

private:
    void Workers() {
        while (true) {
            std::function<void()> task;
            if (!task_queue_.Pop(task)) {
                break;
            }
            task();
        }
    }
    //std的queue不是线程安全的,以下代码不好
    //std::queue<std::function<void()>> queue_;    
    //用阻塞队列
    BlockingQueue<std::function<void()>> task_queue_;
    //线程集合
    std::vector<std::thread> workers_;
};

更多推荐