RUST-Async

async 定义了一个可以并发执行的任务,而 await 则触发这个任务并发执行。大多数语言,包括 Rust,async/await 都是一个语法糖(syntactic sugar),它们使用状态机将 Promise/Future 这样的结构包装起来进行处理。

JavaScript 的 Promise 和线程类似,一旦创建就开始执行,对 Promise await 只是为了获取解析出来的值;而 Rust 的 Future,只有在主动 await 后才开始执行。

Future

异步函数(async fn)的返回值是一个奇怪的 impl Future

的结构

1
2
3
4
5
6
7
8
9
pub trait Future {
type Output;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>;
}

pub enum Poll<T> {
Ready(T),
Pending,
}

对于线程来说,操作系统负责调度;但操作系统不会去调度用户态的协程(比如 Future),任何使用了协程来处理并发的程序,都需要有一个 executor 来负责协程的调度。

常见的 executor 有 futures, tokio, async-std 等。

Reactor 与异步处理

Reactor pattern 它包含三部分:

  • task,待处理的任务。任务可以被打断,并且把控制权交给 executor,等待之后的调度;
  • executor,一个调度器。维护等待运行的任务(ready queue),以及被阻塞的任务(wait queue);
  • reactor,维护事件队列。当事件来临时,通知 executor 唤醒某个任务等待运行。

Future 是异步任务的数据结构,当 fut.await 时,executor 就会调度并执行它。

tokio 的调度器(executor)会运行在多个线程上,运行线程自己的 ready queue 上的任务(Future),如果没有,就去别的线程的调度器上 stealing 一些过来运行(tokio 实现了 work stealing scheduler)。当某个任务无法再继续取得进展,此时 Future 运行的结果是 Poll::Pending,那么调度器会挂起任务,并设置好合适的唤醒条件(Waker),等待被 reactor 唤醒。

reactor 会利用操作系统提供的异步 I/O,比如 epoll / kqueue / IOCP,来监听操作系统提供的 IO 事件,当遇到满足条件的事件时,就会调用 Waker.wake() 唤醒被挂起的 Future。这个 Future 会回到 ready queue 等待执行。

异步使用环境

非计算密集型

Future 的调度是协作式多任务(Cooperative Multitasking),除非 Future 主动放弃 CPU,不然它就会一直被执行,直到运行结束。

如果你的确需要在 tokio(或者其它异步运行时)下运行运算量很大的代码,那么最好使用 yield 来主动让出 CPU,比如 tokio::task::yield_now(),避免某个计算密集型的任务饿死其它任务。

非 std mutex

大部分时候,标准库的 Mutex 可以用在异步代码中,而且,这是推荐的用法。

然而,标准库的 MutexGuard 不能安全地跨越 await,所以,当我们需要获得锁之后执行异步操作,必须使用 tokio 自带的 Mutex。 因为 tokio 实现了 work-stealing 调度,Future 有可能在不同的线程中执行,普通的 MutexGuard 编译直接就会出错,所以需要使用 tokio 的 Mutex。

线程与异步任务的同步

兼有计算密集和 IO 密集的任务,可以使用 channel 来在线程和future两者之间做同步。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
use std::thread;

use anyhow::Result;
use blake3::Hasher;
use futures::{SinkExt, StreamExt};
use rayon::prelude::*;
use tokio::{
net::TcpListener,
sync::{mpsc, oneshot},
};
use tokio_util::codec::{Framed, LinesCodec};

pub const PREFIX_ZERO: &[u8] = &[0, 0, 0];

