RUST Part3

Rust Web

OSI 七层模型中,一般关注比较多的是网络层,传输层和应用层。Rust 中对于网络开发的支持是比较完善的。

  • low level 的开发工具:libpnet, smoltcp
  • 序列化工具:nom, serde
  • 网络基础库:tokio, std::net
  • 日志库:tracing, slog
  • 网络安全支持:rustls, orion, dalek-cryptography, ring, rustCrypto, snow
  • 应用层网络:warp, axum, hyper, reqwest, tungstenite, actix-web, rocket, tonic, quinn, quiche, s2n-quic
  • p2p 网络:libp2p

比如,std::net 为整个 TCP/IP 协议栈的使用提供了封装。tokio::net 提供了和 std::net 几乎一致的封装,且实现了高性能的异步网络。

std::net

std::net 下提供了处理 TCP / UDP 的数据结构,以及一些辅助结构:

  • TCP:TcpListener / TcpStream,处理服务器的监听以及客户端的连接
  • UDP:UdpSocket,处理 UDP socket
  • IpAddr 是 IPv4 和 IPv6 地址的封装;SocketAddr,表示 IP 地址 + 端口的数据结构

server 示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
use std::{
io::{Read, Write},
net::TcpListener,
thread,
};

fn main() {
let listener = TcpListener::bind("0.0.0.0:9527").unwrap();
loop {
let (mut stream, addr) = listener.accept().unwrap();
println!("Accepted a new connection: {}", addr);
thread::spawn(move || {
let mut buf = [0u8; 12];
stream.read_exact(&mut buf).unwrap();
println!("data: {:?}", String::from_utf8_lossy(&buf));

stream.write_all(b"glad to meet you!").unwrap();
});
}
}

client 示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
use std::{
io::{Read, Write},
net::TcpStream,
};

fn main() {
let mut stream = TcpStream::connect("127.0.0.1:9527").unwrap();

stream.write_all(b"hello world!").unwrap();

let mut buf = [0u8; 17];
stream.read_exact(&mut buf).unwrap();
println!("data: {:?}", String::from_utf8_lossy(&buf));
}

TcpStream 虽然没有实现 Drop trait,但是内部包装了 sys_common::net::TcpStream ,然后它又包装了 Socket。而Socket 是一个平台相关的结构,比如,在 Unix 下的实现是 FileDesc,然后它内部是一个 OwnedFd,最终会调用 libc::close(self.fd) 来关闭 fd,也就关闭了 TcpStream。

另外,loop + spawn 是处理网络连接的基本方式,但是带来的问题是,使用线程处理频繁连接和退出的网络连接,会有效率上的问题,以及线程间如何共享公共数据?

大量连接问题

从资源的角度,过多的线程占用过多的内存,Rust 缺省的栈大小是 2M,10k 连接就会占用 20G 内存(可以通过 std::thread::Builder::new().stack_size() 修改默认栈大小);从算力的角度,太多线程在连接数据到达时,会来回切换线程,导致 CPU 忙碌,无法处理更多的连接请求。

对于潜在的有大量连接的网络服务,使用线程不是一个好的方式。

使用协程由两种实现方式,一种是 Golang 风格的有栈协程,一种是 Rust 异步处理这样的无栈协程。比如,使用 tokio 这样的协程库。

共享数据问题

在 Rust 的简单场景下,可使用 Arc<T> 处理不需要修改的共享,使用 Arc<RwLock<T>> 处理需要进行修改的共享。

使用 Lock 处理时,自然会影响系统的吞吐量。一种方式时,降低锁的粒度,将一个全局锁,分解为多个局部的独立的锁。另一种方式是,使用异步消息,代替共享竞争,比如:std::sync::mpsc::channel, crossbeam_channel, tokio::sync::mpsc::channel, flume::unbounded。

网络数据

在 HTTP 协议下,基本上使用 JSON 构建 REST API / JSON API 。serde 定义的 Rust 数据结构具备 Serialize/Deserialize 的能力,然后用 serde_json 生成序列化后的 JSON 数据。

如果需要自定义协议,可以使用TLV(Type-Length-Value)的形式来描述协议数据,或者使用 protobuf 这类二进制协议。

