RUST-concurrency

并发是程序处理多个任务的能力,难点在于多个任务之间如何协作,如何同步状态。几种常见的工作模式:自由竞争模式、map/reduce 模式、DAG 模式。

  • 在自由竞争模式下,多个并发任务会竞争同一个临界区的访问权。任务之间在何时、以何种方式去访问临界区,是不确定的,或者说是最为灵活的,只要在进入临界区时获得独占访问即可。
  • map/reduce 模式,把工作打散,按照相同的处理完成后,再按照一定的顺序将结果组织起来。
  • DAG 模式,把工作切成不相交的、有依赖关系的子任务,然后按依赖关系并发执行。

当处理复杂问题的时候,一种方法是先厘清其脉络,用分治的思想把问题拆解成正交的子问题,然后组合合适的并发模式来处理这些子问题。

Atomic

Atomic 是所有并发原语的基础,它为并发任务的同步奠定了坚实的基础。先看一个 Lock 的实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
use std::{cell::RefCell, fmt, sync::Arc, thread};

struct Lock<T> {
locked: RefCell<bool>,
data: RefCell<T>,
}

impl<T> fmt::Debug for Lock<T>
where
T: fmt::Debug,
{
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "Lock<{:?}>", self.data.borrow())
}
}

// SAFETY: 我们确信 Lock<T> 很安全,可以在多个线程中共享
unsafe impl<T> Sync for Lock<T> {}

impl<T> Lock<T> {
pub fn new(data: T) -> Self {
Self {
data: RefCell::new(data),
locked: RefCell::new(false),
}
}

pub fn lock(&self, op: impl FnOnce(&mut T)) {
// 如果没拿到锁,就一直 spin
while *self.locked.borrow() != false {} // **1

// 拿到,赶紧加锁
*self.locked.borrow_mut() = true; // **2

// 开始干活
op(&mut self.data.borrow_mut()); // **3

// 解锁
*self.locked.borrow_mut() = false; // **4
}
}

fn main() {
let data = Arc::new(Lock::new(0));

let data1 = data.clone();
let t1 = thread::spawn(move || {
data1.lock(|v| *v += 10);
});

let data2 = data.clone();
let t2 = thread::spawn(move || {
data2.lock(|v| *v *= 10);
});
t1.join().unwrap();
t2.join().unwrap();

println!("data: {:?}", data);
}

在 lock() 方法里,拿不到锁的并发任务会一直 spin,拿到锁的任务可以干活,干完活后会解锁,这样之前 spin 的任务会竞争到锁,进入临界区。这里的实现有一些潜在的不易发现的问题:

  • 多核情况下,1 和 2 之间,有可能其它线程也碰巧 spin 结束,把 locked 修改为 true。这样,存在多个线程拿到这把锁,破坏了任何线程都有独占访问的保证。
  • 单核情况下,1 和 2 之间,也可能因为操作系统的可抢占式调度,导致上述问题发生。
  • 编译器会最大程度优化生成的指令,如果操作之间没有依赖关系,可能会生成乱序的机器码,比如3 被优化放在 1 之前,从而破坏了这个 lock 的保证。注意这里的 3 和 1 之间没有依赖关系。
  • 即便编译器不做乱序处理,CPU 也会最大程度做指令的乱序执行,让流水线的效率最高。同样会发生 3 的问题。

这个锁的行为是未定义的。可能大部分时间如我们所愿,但会随机出现奇奇怪怪的行为。一旦这样的事情发生,bug 可能会以各种不同的面貌出现在系统的各个角落。而且,这样的 bug 几乎是无解的,因为它很难稳定复现,表现行为很不一致,甚至,只在某个 CPU 下出现。

为了解决上面这段代码的问题,我们必须在 CPU 层面做一些保证,让某些操作成为原子操作。

CAS

最基础的保证是:可以通过一条指令读取某个内存地址,判断其值是否等于某个前置值,如果相等,将其修改为新的值。这就是 Compare-and-swap 操作,简称CAS。它是操作系统的几乎所有并发原语的基石,使得我们能实现一个可以正常工作的锁。

上文中的代码示例可以将 1 改写为:

1
while self.locked.compare_exchange(false, true, Ordering::Acquire, Ordering::Relaxed).is_err() {}

