选取合适的 demo 是头等大事

* 以协程为目标,涉及到的新语法会简单说明,不涉及的不旁征博引,很多新语法都是有了某种需求才创建的,理解这种需求本身比硬学语法规则更为重要

* 若语法的原理非常简单,也会简单展开讲讲,有利于透过现象看本质,用起来更得心应手

上一篇文章里不光探讨了协程的本质,还说明了一系列 C++20 协程概念:

* 协程体

* 协程状态

* 承诺对象

* 返回对象

* 协程句柄

及它们之间的关系:

并简单说明了接入 C++20 协程时用户需要实现的类型、接口、及其含义。如果没有这些内容铺垫,看本文时会有很多地方将会难以理解,还没看过的小伙伴,墙裂建议先看那篇。

工具还是之前介绍过的 C++ Insights 和 Compile Explorer,也在上一篇中介绍过了,这里不再赘述。

协程调度器

话不多说,直接上 demo:

#include <coroutine>
#include <iostream>
#include <queue>
#include <functional>
#include <thread>

class SingleThreadScheduler {
public:
    void schedule(std::function<void()> task) {
        tasks.push(std::move(task));
    }

    void run() {
        while (!tasks.empty()) {
            auto task = tasks.front();
            tasks.pop();
            task();  
        }
    }

private:
    std::queue<std::function<void()>> tasks;
};

struct AsyncTask {
    struct promise_type {
        AsyncTask get_return_object() { 
            return AsyncTask(std::coroutine_handle<promise_type>::from_promise(*this)); 
        }
        std::suspend_never initial_suspend() { return {}; }
        std::suspend_always final_suspend() noexcept { return {}; }
        void return_void() {}
        void unhandled_exception() { std::terminate(); }
    };

    std::coroutine_handle<promise_type> handle;

    explicit AsyncTask(std::coroutine_handle<promise_type> h) : handle(h) {}
    ~AsyncTask() { if (handle) handle.destroy(); }
};

struct ScheduleAwaiter {
    SingleThreadScheduler* scheduler;

    bool await_ready() const { return false; }
    void await_suspend(std::coroutine_handle<> h) {
        scheduler->schedule([h] { h.resume(); });
    }
    void await_resume() {}
};

AsyncTask demo_coroutine(SingleThreadScheduler& scheduler, int id) {
    std::cout << "Task " << id << " started on thread: " 
              << std::this_thread::get_id() << std::endl;

    co_await ScheduleAwaiter{&scheduler}; 

    std::cout << "Task " << id << " resumed on thread: " 
              << std::this_thread::get_id() << std::endl;
  
    co_await ScheduleAwaiter{&scheduler}; 

    std::cout << "Task " << id << " finish on thread: " 
              << std::this_thread::get_id() << std::endl;
}

int main() {
    SingleThreadScheduler scheduler;

    auto task1 = demo_coroutine(scheduler, 1);
    auto task2 = demo_coroutine(scheduler, 2);
    auto task3 = demo_coroutine(scheduler, 3);

    std::cout << "init done" << std::endl;  
    scheduler.run();
}

这个例子演示了拥有三个协程任务的单线程协程调度器,有如下输出:

Task 1 started on thread: 128258074408768
Task 2 started on thread: 128258074408768
Task 3 started on thread: 128258074408768
init done
Task 1 resumed on thread: 128258074408768
Task 2 resumed on thread: 128258074408768
Task 3 resumed on thread: 128258074408768
Task 1 finish on thread: 128258074408768
Task 2 finish on thread: 128258074408768
Task 3 finish on thread: 128258074408768

用户只需要调用SingleThreadScheduler::run 方法,就可以源源不断的驱动注册在其上的协程运行了!

demo 比较长,下面分段看下。

#include <coroutine>
#include <iostream>
#include <queue>
#include <functional>
#include <thread>

调度器类型,schedule 方法注册协程,run 会阻塞当前线程、不停的运行其上的协程,协程 resume 方法被包裹在 std::function 中,放置在先进先出的队列里,保证执行的先后顺序

class SingleThreadScheduler {
public:
    void schedule(std::function<void()> task) {
        tasks.push(std::move(task));
    }

    void run() {
        while (!tasks.empty()) {
            auto task = tasks.front();
            tasks.pop();
            task();  
        }
    }

private:
    std::queue<std::function<void()>> tasks;
};

协程返回对象的定义,与之前大体一样,包含了承诺对象与协程句柄,承诺对象主要的变化是:1) initial_suspend 不再挂起协程; 2) 增加了 return_void 接口; 3) 减少了 yield_value 接口;

