协程是可以暂停执行,之后恢复的函数。
有栈协程 vs 无栈协程
有栈协程通过固定大小的栈空间作为协程运行时空间,切换协程就是这些固定大小空间直接的切换,协程内的程序逻辑全部运行在这段空间内。而无栈协程将
frame 保存在堆空间(C++20
是这样做的),依据协程的状态进行统一的调度执行。
有栈协程,每个协程会有固定大小的空间作为栈内存使用。
无栈协程,每个协程会有一个协程帧作为状态和变量的管理对象,然后进行类似状态机的函数调用过程。
无栈协程内存紧凑,但是不适合递归。有栈协程虽然适合递归,但是分配的固定空间,可能会浪费,也可能会不足,这是有栈协程的问题。
总的来说:
无栈的切换速度要远高于有栈。
无栈协程更加适合IO场景。
无栈协程相比普通函数会有额外开销。
更详细的对比,可以参考这篇优质的量化文档:基于async_simple的协程性能定量分析
有栈协程
有栈协程不关心协程函数体是什么,直接开辟一边内存(goroutine 中是 2000
Bytes),将这片空间和整个函数体,告知调度器去调度。而协程内部,会实现某个函数或者调度点,运行到此处时,会识别到当前处于协程空间中,会转移执行权(比如,等待
IO 时)。协程并没有显式的 await 暂停点出现,而是在协程内部实现。
协程栈也可以有其他不同的实现方式:
分段栈(Segmented
Stack):编译时,为协程栈增加栈内存检测标记,检测是否够用,是否需要申请更多内存。
拷贝栈(Copy Stack):采用和 std::vector
相同的方式,动态扩展并拷贝栈。
共享栈(Shared
Stack):首先申请一块较大内存的共享栈,然后每次运行新协程前,将协程栈内存
copy 共享栈中,随着协程的运行计算真正用到的内存,将其 copy
到一段合适的内存中。libco 采用的就是这种方式。
虚拟内存栈(Virtual Memory
Stack):为每个协程申请虚拟内存,不读写就不会占物理内存,按照实际使用的内存量申请物理内存页面。
协程调度方式:
栈式调度,按照调用栈关系,每次执行调用栈栈顶的协程
星型调度,每次执行完一个协程,必须切换回调度线程,然后再进入下一个协程
环形调度,协程之间有调用链,但是执行完最后一个协程后,直接返回调度线程
协程调度的优化方法有:
结合多线程,充分利用多核能力。
每个线程设计协程队列,同时可以划分不同的队列类型,区分能够在多线程间进行转移的协程,和不能转移的协程。
采用负载均衡策略,调度协程,比如,空闲线程(可以定义为超过某个"水位")可以窃取忙碌线程协程队列中的任务。
队列设计优化,比如对于短时快速任务,可以使用
spinlock;对于线程内的就绪队列,可以视情况,设计为多写一读的结构,仅在写端加锁。
保存上下文,则是将寄存器的值保存下来。goroutine
也是采用有栈协程的方式实现的。 boost.context, boost.coroutine,
ucontext(unix), fiber(windows) 这些库提供底层
API,没有实现协程调度模块,不适合直接做应用项目。另外 ucontext 被 fiber
库性能相对会较差一点。boost.context 的性能会更好。 腾讯的 libco
虽然做了协程调度,也兼容很多第三方库的调用接口,但是也需要对底层机制较为清楚,才能写出更稳定的代码。比如,不同的系统、不同版本的Linux都需要不同的汇编代码。
一个简单的汇编程序示例:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 // 保存8个32位元素的数组,数组首地址保存在 eax 寄存器 movl %esp, 28(%eax) movl %ebp, 24(%eax) movl %esi, 20(%eax) movl %edi, 16(%eax) movl %edx, 12(%eax) movl %ecx, 8(%eax) movl %ebx, 4(%eax) // eax 是第一个元素 // 假设新上下文中从 esp + 8 的位置开始恢复数组元素到寄存器 movl 8(%esp), %eax movl 4(%eax), %ebx movl 8(%eax), %ecx movl 12(%eax), %edx movl 16(%eax), %edi movl 20(%eax), %esi movl 24(%eax), %ebp movl 28(%eax), %esp
假设这是一个函数内的一段代码,其中 eax
为传入数组首地址作为第一个参数,esp + 8
为另一个数组首地址作为第二个参数。以上代码,就将是将当前上下文寄存器保存到第一个参数的数组中,然后从第二个数组中加载新的上下文的寄存器的值。
这种嵌套切换上下文的方式,保证当前调用栈栈顶的协程先执行结束,然后执行前一个协程。这种有明显层级切换结构的协程,也称为
非对称协程 。
ucontext_t
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 typedef struct ucontext_t { struct ucontext_t *uc_link ; sigset_t uc_sigmask; stack_t uc_stack; mcontext_t uc_mcontext; ... } ucontext_t ;int getcontext (ucontext_t *ucp) ;void makecontext (ucontext_t *ucp, void (*func)(), int argc, ...) ;int setcontext (const ucontext_t *ucp) ;int swapcontext (ucontext_t *oucp, const ucontext_t *ucp) ;
无栈协程
无栈协程的典型实现是 async / await
模型。async
标志会使得编译器,创建一个协程帧,保存函数参数、跨越协程暂停点的局部变量。await
表示程序在这里转移执行权,等待 await
后面的程序执行完成,编译器应该在这之后去生成转移执行权的代码。
无栈协程本质上是一个状态机(state
machine),利用系统栈进行协程切换。恢复执行所需的数据,和程序运行时的栈是分离存储的。比如把协程的
frame
数据放在堆中,而不是栈中,也就不会在函数返回时被回收。从而可以实现异步执行的顺序代码,不用显式的回调来处理非阻塞的输入和输出。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 struct abstract_coroutine { int val; int state = 0 ; void run () { switch (state) { case 0 : return sub_proc_1(); case 1 : return sub_proc_2(); case 2 : return sub_proc_3(); } } void sub_proc_1 () { val = 0 ; state = 1 ; } void sub_proc_2 () { val = 1 ; state = 2 ; } void sub_proc_3 () { val--; } };
每个部分执行完成后,状态转移。下一次调用 run
时,执行下一个函数调用。这种方式,没有有栈协程的显式上下文切换。这种方式和函数执行和返回几乎没有区别。难点在于如何实现高效地协程调度机制。
对称协程 vs 非对称协程
非对称协程,协程之间有严格的父子关系,由调度器负责所有调度。
对称式协程,协程之间地位平等。
Coroutines in C
LLVM Coroutines 实现的无栈协程伪代码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 void *f (int n) { void *hdl = CORO_BEGIN(malloc ); for (int i = n;; ++i) { CORO_SUSPEND(hdl); print (i); CORO_SUSPEND(hdl); print (-i); } CORO_END(hdl, free ); }int main () { void *coro = f(1 ); for (int i = 0 ; i < 4 ; ++i) { CORO_RESUME(coro); } CORO_DESTROY(coro); }
以上程序的协程帧数据可以定义如下:
struct f .frame { int i; };
而编译器会继续优化,比如:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 void *f (int n) { void *hdl = CORO_BEGIN(malloc ); f.frame *frame = (f.frame *)hdl; for (frame->i = n;; ++frame->i) { frame->suspend_index = 0 ; r0: CORO_SUSPEND(hdl); print (frame->i); frame->suspend_index = 1 ; r1: CORO_SUSPEND(hdl); print (-frame->i); } CORO_END(hdl, free ); }struct f .frame { int suspend_index; int i; };
编译器同时,会生成三个函数调用:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 void * f (int *n) { void * hdl = CORO_BEGIN(malloc ); f.frame* frame = (f.frame*)hdl; frame->ResumeFn = &f.resume; frame->DestroyFn = &f.destroy; frame->i = n; frame->suspend_index = 0 ; return coro_hdl; }void f.resume(f.frame *frame) { switch (frame->suspend_index) { case 0 : goto r0; default : goto r1; } for (frame->i = n;; ++frame->i) { frame->suspend_index = 0 ; r0: CORO_SUSPEND(hdl); print (frame->i); frame->suspend_index = 1 ; r1: CORO_SUSPEND(hdl); print (-frame->i); } CORO_END(hdl, free ); }void f.destroy(f.frame *frame) { switch (frame->suspend_index) { … } free (frame); }
此时的协程帧中数据为:
struct f .frame { FnPtr ResumeFn; FnPtr DestroyFn; int suspend_index; int i; };
LLVM Coroutines 中定义了以下宏,用于处理协程的不同状态:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 #define CORO_SUSPEND() \ switch (__builtin_coro_suspend()) { \ case -1 : \ goto coro_Suspend; \ case 1 : \ goto coro_Cleanup; \ default : \ break ; \ }#define CORO_END(hdl, FreeFunc) \ coro_Cleanup : { \ void *coro_mem = __builtin_coro_free(hdl); \ if (coro_mem) \ FreeFunc(coro_mem); \ } \ coro_Suspend: \ __builtin_coro_end(); \ return coro_hdl;
C++20 协程
C++20
协程是无栈协程。定义中包含了以下3种表达式之一的函数,就是一个协程。
co_await 表达式——用于暂停执行,直到恢复:
task<> tcp_echo_server () { char data[1024 ]; while (true ) { std ::size_t n = co_await socket.async_read_some(buffer(data)); co_await async_write (socket, buffer(data, n)) ; } }
co_yield 表达式——用于暂停执行并返回一个值:
generator<int > iota (int n = 0 ) { while (true ) co_yield n++; }
co_return 语句——用于完成执行并返回一个值:
lazy<int > f () { co_return 7 ; }
每个协程都与下列对象关联:
承诺(promise)对象 ,在协程内部操纵。协程通过此对象提交其结果或异常。
协程句柄 (coroutine
handle) ,在协程外部操纵。这是用于恢复协程执行或销毁协程帧的不带所有权句柄。
协程状态 (coroutine
state) ,它是一个动态存储分配(除非自定义分配方式)的内部对象,其包含:
promise 对象
各个形参(全部按值复制)
当前暂停点的一些表示,使得程序在恢复时知晓要从何处继续,销毁时知晓有哪些局部变量在作用域内
生存期超过当前暂停点的局部变量和临时量
当协程开始执行时,它进行下列操作:
用 operator new 分配 协程状态对象
将所有函数形参复制到协程状态中:按值传递的形参被移动或复制,按引用传递的参数保持为引用(因此,如果在被指代对象的生存期结束后恢复协程,它可能变成悬垂引用)
调用承诺对象的构造函数。如果承诺类型拥有接收所有协程形参的构造函数,那么以复制后的协程实参调用该构造函数。否则调用其默认构造函数。
调用 promise.get_return_object() 并将结果保存在局部变量中。该调用的结果将在协程首次暂停时返回给调用方。至此并包含这个步骤为止,任何抛出的异常均传播回调用方,而非置于
promise 中。
调用 promise.initial_suspend() 并 co_await 它的结果。典型的承诺类型 Promise
要么(对于惰性启动的协程)返回std::suspend_always ,要么(对于急切启动的协程)返回std::suspend_never 。
当 co_await
promise.initial_suspend() 恢复时,开始协程体的执行。
一个跨线程调度协程示例
程序实现了 producer 协程运行在线程2中, 500ms 后生成一个 val
;consumer 协程运行在线程 1 中,并会被调度到线程2 中执行,最后读取出
val。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 #include <coroutine> #include <chrono> #include <future> #include <iostream> #include <thread> using namespace std ;struct Task { struct promise_type ; using handle_type = std ::coroutine_handle<promise_type>; handle_type h; std ::future <void > fut; struct promise_type { int result; coroutine_handle<> h_next = nullptr ; Task get_return_object () { return Task{ handle_type::from_promise(*this ) }; } suspend_always initial_suspend () { return {};}; auto final_suspend () noexcept { struct next_awaiter { promise_type* self; bool await_ready () noexcept { return false ; } void await_resume () noexcept {}; coroutine_handle<> await_suspend (coroutine_handle<promise_type> hdl) noexcept { if (hdl.promise().h_next) { return hdl.promise().h_next; } else { return std ::noop_coroutine(); } } }; return next_awaiter{ this }; } void return_value (int i) { result = i; } void unhandled_exception () {} }; bool await_ready () { return false ; } void await_suspend (handle_type hdl) { h.promise().h_next = hdl; fut = std ::async([this ]() { h.resume(); }); } int await_resume () { return h.promise().result; } };Task produce () { int val = 1111 ; std ::this_thread::sleep_for(std ::chrono::milliseconds(500 )); cout << std ::this_thread::get_id() << " produce: " << val << endl ; co_return val; }Task consume (Task& fut) { cout << std ::this_thread::get_id() << " consume: waiting " << endl ; int val = co_await fut; cout << std ::this_thread::get_id() << " consume: " << val << endl ; co_return 0 ; }int main () { auto prod = produce(); auto cons = consume(prod); cons.h.resume(); while (!cons.h.done()) { cout << std ::this_thread::get_id() << " main: waiting " << endl ; std ::this_thread::sleep_for(std ::chrono::milliseconds(100 )); } cout << std ::this_thread::get_id() << " main: done" << endl ; return 0 ; }
co_await
作为一个一元操作符,暂停协程并将控制权返回给调用方。
将操作对象转化为可等待体 awaitable 对象
调用 awaiter.await_ready()
返回 false
暂停协程,此时协程已经完全暂停,可以在不同线程间转移协程句柄,而且不需要额外同步处理。
调用 awaiter.await_suspend(handle) ,其中 handle
表示当前协程的协程句柄:
await_suspend 返回 void
,那么将控制权返回给协程的调用方,而暂停当前协程。
await_suspend 返回 true,也将控制权返回给调用方;返回 false
,则继续执行当前协程。
await_suspend 返回 其他协程的句柄,那么 handle.resume()
恢复执行其他协程,最终会返回到当前协程。
await_suspend 抛出异常,那么会向上一层调用者抛出异常。
返回 true 不会暂停协程
调用 awaiter.await_resume() ,它的返回结果就是 co_await expr
的返回结果。
由于如果执行到
awaiter.await_suspend,那么协程已经完全暂停,可以在不同线程间转移协程句柄,而且不需要额外同步处理。比如,将协程句柄写入回调函数,在某个异步
IO 操作完成时运行该协程。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 #include <coroutine> #include <iostream> #include <stdexcept> #include <thread> auto switch_to_new_thread (std ::jthread& out) { struct awaitable { std ::jthread* p_out; bool await_ready () { return false ; } void await_suspend (std ::coroutine_handle<> h) { std ::jthread& out = *p_out; if (out.joinable()) { throw std ::runtime_error("thread already started, needed to be null, then generated by internal suspend" ); } out = std ::jthread([h] {h.resume();}); std ::cout << "New thread " << out.get_id() << " started\n" ; } void await_resume () {} }; return awaitable{&out}; }struct task { struct promise_type { task get_return_object () { return {}; } std ::suspend_never initial_suspend () { return {}; } std ::suspend_never final_suspend () noexcept { return {}; } void return_void () {} void unhandled_exception () {} }; };task resuming_on_new_thread (std ::jthread& out) { std ::cout << "coroutine start, current thread " << std ::this_thread::get_id() << std ::endl ; co_await switch_to_new_thread (out) ; std ::cout << "coroutine end, current thread " << std ::this_thread::get_id() << std ::endl ; }int main () { std ::jthread out; resuming_on_new_thread(out); return 0 ; }
std::suspend_always 及 std::suspend_never 就是标准库提供的两个
awaitable 对象。
co_yield
yield
表达式向调用方返回一个值并暂停当前协程,在生成器函数中很常见。
co_yield exprco_yield {initializer list }co_await promise.yield_value(expr)
yield_value 生成器将实参保存到生成器对象中,并返回 suspend_always
,并将控制权交还给调用方。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 #include <coroutine> #include <exception> #include <iostream> #include <memory> template <typename T>struct Generator { struct promise_type ; using handle_type = std ::coroutine_handle<promise_type>; struct promise_type { T value; std ::exception_ptr exception; Generator get_return_object () { return Generator(handle_type::from_promise(*this )); } std ::suspend_always initial_suspend () { return {}; } std ::suspend_always final_suspend () noexcept { return {}; } void unhandled_exception () { exception = std ::current_exception(); } template <std ::convertible_to<T> From> std ::suspend_always yield_value (From&& from) { value = std ::forward<From>(from); return {}; } void return_void () {} }; handle_type h; Generator(handle_type hdl) : h(hdl) {} ~Generator() { h.destroy(); } explicit operator bool () { fill(); return !h.done(); } T operator () () { fill(); m_full = false ; return std ::move(h.promise().value); }private : bool m_full = false ; void fill () { if (!m_full) { h(); if (h.promise().exception) { std ::rethrow_exception(h.promise().exception); } m_full = true ; } } }; Generator<uint64_t > fibonacci_sequence(unsigned n) { if (n == 0 ) { co_return ; } if (n > 94 ) { throw std ::runtime_error("n too large, number will overflow" ); } co_yield 0 ; if (n == 1 ) { co_return ; } co_yield 1 ; if (n == 2 ) { co_return ; } uint64_t a = 0 ; uint64_t b = 1 ; for (unsigned i = 2 ; i < n; ++i) { uint64_t s = a + b; co_yield s; a = b; b = s; } }int main () { try { auto gen = fibonacci_sequence(1 ); for (int j = 0 ; gen; ++j) { std ::cout << "fibonacci[" << j << "] = " << gen() << '\n' ; } } catch (const std ::exception& e) { std ::cerr << "exception: " << e.what() << '\n' ; } catch (...) { std ::cerr << "unknown exception\n" ; } return 0 ; }
co_return
co_return expr; 相当于
p.return_value(expr); goto final_suspend;
。 co_return;
相当于 p.return_void(expr); goto final_suspend;
。
coroutine_handle
类模板 coroutine_handle
能用于表示,暂停或执行中的协程。使用时要注意避免 dangling
问题,可能会导致 undefined behavior。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 #include <coroutine> #include <iostream> #include <optional> template <std ::movable T>class Generator {public : struct promise_type { std ::optional<T> current_value; Generator<T> get_return_object () { return Generator{Handle::from_promise(*this )}; } static std ::suspend_always initial_suspend () noexcept { return {}; } static std ::suspend_always final_suspend () noexcept { return {}; } std ::suspend_always yield_value (T value) noexcept { current_value = std ::move(value); return {}; } void await_transform () = delete ; [[noreturn]] void unhandled_exception () { throw ; } }; using Handle = std ::coroutine_handle<promise_type>; explicit Generator (const Handle coroutine) : m_coroutine (coroutine) {} Generator() = default ; ~Generator() { if (m_coroutine) { m_coroutine.destroy(); } } Generator(const Generator&) = delete ; Generator& operator =(const Generator&) = delete ; Generator(Generator&& other) : m_coroutine(other.m_coroutine) { other.m_coroutine = {}; } Generator& operator =(Generator&& other) { if (this != &other) { if (m_coroutine) { m_coroutine.destroy(); } m_coroutine = other.m_coroutine; other.m_coroutine = {}; } return *this ; } class Iter { public : void operator ++() { m_coroutine.resume(); } const T& operator *() { return *m_coroutine.promise().current_value; } bool operator ==(std ::default_sentinel_t ) const { return !m_coroutine || m_coroutine.done(); } explicit Iter (const Handle coroutine) : m_coroutine (coroutine) {} private : Handle m_coroutine; }; Iter begin () { if (m_coroutine) { m_coroutine.resume(); } return Iter{m_coroutine}; } std ::default_sentinel_t end () { return {}; }private : Handle m_coroutine; };template <std ::integral T>Generator<T> range (T first, T last) { while (first < last) { co_yield first; first++; } }int main () { for (const char i : range(65 , 91 )) { std ::cout << i << ' ' ; } std ::cout << '\n' ; return 0 ; }
数据成员
ptr : void* ,指向协程状态的指针; 成员函数
构造函数
operator= 拷贝赋值
operator coroutine_handle<> 获得类型擦除的 coroutine_handle
,就是说将不同类型的模版,返回统一的类型
done : 检查协程是否完成
operator bool : 检查当前 handle 是否表示协程
operator() 或者 resume : 恢复协程执行
destroy : 销毁协程
promise : 访问协程的 promise 对象
from_promise : 从一个 promise 对象,创建 coroutine_handle
address : 返回支撑协程的底层指针
from_address : 从指针地址导入协程
C++20 还为 coroutine_handle
提供了散列特化支持,std::hash。
应用
在设计网络应用时,可以在需要等待IO输入时,将协程 co_await
交还控制权到协程的调用者或者调度到其他协程。当 epoll
中来数据了,触发了IO事件,再在相应的回调函数中 resume 协程的调用。
IO 多路复用
More to read