协程浅析

协程是可以暂停执行,之后恢复的函数。

有栈协程 vs 无栈协程

有栈协程通过固定大小的栈空间作为协程运行时空间,切换协程就是这些固定大小空间直接的切换,协程内的程序逻辑全部运行在这段空间内。而无栈协程将 frame 保存在堆空间(C++20 是这样做的),依据协程的状态进行统一的调度执行。 有栈协程,每个协程会有固定大小的空间作为栈内存使用。 无栈协程,每个协程会有一个协程帧作为状态和变量的管理对象,然后进行类似状态机的函数调用过程。 无栈协程内存紧凑,但是不适合递归。有栈协程虽然适合递归,但是分配的固定空间,可能会浪费,也可能会不足,这是有栈协程的问题。 总的来说:

  • 无栈的切换速度要远高于有栈。
  • 无栈协程更加适合IO场景。
  • 无栈协程相比普通函数会有额外开销。

更详细的对比,可以参考这篇优质的量化文档:基于async_simple的协程性能定量分析

有栈协程

有栈协程不关心协程函数体是什么,直接开辟一边内存(goroutine 中是 2000 Bytes),将这片空间和整个函数体,告知调度器去调度。而协程内部,会实现某个函数或者调度点,运行到此处时,会识别到当前处于协程空间中,会转移执行权(比如,等待 IO 时)。协程并没有显式的 await 暂停点出现,而是在协程内部实现。

协程栈也可以有其他不同的实现方式:

  • 分段栈(Segmented Stack):编译时,为协程栈增加栈内存检测标记,检测是否够用,是否需要申请更多内存。
  • 拷贝栈(Copy Stack):采用和 std::vector 相同的方式,动态扩展并拷贝栈。
  • 共享栈(Shared Stack):首先申请一块较大内存的共享栈,然后每次运行新协程前,将协程栈内存 copy 共享栈中,随着协程的运行计算真正用到的内存,将其 copy 到一段合适的内存中。libco 采用的就是这种方式。
  • 虚拟内存栈(Virtual Memory Stack):为每个协程申请虚拟内存,不读写就不会占物理内存,按照实际使用的内存量申请物理内存页面。

协程调度方式:

  • 栈式调度,按照调用栈关系,每次执行调用栈栈顶的协程
  • 星型调度,每次执行完一个协程,必须切换回调度线程,然后再进入下一个协程
  • 环形调度,协程之间有调用链,但是执行完最后一个协程后,直接返回调度线程

协程调度的优化方法有:

  • 结合多线程,充分利用多核能力。
  • 每个线程设计协程队列,同时可以划分不同的队列类型,区分能够在多线程间进行转移的协程,和不能转移的协程。
  • 采用负载均衡策略,调度协程,比如,空闲线程(可以定义为超过某个"水位")可以窃取忙碌线程协程队列中的任务。
  • 队列设计优化,比如对于短时快速任务,可以使用 spinlock;对于线程内的就绪队列,可以视情况,设计为多写一读的结构,仅在写端加锁。

保存上下文,则是将寄存器的值保存下来。goroutine 也是采用有栈协程的方式实现的。 boost.context, boost.coroutine, ucontext(unix), fiber(windows) 这些库提供底层 API,没有实现协程调度模块,不适合直接做应用项目。另外 ucontext 被 fiber 库性能相对会较差一点。boost.context 的性能会更好。 腾讯的 libco 虽然做了协程调度,也兼容很多第三方库的调用接口,但是也需要对底层机制较为清楚,才能写出更稳定的代码。比如,不同的系统、不同版本的Linux都需要不同的汇编代码。

一个简单的汇编程序示例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
// 保存8个32位元素的数组,数组首地址保存在 eax 寄存器
movl %esp, 28(%eax)
movl %ebp, 24(%eax)
movl %esi, 20(%eax)
movl %edi, 16(%eax)
movl %edx, 12(%eax)
movl %ecx, 8(%eax)
movl %ebx, 4(%eax)
// eax 是第一个元素

// 假设新上下文中从 esp + 8 的位置开始恢复数组元素到寄存器
movl 8(%esp), %eax
movl 4(%eax), %ebx
movl 8(%eax), %ecx
movl 12(%eax), %edx
movl 16(%eax), %edi
movl 20(%eax), %esi
movl 24(%eax), %ebp
movl 28(%eax), %esp