struct AsyncTask {
    struct promise_type {
        AsyncTask get_return_object() { 
            return AsyncTask(std::coroutine_handle<promise_type>::from_promise(*this)); 
        }
        std::suspend_never initial_suspend() { return {}; }
        std::suspend_always final_suspend() noexcept { return {}; }
        void return_void() {}
        void unhandled_exception() { std::terminate(); }
    };

    std::coroutine_handle<promise_type> handle;

    explicit AsyncTask(std::coroutine_handle<promise_type> h) : handle(h) {}
    ~AsyncTask() { if (handle) handle.destroy(); }
};

专用的等待对象,主要实现了 await_suspend 方法以便在协程挂起时、向调度器注册协程 resume 方法。增加这个等待对象一来可以挂起协程,二来方便获取协程句柄及其 resume 方法

struct ScheduleAwaiter {
    SingleThreadScheduler* scheduler;

    bool await_ready() const { return false; }
    void await_suspend(std::coroutine_handle<> h) {
        scheduler->schedule([h] { h.resume(); });
    }
    void await_resume() {}
};

协程体,接收调度器、返回返回对象,内部 co_await 等待两次异步事件,会产生两次中断,每次中断前将 resume 注册到调度器,以便之后唤醒时继续执行,直到协程结束

AsyncTask demo_coroutine(SingleThreadScheduler& scheduler, int id) {
    std::cout << "Task " << id << " started on thread: " 
              << std::this_thread::get_id() << std::endl;

    co_await ScheduleAwaiter{&scheduler}; 

    std::cout << "Task " << id << " resumed on thread: " 
              << std::this_thread::get_id() << std::endl;
  
    co_await ScheduleAwaiter{&scheduler}; 

    std::cout << "Task " << id << " finish on thread: " 
              << std::this_thread::get_id() << std::endl;
}

程序入口,初始化调度器与三个协程任务,最后 run 搞定一切

int main() {
    SingleThreadScheduler scheduler;

    auto task1 = demo_coroutine(scheduler, 1);
    auto task2 = demo_coroutine(scheduler, 2);
    auto task3 = demo_coroutine(scheduler, 3);

    std::cout << "init done" << std::endl; 
    scheduler.run();
}

这里完善一条规则:

* 若协程体中有明确的 co_yield,则承诺对象必需实现 yield_value 接口;

* 若协程体中有明确的 co_return xxx,则承诺对象必需实现 return_value 接口;

* 若协程体中有明确的 co_return 或没有任何 co_return,则承诺对象至少需要实现 return_void 接口。

相比之前的例子,没有显式的 co_yield 和 co_return,这里承诺对象只需要实现 return_void 即可,规范上说没实现的话可能导致未定义行为,实测 clang 去掉没引发崩溃,不过最好还是带上。

老规矩,下面有请 C++ Insights 上场,看看编译器底层做的工作与之前相比有何差异:

查看代码


内容比较多,只捡关键的看下:

struct __demo_coroutineFrame
{
  void (*resume_fn)(__demo_coroutineFrame *);
  void (*destroy_fn)(__demo_coroutineFrame *);
  std::__coroutine_traits_impl<AsyncTask>::promise_type __promise;
  int __suspend_index;
  bool __initial_await_suspend_called;
  SingleThreadScheduler & scheduler;
  int id;
  std::suspend_never __suspend_52_11;         // initial_suspend
  ScheduleAwaiter __suspend_56_14;            // 第一个 co_await
  ScheduleAwaiter __suspend_61_14;            // 第二个 co_await
  std::suspend_always __suspend_52_11_1;      // final_suspend
};

协程状态基本结构与之前一致,除了返回类型、参数、栈变量外,等待对象的数量与类型也发生了变更,看起来编译器根据返回值类型推导直接得到了成员类型 (std::suspend_neverSchedulerAwaitersuspend_always等)。

下面进入协程的 resume 方法看看,它是整个协程的核心:

/* This function invoked by coroutine_handle<>::resume() */
void __demo_coroutineResume(__demo_coroutineFrame * __f)
{
  try 
  {

熟悉的 duff device 上场


    /* Create a switch to get to the correct resume point */
    switch(__f->__suspend_index) {
      case 0: break;
      case 1: goto __resume_demo_coroutine_1;
      case 2: goto __resume_demo_coroutine_2;
      case 3: goto __resume_demo_coroutine_3;
      case 4: goto __resume_demo_coroutine_4;
    }

 promise_type::initial_suspend 返回 suspend_never 导致这里不挂起,协程直接略过这个条件继续运行,这也是 main 中 init done 输出位于 Task N start on thread 输出之后的原因,在构建并返回返回对象前就会向下执行到第一个 co_await

    /* co_await insights.cpp:52 */
    __f->__suspend_52_11 = __f->__promise.initial_suspend();
    if(!__f->__suspend_52_11.await_ready()) {
      __f->__suspend_52_11.await_suspend(std::coroutine_handle<AsyncTask::promise_type>::from_address(static_cast<void *>(__f)).operator std::coroutine_handle<void>());
      __f->__suspend_index = 1;
      __f->__initial_await_suspend_called = true;
      return;
    } 

    __resume_demo_coroutine_1:
    __f->__suspend_52_11.await_resume();
    std::operator<<(std::operator<<(std::operator<<(std::cout, "Task ").operator<<(__f->id), " started on thread: "), std::this_thread::get_id()).operator<<(std::endl);
    

 第一个 co_await,ScheduleAwaiter 会挂起协程,挂起前调用的 ScheduleAwaiter::await_suspend 将 resume 添加到调度器队列,以便下次唤醒

    /* co_await insights.cpp:56 */
    __f->__suspend_56_14 = ScheduleAwaiter{&__f->scheduler};
    if(!__f->__suspend_56_14.await_ready()) {
      __f->__suspend_56_14.await_suspend(std::coroutine_handle<AsyncTask::promise_type>::from_address(static_cast<void *>(__f)).operator std::coroutine_handle<void>());
      __f->__suspend_index = 2;
      return;
    } 

再次被调度器调度到时,根据状态值与 switch-case 直接跳转到这里执行。由于调度器内部使用先进先出队列,因此三个协程任务是严格按顺序执行的

    __resume_demo_coroutine_2:
    __f->__suspend_56_14.await_resume();
    std::operator<<(std::operator<<(std::operator<<(std::cout, "Task ").operator<<(__f->id), " resumed on thread: "), std::this_thread::get_id()).operator<<(std::endl);
    

 第二个 co_await,如法炮制

    /* co_await insights.cpp:61 */
    __f->__suspend_61_14 = ScheduleAwaiter{&__f->scheduler};
    if(!__f->__suspend_61_14.await_ready()) {
      __f->__suspend_61_14.await_suspend(std::coroutine_handle<AsyncTask::promise_type>::from_address(static_cast<void *>(__f)).operator std::coroutine_handle<void>());
      __f->__suspend_index = 3;
      return;
    } 
    
    __resume_demo_coroutine_3:
    __f->__suspend_61_14.await_resume();
    std::operator<<(std::operator<<(std::operator<<(std::cout, "Task ").operator<<(__f->id), " finish on thread: "), std::this_thread::get_id()).operator<<(std::endl);

协程退出前,没有 co_yield 或 co_return xxx 显示调用,则默认调用 co_return 无参版本,对应的就是 return_void 啦;如果有未捕获的异常,promise_type::unhandle_exception 将会被调用进而退出整个进程

    /* co_return insights.cpp:52 */
    __f->__promise.return_void()/* implicit */;
    goto __final_suspend;
  } catch(...) {
    if(!__f->__initial_await_suspend_called) {
      throw ;
    } 
    
    __f->__promise.unhandled_exception();
  }

协程继续运行,promise_type::final_suspend 返回 suspend_always 会导致协程挂起,配合返回对象的析构函数可以销毁协程

  __final_suspend:
  
  /* co_await insights.cpp:52 */
  __f->__suspend_52_11_1 = __f->__promise.final_suspend();
  if(!__f->__suspend_52_11_1.await_ready()) {
    __f->__suspend_52_11_1.await_suspend(std::coroutine_handle<AsyncTask::promise_type>::from_address(static_cast<void *>(__f)).operator std::coroutine_handle<void>());
    __f->__suspend_index = 4;
    return;
  } 

就不会走到这里协程体的自动销毁逻辑啰

  __resume_demo_coroutine_4:
  __f->destroy_fn(__f);
}

有上一篇文章的铺垫,看起来没什么尿点,下面来一张图总览下:

为了便于理解只画了一个协程任务的执行顺序,跟着箭头方向和标号就能梳理清楚啦。

final_suspend 与协程自清理

上面例子中,每个协程的返回对象需要保存在临时变量 task1/2/3 中,不然在调度器运行时会因协程状态销毁而崩溃:

int main() {
    SingleThreadScheduler scheduler;

    demo_coroutine(scheduler, 1);
    demo_coroutine(scheduler, 2);
    demo_coroutine(scheduler, 3);

更多推荐