#[tokio::main]
async fn main() -> Result<()> {
let addr = "0.0.0.0:8080";
let listener = TcpListener::bind(addr).await?;
println!("listen to: {}", addr);

// 创建 tokio task 和 thread 之间的 channel
let (sender, mut receiver) = mpsc::unbounded_channel::<(String, oneshot::Sender<String>)>();

// 使用 thread 处理计算密集型任务
thread::spawn(move || {
// 读取从 tokio task 过来的 msg,这里用的是 blocking_recv,而非 await
while let Some((line, reply)) = receiver.blocking_recv() {
// 计算 pow
let result = match pow(&line) {
Some((hash, nonce)) => format!("hash: {}, once: {}", hash, nonce),
None => "Not found".to_string(),
};
// 把计算结果从 oneshot channel 里发回
if let Err(e) = reply.send(result) {
println!("Failed to send: {}", e);
}
}
});

// 使用 tokio task 处理 IO 密集型任务
loop {
let (stream, addr) = listener.accept().await?;
println!("Accepted: {:?}", addr);
let sender1 = sender.clone();
tokio::spawn(async move {
// 使用 LinesCodec 把 TCP 数据切成一行行字符串处理
let framed = Framed::new(stream, LinesCodec::new());
// split 成 writer 和 reader
let (mut w, mut r) = framed.split();
for line in r.next().await {
// 为每个消息创建一个 oneshot channel
let (reply, reply_receiver) = oneshot::channel();
sender1.send((line?, reply))?;

if let Ok(v) = reply_receiver.await {
w.send(format!("Pow calculated: {}", v)).await?;
}
}
Ok::<_, anyhow::Error>(())
});
}
}

pub fn pow(s: &str) -> Option<(String, u32)> {
let hasher = blake3_base_hash(s.as_bytes());
// 使用 rayon 并发计算 u32 空间下所有 nonce,直到找到有头 N 个 0 的哈希
let nonce = (0..u32::MAX).into_par_iter().find_any(|n| {
let hash = blake3_hash(hasher.clone(), n).as_bytes().to_vec();
&hash[..PREFIX_ZERO.len()] == PREFIX_ZERO
});
nonce.map(|n| {
let hash = blake3_hash(hasher, &n).to_hex().to_string();
(hash, n)
})
}

// 计算携带 nonce 后的哈希
fn blake3_hash(mut hasher: blake3::Hasher, nonce: &u32) -> blake3::Hash {
hasher.update(&nonce.to_be_bytes()[..]);
hasher.finalize()
}

// 计算数据的哈希
fn blake3_base_hash(data: &[u8]) -> Hasher {
let mut hasher = Hasher::new();
hasher.update(data);
hasher
}

Waker

executor 通过调用 poll 方法来让 Future 继续往下执行,如果 poll 方法返回 Poll::Pending,就阻塞 Future,直到 reactor 收到了某个事件,然后调用 Waker.wake() 把 Future 唤醒。

Context 就是 Waker 的一个封装:

1
2
3
4
pub struct Context<'a> {
waker: &'a Waker,
_marker: PhantomData<fn(&'a ()) -> &'a ()>,
}

内部使用了一个 vtable 来允许的 waker 的行为:

1
2
3
4
5
6
pub struct RawWakerVTable {
clone: unsafe fn(*const ()) -> RawWaker,
wake: unsafe fn(*const ()),
wake_by_ref: unsafe fn(*const ()),
drop: unsafe fn(*const ()),
}

在标准库中,只有这些接口的定义,以及“高层”接口的实现,比如 Waker 下的 wake 方法,只是调用了 vtable 里的 wake() 而已:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
impl Waker {
/// Wake up the task associated with this `Waker`.
#[inline]
pub fn wake(self) {
// The actual wakeup call is delegated through a virtual function call
// to the implementation which is defined by the executor.
let wake = self.waker.vtable.wake;
let data = self.waker.data;

// Don't call `drop` -- the waker will be consumed by `wake`.
crate::mem::forget(self);

// SAFETY: This is safe because `Waker::from_raw` is the only way
// to initialize `wake` and `data` requiring the user to acknowledge
// that the contract of `RawWaker` is upheld.
unsafe { (wake)(data) };
}
...
}

具体的实现可参看 futures-rs/waker.rs

Poll

executor 处理 Future 时,会不断地调用它的 poll() 方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
hello().await?;

// 等价于
match hello.poll(cx) {
Poll::Ready(result) => return result,
Poll::Pending => return Poll::Pending
}

hello(inner1.await, inner2.await).await?; // inner1 conditioned by inner2

