RUST-Async

async 定义了一个可以并发执行的任务,而 await 则触发这个任务并发执行。大多数语言,包括 Rust,async/await 都是一个语法糖(syntactic sugar),它们使用状态机将 Promise/Future 这样的结构包装起来进行处理。

JavaScript 的 Promise 和线程类似,一旦创建就开始执行,对 Promise await 只是为了获取解析出来的值;而 Rust 的 Future,只有在主动 await 后才开始执行。

Future

异步函数(async fn)的返回值是一个奇怪的 impl Future

的结构

1
2
3
4
5
6
7
8
9
pub trait Future {
type Output;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>;
}

pub enum Poll<T> {
Ready(T),
Pending,
}

对于线程来说,操作系统负责调度;但操作系统不会去调度用户态的协程(比如 Future),任何使用了协程来处理并发的程序,都需要有一个 executor 来负责协程的调度。

常见的 executor 有 futures, tokio, async-std 等。

Reactor 与异步处理

Reactor pattern 它包含三部分:

  • task,待处理的任务。任务可以被打断,并且把控制权交给 executor,等待之后的调度;
  • executor,一个调度器。维护等待运行的任务(ready queue),以及被阻塞的任务(wait queue);
  • reactor,维护事件队列。当事件来临时,通知 executor 唤醒某个任务等待运行。

Future 是异步任务的数据结构,当 fut.await 时,executor 就会调度并执行它。

tokio 的调度器(executor)会运行在多个线程上,运行线程自己的 ready queue 上的任务(Future),如果没有,就去别的线程的调度器上 stealing 一些过来运行(tokio 实现了 work stealing scheduler)。当某个任务无法再继续取得进展,此时 Future 运行的结果是 Poll::Pending,那么调度器会挂起任务,并设置好合适的唤醒条件(Waker),等待被 reactor 唤醒。

reactor 会利用操作系统提供的异步 I/O,比如 epoll / kqueue / IOCP,来监听操作系统提供的 IO 事件,当遇到满足条件的事件时,就会调用 Waker.wake() 唤醒被挂起的 Future。这个 Future 会回到 ready queue 等待执行。

异步使用环境

非计算密集型

Future 的调度是协作式多任务(Cooperative Multitasking),除非 Future 主动放弃 CPU,不然它就会一直被执行,直到运行结束。

如果你的确需要在 tokio(或者其它异步运行时)下运行运算量很大的代码,那么最好使用 yield 来主动让出 CPU,比如 tokio::task::yield_now(),避免某个计算密集型的任务饿死其它任务。

非 std mutex

大部分时候,标准库的 Mutex 可以用在异步代码中,而且,这是推荐的用法。

然而,标准库的 MutexGuard 不能安全地跨越 await,所以,当我们需要获得锁之后执行异步操作,必须使用 tokio 自带的 Mutex。 因为 tokio 实现了 work-stealing 调度,Future 有可能在不同的线程中执行,普通的 MutexGuard 编译直接就会出错,所以需要使用 tokio 的 Mutex。

线程与异步任务的同步

兼有计算密集和 IO 密集的任务,可以使用 channel 来在线程和future两者之间做同步。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
use std::thread;

use anyhow::Result;
use blake3::Hasher;
use futures::{SinkExt, StreamExt};
use rayon::prelude::*;
use tokio::{
net::TcpListener,
sync::{mpsc, oneshot},
};
use tokio_util::codec::{Framed, LinesCodec};

pub const PREFIX_ZERO: &[u8] = &[0, 0, 0];

