RUST-Async async 定义了一个可以并发执行的任务,而 await 则触发这个任务并发执行。大多数语言,包括 Rust,async/await 都是一个语法糖(syntactic sugar),它们使用状态机将 Promise/Future 这样的结构包装起来进行处理。 JavaScript 的 Promise 和线程类似,一旦创建就开始执行,对 Promise await 只是为了获取解析出来的值;而 Rust 的 Future,只有在主动 await 后才开始执行。 Future 异步函数(async fn)的返回值是一个奇怪的 impl Future 的结构 123456789pub trait Future { type Output; fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>;}pub enum Poll<T> { Ready(T), Pending,} 对于线程来说,操作系统负责调度;但操作系统不会去调度用户态的协程(比如 Future),任何使用了协程来处理并发的程序,都需要有一个 executor 来负责协程的调度。 常见的 executor 有 futures, tokio, async-std 等。 Reactor 与异步处理 Reactor pattern 它包含三部分: task,待处理的任务。任务可以被打断,并且把控制权交给 executor,等待之后的调度; executor,一个调度器。维护等待运行的任务(ready queue),以及被阻塞的任务(wait queue); reactor,维护事件队列。当事件来临时,通知 executor 唤醒某个任务等待运行。 Future 是异步任务的数据结构,当 fut.await 时,executor 就会调度并执行它。 tokio 的调度器(executor)会运行在多个线程上,运行线程自己的 ready queue 上的任务(Future),如果没有,就去别的线程的调度器上 stealing 一些过来运行(tokio 实现了 work stealing scheduler)。当某个任务无法再继续取得进展,此时 Future 运行的结果是 Poll::Pending,那么调度器会挂起任务,并设置好合适的唤醒条件(Waker),等待被 reactor 唤醒。 reactor 会利用操作系统提供的异步 I/O,比如 epoll / kqueue / IOCP,来监听操作系统提供的 IO 事件,当遇到满足条件的事件时,就会调用 Waker.wake() 唤醒被挂起的 Future。这个 Future 会回到 ready queue 等待执行。 异步使用环境 非计算密集型 Future 的调度是协作式多任务(Cooperative Multitasking),除非 Future 主动放弃 CPU,不然它就会一直被执行,直到运行结束。 如果你的确需要在 tokio(或者其它异步运行时)下运行运算量很大的代码,那么最好使用 yield 来主动让出 CPU,比如 tokio::task::yield_now(),避免某个计算密集型的任务饿死其它任务。 非 std mutex 大部分时候,标准库的 Mutex 可以用在异步代码中,而且,这是推荐的用法。 然而,标准库的 MutexGuard 不能安全地跨越 await,所以,当我们需要获得锁之后执行异步操作,必须使用 tokio 自带的 Mutex。 因为 tokio 实现了 work-stealing 调度,Future 有可能在不同的线程中执行,普通的 MutexGuard 编译直接就会出错,所以需要使用 tokio 的 Mutex。 线程与异步任务的同步 兼有计算密集和 IO 密集的任务,可以使用 channel 来在线程和future两者之间做同步。 12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788use std::thread;use anyhow::Result;use blake3::Hasher;use futures::{SinkExt, StreamExt};use rayon::prelude::*;use tokio::{ net::TcpListener, sync::{mpsc, oneshot},};use tokio_util::codec::{Framed, LinesCodec};pub const PREFIX_ZERO: &[u8] = &[0, 0, 0];#[tokio::main]async fn main() -> Result<()> { let addr = "0.0.0.0:8080"; let listener = TcpListener::bind(addr).await?; println!("listen to: {}", addr); // 创建 tokio task 和 thread 之间的 channel let (sender, mut receiver) = mpsc::unbounded_channel::<(String, oneshot::Sender<String>)>(); // 使用 thread 处理计算密集型任务 thread::spawn(move || { // 读取从 tokio task 过来的 msg,这里用的是 blocking_recv,而非 await while let Some((line, reply)) = receiver.blocking_recv() { // 计算 pow let result = match pow(&line) { Some((hash, nonce)) => format!("hash: {}, once: {}", hash, nonce), None => "Not found".to_string(), }; // 把计算结果从 oneshot channel 里发回 if let Err(e) = reply.send(result) { println!("Failed to send: {}", e); } } }); // 使用 tokio task 处理 IO 密集型任务 loop { let (stream, addr) = listener.accept().await?; println!("Accepted: {:?}", addr); let sender1 = sender.clone(); tokio::spawn(async move { // 使用 LinesCodec 把 TCP 数据切成一行行字符串处理 let framed = Framed::new(stream, LinesCodec::new()); // split 成 writer 和 reader let (mut w, mut r) = framed.split(); for line in r.next().await { // 为每个消息创建一个 oneshot channel let (reply, reply_receiver) = oneshot::channel(); sender1.send((line?, reply))?; if let Ok(v) = reply_receiver.await { w.send(format!("Pow calculated: {}", v)).await?; } } Ok::<_, anyhow::Error>(()) }); }}pub fn pow(s: &str) -> Option<(String, u32)> { let hasher = blake3_base_hash(s.as_bytes()); // 使用 rayon 并发计算 u32 空间下所有 nonce,直到找到有头 N 个 0 的哈希 let nonce = (0..u32::MAX).into_par_iter().find_any(|n| { let hash = blake3_hash(hasher.clone(), n).as_bytes().to_vec(); &hash[..PREFIX_ZERO.len()] == PREFIX_ZERO }); nonce.map(|n| { let hash = blake3_hash(hasher, &n).to_hex().to_string(); (hash, n) })}// 计算携带 nonce 后的哈希fn blake3_hash(mut hasher: blake3::Hasher, nonce: &u32) -> blake3::Hash { hasher.update(&nonce.to_be_bytes()[..]); hasher.finalize()}// 计算数据的哈希fn blake3_base_hash(data: &[u8]) -> Hasher { let mut hasher = Hasher::new(); hasher.update(data); hasher} Waker executor 通过调用 poll 方法来让 Future 继续往下执行,如果 poll 方法返回 Poll::Pending,就阻塞 Future,直到 reactor 收到了某个事件,然后调用 Waker.wake() 把 Future 唤醒。 Context 就是 Waker 的一个封装: 1234pub struct Context<'a> { waker: &'a Waker, _marker: PhantomData<fn(&'a ()) -> &'a ()>,} 内部使用了一个 vtable 来允许的 waker 的行为: 123456pub struct RawWakerVTable { clone: unsafe fn(*const ()) -> RawWaker, wake: unsafe fn(*const ()), wake_by_ref: unsafe fn(*const ()), drop: unsafe fn(*const ()),} 在标准库中,只有这些接口的定义,以及“高层”接口的实现,比如 Waker 下的 wake 方法,只是调用了 vtable 里的 wake() 而已: 12345678910111213141516171819impl Waker { /// Wake up the task associated with this `Waker`. #[inline] pub fn wake(self) { // The actual wakeup call is delegated through a virtual function call // to the implementation which is defined by the executor. let wake = self.waker.vtable.wake; let data = self.waker.data; // Don't call `drop` -- the waker will be consumed by `wake`. crate::mem::forget(self); // SAFETY: This is safe because `Waker::from_raw` is the only way // to initialize `wake` and `data` requiring the user to acknowledge // that the contract of `RawWaker` is upheld. unsafe { (wake)(data) }; } ...} 具体的实现可参看 futures-rs/waker.rs。 Poll executor 处理 Future 时,会不断地调用它的 poll() 方法: 123456789101112131415161718192021hello().await?;// 等价于match hello.poll(cx) { Poll::Ready(result) => return result, Poll::Pending => return Poll::Pending}hello(inner1.await, inner2.await).await?; // inner1 conditioned by inner2let fut = inner1;match fut.poll(cx) { Poll::Ready(Ok(file)) => { let fut = inner2; match fut.poll(cx) { Poll::Ready(result) => return result, Poll::Pending => return Poll::Pending, } } Poll::Pending => return Poll::Pending,} async 函数返回的是一个 Future,所以,还需要把这样的代码封装在一个 Future 的实现里,对外提供出去。 需要实现一个数据结构,把内部的状态保存起来,并为这个数据结构实现 Future。 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657enum WriteHelloFile { Init(String), // 等待文件创建,此时需要保存 Future 以便多次调用 AwaitingCreate(impl Future<Output = Result<fs::File, std::io::Error>>), // 等待文件写入,此时需要保存 Future 以便多次调用 AwaitingWrite(impl Future<Output = Result<(), std::io::Error>>), Done,}impl WriteHelloFile { pub fn new(name: impl Into<String>) -> Self { Self::Init(name.into()) }}impl Future for WriteHelloFile { type Output = Result<(), std::io::Error>; fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { let this = self.get_mut(); loop { match this { // 如果状态是 Init,那么就生成 create Future,把状态切换到 AwaitingCreate WriteHelloFile::Init(name) => { let fut = fs::File::create(name); *self = WriteHelloFile::AwaitingCreate(fut); } // 如果状态是 AwaitingCreate,那么 poll create Future // 如果返回 Poll::Ready(Ok(_)),那么创建 write Future // 并把状态切换到 Awaiting WriteHelloFile::AwaitingCreate(fut) => match fut.poll(cx) { Poll::Ready(Ok(file)) => { let fut = file.write_all(b"hello world!"); *self = WriteHelloFile::AwaitingWrite(fut); } Poll::Ready(Err(e)) => return Poll::Ready(Err(e)), Poll::Pending => return Poll::Pending, }, // 如果状态是 AwaitingWrite,那么 poll write Future // 如果返回 Poll::Ready(_),那么状态切换到 Done,整个 Future 执行成功 WriteHelloFile::AwaitingWrite(fut) => match fut.poll(cx) { Poll::Ready(result) => { *self = WriteHelloFile::Done; return Poll::Ready(result); } Poll::Pending => return Poll::Pending, }, // 整个 Future 已经执行完毕 WriteHelloFile::Done => return Poll::Ready(Ok(())), } } }}fn write_hello_file_async(name: &str) -> WriteHelloFile { WriteHelloFile::new(name)} Rust 在编译 async fn 或者 async block 时,就会生成类似的状态机的实现。 Pin 在上面实现 Future 的状态机中,AwaitingCreate 引用了 file 这样一个局部变量: file 被 fut 引用,但 file 会在这个作用域被丢弃。需要把它保存在数据结构中: 1234567891011enum WriteHelloFile { Init(String), AwaitingCreate(impl Future<Output = Result<fs::File, std::io::Error>>), AwaitingWrite(AwaitingWriteData), Done,}struct AwaitingWriteData { fut: impl Future<Output = Result<(), std::io::Error>>, file: fs::File,} 在同一个数据结构内部,fut 指向了对 file 的引用,形成自引用结构(Self-Referential Structure)。 自引用结构有一个很大的问题是:一旦它被移动,原本的指针就会指向旧的地址。即,fut 会在移动后指向移动前的 file。 Pin 拿住的是一个可以解引用成 T 的指针类型 P,而不是直接拿原本的类型 T。所以,对于 Pin 而言,都是 Pin<Box>、Pin<&mut T>,但不会是 Pin。因为 Pin 的目的是,把 T 的内存位置锁住,从而避免移动后自引用类型带来的引用失效问题。 此处 Pin 的 AwaitingWriteData 引用,指向同一个内存位置。 自引用数据结构并非只在异步代码里出现,只不过异步代码在内部生成用状态机表述的 Future 时,很容易产生自引用结构。比如: 1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980#[derive(Debug)]struct SelfReference { name: String, // 在初始化后指向 name name_ptr: *const String,}impl SelfReference { pub fn new(name: impl Into<String>) -> Self { SelfReference { name: name.into(), name_ptr: std::ptr::null(), } } pub fn init(&mut self) { self.name_ptr = &self.name as *const String; } pub fn print_name(&self) { println!( "struct {:p}: (name: {:p} name_ptr: {:p}), name: {}, name_ref: {}", self, &self.name, self.name_ptr, self.name, // 在使用 ptr 是需要 unsafe // SAFETY: 这里 name_ptr 潜在不安全,会指向旧的位置 unsafe { &*self.name_ptr }, ); }}// memcpyfn move_it(data: SelfReference) -> SelfReference { data}fn move_creates_issue() -> SelfReference { let mut data = SelfReference::new("Tyr"); data.init(); // 不 move,一切正常 data.print_name(); let data = move_it(data); // move 之后,name_ref 指向的位置是已经失效的地址 // 只不过现在 move 前的地址还没被回收 data.print_name(); data}fn mem_swap_creates_issue() { let mut data1 = SelfReference::new("Tyr"); data1.init(); let mut data2 = SelfReference::new("Lindsey"); data2.init(); // name: Tyr, name_ref: Tyr data1.print_name(); // name: Lindsey, name_ref: Lindsey data2.print_name(); std::mem::swap(&mut data1, &mut data2); // name: Lindsey, name_ref: Tyr data1.print_name(); // name: Tyr, name_ref: Lindsey data2.print_name();}fn main() { let data = move_creates_issue(); println!("data: {:?}", data); // 若不注释下面这行,程序运行会直接 segment error, move 前的地址已经被回收 // data.print_name(); print!("\\n"); mem_swap_creates_issue();} 创建了一个自引用结构 SelfReference,它里面的 name_ref 指向了 name。正常使用它时,没有任何问题,但一旦对这个结构做 move 操作,name_ref 指向的位置还会是 move 前 name 的地址,这就引发了问题。 使用 std::mem:swap,也会出现类似的问题,一旦 swap,两个数据的内容交换,然而,name_ref 指向的地址还是旧的。 Pin 对解决这类问题很关键,如果试图移动被 Pin 住的数据结构,要么,编译器会通过编译错误阻止你;要么,使用 unsafe Rust,自己负责其安全性。 1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071use std::{marker::PhantomPinned, pin::Pin};#[derive(Debug)]struct SelfReference { name: String, // 在初始化后指向 name name_ptr: *const String, // PhantomPinned 占位符 _marker: PhantomPinned,}impl SelfReference { pub fn new(name: impl Into<String>) -> Self { SelfReference { name: name.into(), name_ptr: std::ptr::null(), _marker: PhantomPinned, } } pub fn init(self: Pin<&mut Self>) { let name_ptr = &self.name as *const String; // SAFETY: 这里并不会把任何数据从 &mut SelfReference 中移走 let this = unsafe { self.get_unchecked_mut() }; this.name_ptr = name_ptr; } pub fn print_name(self: Pin<&Self>) { println!( "struct {:p}: (name: {:p} name_ptr: {:p}), name: {}, name_ref: {}", self, &self.name, self.name_ptr, self.name, // 在使用 ptr 是需要 unsafe // SAFETY: 因为数据不会移动,所以这里 name_ptr 是安全的 unsafe { &*self.name_ptr }, ); }}fn move_pinned(data: Pin<&mut SelfReference>) { println!("{:?} ({:p})", data, &data);}#[allow(dead_code)]fn move_it(data: SelfReference) { println!("{:?} ({:p})", data, &data);}fn move_creates_issue() { let mut data = SelfReference::new("Tyr"); let mut data = unsafe { Pin::new_unchecked(&mut data) }; SelfReference::init(data.as_mut()); // 不 move,一切正常 data.as_ref().print_name(); // 现在只能拿到 pinned 后的数据 move_pinned(data.as_mut()); println!("{:?} ({:p})", data, &data); // 无法拿回 Pin 之前的 SelfReference 结构, // expected struct `SelfReference` // found struct `Pin<&mut SelfReference>` // move_it(data);}fn main() { move_creates_issue();} Unpin 1pub auto trait Unpin {} Pin 是为了让某个数据结构无法合法地移动,而 Unpin 则相当于声明数据结构是可以移动的,它的作用类似于 Send / Sync,通过类型约束来告诉编译器哪些行为是合法的、哪些不是。 在 Rust 中,绝大多数数据结构都是可以移动的,所以它们都自动实现了 Unpin。 即便这些结构被 Pin 包裹,它们依旧可以进行移动: 1234567// 需要一个可变引用来调用 `mem::replace`// 可以通过(隐式)调用 `Pin::deref_mut` 来获得这样的引用,但这只有在 `String` 实现了 `Unpin` 后才有可能let mut string = "this".to_string();let mut pinned_string = Pin::new(&mut string);mem::replace(&mut *pinned_string, "other".to_string()); 当不希望一个数据结构被移动,可以使用 !Unpin。在 Rust 里,实现了 !Unpin 的,除了内部结构(比如 Future),主要就是 PhantomPinned: 12pub struct PhantomPinned;impl !Unpin for PhantomPinned {} 所以,如果你希望你的数据结构不能被移动,可以为其添加 PhantomPinned 字段来隐式声明 !Unpin。 当数据结构满足 Unpin 时,创建 Pin 以及使用 Pin(主要是 DerefMut)都可以不使用 unsafe: 12345678910111213141516171819202122232425262728// 如果实现了 Unpin,可以通过安全接口创建和进行 DerefMutimpl<P: Deref<Target: Unpin>> Pin<P> { pub const fn new(pointer: P) -> Pin<P> { // SAFETY: the value pointed to is `Unpin`, and so has no requirements // around pinning. unsafe { Pin::new_unchecked(pointer) } } pub const fn into_inner(pin: Pin<P>) -> P { pin.pointer }}impl<P: DerefMut<Target: Unpin>> DerefMut for Pin<P> { fn deref_mut(&mut self) -> &mut P::Target { Pin::get_mut(Pin::as_mut(self)) }}// 如果没有实现 Unpin,只能通过 unsafe 接口创建,不能使用 DerefMutimpl<P: Deref> Pin<P> { pub const unsafe fn new_unchecked(pointer: P) -> Pin<P> { Pin { pointer } } pub const unsafe fn into_inner_unchecked(pin: Pin<P>) -> P { pin.pointer }} Box 默认实现了 Unpin。 Box 在堆上分配内存,而指针本身是可以移动的。 Future 的类型 123456789fn main() { let fut = async { 42 }; println!("type of fut is: {}", get_type_name(&fut));}fn get_type_name<T>(_: &T) -> &'static str { std::any::type_name::<T>()} 它的输出如下: 1type of fut is: core::future::from_generator::GenFuture<xxx::main::{{closure}}> GenFuture 的定义: 12345678910struct GenFuture<T: Generator<ResumeTy, Yield = ()>>(T);pub trait Generator<R = ()> { type Yield; type Return; fn resume( self: Pin<&mut Self>, arg: R ) -> GeneratorState<Self::Yield, Self::Return>;} 1234567891011121314151617181920#![feature(generators, generator_trait)]use std::ops::{Generator, GeneratorState};use std::pin::Pin;fn main() { let mut generator = || { yield 1; return "foo" }; match Pin::new(&mut generator).resume(()) { GeneratorState::Yielded(1) => {} _ => panic!("unexpected return from resume"), } match Pin::new(&mut generator).resume(()) { GeneratorState::Complete("foo") => {} _ => panic!("unexpected return from resume"), }} 类似 python 中的 yield。 Stream 对于 Iterator,可以不断调用其 next() 方法,获得新的值,直到 Iterator 返回 None。Iterator 是阻塞式返回数据的,每次调用 next(),必然独占 CPU 直到得到一个结果,而异步的 Stream 是非阻塞的,在等待的过程中会空出 CPU 做其他事情。 Iterator 和 Stream的源码定义(参考 Future 库): 1234567891011121314151617181920212223pub trait Iterator { type Item; fn next(&mut self) -> Option<Self::Item>; fn size_hint(&self) -> (usize, Option<usize>) { ... } fn map<B, F>(self, f: F) -> Map<Self, F> where F: FnMut(Self::Item) -> B { ... } ...}pub trait Stream { type Item; // 和 Future 的 poll() 方法很像,和 Iterator 版本的 next() 的作用类似 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>>; fn size_hint(&self) -> (usize, Option<usize>) { ... }}pub trait StreamExt: Stream { fn next(&mut self) -> Next<'_, Self> where Self: Unpin { ... } fn map<T, F>(self, f: F) -> Map<Self, F> where F: FnMut(Self::Item) -> T { ... } ...} poll_next() 调用起来不方便,我们需要自己处理 Poll 状态,所以,StreamExt 提供了 next() 方法,返回一个实现了 Future trait 的 Next 结构,这样,我们就可以直接通过 stream.next().await 来获取下一个值了。 123456789101112131415161718192021222324252627pub trait StreamExt: Stream { fn next(&mut self) -> Next<'_, Self> where Self: Unpin { assert_future::<Option<Self::Item>, _>(Next::new(self)) }}pub struct Next<'a, St: ?Sized> { stream: &'a mut St,}// 如果 Stream Unpin 那么 Next 也是 Unpinimpl<St: ?Sized + Unpin> Unpin for Next<'_, St> {}impl<'a, St: ?Sized + Stream + Unpin> Next<'a, St> { pub(super) fn new(stream: &'a mut St) -> Self { Self { stream } }}// Next 实现了 Future,每次 poll() 实际上就是从 stream 中 poll_next()impl<St: ?Sized + Stream + Unpin> Future for Next<'_, St> { type Output = Option<St::Item>; fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { self.stream.poll_next_unpin(cx) }} 一般来说,异步操作数据结构,如果使用了泛型参数,只要内部没有自引用数据,就应该实现 Unpin。 示例: 123456789101112use futures::prelude::*;#[tokio::main]async fn main() { let mut st = stream::iter(1..10) .filter(|x| future::ready(x % 2 == 0)) .map(|x| x * x); while let Some(x) = st.next().await { println!("Got item: {}", x); }} 一个 fib Stream 示例: 1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859use futures::{prelude::*, stream::poll_fn};use std::task::Poll;#[tokio::main]async fn main() { consume(fib().take(10)).await; consume(fib1(10)).await; // unfold 产生的 Unfold stream 没有实现 Unpin, // 将其 Pin<Box<T>> ,使其满足 consume 的接口 consume(fib2(10).boxed()).await;}async fn consume(mut st: impl Stream<Item = i32> + Unpin) { while let Some(v) = st.next().await { print!("{} ", v); } print!("\\n");}// 使用 repeat_with 创建 streamfn fib() -> impl Stream<Item = i32> { let mut a = 1; let mut b = 1; stream::repeat_with(move || { let c = a + b; a = b; b = c; b })}// 使用 poll_fn 创建 stream,可以通过返回 Poll::Ready(None) 来结束fn fib1(mut n: usize) -> impl Stream<Item = i32> { let mut a = 1; let mut b = 1; poll_fn(move |_cx| -> Poll<Option<i32>> { if n == 0 { return Poll::Ready(None); } n -= 1; let c = a + b; a = b; b = c; Poll::Ready(Some(b)) })}fn fib2(n: usize) -> impl Stream<Item = i32> { stream::unfold((n, (1, 1)), |(mut n, (a, b))| async move { if n == 0 { None } else { n -= 1; let c = a + b; // c 作为 poll_next() 的返回值,(n, (a, b)) 作为 state Some((c, (n, (b, c)))) } })} 值得注意的是,使用 unfold 的时候,同时使用了局部变量和 Future,所以生成的 Stream 没有实现 Unpin。 Pin<Box> 是一种很简单的方法,能将数据 Pin 在堆上,可以使用 StreamExt 的 boxed() 方法来生成一个 Pin<Box>。 pin_project 库,它提供了一些便利的宏,方便我们操作数据结构里需要被 pin 住的字段。在数据结构中,可以使用 #pin 来声明某个字段在使用的时候需要被封装为 Pin。 Result 方法的 transpose 可以方便地进行 Result<Option> 和 Option<Result> 的转化。 示例,当读取自定义数据结构内部的 file 字段,但又不想把 File 暴露出来,可以封装 tokio::fs::File 结构,并实现 AsyncRead trait: 123456789101112131415161718192021222324252627282930313233343536373839404142use anyhow::Result;use pin_project::pin_project;use std::{ pin::Pin, task::{Context, Poll},};use tokio::{ fs::File, io::{AsyncRead, AsyncReadExt, ReadBuf},};#[pin_project]struct FileWrapper { #[pin] file: File,}impl FileWrapper { pub async fn try_new(name: &str) -> Result<Self> { let file = File::open(name).await?; Ok(Self { file }) }}impl AsyncRead for FileWrapper { fn poll_read( self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &mut ReadBuf<'_>, ) -> Poll<std::io::Result<()>> { self.project().file.poll_read(cx, buf) }}#[tokio::main]async fn main() -> Result<()> { let mut file = FileWrapper::try_new("./Cargo.toml").await?; let mut buffer = String::new(); file.read_to_string(&mut buffer).await?; println!("{}", buffer); Ok(())} Sink 异步IO还有一个比较独特的 Sink trait,一个用于发送一系列异步值的接口。 Sink trait 的定义: 123456789101112131415161718192021222324252627pub trait Sink<Item> { type Error; // 用来准备 Sink 使其可以发送数据。只有 poll_ready() 返回 Poll::Ready(Ok(())) 后,Sink 才会开展后续的动作 fn poll_ready( self: Pin<&mut Self>, cx: &mut Context<'_> ) -> Poll<Result<(), Self::Error>>; // 开始发送数据到 Sink // 但是start_send() 并不保证数据被发送完毕,所以调用者要调用 poll_flush() 或者 poll_close() 来保证完整发送 fn start_send(self: Pin<&mut Self>, item: Item) -> Result<(), Self::Error>; // 将任何尚未发送的数据 flush 到这个 Sink fn poll_flush( self: Pin<&mut Self>, cx: &mut Context<'_> ) -> Poll<Result<(), Self::Error>>; // 将任何尚未发送的数据 flush 到这个 Sink,并关闭这个 Sink fn poll_close( self: Pin<&mut Self>, cx: &mut Context<'_> ) -> Poll<Result<(), Self::Error>>;}pub trait SinkExt<Item>: Sink<Item> { ... fn send(&mut self, item: Item) -> Send<'_, Self, Item> where Self: Unpin { ... } ...} 和 Stream trait 不同的是,Sink trait 的 Item 是 trait 的泛型参数,而不是关联类型。 一般而言,当 trait 接受某个 input,应该使用泛型参数,比如 Add;当它输出某个 output,那么应该使用关联类型,比如 Future、Stream、Iterator 等。 Item 对于 Sink 来说是输入,所以使用泛型参数。 示例,注意实现 pull_xxx 方法时,不能再直接使用异步函数,需要视为 future ,并调用 future 的 poll 函数,这时可能会需要 Box::pin 生成堆上的 trait object: 1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374use anyhow::Result;use bytes::{BufMut, BytesMut};use futures::{Sink, SinkExt};use pin_project::pin_project;use std::{ pin::Pin, task::{Context, Poll},};use tokio::{fs::File, io::AsyncWrite};#[pin_project]struct FileSink { #[pin] file: File, buf: BytesMut,}impl FileSink { pub fn new(file: File) -> Self { Self { file, buf: BytesMut::new(), } }}impl Sink<&str> for FileSink { type Error = std::io::Error; fn poll_ready(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { Poll::Ready(Ok(())) } fn start_send(self: Pin<&mut Self>, item: &str) -> Result<(), Self::Error> { let this = self.project(); eprint!("{}", item); this.buf.put(item.as_bytes()); Ok(()) } fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { // 如果想 project() 多次,需要先把 self reborrow let this = self.as_mut().project(); // 这个 buffer 和 self 无关,所以传入 poll_write() 时,不会有对 self 的引用问题 let buf = this.buf.split_to(this.buf.len()); if buf.is_empty() { return Poll::Ready(Ok(())); } if let Err(e) = futures::ready!(this.file.poll_write(cx, &buf[..])) { return Poll::Ready(Err(e)); } self.project().file.poll_flush(cx) } fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { let this = self.project(); this.file.poll_shutdown(cx) }}#[tokio::main]async fn main() -> Result<()> { let file_sink = FileSink::new(File::create("/tmp/hello").await?); // pin_mut 可以把变量 pin 住 futures::pin_mut!(file_sink); file_sink.send("hello\\n").await?; file_sink.send("world!\\n").await?; file_sink.send("Tyr!\\n").await?; Ok(())} futures 里提供了 sink::unfold 方法,实现创建 Sink,类似 stream::unfold: 1234567891011121314151617181920212223242526use anyhow::Result;use futures::prelude::*;use tokio::{fs::File, io::AsyncWriteExt};#[tokio::main]async fn main() -> Result<()> { let file_sink = writer(File::create("/tmp/hello").await?); // pin_mut 把变量 pin 住 futures::pin_mut!(file_sink); if let Err(_) = file_sink.send("hello\\n").await { println!("Error on send"); } if let Err(_) = file_sink.send("world!\\n").await { println!("Error on send"); } Ok(())}/// 使用 unfold 生成一个 Sink 数据结构fn writer<'a>(file: File) -> impl Sink<&'a str> { sink::unfold(file, |mut file, line: &'a str| async move { file.write_all(line.as_bytes()).await?; eprint!("Received: {}", line); Ok::<_, std::io::Error>(file) })} Notes Rust Rust 本博客所有文章除特别声明外,均采用 CC BY-SA 4.0 协议 ,转载请注明出处! Rust-Send-Sync 上一篇 Rust-TLS 下一篇 Please enable JavaScript to view the comments