Rust Web
OSI 七层模型中,一般关注比较多的是网络层,传输层和应用层。Rust
中对于网络开发的支持是比较完善的。
low level 的开发工具:libpnet, smoltcp
序列化工具:nom, serde
网络基础库:tokio, std::net
日志库:tracing, slog
网络安全支持:rustls, orion, dalek-cryptography, ring, rustCrypto,
snow
应用层网络:warp, axum, hyper, reqwest, tungstenite, actix-web,
rocket, tonic, quinn, quiche, s2n-quic
p2p 网络:libp2p
比如,std::net 为整个 TCP/IP 协议栈的使用提供了封装。tokio::net
提供了和 std::net 几乎一致的封装,且实现了高性能的异步网络。
std::net
std::net 下提供了处理 TCP / UDP 的数据结构,以及一些辅助结构:
TCP:TcpListener / TcpStream,处理服务器的监听以及客户端的连接
UDP:UdpSocket,处理 UDP socket
IpAddr 是 IPv4 和 IPv6 地址的封装;SocketAddr,表示 IP 地址 +
端口的数据结构
server 示例
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 use std::{ io::{Read, Write}, net::TcpListener, thread, };fn main () { let listener = TcpListener::bind("0.0.0.0:9527" ).unwrap(); loop { let (mut stream, addr) = listener.accept().unwrap(); println! ("Accepted a new connection: {}" , addr); thread::spawn(move || { let mut buf = [0u8 ; 12 ]; stream.read_exact(&mut buf).unwrap(); println! ("data: {:?}" , String ::from_utf8_lossy(&buf)); stream.write_all(b"glad to meet you!" ).unwrap(); }); } }
client 示例
use std::{ io::{Read, Write}, net::TcpStream, };fn main () { let mut stream = TcpStream::connect("127.0.0.1:9527" ).unwrap(); stream.write_all(b"hello world!" ).unwrap(); let mut buf = [0u8 ; 17 ]; stream.read_exact(&mut buf).unwrap(); println! ("data: {:?}" , String ::from_utf8_lossy(&buf)); }
TcpStream 虽然没有实现 Drop trait,但是内部包装了
sys_common::net::TcpStream ,然后它又包装了 Socket。而Socket
是一个平台相关的结构,比如,在 Unix 下的实现是
FileDesc,然后它内部是一个 OwnedFd,最终会调用 libc::close(self.fd)
来关闭 fd,也就关闭了 TcpStream。
另外,loop + spawn
是处理网络连接的基本方式,但是带来的问题是,使用线程处理频繁连接和退出的网络连接,会有效率上的问题,以及线程间如何共享公共数据?
大量连接问题
从资源的角度,过多的线程占用过多的内存,Rust 缺省的栈大小是 2M,10k
连接就会占用 20G 内存(可以通过 std::thread::Builder::new().stack_size()
修改默认栈大小);从算力的角度,太多线程在连接数据到达时,会来回切换线程,导致
CPU 忙碌,无法处理更多的连接请求。
对于潜在的有大量连接的网络服务,使用线程不是一个好的方式。
使用协程由两种实现方式,一种是 Golang 风格的有栈协程,一种是 Rust
异步处理这样的无栈协程。比如,使用 tokio 这样的协程库。
共享数据问题
在 Rust 的简单场景下,可使用 Arc<T>
处理不需要修改的共享,使用 Arc<RwLock<T>>
处理需要进行修改的共享。
使用 Lock
处理时,自然会影响系统的吞吐量。一种方式时,降低锁的粒度,将一个全局锁,分解为多个局部的独立的锁。另一种方式是,使用异步消息,代替共享竞争,比如:std::sync::mpsc::channel,
crossbeam_channel, tokio::sync::mpsc::channel, flume::unbounded。
网络数据
在 HTTP 协议下,基本上使用 JSON 构建 REST API / JSON API 。serde
定义的 Rust 数据结构具备 Serialize/Deserialize 的能力,然后用 serde_json
生成序列化后的 JSON 数据。
如果需要自定义协议,可以使用TLV(Type-Length-Value)的形式来描述协议数据,或者使用
protobuf 这类二进制协议。
当消息内容是不定长时,需要约定好如何区分消息的边界,如何识别一个完整的消息帧。比如,使用特殊的边界符号,或者使用长度字段。gRPC
使用了五个字节的 Length-Prefixed-Message ,包含一个字节的压缩标志和四个字节的消息长度。在处理
gRPC 消息时,先读取 5 个字节,取出长度 N,再读取 N
个字节的消息内容。
消息尾添加 \r\n
一般用于基于文本的协议,比如 HTTP 头 /
POP3 / Redis 的 RESP 协议等。
而基于二进制的消息体,则在消息前加入消息内容长度字段会更稳定。当然两种方式也可以一起使用。
使用 LengthDelimitedCodec 自动添加帧内消息内容长度
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 use anyhow::Result ;use bytes::Bytes;use futures::{SinkExt, StreamExt};use tokio::net::TcpListener;use tokio_util::codec::{Framed, LengthDelimitedCodec};#[tokio::main] async fn main () -> Result <()> { let listener = TcpListener::bind("127.0.0.1:9527" ).await ?; loop { let (stream, addr) = listener.accept().await ?; println! ("accepted: {:?}" , addr); let mut stream = Framed::new(stream, LengthDelimitedCodec::new()); tokio::spawn(async move { while let Some (Ok (data)) = stream.next().await { println! ("Got: {:?}" , String ::from_utf8_lossy(&data)); stream.send(Bytes::from("goodbye world!" )).await .unwrap(); } }); } }
client 示例
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 use anyhow::Result ;use bytes::Bytes;use futures::{SinkExt, StreamExt};use tokio::net::TcpStream;use tokio_util::codec::{Framed, LengthDelimitedCodec};#[tokio::main] async fn main () -> Result <()> { let stream = TcpStream::connect("127.0.0.1:9527" ).await ?; let mut stream = Framed::new(stream, LengthDelimitedCodec::new()); stream.send(Bytes::from("hello world" )).await ?; if let Some (Ok (data)) = stream.next().await { println! ("Got: {:?}" , String ::from_utf8_lossy(&data)); } Ok (()) }
其它问题
队头阻塞问题
Web 请求响应的模式中,可能出现队头阻塞的问题,包括 TCP 队头阻塞,以及
HTTP 队头阻塞。
一般 HTTP 应用层队头阻塞,可以通过分离控制平面和数据平面来解决。FTP,
SIP 就是分离控制平面和数据平面的典型代表。另外,HTTP/2
通过多路复用,可以并行处理多个请求,不同 stream
之间数据可以乱序返回,以此解决 HTTP 队头阻塞的问题。
HTTP/3 通过 QUIC 协议,在 UDP 上实现了类似 TCP
的可靠传输,并且实现了类似 HTTP/2 的多路复用。
P2P 网络
P2P 网络中,节点之间直接通信,不需要经过中心服务器。P2P
网络中,节点之间可能存在多条通信路径,节点之间可以相互通信,节点可以随时加入和离开网络。
P2P 网络中,节点需要处理 NAT
穿越问题,节点需要处理防火墙穿越问题。
主流的解决方式是,使用 STUN/TURN/ICE 协议,发现自己的公网 IP/Port
地址,然后在 bootstrap/signaling server
中注册自己的地址,并发现邻居的地址。在此之上,节点还可以加入某个或者某些
topic,然后通过某些协议(比如 gossip)在整个 topic 下扩散消息。
libp2p 是 Rust 中流行的 P2P 网络库。节点之间通信协议有:
节点发现: mDNS, bootstrap, Kad DHT
内容发现:gossipsub, floodsub, Kad DHT
节点路由:Kad DHT
在网络安全方面,TLS
虽然能很好地保护客户端/服务器模型,然而证书的创建、发放以及信任对 P2P
网络是个问题,所以 P2P 网络倾向于使用自己的安全协议,或者使用 noise
protocol,来构建安全等级可以媲美 TLS 1.3 的安全协议。
一个简单的示例:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 use anyhow::Result ;use futures::StreamExt;use libp2p::{ core::upgrade, floodsub::{self , Floodsub, FloodsubEvent, Topic}, identity, mdns::{Mdns, MdnsEvent}, noise, swarm::{NetworkBehaviourEventProcess, SwarmBuilder, SwarmEvent}, tcp::TokioTcpConfig, yamux, NetworkBehaviour, PeerId, Swarm, Transport, };use std::borrow::Cow;use tokio::io::{stdin, AsyncBufReadExt, BufReader};#[derive(NetworkBehaviour)] #[behaviour(event_process = true)] struct ChatBehavior { floodsub: Floodsub, mdns: Mdns, }impl ChatBehavior { pub async fn new (id: PeerId) -> Result <Self > { Ok (Self { mdns: Mdns::new(Default ::default()).await ?, floodsub: Floodsub::new(id), }) } }impl NetworkBehaviourEventProcess<FloodsubEvent> for ChatBehavior { fn inject_event (&mut self , event: FloodsubEvent) { if let FloodsubEvent::Message(msg) = event { let text = String ::from_utf8_lossy(&msg.data); println! ("{:?}: {:?}" , msg.source, text); } } }impl NetworkBehaviourEventProcess<MdnsEvent> for ChatBehavior { fn inject_event (&mut self , event: MdnsEvent) { match event { MdnsEvent::Discovered(list) => { for (id, addr) in list { println! ("Got peer: {} with addr {}" , &id, &addr); self .floodsub.add_node_to_partial_view(id); } } MdnsEvent::Expired(list) => { for (id, addr) in list { println! ("Removed peer: {} with addr {}" , &id, &addr); self .floodsub.remove_node_from_partial_view(&id); } } } } }#[tokio::main] async fn main () -> Result <()> { let name = match std::env::args().nth(1 ) { Some (arg) => Cow::Owned(arg), None => Cow::Borrowed("default" ), }; let topic = floodsub::Topic::new(name); let mut swarm = create_swarm(topic.clone()).await ?; swarm.listen_on("/ip4/127.0.0.1/tcp/0" .parse()?)?; let mut stdin = BufReader::new(stdin()).lines(); loop { tokio::select! { line = stdin.next_line() => { let line = line?.expect("stdin closed" ); swarm.behaviour_mut().floodsub.publish(topic.clone(), line.as_bytes()); } event = swarm.select_next_some() => { if let SwarmEvent::NewListenAddr { address, .. } = event { println! ("Listening on {:?}" , address); } } } } }async fn create_swarm (topic: Topic) -> Result <Swarm<ChatBehavior>> { let id_keys = identity::Keypair::generate_ed25519(); let peer_id = PeerId::from(id_keys.public()); println! ("Local peer id: {:?}" , peer_id); let noise_keys = noise::Keypair::<noise::X25519Spec>::new().into_authentic(&id_keys)?; let transport = TokioTcpConfig::new() .nodelay(true ) .upgrade(upgrade::Version::V1) .authenticate(noise::NoiseConfig::xx(noise_keys).into_authenticated()) .multiplex(yamux::YamuxConfig::default()) .boxed(); let mut behavior = ChatBehavior::new(peer_id.clone()).await ?; behavior.floodsub.subscribe(topic.clone()); let swarm = SwarmBuilder::new(transport, behavior, peer_id) .executor(Box ::new(|fut| { tokio::spawn(fut); })) .build(); Ok (swarm) }