#[tokio::main]
async fn main() -> Result<()> {
let addr = "0.0.0.0:8080";
let listener = TcpListener::bind(addr).await?;
println!("listen to: {}", addr);

// 创建 tokio task 和 thread 之间的 channel
let (sender, mut receiver) = mpsc::unbounded_channel::<(String, oneshot::Sender<String>)>();

// 使用 thread 处理计算密集型任务
thread::spawn(move || {
// 读取从 tokio task 过来的 msg,这里用的是 blocking_recv,而非 await
while let Some((line, reply)) = receiver.blocking_recv() {
// 计算 pow
let result = match pow(&line) {
Some((hash, nonce)) => format!("hash: {}, once: {}", hash, nonce),
None => "Not found".to_string(),
};
// 把计算结果从 oneshot channel 里发回
if let Err(e) = reply.send(result) {
println!("Failed to send: {}", e);
}
}
});

// 使用 tokio task 处理 IO 密集型任务
loop {
let (stream, addr) = listener.accept().await?;
println!("Accepted: {:?}", addr);
let sender1 = sender.clone();
tokio::spawn(async move {
// 使用 LinesCodec 把 TCP 数据切成一行行字符串处理
let framed = Framed::new(stream, LinesCodec::new());
// split 成 writer 和 reader
let (mut w, mut r) = framed.split();
for line in r.next().await {
// 为每个消息创建一个 oneshot channel
let (reply, reply_receiver) = oneshot::channel();
sender1.send((line?, reply))?;

if let Ok(v) = reply_receiver.await {
w.send(format!("Pow calculated: {}", v)).await?;
}
}
Ok::<_, anyhow::Error>(())
});
}
}

pub fn pow(s: &str) -> Option<(String, u32)> {
let hasher = blake3_base_hash(s.as_bytes());
// 使用 rayon 并发计算 u32 空间下所有 nonce,直到找到有头 N 个 0 的哈希
let nonce = (0..u32::MAX).into_par_iter().find_any(|n| {
let hash = blake3_hash(hasher.clone(), n).as_bytes().to_vec();
&hash[..PREFIX_ZERO.len()] == PREFIX_ZERO
});
nonce.map(|n| {
let hash = blake3_hash(hasher, &n).to_hex().to_string();
(hash, n)
})
}

// 计算携带 nonce 后的哈希
fn blake3_hash(mut hasher: blake3::Hasher, nonce: &u32) -> blake3::Hash {
hasher.update(&nonce.to_be_bytes()[..]);
hasher.finalize()
}

// 计算数据的哈希
fn blake3_base_hash(data: &[u8]) -> Hasher {
let mut hasher = Hasher::new();
hasher.update(data);
hasher
}

Waker

executor 通过调用 poll 方法来让 Future 继续往下执行,如果 poll 方法返回 Poll::Pending,就阻塞 Future,直到 reactor 收到了某个事件,然后调用 Waker.wake() 把 Future 唤醒。

Context 就是 Waker 的一个封装:

1
2
3
4
pub struct Context<'a> {
waker: &'a Waker,
_marker: PhantomData<fn(&'a ()) -> &'a ()>,
}

内部使用了一个 vtable 来允许的 waker 的行为:

1
2
3
4
5
6
pub struct RawWakerVTable {
clone: unsafe fn(*const ()) -> RawWaker,
wake: unsafe fn(*const ()),
wake_by_ref: unsafe fn(*const ()),
drop: unsafe fn(*const ()),
}

在标准库中,只有这些接口的定义,以及“高层”接口的实现,比如 Waker 下的 wake 方法,只是调用了 vtable 里的 wake() 而已:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
impl Waker {
/// Wake up the task associated with this `Waker`.
#[inline]
pub fn wake(self) {
// The actual wakeup call is delegated through a virtual function call
// to the implementation which is defined by the executor.
let wake = self.waker.vtable.wake;
let data = self.waker.data;

// Don't call `drop` -- the waker will be consumed by `wake`.
crate::mem::forget(self);

// SAFETY: This is safe because `Waker::from_raw` is the only way
// to initialize `wake` and `data` requiring the user to acknowledge
// that the contract of `RawWaker` is upheld.
unsafe { (wake)(data) };
}
...
}

具体的实现可参看 futures-rs/waker.rs

Poll

