RUST-Async async 定义了一个可以并发执行的任务,而 await 则触发这个任务并发执行。大多数语言,包括 Rust,async/await 都是一个语法糖(syntactic sugar),它们使用状态机将 Promise/Future 这样的结构包装起来进行处理。 JavaScript 的 Promise 和线程类似,一旦创建就开始执行,对 Promise await 只是为了获取解析出来的值;而 Rust 的 Future,只有在主动 await 后才开始执行。 Future 异步函数(async fn)的返回值是一个奇怪的 impl Future 的结构 123456789pub trait Future { type Output; fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>;}pub enum Poll<T> { Ready(T), Pending,} 对于线程来说,操作系统负责调度;但操作系统不会去调度用户态的协程(比如 Future),任何使用了协程来处理并发的程序,都需要有一个 executor 来负责协程的调度。 常见的 executor 有 futures, tokio, async-std 等。 Reactor 与异步处理 Reactor pattern 它包含三部分: task,待处理的任务。任务可以被打断,并且把控制权交给 executor,等待之后的调度; executor,一个调度器。维护等待运行的任务(ready queue),以及被阻塞的任务(wait queue); reactor,维护事件队列。当事件来临时,通知 executor 唤醒某个任务等待运行。 Future 是异步任务的数据结构,当 fut.await 时,executor 就会调度并执行它。 tokio 的调度器(executor)会运行在多个线程上,运行线程自己的 ready queue 上的任务(Future),如果没有,就去别的线程的调度器上 stealing 一些过来运行(tokio 实现了 work stealing scheduler)。当某个任务无法再继续取得进展,此时 Future 运行的结果是 Poll::Pending,那么调度器会挂起任务,并设置好合适的唤醒条件(Waker),等待被 reactor 唤醒。 reactor 会利用操作系统提供的异步 I/O,比如 epoll / kqueue / IOCP,来监听操作系统提供的 IO 事件,当遇到满足条件的事件时,就会调用 Waker.wake() 唤醒被挂起的 Future。这个 Future 会回到 ready queue 等待执行。 异步使用环境 非计算密集型 Future 的调度是协作式多任务(Cooperative Multitasking),除非 Future 主动放弃 CPU,不然它就会一直被执行,直到运行结束。 如果你的确需要在 tokio(或者其它异步运行时)下运行运算量很大的代码,那么最好使用 yield 来主动让出 CPU,比如 tokio::task::yield_now(),避免某个计算密集型的任务饿死其它任务。 非 std mutex 大部分时候,标准库的 Mutex 可以用在异步代码中,而且,这是推荐的用法。 然而,标准库的 MutexGuard 不能安全地跨越 await,所以,当我们需要获得锁之后执行异步操作,必须使用 tokio 自带的 Mutex。 因为 tokio 实现了 work-stealing 调度,Future 有可能在不同的线程中执行,普通的 MutexGuard 编译直接就会出错,所以需要使用 tokio 的 Mutex。 线程与异步任务的同步 兼有计算密集和 IO 密集的任务,可以使用 channel 来在线程和future两者之间做同步。 12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788use std::thread;use anyhow::Result;use blake3::Hasher;use futures::{SinkExt, StreamExt};use rayon::prelude::*;use tokio::{ net::TcpListener, sync::{mpsc, oneshot},};use tokio_util::codec::{Framed, LinesCodec};pub const PREFIX_ZERO: &[u8] = &[0, 0, 0];#[tokio::main]async fn main() -> Result<()> { let addr = "0.0.0.0:8080"; let listener = TcpListener::bind(addr).await?; println!("listen to: {}", addr); // 创建 tokio task 和 thread 之间的 channel let (sender, mut receiver) = mpsc::unbounded_channel::<(String, oneshot::Sender<String>)>(); // 使用 thread 处理计算密集型任务 thread::spawn(move || { // 读取从 tokio task 过来的 msg,这里用的是 blocking_recv,而非 await while let Some((line, reply)) = receiver.blocking_recv() { // 计算 pow let result = match pow(&line) { Some((hash, nonce)) => format!("hash: {}, once: {}", hash, nonce), None => "Not found".to_string(), }; // 把计算结果从 oneshot channel 里发回 if let Err(e) = reply.send(result) { println!("Failed to send: {}", e); } } }); // 使用 tokio task 处理 IO 密集型任务 loop { let (stream, addr) = listener.accept().await?; println!("Accepted: {:?}", addr); let sender1 = sender.clone(); tokio::spawn(async move { // 使用 LinesCodec 把 TCP 数据切成一行行字符串处理 let framed = Framed::new(stream, LinesCodec::new()); // split 成 writer 和 reader let (mut w, mut r) = framed.split(); for line in r.next().await { // 为每个消息创建一个 oneshot channel let (reply, reply_receiver) = oneshot::channel(); sender1.send((line?, reply))?; if let Ok(v) = reply_receiver.await { w.send(format!("Pow calculated: {}", v)).await?; } } Ok::<_, anyhow::Error>(()) }); }}pub fn pow(s: &str) -> Option<(String, u32)> { let hasher = blake3_base_hash(s.as_bytes()); // 使用 rayon 并发计算 u32 空间下所有 nonce,直到找到有头 N 个 0 的哈希 let nonce = (0..u32::MAX).into_par_iter().find_any(|n| { let hash = blake3_hash(hasher.clone(), n).as_bytes().to_vec(); &hash[..PREFIX_ZERO.len()] == PREFIX_ZERO }); nonce.map(|n| { let hash = blake3_hash(hasher, &n).to_hex().to_string(); (hash, n) })}// 计算携带 nonce 后的哈希fn blake3_hash(mut hasher: blake3::Hasher, nonce: &u32) -> blake3::Hash { hasher.update(&nonce.to_be_bytes()[..]); hasher.finalize()}// 计算数据的哈希fn blake3_base_hash(data: &[u8]) -> Hasher { let mut hasher = Hasher::new(); hasher.update(data); hasher} Waker executor 通过调用 poll 方法来让 Future 继续往下执行,如果 poll 方法返回 Poll::Pending,就阻塞 Future,直到 reactor 收到了某个事件,然后调用 Waker.wake() 把 Future 唤醒。 Context 就是 Waker 的一个封装: 1234pub struct Context<'a> { waker: &'a Waker, _marker: PhantomData<fn(&'a ()) -> &'a ()>,} 内部使用了一个 vtable 来允许的 waker 的行为: 123456pub struct RawWakerVTable { clone: unsafe fn(*const ()) -> RawWaker, wake: unsafe fn(*const ()), wake_by_ref: unsafe fn(*const ()), drop: unsafe fn(*const ()),} 在标准库中,只有这些接口的定义,以及“高层”接口的实现,比如 Waker 下的 wake 方法,只是调用了 vtable 里的 wake() 而已: 12345678910111213141516171819impl Waker { /// Wake up the task associated with this `Waker`. #[inline] pub fn wake(self) { // The actual wakeup call is delegated through a virtual function call // to the implementation which is defined by the executor. let wake = self.waker.vtable.wake; let data = self.waker.data; // Don't call `drop` -- the waker will be consumed by `wake`. crate::mem::forget(self); // SAFETY: This is safe because `Waker::from_raw` is the only way // to initialize `wake` and `data` requiring the user to acknowledge // that the contract of `RawWaker` is upheld. unsafe { (wake)(data) }; } ...} 具体的实现可参看 futures-rs/waker.rs。 Poll executor 处理 Future 时,会不断地调用它的 poll() 方法: 123456789101112131415161718192021hello().await?;// 等价于match hello.poll(cx) { Poll::Ready(result) => return result, Poll::Pending => return Poll::Pending}hello(inner1.await, inner2.await).await?; // inner1 conditioned by inner2let fut = inner1;match fut.poll(cx) { Poll::Ready(Ok(file)) => { let fut = inner2; match fut.poll(cx) { Poll::Ready(result) => return result, Poll::Pending => return Poll::Pending, } } Poll::Pending => return Poll::Pending,} async 函数返回的是一个 Future,所以,还需要把这样的代码封装在一个 Future 的实现里,对外提供出去。 需要实现一个数据结构,把内部的状态保存起来,并为这个数据结构实现 Future。 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657enum WriteHelloFile { Init(String), // 等待文件创建,此时需要保存 Future 以便多次调用 AwaitingCreate(impl Future<Output = Result<fs::File, std::io::Error>>), // 等待文件写入,此时需要保存 Future 以便多次调用 AwaitingWrite(impl Future<Output = Result<(), std::io::Error>>), Done,}impl WriteHelloFile { pub fn new(name: impl Into<String>) -> Self { Self::Init(name.into()) }}impl Future for WriteHelloFile { type Output = Result<(), std::io::Error>; fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { let this = self.get_mut(); loop { match this { // 如果状态是 Init,那么就生成 create Future,把状态切换到 AwaitingCreate WriteHelloFile::Init(name) => { let fut = fs::File::create(name); *self = WriteHelloFile::AwaitingCreate(fut); } // 如果状态是 AwaitingCreate,那么 poll create Future // 如果返回 Poll::Ready(Ok(_)),那么创建 write Future // 并把状态切换到 Awaiting WriteHelloFile::AwaitingCreate(fut) => match fut.poll(cx) { Poll::Ready(Ok(file)) => { let fut = file.write_all(b"hello world!"); *self = WriteHelloFile::AwaitingWrite(fut); } Poll::Ready(Err(e)) => return Poll::Ready(Err(e)), Poll::Pending => return Poll::Pending, }, // 如果状态是 AwaitingWrite,那么 poll write Future // 如果返回 Poll::Ready(_),那么状态切换到 Done,整个 Future 执行成功 WriteHelloFile::AwaitingWrite(fut) => match fut.poll(cx) { Poll::Ready(result) => { *self = WriteHelloFile::Done; return Poll::Ready(result); } Poll::Pending => return Poll::Pending, }, // 整个 Future 已经执行完毕 WriteHelloFile::Done => return Poll::Ready(Ok(())), } } }}fn write_hello_file_async(name: &str) -> WriteHelloFile { WriteHelloFile::new(name)} Rust 在编译 async fn 或者 async block 时,就会生成类似的状态机的实现。 Pin 在上面实现 Future 的状态机中,AwaitingCreate 引用了 file 这样一个局部变量: file 被 fut 引用,但 file 会在这个作用域被丢弃。需要把它保存在数据结构中: 1234567891011enum WriteHelloFile { Init(String), AwaitingCreate(impl Future<Output = Result<fs::File, std::io::Error>>), AwaitingWrite(AwaitingWriteData), Done,}struct AwaitingWriteData { fut: impl Future<Output = Result<(), std::io::Error>>, file: fs::File,} 在同一个数据结构内部,fut 指向了对 file 的引用,形成自引用结构(Self-Referential Structure)。 自引用结构有一个很大的问题是:一旦它被移动,原本的指针就会指向旧的地址。即,fut 会在移动后指向移动前的 file。 Pin 拿住的是一个可以解引用成 T 的指针类型 P,而不是直接拿原本的类型 T。所以,对于 Pin 而言,都是 Pin<Box>、Pin<&mut T>,但不会是 Pin。因为 Pin 的目的是,把 T 的内存位置锁住,从而避免移动后自引用类型带来的引用失效问题。 此处 Pin 的 AwaitingWriteData 引用,指向同一个内存位置。 自引用数据结构并非只在异步代码里出现,只不过异步代码在内部生成用状态机表述的 Future 时,很容易产生自引用结构。比如: 1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980#[derive(Debug)]struct SelfReference { name: String, // 在初始化后指向 name name_ptr: *const String,}impl SelfReference { pub fn new(name: impl Into<String>) -> Self { SelfReference { name: name.into(), name_ptr: std::ptr::null(), } } pub fn init(&mut self) { self.name_ptr = &self.name as *const String; } pub fn print_name(&self) { println!( "struct {:p}: (name: {:p} name_ptr: {:p}), name: {}, name_ref: {}", self, &self.name, self.name_ptr, self.name, // 在使用 ptr 是需要 unsafe // SAFETY: 这里 name_ptr 潜在不安全,会指向旧的位置 unsafe { &*self.name_ptr }, ); }}// memcpyfn move_it(data: SelfReference) -> SelfReference { data}fn move_creates_issue() -> SelfReference { let mut data = SelfReference::new("Tyr"); data.init(); // 不 move,一切正常 data.print_name(); let data = move_it(data); // move 之后,name_ref 指向的位置是已经失效的地址 // 只不过现在 move 前的地址还没被回收 data.print_name(); data}fn mem_swap_creates_issue() { let mut data1 = SelfReference::new("Tyr"); data1.init(); let mut data2 = SelfReference::new("Lindsey"); data2.init(); // name: Tyr, name_ref: Tyr data1.print_name(); // name: Lindsey, name_ref: Lindsey data2.print_name(); std::mem::swap(&mut data1, &mut data2); // name: Lindsey, name_ref: Tyr data1.print_name(); // name: Tyr, name_ref: Lindsey data2.print_name();}fn main() { let data = move_creates_issue(); println!("data: {:?}", data); // 若不注释下面这行,程序运行会直接 segment error, move 前的地址已经被回收 // data.print_name(); print!("\\n"); mem_swap_creates_issue();} 创建了一个自引用结构 SelfReference,它里面的 name_ref 指向了 name。正常使用它时,没有任何问题,但一旦对这个结构做 move 操作,name_ref 指向的位置还会是 move 前 name 的地址,这就引发了问题。 使用 std::mem:swap,也会出现类似的问题,一旦 swap,两个数据的内容交换,然而,name_ref 指向的地址还是旧的。 Pin 对解决这类问题很关键,如果试图移动被 Pin 住的数据结构,要么,编译器会通过编译错误阻止你;要么,使用 unsafe Rust,自己负责其安全性。 1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071use std::{marker::PhantomPinned, pin::Pin};#[derive(Debug)]struct SelfReference { name: String, // 在初始化后指向 name name_ptr: *const String, // PhantomPinned 占位符 _marker: PhantomPinned,}impl SelfReference { pub fn new(name: impl Into<String>) -> Self { SelfReference { name: name.into(), name_ptr: std::ptr::null(), _marker: PhantomPinned, } } pub fn init(self: Pin<&mut Self>) { let name_ptr = &self.name as *const String; // SAFETY: 这里并不会把任何数据从 &mut SelfReference 中移走 let this = unsafe { self.get_unchecked_mut() }; this.name_ptr = name_ptr; } pub fn print_name(self: Pin<&Self>) { println!( "struct {:p}: (name: {:p} name_ptr: {:p}), name: {}, name_ref: {}", self, &self.name, self.name_ptr, self.name, // 在使用 ptr 是需要 unsafe // SAFETY: 因为数据不会移动,所以这里 name_ptr 是安全的 unsafe { &*self.name_ptr }, ); }}fn move_pinned(data: Pin<&mut SelfReference>) { println!("{:?} ({:p})", data, &data);}#[allow(dead_code)]fn move_it(data: SelfReference) { println!("{:?} ({:p})", data, &data);}fn move_creates_issue() { let mut data = SelfReference::new("Tyr"); let mut data = unsafe { Pin::new_unchecked(&mut data) }; SelfReference::init(data.as_mut()); // 不 move,一切正常 data.as_ref().print_name(); // 现在只能拿到 pinned 后的数据 move_pinned(data.as_mut()); println!("{:?} ({:p})", data, &data); // 无法拿回 Pin 之前的 SelfReference 结构, // expected struct `SelfReference` // found struct `Pin<&mut SelfReference>` // move_it(data);}fn main() { move_creates_issue();} Unpin 1pub auto trait Unpin {} Pin 是为了让某个数据结构无法合法地移动,而 Unpin 则相当于声明数据结构是可以移动的,它的作用类似于 Send / Sync,通过类型约束来告诉编译器哪些行为是合法的、哪些不是。 在 Rust 中,绝大多数数据结构都是可以移动的,所以它们都自动实现了 Unpin。 即便这些结构被 Pin 包裹,它们依旧可以进行移动: 1234567// 需要一个可变引用来调用 `mem::replace`// 可以通过(隐式)调用 `Pin::deref_mut` 来获得这样的引用,但这只有在 `String` 实现了 `Unpin` 后才有可能let mut string = "this".to_string();let mut pinned_string = Pin::new(&mut string);mem::replace(&mut *pinned_string, "other".to_string()); 当不希望一个数据结构被移动,可以使用 !Unpin。在 Rust 里,实现了 !Unpin 的,除了内部结构(比如 Future),主要就是 PhantomPinned: 12pub struct PhantomPinned;impl !Unpin for PhantomPinned {} 所以,如果你希望你的数据结构不能被移动,可以为其添加 PhantomPinned 字段来隐式声明 !Unpin。 当数据结构满足 Unpin 时,创建 Pin 以及使用 Pin(主要是 DerefMut)都可以不使用 unsafe: 12345678910111213141516171819202122232425262728// 如果实现了 Unpin,可以通过安全接口创建和进行 DerefMutimpl<P: Deref<Target: Unpin>> Pin<P> { pub const fn new(pointer: P) -> Pin<P> { // SAFETY: the value pointed to is `Unpin`, and so has no requirements // around pinning. unsafe { Pin::new_unchecked(pointer) } } pub const fn into_inner(pin: Pin<P>) -> P { pin.pointer }}impl<P: DerefMut<Target: Unpin>> DerefMut for Pin<P> { fn deref_mut(&mut self) -> &mut P::Target { Pin::get_mut(Pin::as_mut(self)) }}// 如果没有实现 Unpin,只能通过 unsafe 接口创建,不能使用 DerefMutimpl<P: Deref> Pin<P> { pub const unsafe fn new_unchecked(pointer: P) -> Pin<P> { Pin { pointer } } pub const unsafe fn into_inner_unchecked(pin: Pin<P>) -> P { pin.pointer }} Box 默认实现了 Unpin。 Box 在堆上分配内存,而指针本身是可以移动的。 Future 的类型 123456789fn main() { let fut = async { 42 }; println!("type of fut is: {}", get_type_name(&fut));}fn get_type_name<T>(_: &T) -> &'static str { std::any::type_name::<T>()} 它的输出如下: 1type of fut is: core::future::from_generator::GenFuture<xxx::main::{{closure}}> GenFuture 的定义: 12345678910struct GenFuture<T: Generator<ResumeTy, Yield = ()>>(T);pub trait Generator<R = ()> { type Yield; type Return; fn resume( self: Pin<&mut Self>, arg: R ) -> GeneratorState<Self::Yield, Self::Return>;} 1234567891011121314151617181920#![feature(generators, generator_trait)]use std::ops::{Generator, GeneratorState};use std::pin::Pin;fn main() { let mut generator = || { yield 1; return "foo" }; match Pin::new(&mut generator).resume(()) { GeneratorState::Yielded(1) => {} _ => panic!("unexpected return from resume"), } match Pin::new(&mut generator).resume(()) { GeneratorState::Complete("foo") => {} _ => panic!("unexpected return from resume"), }} 类似 python 中的 yield。 Notes Rust Rust 本博客所有文章除特别声明外,均采用 CC BY-SA 4.0 协议 ,转载请注明出处! Rust-Send-Sync 上一篇 Rust-TLS 下一篇 Please enable JavaScript to view the comments