前言

并行计算库充分利用多核的优势,通过并行运算提高程序效率,业界有两个知名的c++并行库,一个是intel开发的TBB,一个是微软开发的PPL

TBB(Intel® Threading Building Blocks )

TBBintel用标准c++写的一个开源的并行计算库。它的目的是提升数据并行计算的能力,可以在其官网下载最新的库和文档。TBB主要功能:并行算法、任务调度、并行容器、同步原语、内存分配器。

PPL(Parallel Patterns Library)

PPL是微软开发的并行计算库,它的功能和TBB是差不多的,主要是在windows上使用。二者在并行算法的使用上基本上是一样的, 但还是有些差异:TBBtask没有PPLtask强大,PPLtask可以链式连续执行还可以组合任务,TBBtask则不行。

PPL C++ 库与 C# 并行库TaskParallelLibrary的设计理念、基本框架以及接口使用上非常类似,熟悉C#并行库的朋友上手C++版的PPL非常容易。下面我将介绍微软跨平台PPL的一个简易实现pplx,该库是附在微软的开源项目 cpprestsdk 中的。

pplx 并行库

C++ REST SDK 是 Microsoft 的一个开源跨平台项目, 其使用大量现代异步 C++ API 实现了一个基于 HTTP / HTTPS 协议的 B/S 组件,使用该组件,可以方便地进行高性能RESTfulHTTP / HTTPS 服务器、客户端开发,且可以在WindowsLinuxOSXiOSAndroid各平台下使用。

当然今天我要介绍的主角是该项目中的并行库PPLX。下面先介绍如何编译安装cpprestsdk,然后介绍如何使用并行库PPLX。以下都是在 Ubuntu 系统上进行。

编译安装

有两种方式可以安装cpprestsdk,一种是直接用 apt-get 安装,另一种是从源码安装。

通过 apt-get 安装
sudo apt-get install libcpprest-dev
从 source 编译安装

ref: How to build for Linux

1, 系统要求: Ubuntu 16.04 及之后的版本

2, 安装必要的工具:boost库,ninja 用于编译,

sudo apt-get install g++ git libwebsocketpp-dev openssl libssl-dev ninja-build

sudo apt-get install libboost-atomic-dev libboost-thread-dev libboost-system-dev libboost-date-time-dev libboost-regex-dev libboost-filesystem-dev libboost-random-dev libboost-chrono-dev libboost-serialization-dev 

3, 下载代码

git clone https://github.com/Microsoft/cpprestsdk.git casablanca

4, 编译:

cd casablanca
mkdir build.release
cd build.release
cmake -G Ninja .. -DCMAKE_BUILD_TYPE=Release
ninja

如果想编译成 debug 版本,把上面代码中的 release/Release 修改为 debug/Debug 即可。

5, 编译完成之后,跑一下 test_runner 测试验证一下:

cd Release/Binaries
./test_runner *_test.so

或者运行 bing 搜索示例:

cd Release/Binaries
./BingRequest kesalin kesalin.html

6, 安装:

sudo ninja install
sudo ldconfig

7, 编译单个文件的参数:

g++ -std=c++11 my_file.cpp -o my_file -lboost_system -lcrypto -lssl -lcpprest
./my_file

使用 pplx 并行库

创建并运行任务

可以通过多种途径创建任务:

//构造函数
auto task = pplx::task<int>([](){
    return 10;
});
 
//lambda
auto task = []()->pplx::task<int>{
    return pplx::task_from_result(10);
};
 
//create_task
auto task = pplx::create_task([](){
    return 10;
});
 
//create_task 创建延迟任务
pplx::task_completion_event<int> tce;// task_completion_event 需按值传递
auto task = pplx::create_task(tce);

也可以创建任务链:

pplx::task<std::string> create_print_task(const std::string& init_value)
{
    return pplx::create_task([init_value]() {
        std::cout <<"Current value:" << init_value << std::endl;
        return std::string("Value 2");
    })

    .then([](std::string value) {
        std::cout << "Current value:" << value << std::endl;
        return std::string("Value 3");
    })

    .then([](std::string value) {
        std::cout << "Current value:" << value << std::endl;
        return std::string("Value 4");
    });
}

使用task.get()或者task.wait()执行任务:

  • 阻塞方式get(): 阻塞直到任务执行完成,并返回任务结果,当任务取消时,抛出task_canceled异常,发生其它异常也会被抛出;
  • 非阻塞方式wait():等待任务到达终止状态,然后返回任务状态:completedcanceled,如果发生异常会被抛出。
void test_task_chain()
{
    auto task_chain = create_print_task("Value 1");
    task_chain.then([](std::string value) {
        std::cout << "Result value: " << value << std::endl;
        return value;
    })

    // process exception
    .then([](pplx::task<std::string> previousTask) {
        try {
            previousTask.get();
        }
        catch (const std::exception& e) {
            std::cout << "exception: " << e.what() << std::endl;
        }
    })

    .wait();
}
组任务