executor 处理 Future 时,会不断地调用它的 poll() 方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
hello().await?;

// 等价于
match hello.poll(cx) {
Poll::Ready(result) => return result,
Poll::Pending => return Poll::Pending
}

hello(inner1.await, inner2.await).await?; // inner1 conditioned by inner2

let fut = inner1;
match fut.poll(cx) {
Poll::Ready(Ok(file)) => {
let fut = inner2;
match fut.poll(cx) {
Poll::Ready(result) => return result,
Poll::Pending => return Poll::Pending,
}
}
Poll::Pending => return Poll::Pending,
}

async 函数返回的是一个 Future,所以,还需要把这样的代码封装在一个 Future 的实现里,对外提供出去。

需要实现一个数据结构,把内部的状态保存起来,并为这个数据结构实现 Future。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
enum WriteHelloFile {
Init(String),
// 等待文件创建,此时需要保存 Future 以便多次调用
AwaitingCreate(impl Future<Output = Result<fs::File, std::io::Error>>),
// 等待文件写入,此时需要保存 Future 以便多次调用
AwaitingWrite(impl Future<Output = Result<(), std::io::Error>>),
Done,
}

impl WriteHelloFile {
pub fn new(name: impl Into<String>) -> Self {
Self::Init(name.into())
}
}

impl Future for WriteHelloFile {
type Output = Result<(), std::io::Error>;

fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let this = self.get_mut();
loop {
match this {
// 如果状态是 Init,那么就生成 create Future,把状态切换到 AwaitingCreate
WriteHelloFile::Init(name) => {
let fut = fs::File::create(name);
*self = WriteHelloFile::AwaitingCreate(fut);
}
// 如果状态是 AwaitingCreate,那么 poll create Future
// 如果返回 Poll::Ready(Ok(_)),那么创建 write Future
// 并把状态切换到 Awaiting
WriteHelloFile::AwaitingCreate(fut) => match fut.poll(cx) {
Poll::Ready(Ok(file)) => {
let fut = file.write_all(b"hello world!");
*self = WriteHelloFile::AwaitingWrite(fut);
}
Poll::Ready(Err(e)) => return Poll::Ready(Err(e)),
Poll::Pending => return Poll::Pending,
},
// 如果状态是 AwaitingWrite,那么 poll write Future
// 如果返回 Poll::Ready(_),那么状态切换到 Done,整个 Future 执行成功
WriteHelloFile::AwaitingWrite(fut) => match fut.poll(cx) {
Poll::Ready(result) => {
*self = WriteHelloFile::Done;
return Poll::Ready(result);
}
Poll::Pending => return Poll::Pending,
},
// 整个 Future 已经执行完毕
WriteHelloFile::Done => return Poll::Ready(Ok(())),
}
}
}
}

fn write_hello_file_async(name: &str) -> WriteHelloFile {
WriteHelloFile::new(name)
}

Rust 在编译 async fn 或者 async block 时,就会生成类似的状态机的实现。

Pin

在上面实现 Future 的状态机中,AwaitingCreate 引用了 file 这样一个局部变量:

file 被 fut 引用,但 file 会在这个作用域被丢弃。需要把它保存在数据结构中:

1
2
3
4
5
6
7
8
9
10
11
enum WriteHelloFile {
Init(String),
AwaitingCreate(impl Future<Output = Result<fs::File, std::io::Error>>),
AwaitingWrite(AwaitingWriteData),
Done,
}

struct AwaitingWriteData {
fut: impl Future<Output = Result<(), std::io::Error>>,
file: fs::File,
}

在同一个数据结构内部,fut 指向了对 file 的引用,形成自引用结构(Self-Referential Structure)。

自引用结构有一个很大的问题是:一旦它被移动,原本的指针就会指向旧的地址。即,fut 会在移动后指向移动前的 file。