假设这是一个函数内的一段代码,其中 eax 为传入数组首地址作为第一个参数,esp + 8 为另一个数组首地址作为第二个参数。以上代码,就将是将当前上下文寄存器保存到第一个参数的数组中,然后从第二个数组中加载新的上下文的寄存器的值。

这种嵌套切换上下文的方式,保证当前调用栈栈顶的协程先执行结束,然后执行前一个协程。这种有明显层级切换结构的协程,也称为 非对称协程

ucontext_t

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
// 上下文结构体
typedef struct ucontext_t {
// 当前上下文结束后,下一个激活的上下文对象的指针,只在当前上下文是由makecontext创建时有效
struct ucontext_t *uc_link;
// 当前上下文的信号屏蔽掩码
sigset_t uc_sigmask;
// 当前上下文使用的栈内存空间,只在当前上下文是由makecontext创建时有效
stack_t uc_stack;
// 平台相关的上下文具体内容,包含寄存器的值
mcontext_t uc_mcontext;
...
} ucontext_t;


// 获取当前的上下文
int getcontext(ucontext_t *ucp);

// 将 getcontext 获取到的上下文指针ucp,与一个函数 func 进行绑定,支持指定func运行时的参数,
// 在调用 makecontext 之前,必须手动给 ucp 分配一段内存空间,存储在 ucp->uc_stack 中,作为 func 函数运行时的栈空间,
// 可以指定 ucp->uc_link,func 运行结束后恢复 uc_link 指向的上下文
// makecontext 执行完后,ucp就与函数func绑定了,调用 setcontext 或 swapcontext 激活 ucp 时,func 就会被运行
// 如果不指定 uc_link,那 func函数结束时必须显式调用 setcontext 或 swapcontext 以重新指定一个有效的上下文,否则程序就跑飞了
void makecontext(ucontext_t *ucp, void (*func)(), int argc, ...);


// 恢复 ucp 指向的上下文,这个函数会跳转到 ucp上下文 对应的函数中执行,相当于变相调用了函数,但是不会返回。
int setcontext(const ucontext_t *ucp);

// 恢复 ucp 指向的上下文,同时将当前的上下文存储到 oucp 中
// 和 setcontext 一样,swapcontext 会跳转到 ucp上下文 对应的函数中执行,但是不会返回
int swapcontext(ucontext_t *oucp, const ucontext_t *ucp);

无栈协程

无栈协程的典型实现是 async / await 模型。async 标志会使得编译器,创建一个协程帧,保存函数参数、跨越协程暂停点的局部变量。await 表示程序在这里转移执行权,等待 await 后面的程序执行完成,编译器应该在这之后去生成转移执行权的代码。

无栈协程本质上是一个状态机(state machine),利用系统栈进行协程切换。恢复执行所需的数据,和程序运行时的栈是分离存储的。比如把协程的 frame 数据放在堆中,而不是栈中,也就不会在函数返回时被回收。从而可以实现异步执行的顺序代码,不用显式的回调来处理非阻塞的输入和输出。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
// 无栈协程抽象出的一个类
struct abstract_coroutine {
int val;
int state = 0;

// interface
void run() {
switch (state) {
case 0:
return sub_proc_1();
case 1:
return sub_proc_2();
case 2:
return sub_proc_3();
}
}

void sub_proc_1() {
val = 0;
state = 1;
}

void sub_proc_2() {
val = 1;
state = 2;
}

void sub_proc_3() {
val--;
}
};

每个部分执行完成后,状态转移。下一次调用 run 时,执行下一个函数调用。这种方式,没有有栈协程的显式上下文切换。这种方式和函数执行和返回几乎没有区别。难点在于如何实现高效地协程调度机制。

对称协程 vs 非对称协程

非对称协程,协程之间有严格的父子关系,由调度器负责所有调度。

对称式协程,协程之间地位平等。

Coroutines in C

LLVM Coroutines 实现的无栈协程伪代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
void *f(int n) {
void *hdl = CORO_BEGIN(malloc);
for (int i = n;; ++i) {
CORO_SUSPEND(hdl);
print(i);
CORO_SUSPEND(hdl);
print(-i);
}
CORO_END(hdl, free);
}

int main() {
void *coro = f(1);
for (int i = 0; i < 4; ++i) {
CORO_RESUME(coro);
}
CORO_DESTROY(coro);
}

