没有调度器的协程不是好协程——零基础深入浅出 C++20 协程
选取合适的 demo 是头等大事
* 以协程为目标,涉及到的新语法会简单说明,不涉及的不旁征博引,很多新语法都是有了某种需求才创建的,理解这种需求本身比硬学语法规则更为重要
* 若语法的原理非常简单,也会简单展开讲讲,有利于透过现象看本质,用起来更得心应手
上一篇文章里不光探讨了协程的本质,还说明了一系列 C++20 协程概念:
* 协程体
* 协程状态
* 承诺对象
* 返回对象
* 协程句柄
及它们之间的关系:

并简单说明了接入 C++20 协程时用户需要实现的类型、接口、及其含义。如果没有这些内容铺垫,看本文时会有很多地方将会难以理解,还没看过的小伙伴,墙裂建议先看那篇。
工具还是之前介绍过的 C++ Insights 和 Compile Explorer,也在上一篇中介绍过了,这里不再赘述。
协程调度器
话不多说,直接上 demo:
#include <coroutine>
#include <iostream>
#include <queue>
#include <functional>
#include <thread>
class SingleThreadScheduler {
public:
void schedule(std::function<void()> task) {
tasks.push(std::move(task));
}
void run() {
while (!tasks.empty()) {
auto task = tasks.front();
tasks.pop();
task();
}
}
private:
std::queue<std::function<void()>> tasks;
};
struct AsyncTask {
struct promise_type {
AsyncTask get_return_object() {
return AsyncTask(std::coroutine_handle<promise_type>::from_promise(*this));
}
std::suspend_never initial_suspend() { return {}; }
std::suspend_always final_suspend() noexcept { return {}; }
void return_void() {}
void unhandled_exception() { std::terminate(); }
};
std::coroutine_handle<promise_type> handle;
explicit AsyncTask(std::coroutine_handle<promise_type> h) : handle(h) {}
~AsyncTask() { if (handle) handle.destroy(); }
};
struct ScheduleAwaiter {
SingleThreadScheduler* scheduler;
bool await_ready() const { return false; }
void await_suspend(std::coroutine_handle<> h) {
scheduler->schedule([h] { h.resume(); });
}
void await_resume() {}
};
AsyncTask demo_coroutine(SingleThreadScheduler& scheduler, int id) {
std::cout << "Task " << id << " started on thread: "
<< std::this_thread::get_id() << std::endl;
co_await ScheduleAwaiter{&scheduler};
std::cout << "Task " << id << " resumed on thread: "
<< std::this_thread::get_id() << std::endl;
co_await ScheduleAwaiter{&scheduler};
std::cout << "Task " << id << " finish on thread: "
<< std::this_thread::get_id() << std::endl;
}
int main() {
SingleThreadScheduler scheduler;
auto task1 = demo_coroutine(scheduler, 1);
auto task2 = demo_coroutine(scheduler, 2);
auto task3 = demo_coroutine(scheduler, 3);
std::cout << "init done" << std::endl;
scheduler.run();
}
这个例子演示了拥有三个协程任务的单线程协程调度器,有如下输出:
Task 1 started on thread: 128258074408768
Task 2 started on thread: 128258074408768
Task 3 started on thread: 128258074408768
init done
Task 1 resumed on thread: 128258074408768
Task 2 resumed on thread: 128258074408768
Task 3 resumed on thread: 128258074408768
Task 1 finish on thread: 128258074408768
Task 2 finish on thread: 128258074408768
Task 3 finish on thread: 128258074408768
用户只需要调用SingleThreadScheduler::run 方法,就可以源源不断的驱动注册在其上的协程运行了!
demo 比较长,下面分段看下。
#include <coroutine>
#include <iostream>
#include <queue>
#include <functional>
#include <thread>
调度器类型,schedule 方法注册协程,run 会阻塞当前线程、不停的运行其上的协程,协程 resume 方法被包裹在 std::function 中,放置在先进先出的队列里,保证执行的先后顺序
class SingleThreadScheduler {
public:
void schedule(std::function<void()> task) {
tasks.push(std::move(task));
}
void run() {
while (!tasks.empty()) {
auto task = tasks.front();
tasks.pop();
task();
}
}
private:
std::queue<std::function<void()>> tasks;
};
协程返回对象的定义,与之前大体一样,包含了承诺对象与协程句柄,承诺对象主要的变化是:1) initial_suspend 不再挂起协程; 2) 增加了 return_void 接口; 3) 减少了 yield_value 接口;
struct AsyncTask {
struct promise_type {
AsyncTask get_return_object() {
return AsyncTask(std::coroutine_handle<promise_type>::from_promise(*this));
}
std::suspend_never initial_suspend() { return {}; }
std::suspend_always final_suspend() noexcept { return {}; }
void return_void() {}
void unhandled_exception() { std::terminate(); }
};
std::coroutine_handle<promise_type> handle;
explicit AsyncTask(std::coroutine_handle<promise_type> h) : handle(h) {}
~AsyncTask() { if (handle) handle.destroy(); }
};
专用的等待对象,主要实现了 await_suspend 方法以便在协程挂起时、向调度器注册协程 resume 方法。增加这个等待对象一来可以挂起协程,二来方便获取协程句柄及其 resume 方法
struct ScheduleAwaiter {
SingleThreadScheduler* scheduler;
bool await_ready() const { return false; }
void await_suspend(std::coroutine_handle<> h) {
scheduler->schedule([h] { h.resume(); });
}
void await_resume() {}
};
协程体,接收调度器、返回返回对象,内部 co_await 等待两次异步事件,会产生两次中断,每次中断前将 resume 注册到调度器,以便之后唤醒时继续执行,直到协程结束
AsyncTask demo_coroutine(SingleThreadScheduler& scheduler, int id) {
std::cout << "Task " << id << " started on thread: "
<< std::this_thread::get_id() << std::endl;
co_await ScheduleAwaiter{&scheduler};
std::cout << "Task " << id << " resumed on thread: "
<< std::this_thread::get_id() << std::endl;
co_await ScheduleAwaiter{&scheduler};
std::cout << "Task " << id << " finish on thread: "
<< std::this_thread::get_id() << std::endl;
}
程序入口,初始化调度器与三个协程任务,最后 run 搞定一切
int main() {
SingleThreadScheduler scheduler;
auto task1 = demo_coroutine(scheduler, 1);
auto task2 = demo_coroutine(scheduler, 2);
auto task3 = demo_coroutine(scheduler, 3);
std::cout << "init done" << std::endl;
scheduler.run();
}
这里完善一条规则:
* 若协程体中有明确的 co_yield,则承诺对象必需实现 yield_value 接口;
* 若协程体中有明确的 co_return xxx,则承诺对象必需实现 return_value 接口;
* 若协程体中有明确的 co_return 或没有任何 co_return,则承诺对象至少需要实现 return_void 接口。
相比之前的例子,没有显式的 co_yield 和 co_return,这里承诺对象只需要实现 return_void 即可,规范上说没实现的话可能导致未定义行为,实测 clang 去掉没引发崩溃,不过最好还是带上。
老规矩,下面有请 C++ Insights 上场,看看编译器底层做的工作与之前相比有何差异:
查看代码
内容比较多,只捡关键的看下:
struct __demo_coroutineFrame
{
void (*resume_fn)(__demo_coroutineFrame *);
void (*destroy_fn)(__demo_coroutineFrame *);
std::__coroutine_traits_impl<AsyncTask>::promise_type __promise;
int __suspend_index;
bool __initial_await_suspend_called;
SingleThreadScheduler & scheduler;
int id;
std::suspend_never __suspend_52_11; // initial_suspend
ScheduleAwaiter __suspend_56_14; // 第一个 co_await
ScheduleAwaiter __suspend_61_14; // 第二个 co_await
std::suspend_always __suspend_52_11_1; // final_suspend
};
协程状态基本结构与之前一致,除了返回类型、参数、栈变量外,等待对象的数量与类型也发生了变更,看起来编译器根据返回值类型推导直接得到了成员类型 (std::suspend_never、SchedulerAwaiter、suspend_always等)。
下面进入协程的 resume 方法看看,它是整个协程的核心:
/* This function invoked by coroutine_handle<>::resume() */
void __demo_coroutineResume(__demo_coroutineFrame * __f)
{
try
{
熟悉的 duff device 上场
/* Create a switch to get to the correct resume point */
switch(__f->__suspend_index) {
case 0: break;
case 1: goto __resume_demo_coroutine_1;
case 2: goto __resume_demo_coroutine_2;
case 3: goto __resume_demo_coroutine_3;
case 4: goto __resume_demo_coroutine_4;
}
promise_type::initial_suspend 返回 suspend_never 导致这里不挂起,协程直接略过这个条件继续运行,这也是 main 中 init done 输出位于 Task N start on thread 输出之后的原因,在构建并返回返回对象前就会向下执行到第一个 co_await
/* co_await insights.cpp:52 */
__f->__suspend_52_11 = __f->__promise.initial_suspend();
if(!__f->__suspend_52_11.await_ready()) {
__f->__suspend_52_11.await_suspend(std::coroutine_handle<AsyncTask::promise_type>::from_address(static_cast<void *>(__f)).operator std::coroutine_handle<void>());
__f->__suspend_index = 1;
__f->__initial_await_suspend_called = true;
return;
}
__resume_demo_coroutine_1:
__f->__suspend_52_11.await_resume();
std::operator<<(std::operator<<(std::operator<<(std::cout, "Task ").operator<<(__f->id), " started on thread: "), std::this_thread::get_id()).operator<<(std::endl);
第一个 co_await,ScheduleAwaiter 会挂起协程,挂起前调用的 ScheduleAwaiter::await_suspend 将 resume 添加到调度器队列,以便下次唤醒
/* co_await insights.cpp:56 */
__f->__suspend_56_14 = ScheduleAwaiter{&__f->scheduler};
if(!__f->__suspend_56_14.await_ready()) {
__f->__suspend_56_14.await_suspend(std::coroutine_handle<AsyncTask::promise_type>::from_address(static_cast<void *>(__f)).operator std::coroutine_handle<void>());
__f->__suspend_index = 2;
return;
}
再次被调度器调度到时,根据状态值与 switch-case 直接跳转到这里执行。由于调度器内部使用先进先出队列,因此三个协程任务是严格按顺序执行的
__resume_demo_coroutine_2:
__f->__suspend_56_14.await_resume();
std::operator<<(std::operator<<(std::operator<<(std::cout, "Task ").operator<<(__f->id), " resumed on thread: "), std::this_thread::get_id()).operator<<(std::endl);
第二个 co_await,如法炮制
/* co_await insights.cpp:61 */
__f->__suspend_61_14 = ScheduleAwaiter{&__f->scheduler};
if(!__f->__suspend_61_14.await_ready()) {
__f->__suspend_61_14.await_suspend(std::coroutine_handle<AsyncTask::promise_type>::from_address(static_cast<void *>(__f)).operator std::coroutine_handle<void>());
__f->__suspend_index = 3;
return;
}
__resume_demo_coroutine_3:
__f->__suspend_61_14.await_resume();
std::operator<<(std::operator<<(std::operator<<(std::cout, "Task ").operator<<(__f->id), " finish on thread: "), std::this_thread::get_id()).operator<<(std::endl);
协程退出前,没有 co_yield 或 co_return xxx 显示调用,则默认调用 co_return 无参版本,对应的就是 return_void 啦;如果有未捕获的异常,promise_type::unhandle_exception 将会被调用进而退出整个进程
/* co_return insights.cpp:52 */
__f->__promise.return_void()/* implicit */;
goto __final_suspend;
} catch(...) {
if(!__f->__initial_await_suspend_called) {
throw ;
}
__f->__promise.unhandled_exception();
}
协程继续运行,promise_type::final_suspend 返回 suspend_always 会导致协程挂起,配合返回对象的析构函数可以销毁协程
__final_suspend:
/* co_await insights.cpp:52 */
__f->__suspend_52_11_1 = __f->__promise.final_suspend();
if(!__f->__suspend_52_11_1.await_ready()) {
__f->__suspend_52_11_1.await_suspend(std::coroutine_handle<AsyncTask::promise_type>::from_address(static_cast<void *>(__f)).operator std::coroutine_handle<void>());
__f->__suspend_index = 4;
return;
}
就不会走到这里协程体的自动销毁逻辑啰
__resume_demo_coroutine_4:
__f->destroy_fn(__f);
}
有上一篇文章的铺垫,看起来没什么尿点,下面来一张图总览下:

为了便于理解只画了一个协程任务的执行顺序,跟着箭头方向和标号就能梳理清楚啦。
final_suspend 与协程自清理
上面例子中,每个协程的返回对象需要保存在临时变量 task1/2/3 中,不然在调度器运行时会因协程状态销毁而崩溃:
int main() {
SingleThreadScheduler scheduler;
demo_coroutine(scheduler, 1);
demo_coroutine(scheduler, 2);
demo_coroutine(scheduler, 3);更多推荐
所有评论(0)