Pin 拿住的是一个可以解引用成 T 的指针类型 P,而不是直接拿原本的类型 T。所以,对于 Pin 而言,都是 Pin<Box>、Pin<&mut T>,但不会是 Pin。因为 Pin 的目的是,把 T 的内存位置锁住,从而避免移动后自引用类型带来的引用失效问题。

此处 Pin 的 AwaitingWriteData 引用,指向同一个内存位置。

自引用数据结构并非只在异步代码里出现,只不过异步代码在内部生成用状态机表述的 Future 时,很容易产生自引用结构。比如:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
#[derive(Debug)]
struct SelfReference {
name: String,
// 在初始化后指向 name
name_ptr: *const String,
}

impl SelfReference {
pub fn new(name: impl Into<String>) -> Self {
SelfReference {
name: name.into(),
name_ptr: std::ptr::null(),
}
}

pub fn init(&mut self) {
self.name_ptr = &self.name as *const String;
}

pub fn print_name(&self) {
println!(
"struct {:p}: (name: {:p} name_ptr: {:p}), name: {}, name_ref: {}",
self,
&self.name,
self.name_ptr,
self.name,
// 在使用 ptr 是需要 unsafe
// SAFETY: 这里 name_ptr 潜在不安全,会指向旧的位置
unsafe { &*self.name_ptr },
);
}
}

// memcpy
fn move_it(data: SelfReference) -> SelfReference {
data
}

fn move_creates_issue() -> SelfReference {
let mut data = SelfReference::new("Tyr");
data.init();

// 不 move,一切正常
data.print_name();

let data = move_it(data);

// move 之后,name_ref 指向的位置是已经失效的地址
// 只不过现在 move 前的地址还没被回收
data.print_name();
data
}

fn mem_swap_creates_issue() {
let mut data1 = SelfReference::new("Tyr");
data1.init();

let mut data2 = SelfReference::new("Lindsey");
data2.init();

// name: Tyr, name_ref: Tyr
data1.print_name();
// name: Lindsey, name_ref: Lindsey
data2.print_name();

std::mem::swap(&mut data1, &mut data2);
// name: Lindsey, name_ref: Tyr
data1.print_name();
// name: Tyr, name_ref: Lindsey
data2.print_name();
}

fn main() {
let data = move_creates_issue();
println!("data: {:?}", data);
// 若不注释下面这行,程序运行会直接 segment error, move 前的地址已经被回收
// data.print_name();
print!("\\n");
mem_swap_creates_issue();
}

创建了一个自引用结构 SelfReference,它里面的 name_ref 指向了 name。正常使用它时,没有任何问题,但一旦对这个结构做 move 操作,name_ref 指向的位置还会是 move 前 name 的地址,这就引发了问题。

使用 std::mem:swap,也会出现类似的问题,一旦 swap,两个数据的内容交换,然而,name_ref 指向的地址还是旧的。

Pin 对解决这类问题很关键,如果试图移动被 Pin 住的数据结构,要么,编译器会通过编译错误阻止你;要么,使用 unsafe Rust,自己负责其安全性。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
use std::{marker::PhantomPinned, pin::Pin};

#[derive(Debug)]
struct SelfReference {
name: String,
// 在初始化后指向 name
name_ptr: *const String,
// PhantomPinned 占位符
_marker: PhantomPinned,
}

impl SelfReference {
pub fn new(name: impl Into<String>) -> Self {
SelfReference {
name: name.into(),
name_ptr: std::ptr::null(),
_marker: PhantomPinned,
}
}

pub fn init(self: Pin<&mut Self>) {
let name_ptr = &self.name as *const String;
// SAFETY: 这里并不会把任何数据从 &mut SelfReference 中移走
let this = unsafe { self.get_unchecked_mut() };
this.name_ptr = name_ptr;
}

pub fn print_name(self: Pin<&Self>) {
println!(
"struct {:p}: (name: {:p} name_ptr: {:p}), name: {}, name_ref: {}",
self,
&self.name,
self.name_ptr,
self.name,
// 在使用 ptr 是需要 unsafe
// SAFETY: 因为数据不会移动,所以这里 name_ptr 是安全的
unsafe { &*self.name_ptr },
);
}
}

