流水线的思想和池的思想对应,池将任务分解成并行的小任务,每个任务之间没有依赖,这样可以在同时处理多个任务。
如果任务间有依赖关系,则池的思想就无法处理,此时可以将任务分为多个阶段,阶段间前后依赖,每个阶段串行。
当有多个这样的任务运行时,每个阶段的处理都可以错位并行,这就是流水线的思想。

背景问题

在处理单独的cpu密集或者io密集任务时,此时任务容易分为大量并行小块,使用线程池处理高效快速。
但是如果任务由cpu密集和io密集混合组成,此时使用线程池处理效率不高,因为cpu密集任务和io密集任务会相互阻塞,导致资源浪费。

解决方案

将任务分为独立并行任务,将单一并行任务分为多个阶段,每个阶段串行,让每个串行任务重叠执行。
这样cpu密集任务和io密集任务不会相互阻塞,提高效率。
具体来说就是将cpu和io的密集任务分解为io+cpu的多个任务,再将io和cpu分解为对应阶段,运行状态如下。

运行状态

任务1 1 2 3 4 5
任务2 0 1 2 3 4 5
任务3 0 0 1 2 3 4 5
任务4 0 0 0 1 2 3 4 5
任务5 0 0 0 0 1 2 3 4 5

每个并行任务都是串行的,先进行阶段1,再进行阶段2,再进行阶段3,再进行阶段4,再进行阶段5。
但是多个并行任务在不同阶段时是并行的。
任务1进行阶段二时,任务2进行阶段一。同理,任务1进行阶段三时,任务2进行阶段二,任务3进行阶段一。
这样不同任务就是部分重叠的。这样可以充分利用cpu资源,提高效率。

这种运行方式,只有一个任务时就是串行,没有提升。
当有多个任务时,每个任务都是串行的,但是多个任务在不同阶段时是并行的,这样就可以充分利用cpu资源,提高效率。

算法

1.分阶段缓冲
将原本紧密耦合的串行任务拆分为独立的处理阶段,每个阶段拥有独立的输入/输出缓冲区
2.异步转发
前一个阶段的输出作为后一个阶段的输入,通过转发线程的方式连接相邻阶段
3.哨兵中止
添加哨兵机制,哨兵在流水线中流转,通知流水线每个阶段结束。

代码实现

#include <iostream>
#include <vector>
#include <functional>
#include <queue>
#include <thread>
#include <mutex>
#include <condition_variable>
#include <future>
#include <climits>  

template<typename T>
class PipelineStage {
private:
    std::function<T(T)> processor_;
    std::queue<T> input_queue_;
    std::queue<T> output_queue_;
    std::mutex mtx_;
    std::condition_variable cv_input;
    std::condition_variable cv_output;
    bool stop_ = false;
    std::thread worker_;
    
public:
    PipelineStage(std::function<T(T)> processor) 
        : processor_(processor) {
        worker_ = std::thread(&PipelineStage::run, this);
    }
    
    ~PipelineStage() {
        stop();
        if (worker_.joinable()) worker_.join();
    }
    
    void enqueue(T data) {
        {
            std::lock_guard<std::mutex> lock(mtx_);
            input_queue_.push(data);
        }
        cv_input.notify_one();
    }
    
    bool dequeue(T& data) {
        std::unique_lock<std::mutex> lock(mtx_);
        cv_output.wait(lock, [this] { 
            return !output_queue_.empty() || stop_; 
        });
        
        if (stop_ && output_queue_.empty()) {
            return false;
        }
        
        data = output_queue_.front();
        output_queue_.pop();
        return true;
    }
    
    void stop() {
        {
            std::lock_guard<std::mutex> lock(mtx_);
            stop_ = true;
        }
        cv_input.notify_all();
        cv_output.notify_all();
    }
    
private:
    void run() {
        while (true) {
            T input;
            {
                std::unique_lock<std::mutex> lock(mtx_);
                cv_input.wait(lock, [this] { 
                    return !input_queue_.empty() || stop_; 
                });
                
                if (stop_ && input_queue_.empty()) {
                    break;
                }
                
                if (!input_queue_.empty()) {
                    input = input_queue_.front();
                    input_queue_.pop();
                }
            }

            // 哨兵检测:遇到 INT_MAX 就传递哨兵并退出
            if (input == INT_MAX) {
                {
                    std::lock_guard<std::mutex> lock(mtx_);
                    output_queue_.push(INT_MAX);  // 传递哨兵
                }
                cv_output.notify_one();
                break;
            }
            
            // 处理数据
            T output = processor_(input);
            
            {
                std::lock_guard<std::mutex> lock(mtx_);
                output_queue_.push(output);
            }
            cv_output.notify_one();
        }
    }
};

template<typename T>
class Pipeline {
private:
    std::vector<PipelineStage<T>*> stages_;
    
public:
    ~Pipeline() {
        for (auto stage : stages_) {
            delete stage;
        }
    }
    
    void add_stage(std::function<T(T)> processor) {
        stages_.push_back(new PipelineStage<T>(processor));
    }
    
