并发编程(c++)——4.流水线模式
流水线的思想和池的思想对应,池将任务分解成并行的小任务,每个任务之间没有依赖,这样可以在同时处理多个任务。
如果任务间有依赖关系,则池的思想就无法处理,此时可以将任务分为多个阶段,阶段间前后依赖,每个阶段串行。
当有多个这样的任务运行时,每个阶段的处理都可以错位并行,这就是流水线的思想。
背景问题
在处理单独的cpu密集或者io密集任务时,此时任务容易分为大量并行小块,使用线程池处理高效快速。
但是如果任务由cpu密集和io密集混合组成,此时使用线程池处理效率不高,因为cpu密集任务和io密集任务会相互阻塞,导致资源浪费。
解决方案
将任务分为独立并行任务,将单一并行任务分为多个阶段,每个阶段串行,让每个串行任务重叠执行。
这样cpu密集任务和io密集任务不会相互阻塞,提高效率。
具体来说就是将cpu和io的密集任务分解为io+cpu的多个任务,再将io和cpu分解为对应阶段,运行状态如下。
运行状态
任务1 1 2 3 4 5
任务2 0 1 2 3 4 5
任务3 0 0 1 2 3 4 5
任务4 0 0 0 1 2 3 4 5
任务5 0 0 0 0 1 2 3 4 5
每个并行任务都是串行的,先进行阶段1,再进行阶段2,再进行阶段3,再进行阶段4,再进行阶段5。
但是多个并行任务在不同阶段时是并行的。
任务1进行阶段二时,任务2进行阶段一。同理,任务1进行阶段三时,任务2进行阶段二,任务3进行阶段一。
这样不同任务就是部分重叠的。这样可以充分利用cpu资源,提高效率。
这种运行方式,只有一个任务时就是串行,没有提升。
当有多个任务时,每个任务都是串行的,但是多个任务在不同阶段时是并行的,这样就可以充分利用cpu资源,提高效率。
算法
1.分阶段缓冲
将原本紧密耦合的串行任务拆分为独立的处理阶段,每个阶段拥有独立的输入/输出缓冲区
2.异步转发
前一个阶段的输出作为后一个阶段的输入,通过转发线程的方式连接相邻阶段
3.哨兵中止
添加哨兵机制,哨兵在流水线中流转,通知流水线每个阶段结束。
代码实现
#include <iostream>
#include <vector>
#include <functional>
#include <queue>
#include <thread>
#include <mutex>
#include <condition_variable>
#include <future>
#include <climits>
template<typename T>
class PipelineStage {
private:
std::function<T(T)> processor_;
std::queue<T> input_queue_;
std::queue<T> output_queue_;
std::mutex mtx_;
std::condition_variable cv_input;
std::condition_variable cv_output;
bool stop_ = false;
std::thread worker_;
public:
PipelineStage(std::function<T(T)> processor)
: processor_(processor) {
worker_ = std::thread(&PipelineStage::run, this);
}
~PipelineStage() {
stop();
if (worker_.joinable()) worker_.join();
}
void enqueue(T data) {
{
std::lock_guard<std::mutex> lock(mtx_);
input_queue_.push(data);
}
cv_input.notify_one();
}
bool dequeue(T& data) {
std::unique_lock<std::mutex> lock(mtx_);
cv_output.wait(lock, [this] {
return !output_queue_.empty() || stop_;
});
if (stop_ && output_queue_.empty()) {
return false;
}
data = output_queue_.front();
output_queue_.pop();
return true;
}
void stop() {
{
std::lock_guard<std::mutex> lock(mtx_);
stop_ = true;
}
cv_input.notify_all();
cv_output.notify_all();
}
private:
void run() {
while (true) {
T input;
{
std::unique_lock<std::mutex> lock(mtx_);
cv_input.wait(lock, [this] {
return !input_queue_.empty() || stop_;
});
if (stop_ && input_queue_.empty()) {
break;
}
if (!input_queue_.empty()) {
input = input_queue_.front();
input_queue_.pop();
}
}
// 哨兵检测:遇到 INT_MAX 就传递哨兵并退出
if (input == INT_MAX) {
{
std::lock_guard<std::mutex> lock(mtx_);
output_queue_.push(INT_MAX); // 传递哨兵
}
cv_output.notify_one();
break;
}
// 处理数据
T output = processor_(input);
{
std::lock_guard<std::mutex> lock(mtx_);
output_queue_.push(output);
}
cv_output.notify_one();
}
}
};
template<typename T>
class Pipeline {
private:
std::vector<PipelineStage<T>*> stages_;
public:
~Pipeline() {
for (auto stage : stages_) {
delete stage;
}
}
void add_stage(std::function<T(T)> processor) {
stages_.push_back(new PipelineStage<T>(processor));
}
void process(const std::vector<T>& inputs) {
// 连接各个阶段
for (size_t i = 0; i < stages_.size() - 1; ++i) {
auto producer = stages_[i];
auto consumer = stages_[i + 1];
// 启动转发线程
std::thread([producer, consumer]() {
T data;
while (producer->dequeue(data)) {
consumer->enqueue(data);
}
}).detach();
}
// 向第一个阶段输入数据
for (const auto& input : inputs)
{
stages_[0]->enqueue(input);
}
// 发送哨兵(INT_MAX 表示结束)
stages_[0]->enqueue(INT_MAX);
// 收集最后一个阶段的输出
std::vector<T> results;
T result;
while (stages_.back()->dequeue(result)) {
if (result == INT_MAX) break; // 遇到哨兵就停止收集
results.push_back(result);
}
// 输出结果
for (const auto& r : results) {
std::cout << "最终结果: " << r << std::endl;
}
}
};
测试
#include "thread_flow_line_improve.hpp"
#include <iostream>
#include <chrono>
// 使用示例
int main() {
Pipeline<int> pipeline;
// 添加处理阶段
pipeline.add_stage([](int x) {
std::cout << "阶段1: " << x << " → " << x * 2 << std::endl;
std::this_thread::sleep_for(std::chrono::milliseconds(100));
return x * 2;
});
pipeline.add_stage([](int x) {
std::cout << "阶段2: " << x << " → " << x + 10 << std::endl;
std::this_thread::sleep_for(std::chrono::milliseconds(100));
return x + 10;
});
pipeline.add_stage([](int x) {
std::cout << "阶段3: " << x << " → " << x * x << std::endl;
std::this_thread::sleep_for(std::chrono::milliseconds(100));
return x * x;
});
std::vector<int> inputs = {1, 2, 3, 4, 5, 6, 7, 8, 9, 10};
// std::vector<int> inputs = {1};
// 记录处理时间
auto start = std::chrono::high_resolution_clock::now();
pipeline.process(inputs);
auto end = std::chrono::high_resolution_clock::now();
std::chrono::duration<double> elapsed = (end - start)*1000;
std::cout << "流水线处理时间: " << elapsed.count() << " 毫秒" << std::endl;
// 非流水线处理
auto func1 = [](int x) {
std::cout << "阶段1: " << x << " → " << x * 2 << std::endl;
std::this_thread::sleep_for(std::chrono::milliseconds(100));
return x * 2;
};
auto func2 = [](int x) {
std::cout << "阶段2: " << x << " → " << x + 10 << std::endl;
std::this_thread::sleep_for(std::chrono::milliseconds(100));
return x + 10;
};
auto func3 = [](int x) {
std::cout << "阶段3: " << x << " → " << x * x << std::endl;
std::this_thread::sleep_for(std::chrono::milliseconds(100));
return x * x;
};
std::vector<int> inputs2 = {1, 2, 3, 4, 5, 6, 7, 8, 9, 10};
start = std::chrono::high_resolution_clock::now();
for (int x : inputs2) {
x = func1(x);
x = func2(x);
x = func3(x);
}
end = std::chrono::high_resolution_clock::now();
elapsed = (end - start)*1000;
std::cout << "非流水线处理时间: " << elapsed.count() << " 毫秒" << std::endl;
return 0;
}
运行结果
阶段1: 1 → 2
阶段1: 2阶段2: 2 → 12
→ 4
阶段1: 3 → 阶段2: 4 → 14
阶段3: 12 → 6
144
阶段1: 阶段2: 4 → 8
6阶段3: 14 → → 16
196
阶段1: 5 → 10
阶段2: 阶段3: 16 → 8 → 256
18
阶段3: 18 → 阶段1: 6 → 12
阶段2: 10 → 20
324
阶段2: 12 → 22
阶段1: 7 → 14
阶段3: 20 → 400
阶段1: 8 → 16
阶段2: 14 → 阶段3: 2422 → 484
阶段3: 24 → 阶段1: 9 → 18
阶段2: 16 → 26
576
阶段3: 阶段2: 18 → 28
阶段1: 10 → 20
26 → 676
阶段3: 28 → 784
阶段2: 20 → 30
阶段3: 30 → 900
最终结果: 144
最终结果: 196
最终结果: 256
最终结果: 324
最终结果: 400
最终结果: 484
最终结果: 576
最终结果: 676
最终结果: 784
最终结果: 900
流水线处理时间: 1331.53 毫秒
阶段1: 1 → 2
阶段2: 2 → 12
阶段3: 12 → 144
阶段1: 2 → 4
阶段2: 4 → 14
阶段3: 14 → 196
阶段1: 3 → 6
阶段2: 6 → 16
阶段3: 16 → 256
阶段1: 4 → 8
阶段2: 8 → 18
阶段3: 18 → 324
阶段1: 5 → 10
阶段2: 10 → 20
阶段3: 20 → 400
阶段1: 6 → 12
阶段2: 12 → 22
阶段3: 22 → 484
阶段1: 7 → 14
阶段2: 14 → 24
阶段3: 24 → 576
阶段1: 8 → 16
阶段2: 16 → 26
阶段3: 26 → 676
阶段1: 9 → 18
阶段2: 18 → 28
阶段3: 28 → 784
阶段1: 10 → 20
阶段2: 20 → 30
阶段3: 30 → 900
非流水线处理时间: 3258.74 毫秒
结论
流水线处理因为每个阶段可以错位并行处理数据,从而提高了整体的处理效率。
在上述示例中,流水线处理的时间明显小于非流水线处理的时间。
流水线和线程池思想相互补充,是并发编程的核心。
更多推荐

所有评论(0)