fn move_pinned(data: Pin<&mut SelfReference>) {
println!("{:?} ({:p})", data, &data);
}

#[allow(dead_code)]
fn move_it(data: SelfReference) {
println!("{:?} ({:p})", data, &data);
}

fn move_creates_issue() {
let mut data = SelfReference::new("Tyr");
let mut data = unsafe { Pin::new_unchecked(&mut data) };
SelfReference::init(data.as_mut());

// 不 move,一切正常
data.as_ref().print_name();

// 现在只能拿到 pinned 后的数据
move_pinned(data.as_mut());
println!("{:?} ({:p})", data, &data);

// 无法拿回 Pin 之前的 SelfReference 结构,
// expected struct `SelfReference`
// found struct `Pin<&mut SelfReference>`
// move_it(data);
}

fn main() {
move_creates_issue();
}

Unpin

1
pub auto trait Unpin {}

Pin 是为了让某个数据结构无法合法地移动,而 Unpin 则相当于声明数据结构是可以移动的,它的作用类似于 Send / Sync,通过类型约束来告诉编译器哪些行为是合法的、哪些不是。

在 Rust 中,绝大多数数据结构都是可以移动的,所以它们都自动实现了 Unpin

即便这些结构被 Pin 包裹,它们依旧可以进行移动:

1
2
3
4
5
6
7
// 需要一个可变引用来调用 `mem::replace`
// 可以通过(隐式)调用 `Pin::deref_mut` 来获得这样的引用,但这只有在 `String` 实现了 `Unpin` 后才有可能

let mut string = "this".to_string();
let mut pinned_string = Pin::new(&mut string);

mem::replace(&mut *pinned_string, "other".to_string());

当不希望一个数据结构被移动,可以使用 !Unpin。在 Rust 里,实现了 !Unpin 的,除了内部结构(比如 Future),主要就是 PhantomPinned:

1
2
pub struct PhantomPinned;
impl !Unpin for PhantomPinned {}

所以,如果你希望你的数据结构不能被移动,可以为其添加 PhantomPinned 字段来隐式声明 !Unpin。

当数据结构满足 Unpin 时,创建 Pin 以及使用 Pin(主要是 DerefMut)都可以不使用 unsafe:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
// 如果实现了 Unpin,可以通过安全接口创建和进行 DerefMut
impl<P: Deref<Target: Unpin>> Pin<P> {
pub const fn new(pointer: P) -> Pin<P> {
// SAFETY: the value pointed to is `Unpin`, and so has no requirements
// around pinning.
unsafe { Pin::new_unchecked(pointer) }
}
pub const fn into_inner(pin: Pin<P>) -> P {
pin.pointer
}
}

impl<P: DerefMut<Target: Unpin>> DerefMut for Pin<P> {
fn deref_mut(&mut self) -> &mut P::Target {
Pin::get_mut(Pin::as_mut(self))
}
}

// 如果没有实现 Unpin,只能通过 unsafe 接口创建,不能使用 DerefMut
impl<P: Deref> Pin<P> {
pub const unsafe fn new_unchecked(pointer: P) -> Pin<P> {
Pin { pointer }
}

pub const unsafe fn into_inner_unchecked(pin: Pin<P>) -> P {
pin.pointer
}
}

Box 默认实现了 Unpin。 Box 在堆上分配内存,而指针本身是可以移动的。

Future 的类型

1
2
3
4
5
6
7
8
9
fn main() {
let fut = async { 42 };

println!("type of fut is: {}", get_type_name(&fut));
}

fn get_type_name<T>(_: &T) -> &'static str {
std::any::type_name::<T>()
}

它的输出如下:

1
type of fut is: core::future::from_generator::GenFuture<xxx::main::{{closure}}>