如果 locked 当前的值是 false,就将其改成 true。这整个操作在一条指令里完成,不会被其它线程打断或者修改;如果 locked 的当前值不是 false,那么就会返回错误,我们会在此不停 spin,直到前置条件得到满足。这里,compare_exchange 是 Rust 提供的 CAS 操作,它会被编译成 CPU 的对应 CAS 指令。

同样在释放锁的时候,相应地需要使用 atomic 的版本,而非直接赋值成 false:

1
self.locked.store(false, Ordering::Release);

Ordering

通过使用 compare_exchange ,规避了 1 和 2 面临的问题,但对于和编译器/CPU自动优化相关的 3 和 4,我们还需要一些额外处理。这就是这个函数里额外的两个和 Ordering 有关。

  • Relaxed,这是最宽松的规则,它对编译器和 CPU 不做任何限制,可以乱序执行。
  • Release,当写入数据的时候:
    • 对于当前线程,任何读取或写入操作都不能被乱序排在这个 store 之后
    • 对于其它线程,如果使用了 Acquire 来读取这个 atomic 的数据,那么它们看到的是修改后的结果。所以能保证读到最新的值。
  • Acquire 是当读取数据的时候:
    • 对于当前线程,任何读取或者写入操作都不能被乱序排在这个读取之前。
    • 对于其它线程,如果使用了 Release 来修改数据,那么,修改的值对当前线程可见。
  • AcqRel 是Acquire 和 Release 的结合,同时拥有 Acquire 和 Release 的保证。
    • 一般用在 fetch_xxx 上,比如对一个 atomic 自增 1,你希望这个操作之前和之后的读取或写入操作不会被乱序,并且操作的结果对其它线程可见。
  • SeqCst 是最严格的 ordering,除了 AcqRel 的保证外,它还保证所有线程看到的所有 SeqCst 操作的顺序是一致的。

因为 CAS 和 ordering 都是系统级的操作,所以这里描述的 Ordering 的用途在各种语言中都大同小异。

对于 Rust 来说,它的 atomic 原语继承于 C++。C++ 关于 ordering 的文档要清晰得多。

由于 compare_exchange 会独占访问,所以可以进行以下优化:

1
2
3
4
5
6
7
8
9
while self
.locked
.compare_exchange(false, true, Ordering::Acquire, Ordering::Relaxed)
.is_err()
{
// compare_exchange 需要独占访问,当拿不到锁时,我们
// 检测 locked 的状态,直到其 unlocked 后,再尝试拿锁
while self.locked.load(Ordering::Relaxed) == true {}
}

CAS 是个代价比较高的操作,它需要获得对应内存的独占访问(exclusive access),我们希望失败的时候只是简单读取 atomic 的状态,只有符合条件的时候再去做独占访问,进行 CAS。

一个全局 Metrics 的示例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
use std::{
collections::HashMap,
sync::atomic::{AtomicUsize, Ordering},
};