当消息内容是不定长时,需要约定好如何区分消息的边界,如何识别一个完整的消息帧。比如,使用特殊的边界符号,或者使用长度字段。gRPC 使用了五个字节的 Length-Prefixed-Message,包含一个字节的压缩标志和四个字节的消息长度。在处理 gRPC 消息时,先读取 5 个字节,取出长度 N,再读取 N 个字节的消息内容。

消息尾添加 \r\n 一般用于基于文本的协议,比如 HTTP 头 / POP3 / Redis 的 RESP 协议等。

而基于二进制的消息体,则在消息前加入消息内容长度字段会更稳定。当然两种方式也可以一起使用。

使用 LengthDelimitedCodec 自动添加帧内消息内容长度

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
use anyhow::Result;
use bytes::Bytes;
use futures::{SinkExt, StreamExt};
use tokio::net::TcpListener;
use tokio_util::codec::{Framed, LengthDelimitedCodec};

#[tokio::main]
async fn main() -> Result<()> {
let listener = TcpListener::bind("127.0.0.1:9527").await?;
loop {
let (stream, addr) = listener.accept().await?;
println!("accepted: {:?}", addr);
// LengthDelimitedCodec 默认 4 字节长度
let mut stream = Framed::new(stream, LengthDelimitedCodec::new());

tokio::spawn(async move {
// 这里解析得到的 data 会只包含消息主体,不包含长度
while let Some(Ok(data)) = stream.next().await {
println!("Got: {:?}", String::from_utf8_lossy(&data));
// 发送的消息只需要写入消息主体,不需要提供长度
// Framed/LengthDelimitedCodec 会自动计算并添加
stream.send(Bytes::from("goodbye world!")).await.unwrap();
}
});
}
}

client 示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
use anyhow::Result;
use bytes::Bytes;
use futures::{SinkExt, StreamExt};
use tokio::net::TcpStream;
use tokio_util::codec::{Framed, LengthDelimitedCodec};

#[tokio::main]
async fn main() -> Result<()> {
let stream = TcpStream::connect("127.0.0.1:9527").await?;
let mut stream = Framed::new(stream, LengthDelimitedCodec::new());
stream.send(Bytes::from("hello world")).await?;

if let Some(Ok(data)) = stream.next().await {
println!("Got: {:?}", String::from_utf8_lossy(&data));
}

Ok(())
}

其它问题

队头阻塞问题

Web 请求响应的模式中,可能出现队头阻塞的问题,包括 TCP 队头阻塞,以及 HTTP 队头阻塞。

一般 HTTP 应用层队头阻塞,可以通过分离控制平面和数据平面来解决。FTP, SIP 就是分离控制平面和数据平面的典型代表。另外,HTTP/2 通过多路复用,可以并行处理多个请求,不同 stream 之间数据可以乱序返回,以此解决 HTTP 队头阻塞的问题。

HTTP/3 通过 QUIC 协议,在 UDP 上实现了类似 TCP 的可靠传输,并且实现了类似 HTTP/2 的多路复用。

P2P 网络

P2P 网络中,节点之间直接通信,不需要经过中心服务器。P2P 网络中,节点之间可能存在多条通信路径,节点之间可以相互通信,节点可以随时加入和离开网络。

P2P 网络中,节点需要处理 NAT 穿越问题,节点需要处理防火墙穿越问题。

主流的解决方式是,使用 STUN/TURN/ICE 协议,发现自己的公网 IP/Port 地址,然后在 bootstrap/signaling server 中注册自己的地址,并发现邻居的地址。在此之上,节点还可以加入某个或者某些 topic,然后通过某些协议(比如 gossip)在整个 topic 下扩散消息。

libp2p 是 Rust 中流行的 P2P 网络库。节点之间通信协议有:

  • 节点发现: mDNS, bootstrap, Kad DHT
  • 内容发现:gossipsub, floodsub, Kad DHT
  • 节点路由:Kad DHT

在网络安全方面,TLS 虽然能很好地保护客户端/服务器模型,然而证书的创建、发放以及信任对 P2P 网络是个问题,所以 P2P 网络倾向于使用自己的安全协议,或者使用 noise protocol,来构建安全等级可以媲美 TLS 1.3 的安全协议。