let fut = inner1;
match fut.poll(cx) {
Poll::Ready(Ok(file)) => {
let fut = inner2;
match fut.poll(cx) {
Poll::Ready(result) => return result,
Poll::Pending => return Poll::Pending,
}
}
Poll::Pending => return Poll::Pending,
}

async 函数返回的是一个 Future,所以,还需要把这样的代码封装在一个 Future 的实现里,对外提供出去。

需要实现一个数据结构,把内部的状态保存起来,并为这个数据结构实现 Future。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
enum WriteHelloFile {
Init(String),
// 等待文件创建,此时需要保存 Future 以便多次调用
AwaitingCreate(impl Future<Output = Result<fs::File, std::io::Error>>),
// 等待文件写入,此时需要保存 Future 以便多次调用
AwaitingWrite(impl Future<Output = Result<(), std::io::Error>>),
Done,
}

impl WriteHelloFile {
pub fn new(name: impl Into<String>) -> Self {
Self::Init(name.into())
}
}

impl Future for WriteHelloFile {
type Output = Result<(), std::io::Error>;

fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let this = self.get_mut();
loop {
match this {
// 如果状态是 Init,那么就生成 create Future,把状态切换到 AwaitingCreate
WriteHelloFile::Init(name) => {
let fut = fs::File::create(name);
*self = WriteHelloFile::AwaitingCreate(fut);
}
// 如果状态是 AwaitingCreate,那么 poll create Future
// 如果返回 Poll::Ready(Ok(_)),那么创建 write Future
// 并把状态切换到 Awaiting
WriteHelloFile::AwaitingCreate(fut) => match fut.poll(cx) {
Poll::Ready(Ok(file)) => {
let fut = file.write_all(b"hello world!");
*self = WriteHelloFile::AwaitingWrite(fut);
}
Poll::Ready(Err(e)) => return Poll::Ready(Err(e)),
Poll::Pending => return Poll::Pending,
},
// 如果状态是 AwaitingWrite,那么 poll write Future
// 如果返回 Poll::Ready(_),那么状态切换到 Done,整个 Future 执行成功
WriteHelloFile::AwaitingWrite(fut) => match fut.poll(cx) {
Poll::Ready(result) => {
*self = WriteHelloFile::Done;
return Poll::Ready(result);
}
Poll::Pending => return Poll::Pending,
},
// 整个 Future 已经执行完毕
WriteHelloFile::Done => return Poll::Ready(Ok(())),
}
}
}
}

fn write_hello_file_async(name: &str) -> WriteHelloFile {
WriteHelloFile::new(name)
}

Rust 在编译 async fn 或者 async block 时,就会生成类似的状态机的实现。

Pin

在上面实现 Future 的状态机中,AwaitingCreate 引用了 file 这样一个局部变量:

file 被 fut 引用,但 file 会在这个作用域被丢弃。需要把它保存在数据结构中:

1
2
3
4
5
6
7
8
9
10
11
enum WriteHelloFile {
Init(String),
AwaitingCreate(impl Future<Output = Result<fs::File, std::io::Error>>),
AwaitingWrite(AwaitingWriteData),
Done,
}

struct AwaitingWriteData {
fut: impl Future<Output = Result<(), std::io::Error>>,
file: fs::File,
}

在同一个数据结构内部,fut 指向了对 file 的引用,形成自引用结构(Self-Referential Structure)。

自引用结构有一个很大的问题是:一旦它被移动,原本的指针就会指向旧的地址。即,fut 会在移动后指向移动前的 file。

Pin 拿住的是一个可以解引用成 T 的指针类型 P,而不是直接拿原本的类型 T。所以,对于 Pin 而言,都是 Pin<Box>、Pin<&mut T>,但不会是 Pin。因为 Pin 的目的是,把 T 的内存位置锁住,从而避免移动后自引用类型带来的引用失效问题。

此处 Pin 的 AwaitingWriteData 引用,指向同一个内存位置。

自引用数据结构并非只在异步代码里出现,只不过异步代码在内部生成用状态机表述的 Future 时,很容易产生自引用结构。比如:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
#[derive(Debug)]
struct SelfReference {
name: String,
// 在初始化后指向 name
name_ptr: *const String,
}