GenFuture 的定义:

1
2
3
4
5
6
7
8
9
10
struct GenFuture<T: Generator<ResumeTy, Yield = ()>>(T);

pub trait Generator<R = ()> {
type Yield;
type Return;
fn resume(
self: Pin<&mut Self>,
arg: R
) -> GeneratorState<Self::Yield, Self::Return>;
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
#![feature(generators, generator_trait)]

use std::ops::{Generator, GeneratorState};
use std::pin::Pin;

fn main() {
let mut generator = || {
yield 1;
return "foo"
};

match Pin::new(&mut generator).resume(()) {
GeneratorState::Yielded(1) => {}
_ => panic!("unexpected return from resume"),
}
match Pin::new(&mut generator).resume(()) {
GeneratorState::Complete("foo") => {}
_ => panic!("unexpected return from resume"),
}
}

类似 python 中的 yield。

Stream

对于 Iterator,可以不断调用其 next() 方法,获得新的值,直到 Iterator 返回 None。Iterator 是阻塞式返回数据的,每次调用 next(),必然独占 CPU 直到得到一个结果,而异步的 Stream 是非阻塞的,在等待的过程中会空出 CPU 做其他事情。

Iterator 和 Stream的源码定义(参考 Future 库):

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
pub trait Iterator {
type Item;
fn next(&mut self) -> Option<Self::Item>;

fn size_hint(&self) -> (usize, Option<usize>) { ... }
fn map<B, F>(self, f: F) -> Map<Self, F> where F: FnMut(Self::Item) -> B { ... }
...
}

pub trait Stream {
type Item;

// 和 Future 的 poll() 方法很像,和 Iterator 版本的 next() 的作用类似
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>>;

fn size_hint(&self) -> (usize, Option<usize>) { ... }
}

pub trait StreamExt: Stream {
fn next(&mut self) -> Next<'_, Self> where Self: Unpin { ... }
fn map<T, F>(self, f: F) -> Map<Self, F> where F: FnMut(Self::Item) -> T { ... }
...
}

poll_next() 调用起来不方便,我们需要自己处理 Poll 状态,所以,StreamExt 提供了 next() 方法,返回一个实现了 Future trait 的 Next 结构,这样,我们就可以直接通过 stream.next().await 来获取下一个值了。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
pub trait StreamExt: Stream {
fn next(&mut self) -> Next<'_, Self> where Self: Unpin {
assert_future::<Option<Self::Item>, _>(Next::new(self))
}
}

pub struct Next<'a, St: ?Sized> {
stream: &'a mut St,
}

// 如果 Stream Unpin 那么 Next 也是 Unpin
impl<St: ?Sized + Unpin> Unpin for Next<'_, St> {}

impl<'a, St: ?Sized + Stream + Unpin> Next<'a, St> {
pub(super) fn new(stream: &'a mut St) -> Self {
Self { stream }
}
}

// Next 实现了 Future,每次 poll() 实际上就是从 stream 中 poll_next()
impl<St: ?Sized + Stream + Unpin> Future for Next<'_, St> {
type Output = Option<St::Item>;

fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
self.stream.poll_next_unpin(cx)
}
}

一般来说,异步操作数据结构,如果使用了泛型参数,只要内部没有自引用数据,就应该实现 Unpin。

示例:

1
2
3
4
5
6
7
8
9
10
11
12
use futures::prelude::*;

#[tokio::main]
async fn main() {
let mut st = stream::iter(1..10)
.filter(|x| future::ready(x % 2 == 0))
.map(|x| x * x);

while let Some(x) = st.next().await {
println!("Got item: {}", x);
}
}

一个 fib Stream 示例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
use futures::{prelude::*, stream::poll_fn};
use std::task::Poll;

#[tokio::main]
async fn main() {
consume(fib().take(10)).await;
consume(fib1(10)).await;
// unfold 产生的 Unfold stream 没有实现 Unpin,
// 将其 Pin<Box<T>> ,使其满足 consume 的接口
consume(fib2(10).boxed()).await;
}