// server statistics
pub struct Metrics(HashMap<&'static str, AtomicUsize>);

impl Metrics {
pub fn new(names: &[&'static str]) -> Self {
let mut metrics: HashMap<&'static str, AtomicUsize> = HashMap::new();
for name in names.iter() {
metrics.insert(name, AtomicUsize::new(0));
}
Self(metrics)
}

pub fn inc(&self, name: &'static str) {
if let Some(m) = self.0.get(name) {
m.fetch_add(1, Ordering::Relaxed);
}
}

pub fn add(&self, name: &'static str, val: usize) {
if let Some(m) = self.0.get(name) {
m.fetch_add(val, Ordering::Relaxed);
}
}

pub fn dec(&self, name: &'static str) {
if let Some(m) = self.0.get(name) {
m.fetch_sub(1, Ordering::Relaxed);
}
}

pub fn snapshot(&self) -> Vec<(&'static str, usize)> {
self.0
.iter()
.map(|(k, v)| (*k, v.load(Ordering::Relaxed)))
.collect()
}
}

lazy_static! {
pub(crate) static ref METRICS: Metrics = Metrics::new(&[
"topics",
"clients",
"peers",
"broadcasts",
"servers",
"states",
"subscribers"
]);
}

fn main() {
METRICS.inc("topics");
METRICS.inc("subscribers");

println!("{:?}", METRICS.snapshot());
}

Mutex

通过 SpinLock 做互斥的实现方式有使用场景的限制:如果受保护的临界区太大,那么整体的性能会急剧下降, CPU 忙等,浪费资源还不干实事,不适合作为一种通用的处理方法。更通用的解决方案是:当多个线程竞争同一个 Mutex 时,获得锁的线程得到临界区的访问,其它线程被挂起,放入该 Mutex 上的一个等待队列里。当获得锁的线程完成工作,退出临界区时,Mutex 会给等待队列发一个信号,把队列中第一个线程唤醒,于是这个线程可以进行后续的访问。

SpinLock和 Mutex 最大的不同是,使用 SpinLock,线程在忙等(busy wait),而使用 Mutex lock,线程在等待锁的时候会被调度出去,等锁可用时再被调度回来。

线程的上下文切换代价很大,所以频繁将线程挂起再唤醒,会降低整个系统的效率。所以很多 Mutex 具体的实现会将 SpinLock(确切地说是 spin wait)和线程挂起结合使用:线程的 lock 请求如果拿不到会先尝试 spin 一会,然后再挂起添加到等待队列。Rust 下的 parking_lot 就是这样实现的。

如果新来的线程恰巧在 spin 过程中拿到了锁,而当前等待队列中还有其它线程在等待锁,那么等待的线程只能继续等待下去,这不符合 FIFO,不适合那些需要严格按先来后到排队的使用场景。为此,parking_lot 提供了 fair mutex。

Mutex 的实现依赖于 CPU 提供的 atomic。可以把 Mutex 想象成一个粒度更大的 atomic,只不过这个 atomic 无法由 CPU 保证,而是通过软件算法来实现。

Condvar

Condvar 有两种状态:

  • 等待(wait):线程在队列中等待,直到满足某个条件。
  • 通知(notify):当 condvar 的条件满足时,当前线程通知其他等待的线程可以被唤醒。

Condvar 往往和 Mutex 一起使用:Mutex 用于保证条件在读写时互斥,Condvar 用于控制线程的等待和唤醒。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
use std::sync::{Arc, Condvar, Mutex};
use std::thread;
use std::time::Duration;

fn main() {
let pair = Arc::new((Mutex::new(false), Condvar::new()));
let pair2 = Arc::clone(&pair);

thread::spawn(move || {
let (lock, cvar) = &*pair2;
let mut started = lock.lock().unwrap();
*started = true;
eprintln!("I'm a happy worker!");
// 通知主线程
cvar.notify_one();
loop {
thread::sleep(Duration::from_secs(1));
println!("working...");
}
});

// 等待工作线程的通知
let (lock, cvar) = &*pair;
let mut started = lock.lock().unwrap();
while !*started {
// 唤醒哪个 Mutex 下等待的线程
started = cvar.wait(started).unwrap();
}
eprintln!("Worker started!");
}

Channel

Channel 把锁封装在了队列写入和读取的小块区域内,然后把读者和写者完全分离,使得读者读取数据和写者写入数据,对开发者而言,除了潜在的上下文切换外,完全和锁无关,就像访问一个本地队列一样。对于大部分并发问题,都可以用 Channel 或者类似的思想来处理(比如 actor model)。

Rust 提供了以下四种 Channel:

  • oneshot:这可能是最简单的 Channel,写者就只发一次数据,而读者也只读一次。
  • rendezvous:适用于只需要通过 Channel 来控制线程间的同步,并不需要发送数据。rendezvous channel 是 channel size 为 0 的一种特殊情况。rendezvous channel 其实也就是 Mutex + Condvar 的一个包装。
  • bounded:bounded channel 有一个队列,但队列有上限。一旦队列被写满了,写者也需要被挂起等待。当阻塞发生后,读者一旦读取数据,channel 内部就会使用 Condvar 的 notify_one 通知写者,唤醒某个写者使其能够继续写入。 > 一般会用到 Mutex + Condvar + VecDeque 来实现;如果不用 Condvar,可以直接使用 thread::park + thread::notify 来完成(flume 的做法);如果不用 VecDeque,也可以使用双向链表或者其它的 ring buffer 的实现。
  • unbounded:queue 没有上限,如果写满了,就自动扩容。我们知道,Rust 的很多数据结构如 Vec 、VecDeque 都是自动扩容的。unbounded 和 bounded 相比,除了不阻塞写者,其它实现都很类似。

所有这些 channel 类型,同步和异步的实现思路大同小异,主要的区别在于挂起/唤醒的对象。在同步的世界里,挂起/唤醒的对象是线程;而异步的世界里,是粒度很小的 task。

根据 Channel 读者和写者的数量,Channel 又可以分为:

  • SPSC:Single-Producer Single-Consumer,单生产者,单消费者。最简单,可以不依赖于 Mutex,只用 atomics 就可以实现。
  • SPMC:Single-Producer Multi-Consumer,单生产者,多消费者。需要在消费者这侧读取时加锁。
  • MPSC:Multi-Producer Single-Consumer,多生产者,单消费者。需要在生产者这侧写入时加锁。
  • MPMC:Multi-Producer Multi-Consumer。多生产者,多消费者。需要在生产者写入或者消费者读取时加锁。

在众多 Channel 类型中,使用最广的是 MPSC channel,多生产者,单消费者。通过单消费者来保证,消息的数据结构有独占的写访问。

Actor

actor 是一种有栈协程。每个 actor,有自己的一个独立的、轻量级的调用栈,以及一个用来接受消息的消息队列(mailbox 或者 message queue),外界跟 actor 打交道的唯一手段就是,给它发送消息。

Rust 标准库没有 actor 的实现,但是社区里有比较成熟的 actix,以及 bastion

一个示例 Actor 实现为:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
use actix::prelude::*;
use anyhow::Result;

#[derive(Message, Debug, Clone, PartialEq)]
#[rtype(result = "OutMsg")]
enum InMsg {
Add((usize, usize)),
Concat((String, String)),
}

#[derive(MessageResponse, Debug, Clone, PartialEq)]
enum OutMsg {
Num(usize),
Str(String),
}

// Actor
struct DummyActor;

// 只需要实现 Actor trait和Handler<InMsg> trait
impl Actor for DummyActor {
type Context = Context<Self>;
}

// 实现处理 InMsg 的 Handler trait
impl Handler<InMsg> for DummyActor {
type Result = OutMsg; // <- 返回的消息

fn handle(&mut self, msg: InMsg, _ctx: &mut Self::Context) -> Self::Result {
match msg {
InMsg::Add((a, b)) => OutMsg::Num(a + b),
InMsg::Concat((mut s1, s2)) => {
s1.push_str(&s2);
OutMsg::Str(s1)
}
}
}
}

#[actix::main]
async fn main() -> Result<()> {
let addr = DummyActor.start();
let res = addr.send(InMsg::Add((21, 21))).await?;
let res1 = addr
.send(InMsg::Concat(("hello, ".into(), "world".into())))
.await?;

println!("res: {:?}, res1: {:?}", res, res1);

Ok(())
}

小结

  • Atomic 在处理简单的原生类型时非常有用,如果你可以通过 AtomicXXX 结构进行同步,那么它们是最好的选择。
  • 当你的数据结构无法简单通过 AtomicXXX 进行同步,但你又的确需要在多个线程中共享数据,那么 Mutex / RwLock 可以是一种选择。不过,需要考虑锁的粒度,粒度太大的 Mutex / RwLock 效率很低。
  • 如果有 N 份资源可以供多个并发任务竞争使用,那么,Semaphore 是一个很好的选择。比如做一个 DB 连接池。
  • 当需要在并发任务中通知、协作时,Condvar 提供了最基本的通知机制,而Channel 把这个通知机制进一步广泛扩展开。可以用 Condvar 进行点对点的同步,用 Channel 做一对多、多对一、多对多的同步。

做大部分复杂的系统设计时,Channel 往往是最有力的武器,除了可以让数据穿梭于各个线程、各个异步任务间,它的接口还可以很优雅地跟 stream 适配。

在做整个后端的系统架构时,着眼的是:有哪些服务、服务和服务之间如何通讯、数据如何流动、服务和服务间如何同步。在做某一个服务的架构时,着眼的是有哪些功能性的线程(异步任务)、它们之间的接口是什么样子、数据如何流动、如何同步。

而 Channel 兼具接口、同步和数据流三种功能。

Rust 提供几乎你需要的所有解决方案,可按需选择。


本博客所有文章除特别声明外,均采用 CC BY-SA 4.0 协议 ,转载请注明出处!