impl SelfReference {
pub fn new(name: impl Into<String>) -> Self {
SelfReference {
name: name.into(),
name_ptr: std::ptr::null(),
}
}

pub fn init(&mut self) {
self.name_ptr = &self.name as *const String;
}

pub fn print_name(&self) {
println!(
"struct {:p}: (name: {:p} name_ptr: {:p}), name: {}, name_ref: {}",
self,
&self.name,
self.name_ptr,
self.name,
// 在使用 ptr 是需要 unsafe
// SAFETY: 这里 name_ptr 潜在不安全,会指向旧的位置
unsafe { &*self.name_ptr },
);
}
}

// memcpy
fn move_it(data: SelfReference) -> SelfReference {
data
}

fn move_creates_issue() -> SelfReference {
let mut data = SelfReference::new("Tyr");
data.init();

// 不 move,一切正常
data.print_name();

let data = move_it(data);

// move 之后,name_ref 指向的位置是已经失效的地址
// 只不过现在 move 前的地址还没被回收
data.print_name();
data
}

fn mem_swap_creates_issue() {
let mut data1 = SelfReference::new("Tyr");
data1.init();

let mut data2 = SelfReference::new("Lindsey");
data2.init();

// name: Tyr, name_ref: Tyr
data1.print_name();
// name: Lindsey, name_ref: Lindsey
data2.print_name();

std::mem::swap(&mut data1, &mut data2);
// name: Lindsey, name_ref: Tyr
data1.print_name();
// name: Tyr, name_ref: Lindsey
data2.print_name();
}

fn main() {
let data = move_creates_issue();
println!("data: {:?}", data);
// 若不注释下面这行,程序运行会直接 segment error, move 前的地址已经被回收
// data.print_name();
print!("\\n");
mem_swap_creates_issue();
}

创建了一个自引用结构 SelfReference,它里面的 name_ref 指向了 name。正常使用它时,没有任何问题,但一旦对这个结构做 move 操作,name_ref 指向的位置还会是 move 前 name 的地址,这就引发了问题。

使用 std::mem:swap,也会出现类似的问题,一旦 swap,两个数据的内容交换,然而,name_ref 指向的地址还是旧的。

Pin 对解决这类问题很关键,如果试图移动被 Pin 住的数据结构,要么,编译器会通过编译错误阻止你;要么,使用 unsafe Rust,自己负责其安全性。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
use std::{marker::PhantomPinned, pin::Pin};

#[derive(Debug)]
struct SelfReference {
name: String,
// 在初始化后指向 name
name_ptr: *const String,
// PhantomPinned 占位符
_marker: PhantomPinned,
}

impl SelfReference {
pub fn new(name: impl Into<String>) -> Self {
SelfReference {
name: name.into(),
name_ptr: std::ptr::null(),
_marker: PhantomPinned,
}
}

pub fn init(self: Pin<&mut Self>) {
let name_ptr = &self.name as *const String;
// SAFETY: 这里并不会把任何数据从 &mut SelfReference 中移走
let this = unsafe { self.get_unchecked_mut() };
this.name_ptr = name_ptr;
}

pub fn print_name(self: Pin<&Self>) {
println!(
"struct {:p}: (name: {:p} name_ptr: {:p}), name: {}, name_ref: {}",
self,
&self.name,
self.name_ptr,
self.name,
// 在使用 ptr 是需要 unsafe
// SAFETY: 因为数据不会移动,所以这里 name_ptr 是安全的
unsafe { &*self.name_ptr },
);
}
}

fn move_pinned(data: Pin<&mut SelfReference>) {
println!("{:?} ({:p})", data, &data);
}

#[allow(dead_code)]
fn move_it(data: SelfReference) {
println!("{:?} ({:p})", data, &data);
}

fn move_creates_issue() {
let mut data = SelfReference::new("Tyr");
let mut data = unsafe { Pin::new_unchecked(&mut data) };
SelfReference::init(data.as_mut());

// 不 move,一切正常
data.as_ref().print_name();

// 现在只能拿到 pinned 后的数据
move_pinned(data.as_mut());
println!("{:?} ({:p})", data, &data);

// 无法拿回 Pin 之前的 SelfReference 结构,
// expected struct `SelfReference`
// found struct `Pin<&mut SelfReference>`
// move_it(data);
}