async fn consume(mut st: impl Stream<Item = i32> + Unpin) {
while let Some(v) = st.next().await {
print!("{} ", v);
}
print!("\\n");
}

// 使用 repeat_with 创建 stream
fn fib() -> impl Stream<Item = i32> {
let mut a = 1;
let mut b = 1;
stream::repeat_with(move || {
let c = a + b;
a = b;
b = c;
b
})
}

// 使用 poll_fn 创建 stream,可以通过返回 Poll::Ready(None) 来结束
fn fib1(mut n: usize) -> impl Stream<Item = i32> {
let mut a = 1;
let mut b = 1;
poll_fn(move |_cx| -> Poll<Option<i32>> {
if n == 0 {
return Poll::Ready(None);
}
n -= 1;
let c = a + b;
a = b;
b = c;
Poll::Ready(Some(b))
})
}

fn fib2(n: usize) -> impl Stream<Item = i32> {
stream::unfold((n, (1, 1)), |(mut n, (a, b))| async move {
if n == 0 {
None
} else {
n -= 1;
let c = a + b;
// c 作为 poll_next() 的返回值,(n, (a, b)) 作为 state
Some((c, (n, (b, c))))
}
})
}

值得注意的是,使用 unfold 的时候,同时使用了局部变量和 Future,所以生成的 Stream 没有实现 Unpin。

Pin<Box> 是一种很简单的方法,能将数据 Pin 在堆上,可以使用 StreamExt 的 boxed() 方法来生成一个 Pin<Box>。

pin_project 库,它提供了一些便利的宏,方便我们操作数据结构里需要被 pin 住的字段。在数据结构中,可以使用 #pin 来声明某个字段在使用的时候需要被封装为 Pin

Result 方法的 transpose 可以方便地进行 Result<Option> 和 Option<Result> 的转化。

示例,当读取自定义数据结构内部的 file 字段,但又不想把 File 暴露出来,可以封装 tokio::fs::File 结构,并实现 AsyncRead trait:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
use anyhow::Result;
use pin_project::pin_project;
use std::{
pin::Pin,
task::{Context, Poll},
};
use tokio::{
fs::File,
io::{AsyncRead, AsyncReadExt, ReadBuf},
};

#[pin_project]
struct FileWrapper {
#[pin]
file: File,
}

impl FileWrapper {
pub async fn try_new(name: &str) -> Result<Self> {
let file = File::open(name).await?;
Ok(Self { file })
}
}

impl AsyncRead for FileWrapper {
fn poll_read(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &mut ReadBuf<'_>,
) -> Poll<std::io::Result<()>> {
self.project().file.poll_read(cx, buf)
}
}

#[tokio::main]
async fn main() -> Result<()> {
let mut file = FileWrapper::try_new("./Cargo.toml").await?;
let mut buffer = String::new();
file.read_to_string(&mut buffer).await?;
println!("{}", buffer);
Ok(())
}

Sink

异步IO还有一个比较独特的 Sink trait,一个用于发送一系列异步值的接口。

Sink trait 的定义:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
pub trait Sink<Item> {
type Error;
// 用来准备 Sink 使其可以发送数据。只有 poll_ready() 返回 Poll::Ready(Ok(())) 后,Sink 才会开展后续的动作
fn poll_ready(
self: Pin<&mut Self>,
cx: &mut Context<'_>
) -> Poll<Result<(), Self::Error>>;
// 开始发送数据到 Sink
// 但是start_send() 并不保证数据被发送完毕,所以调用者要调用 poll_flush() 或者 poll_close() 来保证完整发送
fn start_send(self: Pin<&mut Self>, item: Item) -> Result<(), Self::Error>;
// 将任何尚未发送的数据 flush 到这个 Sink
fn poll_flush(
self: Pin<&mut Self>,
cx: &mut Context<'_>
) -> Poll<Result<(), Self::Error>>;
// 将任何尚未发送的数据 flush 到这个 Sink,并关闭这个 Sink
fn poll_close(
self: Pin<&mut Self>,
cx: &mut Context<'_>
) -> Poll<Result<(), Self::Error>>;
}

