阻塞队列(block_queue)

什么是阻塞队列:

  • 在多线程编程时当有多个线程访问一个队列时如果队列为空,则获取队列中元素的方法就会阻塞,直到队列中有元素可以获取

解决的问题:

  • 队列的线程安全问题
  • 队列多线程取元素时的轮询判断问题

什么情况下需要阻塞队列:

  • 在c++标准库STL中常用的队列容器都不是线程安全的,我们在多线程编程时总会有类似于生产者消费者这种模型,在这种模型下我们访问队列就需要手动加锁,如果队列不是阻塞式的,我们每次从队列中获取元素的时候都需要对队列进行判断,如果队列为空,就continue,这种做法一般都需要配合一个定时器来实现,定时器的时长很难去设定,如果时间过短会一直去轮询,如果时间过长又会导致队列中消息有积压。下面我们通过一段代码来说明。
#include <thread>
#include <vector>
#include <Windows.h>
#include <iostream>
#include <queue>

std::queue<int> testQueue;
std::mutex mtx;

void Producer()
{
	for (size_t i = 0; i < 100; ++i)
	{
		Sleep(100);
		//push之前先进行加锁
		mtx.lock();
		testQueue.push(i);
		mtx.unlock();
	}
}

void Customer(size_t id)
{
	while (1)
	{
		mtx.lock();
		//需要每次都进行判断,如果队列为空就continue,每次队列的读写都得加锁
		if (testQueue.size() == 0)
		{
			Sleep(1);
			mtx.unlock();
			continue;
		}
		auto value = testQueue.front();
		testQueue.pop();
		mtx.unlock();
		std::cout << "thread id: " << id << " value:" << value << std::endl;
	}	
}

int main()
{
	std::vector<std::thread> vTh;
	for (size_t i = 0; i < 10; ++i)
	{
		std::thread t = std::thread(Customer,i);
		vTh.push_back(std::move(t));
	}

	Producer();
	for (auto& it : vTh)
	{
		std::cout << it.get_id() << std::endl;
		if (it.joinable())
		{
			it.join();
		}
	}
	return 0;
}

阻塞队列的实现:

BlockQueue.h

#include<mutex>
#include<list>
#include<condition_variable>
using namespace std;

template <typename T>
class BlockQueue
{
public:
	BlockQueue(){}

	~BlockQueue(){}

	void Put(const T& x)
	{
		unique_lock<mutex> guard(m_mutex);
		m_queue.push_back(x);
		m_cond.notify_all();
	}

	void Put(T&&x)
	{
		unique_lock<mutex> guard(m_mutex);
		m_queue.push_back(move(x));
		m_cond.notify_all();
	}

	T Take()
	{
		unique_lock<mutex> guard(m_mutex);
		while (m_queue.size() == 0)
			m_cond.wait(guard);
		T front(move(m_queue.front()));
		m_queue.pop_front();
		return move(front);
	}

	size_t Size()
	{
		unique_lock<mutex> guard(m_mutex);
		return m_queue.size();
	}

private:
	mutable mutex m_mutex;
	condition_variable m_cond;
	list<T>     m_queue;
};

main.cpp

#include "BlockQueue.h"
#include <thread>
#include <vector>
#include <Windows.h>
#include <iostream>

BlockQueue<int> testQueue;
std::mutex mtx;

void Produce()
{
	for (size_t i = 0; i < 100; ++i)
	{
		Sleep(100);
		testQueue.Put(i);
	}
}

void Custorm(size_t id)
{
	while (1)
	{
		auto value = testQueue.Take();
		std::cout << "thread id: " << id << " value:" << value << std::endl;
	}	
}

int main()
{
	std::vector<std::thread> vTh;
	for (size_t i = 0; i < 10; ++i)
	{
		std::thread t = std::thread(Custorm,i);
		vTh.push_back(std::move(t));
	}

	Produce();
	for (auto& it : vTh)
	{
		std::cout << it.get_id() << std::endl;
		if (it.joinable())
		{
			it.join();
		}
	}
	return 0;
}

说明:

  • 通过对比上面得消费者和生产者模型,我们可以发现,生产者和消费者在使用的时候不需要关心队列的线程安全问题,其次消费者也省去了每次进行判断轮询的操作。
Logo

权威|前沿|技术|干货|国内首个API全生命周期开发者社区

更多推荐