fn main() {
move_creates_issue();
}

Unpin

1
pub auto trait Unpin {}

Pin 是为了让某个数据结构无法合法地移动,而 Unpin 则相当于声明数据结构是可以移动的,它的作用类似于 Send / Sync,通过类型约束来告诉编译器哪些行为是合法的、哪些不是。

在 Rust 中,绝大多数数据结构都是可以移动的,所以它们都自动实现了 Unpin

即便这些结构被 Pin 包裹,它们依旧可以进行移动:

1
2
3
4
5
6
7
// 需要一个可变引用来调用 `mem::replace`
// 可以通过(隐式)调用 `Pin::deref_mut` 来获得这样的引用,但这只有在 `String` 实现了 `Unpin` 后才有可能

let mut string = "this".to_string();
let mut pinned_string = Pin::new(&mut string);

mem::replace(&mut *pinned_string, "other".to_string());

当不希望一个数据结构被移动,可以使用 !Unpin。在 Rust 里,实现了 !Unpin 的,除了内部结构(比如 Future),主要就是 PhantomPinned:

1
2
pub struct PhantomPinned;
impl !Unpin for PhantomPinned {}

所以,如果你希望你的数据结构不能被移动,可以为其添加 PhantomPinned 字段来隐式声明 !Unpin。

当数据结构满足 Unpin 时,创建 Pin 以及使用 Pin(主要是 DerefMut)都可以不使用 unsafe:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
// 如果实现了 Unpin,可以通过安全接口创建和进行 DerefMut
impl<P: Deref<Target: Unpin>> Pin<P> {
pub const fn new(pointer: P) -> Pin<P> {
// SAFETY: the value pointed to is `Unpin`, and so has no requirements
// around pinning.
unsafe { Pin::new_unchecked(pointer) }
}
pub const fn into_inner(pin: Pin<P>) -> P {
pin.pointer
}
}

impl<P: DerefMut<Target: Unpin>> DerefMut for Pin<P> {
fn deref_mut(&mut self) -> &mut P::Target {
Pin::get_mut(Pin::as_mut(self))
}
}

// 如果没有实现 Unpin,只能通过 unsafe 接口创建,不能使用 DerefMut
impl<P: Deref> Pin<P> {
pub const unsafe fn new_unchecked(pointer: P) -> Pin<P> {
Pin { pointer }
}

pub const unsafe fn into_inner_unchecked(pin: Pin<P>) -> P {
pin.pointer
}
}

Box 默认实现了 Unpin。 Box 在堆上分配内存,而指针本身是可以移动的。

Future 的类型

1
2
3
4
5
6
7
8
9
fn main() {
let fut = async { 42 };

println!("type of fut is: {}", get_type_name(&fut));
}

fn get_type_name<T>(_: &T) -> &'static str {
std::any::type_name::<T>()
}

它的输出如下:

1
type of fut is: core::future::from_generator::GenFuture<xxx::main::{{closure}}>

GenFuture 的定义:

1
2
3
4
5
6
7
8
9
10
struct GenFuture<T: Generator<ResumeTy, Yield = ()>>(T);

pub trait Generator<R = ()> {
type Yield;
type Return;
fn resume(
self: Pin<&mut Self>,
arg: R
) -> GeneratorState<Self::Yield, Self::Return>;
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
#![feature(generators, generator_trait)]

use std::ops::{Generator, GeneratorState};
use std::pin::Pin;

fn main() {
let mut generator = || {
yield 1;
return "foo"
};

match Pin::new(&mut generator).resume(()) {
GeneratorState::Yielded(1) => {}
_ => panic!("unexpected return from resume"),
}
match Pin::new(&mut generator).resume(()) {
GeneratorState::Complete("foo") => {}
_ => panic!("unexpected return from resume"),
}
}

类似 python 中的 yield。


本博客所有文章除特别声明外,均采用 CC BY-SA 4.0 协议 ,转载请注明出处!