以上程序的协程帧数据可以定义如下:

1
2
3
4
struct f.frame {
// 只有 i 跨越了协程的调用,而 n 在协程 SUSPEND 处理之前已经没用了
int i;
};

而编译器会继续优化,比如:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
void *f(int n) {
void *hdl = CORO_BEGIN(malloc);
f.frame *frame = (f.frame *)hdl;
for (frame->i = n;; ++frame->i) {
frame->suspend_index = 0;
r0:
CORO_SUSPEND(hdl);
print(frame->i);
frame->suspend_index = 1;
r1:
CORO_SUSPEND(hdl);
print(-frame->i);
}
CORO_END(hdl, free);
}

// 此时协程帧数据
struct f.frame {
int suspend_index;
int i;
};

编译器同时,会生成三个函数调用:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
// coroutine start function
void* f(int *n) {
void* hdl = CORO_BEGIN(malloc);
f.frame* frame = (f.frame*)hdl;
frame->ResumeFn = &f.resume;
frame->DestroyFn = &f.destroy;
frame->i = n;
frame->suspend_index = 0;
return coro_hdl;
}

// coroutine resume function
void f.resume(f.frame *frame) {
switch (frame->suspend_index) {
case 0:
goto r0;
default:
goto r1;
}
for (frame->i = n;; ++frame->i) {
frame->suspend_index = 0;
r0:
CORO_SUSPEND(hdl);
print(frame->i);
frame->suspend_index = 1;
r1:
CORO_SUSPEND(hdl);
print(-frame->i);
}
CORO_END(hdl, free);
}

// coroutine destroy function
void f.destroy(f.frame *frame) {
switch (frame->suspend_index) {

}
free(frame);
}

此时的协程帧中数据为:

1
2
3
4
5
6
struct f.frame {
FnPtr ResumeFn;
FnPtr DestroyFn;
int suspend_index;
int i;
};

LLVM Coroutines 中定义了以下宏,用于处理协程的不同状态:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
#define CORO_SUSPEND()                                                         \
switch (__builtin_coro_suspend()) { \
case -1: \
goto coro_Suspend; \
case 1: \
goto coro_Cleanup; \
default: \
break; \
}

#define CORO_END(hdl, FreeFunc) \
coro_Cleanup : { \
void *coro_mem = __builtin_coro_free(hdl); \
if (coro_mem) \
FreeFunc(coro_mem); \
} \
coro_Suspend: \
__builtin_coro_end(); \
return coro_hdl;

C++20 协程

C++20 协程是无栈协程。定义中包含了以下3种表达式之一的函数,就是一个协程。

  • co_await 表达式——用于暂停执行,直到恢复:
1
2
3
4
5
6
7
8
9
task<> tcp_echo_server()
{
char data[1024];
while (true)
{
std::size_t n = co_await socket.async_read_some(buffer(data));
co_await async_write(socket, buffer(data, n));
}
}
  • co_yield 表达式——用于暂停执行并返回一个值:
1
2
3
4
5
generator<int> iota(int n = 0)
{
while (true)
co_yield n++;
}
  • co_return 语句——用于完成执行并返回一个值:
1
2
3
4
lazy<int> f()
{
co_return 7;
}

每个协程都与下列对象关联:

  • 承诺(promise)对象,在协程内部操纵。协程通过此对象提交其结果或异常。
  • 协程句柄 (coroutine handle),在协程外部操纵。这是用于恢复协程执行或销毁协程帧的不带所有权句柄。
  • 协程状态 (coroutine state),它是一个动态存储分配(除非自定义分配方式)的内部对象,其包含:
    • promise 对象
    • 各个形参(全部按值复制)
    • 当前暂停点的一些表示,使得程序在恢复时知晓要从何处继续,销毁时知晓有哪些局部变量在作用域内
    • 生存期超过当前暂停点的局部变量和临时量

当协程开始执行时,它进行下列操作:

  • 用 operator new 分配协程状态对象
  • 将所有函数形参复制到协程状态中:按值传递的形参被移动或复制,按引用传递的参数保持为引用(因此,如果在被指代对象的生存期结束后恢复协程,它可能变成悬垂引用)
  • 调用承诺对象的构造函数。如果承诺类型拥有接收所有协程形参的构造函数,那么以复制后的协程实参调用该构造函数。否则调用其默认构造函数。
  • 调用 promise.get_return_object() 并将结果保存在局部变量中。该调用的结果将在协程首次暂停时返回给调用方。至此并包含这个步骤为止,任何抛出的异常均传播回调用方,而非置于 promise 中。
  • 调用 promise.initial_suspend() 并 co_await 它的结果。典型的承诺类型 Promise 要么(对于惰性启动的协程)返回std::suspend_always,要么(对于急切启动的协程)返回std::suspend_never
  • 当 co_await promise.initial_suspend() 恢复时,开始协程体的执行。