可以创建和执行一组任务,根据需要来选择是全部执行再返回,还是执行任一任务就返回。

  • when_all:返回组任务,只有当所有任务都完成时组任务才会返回成功;如果任一任务被取消或者抛出异常,则组任务会完成并处理取消状态,在组任务get()或者wait()时抛出异常。如果任务类型为task<T>,则组任务类型为task<vector<T>>
  • when_any:返回组任务,当任一任务完成时组任务就会返回成功;如果所有任务都被取消或者抛出异常,则组任务会完成并处理取消状态,并且如果任何任务发生异常,在组任务get或者wait时抛出异常。如果任务类型为task<T>,则组任务类型为task<T, size_t>size_t 返回完成任务的索引。
void test_group_tasks()
{
    auto sleep_print = [](int seconds, const std::string& info) {
        if (seconds > 0) {
            sleep(seconds);
        }

        std::cout << info << std::endl;
    };

    auto/*std::array<pplx::task<int>, 3>*/ tasks = {
        pplx::create_task([sleep_print]() -> int { sleep_print(2, "Task 1"); return 1; }),
        pplx::create_task([sleep_print]() -> int { sleep_print(2, "Task 2"); return 2; }),
        pplx::create_task([sleep_print]() -> int { sleep_print(4, "Task 3"); return 3; })
    };

    {
        std::cout << "=== when_all ===" << std::endl;

        auto joinTask = pplx::when_all(std::begin(tasks), std::end(tasks));
        auto result = joinTask.wait();
        std::cout << "All joined thread. result: " << result << std::endl;
    }

    {
        std::cout << "=== when_any ===" << std::endl;

        auto joinTask = pplx::when_any(std::begin(tasks), std::end(tasks))
        .then([](std::pair<int, size_t> result) {
            std::cout << "First task to finish returns "
                  << result.first
                  << " and has index "
                  << result.second << std::endl;
        });

        auto result = joinTask.wait();
        std::cout << "Any joined thread. result: " << result << std::endl;
    }
}
取消任务

cancellation_token_source 通过封装一个 cancellation_token 指针来提供取消操作,通过cancellation_token.is_canceled()在执行任务的过程中判断任务是否要被取消。

示例中的任务会循环执行,直到显式取消任务:

void test_cancellation()
{
    pplx::cancellation_token_source cts;
    std::cout << "Creating task..." << std::endl;

    auto task = pplx::create_task([cts]{
        bool moreToDo = true;
        while (moreToDo) {
           if (cts.get_token().is_canceled()) {
               return;
           }
           else {
               moreToDo = []()->bool {
                   std::cout << "Performing work at " << now() << std::endl;
                   sleep(1);
                   return true;
               }();
           }
        }
    });

    sleep(3);

    std::cout << "Canceling task... " << now() << std::endl;
    cts.cancel();

    std::cout << "Waiting for task to complete... " << now() << std::endl;
    task.wait();

    std::cout << "Done. " << now() << std::endl;
}

当要在异步任务链中支持取消时,需要将cancellation_token作为构造task的参数传递,然后结合task.wait()判断是否要取消:

void test_cancellation_async()
{
    pplx::cancellation_token_source cts;
    auto task = pplx::task<void>([cts]() {
        std::cout << "Cancel continue_task" << std::endl;
        cts.cancel();
    })

    .then([]() {
        std::cout << "This will not run" << std::endl;
    }, cts.get_token());

    try {
        if (task.wait() == pplx::task_status::canceled) {
            std::cout<<"Taks has been canceled"<<std::endl;
        }
        else {
            task.get();
        }
    }
    catch (const std::exception& e) {
        std::cout << "exception: " << e.what() << std::endl;
    }
}
处理异常

之前说过如果任务发生异常,会在get或者wait时抛出,但是如果希望在异步任务链中判定之前执行是否发生异常做出操作时,可以采用另外的方式。
当使用task.then时一般是这样写的:

task<T>.then([](T t){
     //处理任务结果t
})

这时候进入then时之前的任务已经执行完成了,task.then有另外一种写法,能够在then时并没有执行任务:

task<T>.then([](task<T> task){
       try 
       {
              task.get(); //使用get或者wait执行任务
       }
       catch(...)
       {
           //处理异常
       }
})

示例:

void test_task_exception()
{
    auto task_chain = create_print_task("Value 1");
    task_chain.then([](std::string value) {
        // uncomment this line to throw an exception.
        throw std::runtime_error("An exception happened!");
        std::cout << "Result value: " << value << std::endl;
        return value;
    })

    // process exception
    .then([](pplx::task<std::string> previousTask) {
        try {
            previousTask.get();
        }
        catch (const std::exception& e) {
            std::cout << "exception: " << e.what() << std::endl;
        }
    })

    .wait();
}