pub trait SinkExt<Item>: Sink<Item> {
...
fn send(&mut self, item: Item) -> Send<'_, Self, Item> where Self: Unpin { ... }
...
}

和 Stream trait 不同的是,Sink trait 的 Item 是 trait 的泛型参数,而不是关联类型。

一般而言,当 trait 接受某个 input,应该使用泛型参数,比如 Add;当它输出某个 output,那么应该使用关联类型,比如 Future、Stream、Iterator 等。

Item 对于 Sink 来说是输入,所以使用泛型参数。

示例,注意实现 pull_xxx 方法时,不能再直接使用异步函数,需要视为 future ,并调用 future 的 poll 函数,这时可能会需要 Box::pin 生成堆上的 trait object:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
use anyhow::Result;
use bytes::{BufMut, BytesMut};
use futures::{Sink, SinkExt};
use pin_project::pin_project;
use std::{
pin::Pin,
task::{Context, Poll},
};
use tokio::{fs::File, io::AsyncWrite};

#[pin_project]
struct FileSink {
#[pin]
file: File,
buf: BytesMut,
}

impl FileSink {
pub fn new(file: File) -> Self {
Self {
file,
buf: BytesMut::new(),
}
}
}

impl Sink<&str> for FileSink {
type Error = std::io::Error;

fn poll_ready(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
Poll::Ready(Ok(()))
}

fn start_send(self: Pin<&mut Self>, item: &str) -> Result<(), Self::Error> {
let this = self.project();
eprint!("{}", item);
this.buf.put(item.as_bytes());
Ok(())
}

fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
// 如果想 project() 多次,需要先把 self reborrow
let this = self.as_mut().project();

// 这个 buffer 和 self 无关,所以传入 poll_write() 时,不会有对 self 的引用问题
let buf = this.buf.split_to(this.buf.len());
if buf.is_empty() {
return Poll::Ready(Ok(()));
}

if let Err(e) = futures::ready!(this.file.poll_write(cx, &buf[..])) {
return Poll::Ready(Err(e));
}
self.project().file.poll_flush(cx)
}

fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
let this = self.project();
this.file.poll_shutdown(cx)
}
}


#[tokio::main]
async fn main() -> Result<()> {
let file_sink = FileSink::new(File::create("/tmp/hello").await?);
// pin_mut 可以把变量 pin 住
futures::pin_mut!(file_sink);
file_sink.send("hello\\n").await?;
file_sink.send("world!\\n").await?;
file_sink.send("Tyr!\\n").await?;

Ok(())
}

futures 里提供了 sink::unfold 方法,实现创建 Sink,类似 stream::unfold:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
use anyhow::Result;
use futures::prelude::*;
use tokio::{fs::File, io::AsyncWriteExt};

#[tokio::main]
async fn main() -> Result<()> {
let file_sink = writer(File::create("/tmp/hello").await?);
// pin_mut 把变量 pin 住
futures::pin_mut!(file_sink);
if let Err(_) = file_sink.send("hello\\n").await {
println!("Error on send");
}
if let Err(_) = file_sink.send("world!\\n").await {
println!("Error on send");
}
Ok(())
}

/// 使用 unfold 生成一个 Sink 数据结构
fn writer<'a>(file: File) -> impl Sink<&'a str> {
sink::unfold(file, |mut file, line: &'a str| async move {
file.write_all(line.as_bytes()).await?;
eprint!("Received: {}", line);
Ok::<_, std::io::Error>(file)
})
}

本博客所有文章除特别声明外,均采用 CC BY-SA 4.0 协议 ,转载请注明出处!