一个跨线程调度协程示例

程序实现了 producer 协程运行在线程2中, 500ms 后生成一个 val ;consumer 协程运行在线程 1 中,并会被调度到线程2 中执行,最后读取出 val。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
#include <coroutine>
#include <chrono>
#include <future>
#include <iostream>
#include <thread>

using namespace std;


struct Task {
struct promise_type;
using handle_type = std::coroutine_handle<promise_type>;

handle_type h; // 协程的 handle
std::future<void> fut;

//=================================================================================
// promise
struct promise_type {
int result;
coroutine_handle<> h_next = nullptr; // 调度的下一个协程

Task get_return_object() {
return Task{ handle_type::from_promise(*this) };
}

suspend_always initial_suspend() { return {};};

auto final_suspend() noexcept {
// 被 await 后,如果 h_next 不为空,则返回 h_next(调度到指定协程),否则返回 std::noop_coroutine() (调度到协程的调用者)
struct next_awaiter { // as a promise
promise_type* self;

bool await_ready() noexcept { return false; } // will suspend
void await_resume() noexcept {};

coroutine_handle<> await_suspend(coroutine_handle<promise_type> hdl) noexcept {
if (hdl.promise().h_next) {
return hdl.promise().h_next; // 返回 h_next ,即调度的到下一个协程
} else {
return std::noop_coroutine();
}
}
};
return next_awaiter{ this };
}

// called at co_return
void return_value(int i) { result = i; }

void unhandled_exception() {}
};

//=================================================================================
// awaiter definition
bool await_ready() { return false; } // go to await_suspend
// co_await will call await_suspend
void await_suspend(handle_type hdl) {
h.promise().h_next = hdl; // 设置下一个协程(consume)
// 另开线程执行 producer 协程 resume() , 此时异步地开始执行 produce() 函数
fut = std::async([this]() { h.resume(); });
}
int await_resume() { return h.promise().result; }
};


// 协程 producer
Task produce() {
// producer 协程 resume() ,从这里开始执行
int val = 1111;
std::this_thread::sleep_for(std::chrono::milliseconds(500));
cout << std::this_thread::get_id() << " produce: " << val << endl;
co_return val; // promise::return_value(int)
// final_suspend 返回 consumer 的 coroutine_handle,即调度到 consumer 协程
// 注意,此时 producer 协程还存在,同时,当前运行在 async 线程中
}


// 协程 consumer
Task consume(Task& fut) {
cout << std::this_thread::get_id() << " consume: waiting " << endl;
// 这里的 co_await 的是 producer (fut) ,传入 producer::await_suspend 的是 consumer 的 coroutine_handle
// 从 producer::await_suspend 开始执行
int val = co_await fut;
// 从 producer::final_suspend 执行结束后,调度到 consumer 协程,从此处开始执行
// 在此之前,由 producer::await_resume 返回的结果,保存到 val
// !!!注意,此时 consumer 协程运行在 async 线程中
cout << std::this_thread::get_id() << " consume: " << val << endl;
co_return 0; // promise::return_value(int)
// 协程结束执行,即 async 线程结束
}


int main() {
// 初始化协程
auto prod = produce();
auto cons = consume(prod);

// consumer 协程 ,从 co_await 开始执行
cons.h.resume();

// 等待协程 consumer 协程执行完毕,但是当前程序不会阻塞,而是忙等待
while (!cons.h.done()) {
cout << std::this_thread::get_id() << " main: waiting " << endl;
std::this_thread::sleep_for(std::chrono::milliseconds(100));
}

cout << std::this_thread::get_id() << " main: done" << endl;

return 0; // 主线程结束,协程析构
}

co_await