本文完整代码

// g++ -std=c++11 pplxdemo.cpp -o pplxdemo -lboost_system -lcrypto -lssl -lcpprest

#include <pplx/pplxtasks.h>
#include <iostream>
#include <sstream>
#include <vector>
#include <functional>
#include <iomanip>
#include <ctime>
#include <thread>
#include <chrono>
#include <stdexcept>

std::string now()
{
    auto t = std::time(nullptr);
    auto tm = *std::localtime(&t);

    std::ostringstream oss;
    oss << std::put_time(&tm, "%Y-%m-%d %H-%M-%S");
    auto str = oss.str();
    return str;
}

void sleep(int seconds)
{
    std::this_thread::sleep_for(std::chrono::seconds(seconds));
}

pplx::task<std::string> create_print_task(const std::string& init_value)
{
    return pplx::create_task([init_value]() {
        std::cout <<"Current value:" << init_value << std::endl;
        return std::string("Value 2");
    })

    .then([](std::string value) {
        std::cout << "Current value:" << value << std::endl;
        return std::string("Value 3");
    })

    .then([](std::string value) {
        std::cout << "Current value:" << value << std::endl;
        return std::string("Value 4");
    });
}

void test_task_chain()
{
    auto task_chain = create_print_task("Value 1");
    task_chain.then([](std::string value) {
        // uncomment this line to throw an exception.
        // throw std::runtime_error("An exception happened!");
        std::cout << "Result value: " << value << std::endl;
        return value;
    })

    // process exception
    .then([](pplx::task<std::string> previousTask) {
        try {
            previousTask.get();
        }
        catch (const std::exception& e) {
            std::cout << "exception: " << e.what() << std::endl;
        }
    })

    .wait();
}

void test_group_tasks()
{
    auto sleep_print = [](int seconds, const std::string& info) {
        if (seconds > 0) {
            sleep(seconds);
        }

        std::cout << info << std::endl;
    };

    auto/*std::array<pplx::task<int>, 3>*/ tasks = {
        pplx::create_task([sleep_print]() -> int { sleep_print(2, "Task 1"); return 1; }),
        pplx::create_task([sleep_print]() -> int { sleep_print(2, "Task 2"); return 2; }),
        pplx::create_task([sleep_print]() -> int { sleep_print(4, "Task 3"); return 3; })
    };

    {
        std::cout << "=== when_all ===" << std::endl;

        auto joinTask = pplx::when_all(std::begin(tasks), std::end(tasks));
        auto result = joinTask.wait();
        std::cout << "All joined thread. result: " << result << std::endl;
    }

    {
        std::cout << "=== when_any ===" << std::endl;

        auto joinTask = pplx::when_any(std::begin(tasks), std::end(tasks))
        .then([](std::pair<int, size_t> result) {
            std::cout << "First task to finish returns "
                  << result.first
                  << " and has index "
                  << result.second << std::endl;
        });

        auto result = joinTask.wait();
        std::cout << "Any joined thread. result: " << result << std::endl;
    }
}

void test_cancellation()
{
    pplx::cancellation_token_source cts;
    std::cout << "Creating task..." << std::endl;

    auto task = pplx::create_task([cts]{
        bool moreToDo = true;
        while (moreToDo) {
           if (cts.get_token().is_canceled()) {
               return;
           }
           else {
               moreToDo = []()->bool {
                   std::cout << "Performing work at " << now() << std::endl;
                   sleep(1);
                   return true;
               }();
           }
        }
    });

    sleep(3);

    std::cout << "Canceling task... " << now() << std::endl;
    cts.cancel();

    std::cout << "Waiting for task to complete... " << now() << std::endl;
    task.wait();

    std::cout << "Done. " << now() << std::endl;
}

void test_cancellation_async()
{
    pplx::cancellation_token_source cts;
    auto task = pplx::task<void>([cts]() {
        std::cout << "Cancel continue_task" << std::endl;
        cts.cancel();
    })

    .then([]() {
        std::cout << "This will not run" << std::endl;
    }, cts.get_token());

    try {
        if (task.wait() == pplx::task_status::canceled) {
            std::cout<<"Taks has been canceled"<<std::endl;
        }
        else {
            task.get();
        }
    }
    catch (const std::exception& e) {
        std::cout << "exception: " << e.what() << std::endl;
    }
}

int main(int argc, char* args[])
{
    std::cout << "==== pplx demo ====" << std::endl;
    test_task_chain();
    test_group_tasks();
    test_cancellation();
    test_cancellation_async();
    return 0;
}
Logo

瓜分20万奖金 获得内推名额 丰厚实物奖励 易参与易上手

更多推荐