    void process(const std::vector<T>& inputs) {
        // 连接各个阶段
        for (size_t i = 0; i < stages_.size() - 1; ++i) {
            auto producer = stages_[i];
            auto consumer = stages_[i + 1];
            
            // 启动转发线程
            std::thread([producer, consumer]() {
                T data;
                while (producer->dequeue(data)) {
                    consumer->enqueue(data);
                }
            }).detach();
        }
        
        // 向第一个阶段输入数据
        for (const auto& input : inputs) 
        {
            stages_[0]->enqueue(input);
        }

        // 发送哨兵(INT_MAX 表示结束)
        stages_[0]->enqueue(INT_MAX);
        
        // 收集最后一个阶段的输出
        std::vector<T> results;
        T result;
        while (stages_.back()->dequeue(result)) {
            if (result == INT_MAX) break;  // 遇到哨兵就停止收集
            results.push_back(result);
        }

        // 输出结果
        for (const auto& r : results) {
            std::cout << "最终结果: " << r << std::endl;
        }

    }
};

测试

#include "thread_flow_line_improve.hpp"

#include <iostream>
#include <chrono>

// 使用示例
int main() {
    Pipeline<int> pipeline;
    
    // 添加处理阶段
    pipeline.add_stage([](int x) { 
        std::cout << "阶段1: " << x << " → " << x * 2 << std::endl;
        std::this_thread::sleep_for(std::chrono::milliseconds(100));
        return x * 2; 
    });
    
    pipeline.add_stage([](int x) { 
        std::cout << "阶段2: " << x << " → " << x + 10 << std::endl;
        std::this_thread::sleep_for(std::chrono::milliseconds(100));
        return x + 10; 
    });
    
    pipeline.add_stage([](int x) { 
        std::cout << "阶段3: " << x << " → " << x * x << std::endl;
        std::this_thread::sleep_for(std::chrono::milliseconds(100));
        return x * x; 
    });
    
    std::vector<int> inputs = {1, 2, 3, 4, 5, 6, 7, 8, 9, 10};
    // std::vector<int> inputs = {1};
    // 记录处理时间
    auto start = std::chrono::high_resolution_clock::now();
    pipeline.process(inputs);
    auto end = std::chrono::high_resolution_clock::now();
    std::chrono::duration<double> elapsed = (end - start)*1000;
    std::cout << "流水线处理时间: " << elapsed.count() << " 毫秒" << std::endl;

    // 非流水线处理
    auto func1 = [](int x) {
        std::cout << "阶段1: " << x << " → " << x * 2 << std::endl;
        std::this_thread::sleep_for(std::chrono::milliseconds(100));
        return x * 2;
    };
    auto func2 = [](int x) {
        std::cout << "阶段2: " << x << " → " << x + 10 << std::endl;
        std::this_thread::sleep_for(std::chrono::milliseconds(100));
        return x + 10;
    };
    auto func3 = [](int x) {
        std::cout << "阶段3: " << x << " → " << x * x << std::endl;
        std::this_thread::sleep_for(std::chrono::milliseconds(100));
        return x * x;
    };
    std::vector<int> inputs2 = {1, 2, 3, 4, 5, 6, 7, 8, 9, 10};
    start = std::chrono::high_resolution_clock::now();
    for (int x : inputs2) {
        x = func1(x);
        x = func2(x);
        x = func3(x);
    }
    end = std::chrono::high_resolution_clock::now();
    elapsed = (end - start)*1000;
    std::cout << "非流水线处理时间: " << elapsed.count() << " 毫秒" << std::endl;

    
    return 0;
}

运行结果

阶段1: 1 → 2
阶段1: 2阶段2: 2 → 12
 → 4
阶段1: 3 → 阶段2: 4 → 14
阶段3: 12 → 6
144
阶段1: 阶段2: 4 → 8
6阶段3: 14 →  → 16
196
阶段1: 5 → 10
阶段2: 阶段3: 16 → 8 → 256
18
阶段3: 18 → 阶段1: 6 → 12
阶段2: 10 → 20
324
阶段2: 12 → 22
阶段1: 7 → 14
阶段3: 20 → 400
阶段1: 8 → 16
阶段2: 14 → 阶段3: 2422 → 484

阶段3: 24 → 阶段1: 9 → 18
阶段2: 16 → 26
576
阶段3: 阶段2: 18 → 28
阶段1: 10 → 20
26 → 676
阶段3: 28 → 784
阶段2: 20 → 30
阶段3: 30 → 900
最终结果: 144
最终结果: 196
最终结果: 256
最终结果: 324
最终结果: 400
最终结果: 484
最终结果: 576
最终结果: 676
最终结果: 784
最终结果: 900
流水线处理时间: 1331.53 毫秒
阶段1: 1 → 2
阶段2: 2 → 12
阶段3: 12 → 144
阶段1: 2 → 4
阶段2: 4 → 14
阶段3: 14 → 196
阶段1: 3 → 6
阶段2: 6 → 16
阶段3: 16 → 256
阶段1: 4 → 8
阶段2: 8 → 18
阶段3: 18 → 324
阶段1: 5 → 10
阶段2: 10 → 20
阶段3: 20 → 400
阶段1: 6 → 12
阶段2: 12 → 22
阶段3: 22 → 484
阶段1: 7 → 14
阶段2: 14 → 24
阶段3: 24 → 576
阶段1: 8 → 16
阶段2: 16 → 26
阶段3: 26 → 676
阶段1: 9 → 18
阶段2: 18 → 28
阶段3: 28 → 784
阶段1: 10 → 20
阶段2: 20 → 30
阶段3: 30 → 900
非流水线处理时间: 3258.74 毫秒

结论

流水线处理因为每个阶段可以错位并行处理数据,从而提高了整体的处理效率。
在上述示例中,流水线处理的时间明显小于非流水线处理的时间。
流水线和线程池思想相互补充,是并发编程的核心。

更多推荐