作为一个一元操作符,暂停协程并将控制权返回给调用方。

  1. 将操作对象转化为可等待体 awaitable 对象
  2. 调用 awaiter.await_ready()
    • 返回 false 暂停协程,此时协程已经完全暂停,可以在不同线程间转移协程句柄,而且不需要额外同步处理。
      • 调用 awaiter.await_suspend(handle) ,其中 handle 表示当前协程的协程句柄:
        • await_suspend 返回 void ,那么将控制权返回给协程的调用方,而暂停当前协程。
        • await_suspend 返回 true,也将控制权返回给调用方;返回 false ,则继续执行当前协程。
        • await_suspend 返回 其他协程的句柄,那么 handle.resume() 恢复执行其他协程,最终会返回到当前协程。
        • await_suspend 抛出异常,那么会向上一层调用者抛出异常。
    • 返回 true 不会暂停协程
  3. 调用 awaiter.await_resume() ,它的返回结果就是 co_await expr 的返回结果。

由于如果执行到 awaiter.await_suspend,那么协程已经完全暂停,可以在不同线程间转移协程句柄,而且不需要额外同步处理。比如,将协程句柄写入回调函数,在某个异步 IO 操作完成时运行该协程。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
#include <coroutine>
#include <iostream>
#include <stdexcept>
#include <thread>


auto switch_to_new_thread(std::jthread& out) {
struct awaitable {
std::jthread* p_out;
bool await_ready() {
return false;
}
void await_suspend(std::coroutine_handle<> h) {
std::jthread& out = *p_out;
if (out.joinable()) {
throw std::runtime_error("thread already started, needed to be null, then generated by internal suspend");
}
out = std::jthread([h] {h.resume();});
std::cout << "New thread " << out.get_id() << " started\n";
}

void await_resume() {}
};

return awaitable{&out};
}


struct task {
struct promise_type {
task get_return_object() {
return {};
}
std::suspend_never initial_suspend() {
return {};
}
std::suspend_never final_suspend() noexcept {
return {};
}

void return_void() {}

void unhandled_exception() {}
};
};


task resuming_on_new_thread(std::jthread& out) {
std::cout << "coroutine start, current thread " << std::this_thread::get_id() << std::endl;
co_await switch_to_new_thread(out);
std::cout << "coroutine end, current thread " << std::this_thread::get_id() << std::endl;
}


int main() {
std::jthread out;
resuming_on_new_thread(out);
return 0;
}

std::suspend_always 及 std::suspend_never 就是标准库提供的两个 awaitable 对象。

co_yield

yield 表达式向调用方返回一个值并暂停当前协程,在生成器函数中很常见。

1
2
3
4
5
6
co_yield expr
// 或者
co_yield {initializer list}

// 等价于
co_await promise.yield_value(expr)

yield_value 生成器将实参保存到生成器对象中,并返回 suspend_always ,并将控制权交还给调用方。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
#include <coroutine>
#include <exception>
#include <iostream>
#include <memory>

template <typename T>
struct Generator {
// 定义一个 promise_type 类型,用于生成 handle_type
struct promise_type;
using handle_type = std::coroutine_handle<promise_type>;
struct promise_type {
T value; // 在 promise 中暂存了值
std::exception_ptr exception;

Generator get_return_object() {
return Generator(handle_type::from_promise(*this));
}

std::suspend_always initial_suspend() { return {}; }
std::suspend_always final_suspend() noexcept { return {}; }
void unhandled_exception() { exception = std::current_exception(); }

template <std::convertible_to<T> From>
std::suspend_always yield_value(From&& from) {
value = std::forward<From>(from);
return {};
}

void return_void() {}
};

// 构造函数传入 handle_type,用于生成 Generator 对象
handle_type h;
Generator(handle_type hdl) : h(hdl) {}
~Generator() { h.destroy(); }

explicit operator bool() {
fill(); // 调用 fill() 方法,填充 promise 中的值和异常对象;如果过没有使用 operator(),则不会重复执行协程
return !h.done();
}

T operator()() {
fill();
m_full = false;
return std::move(h.promise().value); // 返回 promise 中暂存的值
}

private:
bool m_full = false;

void fill() {
if (!m_full) {
h(); // 调用 handle_type::operator(),触发协程执行
// handle 中存储的 promise 对象,暂存了当前的目标变量值拷贝和程序异常对象
if (h.promise().exception) {
std::rethrow_exception(h.promise().exception);
}
m_full = true;
}
}

};