一个简单的示例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
use anyhow::Result;
use futures::StreamExt;
use libp2p::{
core::upgrade,
floodsub::{self, Floodsub, FloodsubEvent, Topic},
identity,
mdns::{Mdns, MdnsEvent},
noise,
swarm::{NetworkBehaviourEventProcess, SwarmBuilder, SwarmEvent},
tcp::TokioTcpConfig,
yamux, NetworkBehaviour, PeerId, Swarm, Transport,
};
use std::borrow::Cow;
use tokio::io::{stdin, AsyncBufReadExt, BufReader};

/// 处理 p2p 网络的 behavior 数据结构
#[derive(NetworkBehaviour)]
#[behaviour(event_process = true)]
struct ChatBehavior {
/// flood subscription,比较浪费带宽,gossipsub 是更好的选择
floodsub: Floodsub,
/// 本地节点发现机制
mdns: Mdns,
}

impl ChatBehavior {
/// 创建一个新的 ChatBehavior
pub async fn new(id: PeerId) -> Result<Self> {
Ok(Self {
mdns: Mdns::new(Default::default()).await?,
floodsub: Floodsub::new(id),
})
}
}

impl NetworkBehaviourEventProcess<FloodsubEvent> for ChatBehavior {
// 处理 floodsub 产生的消息
fn inject_event(&mut self, event: FloodsubEvent) {
if let FloodsubEvent::Message(msg) = event {
let text = String::from_utf8_lossy(&msg.data);
println!("{:?}: {:?}", msg.source, text);
}
}
}

impl NetworkBehaviourEventProcess<MdnsEvent> for ChatBehavior {
fn inject_event(&mut self, event: MdnsEvent) {
match event {
MdnsEvent::Discovered(list) => {
// 把 mdns 发现的新的 peer 加入到 floodsub 的 view 中
for (id, addr) in list {
println!("Got peer: {} with addr {}", &id, &addr);
self.floodsub.add_node_to_partial_view(id);
}
}
MdnsEvent::Expired(list) => {
// 把 mdns 发现的离开的 peer 加入到 floodsub 的 view 中
for (id, addr) in list {
println!("Removed peer: {} with addr {}", &id, &addr);
self.floodsub.remove_node_from_partial_view(&id);
}
}
}
}
}

#[tokio::main]
async fn main() -> Result<()> {
let name = match std::env::args().nth(1) {
Some(arg) => Cow::Owned(arg),
None => Cow::Borrowed("default"),
};

// 创建 floodsub topic
let topic = floodsub::Topic::new(name);

// 创建 swarm
let mut swarm = create_swarm(topic.clone()).await?;

swarm.listen_on("/ip4/127.0.0.1/tcp/0".parse()?)?;

let mut stdin = BufReader::new(stdin()).lines();
loop {
tokio::select! {
line = stdin.next_line() => {
let line = line?.expect("stdin closed");
swarm.behaviour_mut().floodsub.publish(topic.clone(), line.as_bytes());
}
event = swarm.select_next_some() => {
if let SwarmEvent::NewListenAddr { address, .. } = event {
println!("Listening on {:?}", address);
}
}
}
}
}

async fn create_swarm(topic: Topic) -> Result<Swarm<ChatBehavior>> {
// 创建 identity(密钥对)
let id_keys = identity::Keypair::generate_ed25519();
let peer_id = PeerId::from(id_keys.public());
println!("Local peer id: {:?}", peer_id);

// 使用 noise protocol 来处理加密和认证
let noise_keys = noise::Keypair::<noise::X25519Spec>::new().into_authentic(&id_keys)?;

// 创建传输层
let transport = TokioTcpConfig::new()
.nodelay(true)
.upgrade(upgrade::Version::V1)
.authenticate(noise::NoiseConfig::xx(noise_keys).into_authenticated())
.multiplex(yamux::YamuxConfig::default())
.boxed();

let mut behavior = ChatBehavior::new(peer_id.clone()).await?;
behavior.floodsub.subscribe(topic.clone());
let swarm = SwarmBuilder::new(transport, behavior, peer_id)
.executor(Box::new(|fut| {
tokio::spawn(fut);
}))
.build();

Ok(swarm)
}

本博客所有文章除特别声明外,均采用 CC BY-SA 4.0 协议 ,转载请注明出处!