Generator<uint64_t>
fibonacci_sequence(unsigned n) {
if (n == 0) {
co_return ;
}
if (n > 94) {
throw std::runtime_error("n too large, number will overflow");
}

co_yield 0;
if (n == 1) {
co_return;
}

co_yield 1;
if (n == 2) {
co_return;
}

uint64_t a = 0;
uint64_t b = 1;

for (unsigned i = 2; i < n; ++i) {
uint64_t s = a + b;
co_yield s;
a = b;
b = s;
}
}


int main() {
try {
auto gen = fibonacci_sequence(1);
for (int j = 0; gen; ++j) {
std::cout << "fibonacci[" << j << "] = " << gen() << '\n';
}
} catch (const std::exception& e) {
std::cerr << "exception: " << e.what() << '\n';
} catch (...) {
std::cerr << "unknown exception\n";
}

return 0;
}

co_return

co_return expr; 相当于 p.return_value(expr); goto final_suspend; 。 co_return; 相当于 p.return_void(expr); goto final_suspend;

coroutine_handle

类模板 coroutine_handle 能用于表示,暂停或执行中的协程。使用时要注意避免 dangling 问题,可能会导致 undefined behavior。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
#include <coroutine>
#include <iostream>
#include <optional>


template <std::movable T>
class Generator {
public:
//=================================================================================
// promise_type
struct promise_type {
std::optional<T> current_value;

Generator<T> get_return_object() {
return Generator{Handle::from_promise(*this)};
}

static std::suspend_always initial_suspend() noexcept { return {}; }
static std::suspend_always final_suspend() noexcept { return {}; }

std::suspend_always yield_value(T value) noexcept {
current_value = std::move(value);
return {};
}

void await_transform() = delete;
[[noreturn]] void unhandled_exception() { throw; }
};

using Handle = std::coroutine_handle<promise_type>;

explicit Generator(const Handle coroutine) : m_coroutine(coroutine) {}

//=================================================================================
// constructor
Generator() = default;
~Generator() {
if (m_coroutine) {
m_coroutine.destroy();
}
}

Generator(const Generator&) = delete;
Generator& operator=(const Generator&) = delete;

Generator(Generator&& other) : m_coroutine(other.m_coroutine) {
other.m_coroutine = {};
}
Generator& operator=(Generator&& other) {
if (this != &other) {
if (m_coroutine) {
m_coroutine.destroy();
}
m_coroutine = other.m_coroutine;
other.m_coroutine = {};
}
return *this;
}

//=================================================================================
// for loop support
class Iter {
public:
void operator++() {
m_coroutine.resume();
}
const T& operator*() {
return *m_coroutine.promise().current_value;
}

// out of range
bool operator==(std::default_sentinel_t) const {
return !m_coroutine || m_coroutine.done();
}

explicit Iter(const Handle coroutine) : m_coroutine(coroutine) {}

private:
Handle m_coroutine;
};

Iter begin() {
if (m_coroutine) {
m_coroutine.resume();
}
return Iter{m_coroutine};
}

std::default_sentinel_t end() { return {}; }

private:
Handle m_coroutine;
};


template <std::integral T>
Generator<T> range(T first, T last) {
while (first < last) {
co_yield first;
first++;
}
}


int main() {
for (const char i : range(65, 91)) {
std::cout << i << ' ';
}
std::cout << '\n';
return 0;
}

数据成员

  • ptr : void* ,指向协程状态的指针; 成员函数
  • 构造函数
  • operator= 拷贝赋值
  • operator coroutine_handle<> 获得类型擦除的 coroutine_handle ,就是说将不同类型的模版,返回统一的类型
  • done : 检查协程是否完成
  • operator bool : 检查当前 handle 是否表示协程
  • operator() 或者 resume : 恢复协程执行
  • destroy : 销毁协程
  • promise : 访问协程的 promise 对象
  • from_promise : 从一个 promise 对象,创建 coroutine_handle
  • address : 返回支撑协程的底层指针
  • from_address : 从指针地址导入协程

C++20 还为 coroutine_handle 提供了散列特化支持,std::hash

应用

在设计网络应用时,可以在需要等待IO输入时,将协程 co_await 交还控制权到协程的调用者或者调度到其他协程。当 epoll 中来数据了,触发了IO事件,再在相应的回调函数中 resume 协程的调用。

IO 多路复用

More to read


本博客所有文章除特别声明外,均采用 CC BY-SA 4.0 协议 ,转载请注明出处!