C++多线程笔记

C++ chrono

明确区分时间点时间段

时间点类型:chrono::steady_clock::time_point 等 时间段类型:chrono::milliseconds,chrono::seconds,chrono::minutes ...

利用运算符重载:时间点+时间段=时间点,时间点-时间点=时间段。

1
2
3
4
auto t0 = chrono::steady_clock::now(); // 获取当前时间点
auto t1 = t0 + chrono::seconds(30); // 当前时间点的30秒后
auto dt = t1 - t0; // 获取两个时间点的差(时间段)
int64_t sec = chrono::duration_cast<chrono::seconds>(dt).count(); // 时间差的秒数

跨平台的 sleep:std::this_thread::sleep_for

可以用 std::this_thread::sleep_for 替代 Unix 类操作系统专有的的 usleep。他可以让当前线程休眠一段时间。

1
2
3
4
5
6
7
8
#include <iostream>
#include <thread>
#include <chrono>

int main() {
std::this_thread::sleep_for(std::chrono::milliseconds(400));
return 0;
}

除了接受一个时间段的 sleep_for,还有接受一个时间点的 sleep_until,表示让当前线程休眠直到某个时间点。

1
2
3
4
5
6
7
8
9
#include <iostream>
#include <thread>
#include <chrono>

int main() {
auto t = std::chrono::steady_clock::now() + std::chrono::milliseconds(400);
std::this_thread::sleep_until(t);
return 0;
}

进程与线程

进程是一个应用程序被操作系统拉起来加载到内存之后从开始执行到执行结束的这样一个过程。简单来说,进程是程序(应用程序,可执行文件)的一次执行。比如双击打开一个桌面应用软件就是开启了一个进程。 线程是进程中的一个实体,是被系统独立分配和调度的基本单位。线程是CPU可执行调度的最小单位。也就是说,进程本身并不能获取CPU时间,只有它的线程才可以。

  • 每个线程共享同样的内存空间,开销比较小。
  • 每个进程拥有独立的内存空间,因此开销更大。
  • 对于高性能并行计算,更好的是多线程。

C++ 中的多线程:std::thread

C++11 开始,为多线程提供了语言级别的支持。他用 std::thread 这个类来表示线程。

std::thread 构造函数的参数可以是任意 lambda 表达式。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
#include <iostream>
#include <thread>
#include <string>

void download(std::string file) {
for (int i = 0; i < 10; i++) {
std::cout << "Downloading " << file
<< " (" << i * 10 << "%)..." << std::endl;
std::this_thread::sleep_for(std::chrono::milliseconds(400));
}
std::cout << "Download complete: " << file << std::endl;
}

void interact() {
std::string name;
std::cin >> name;
std::cout << "Hi, " << name << std::endl;
}

int main() {
std::thread t1([&] {
download("hello.zip");
});
interact();
std::cout << "Waiting for child thread..." << std::endl;
t1.join();
std::cout << "Child thread exited!" << std::endl;
return 0;
}

如果使用CMake,编译时链接设置:

1
2
find_package(Threads REQUIRED)
target_link_libraries(cpptest PUBLIC Threads::Threads)

作为一个 C++ 类,std::thread 同样遵循 RAII 思想和三五法则:因为管理着资源,他自定义了解构函数,删除了拷贝构造/赋值函数,但是提供了移动构造/赋值函数。

当 t1 所在的函数退出时,就会调用 std::thread 的解构函数,这会销毁 t1 线程。

t1.detach() 意味着线程的生命周期不再由当前 std::thread 对象管理,而是在线程退出以后自动销毁自己。

1
2
3
4
5
6
7
8
9
10
11
12
13
void myfunc() {
std::thread t1([&] {
download("hello.zip");
});
t1.detach();
// t1 所代表的线程被分离了,不再随 t1 对象销毁
}

int main() {
myfunc();
interact();
return 0;
}

但是 detach 的问题是进程退出时候不会等待所有子线程执行完毕。所以另一种解法是把 t1 对象移动到一个全局变量去,从而延长其生命周期到 myfunc 函数体外。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
#include <iostream>
#include <thread>
#include <string>
#include <vector>

void download(std::string file) {
for (int i = 0; i < 10; i++) {
std::cout << "Downloading " << file
<< " (" << i * 10 << "%)..." << std::endl;
std::this_thread::sleep_for(std::chrono::milliseconds(400));
}
std::cout << "Download complete: " << file << std::endl;
}

void interact() {
std::string name;
std::cin >> name;
std::cout << "Hi, " << name << std::endl;
}

class ThreadPool {
std::vector<std::thread> m_pool;

public:
void push_back(std::thread thr) {
m_pool.push_back(std::move(thr));
}

~ThreadPool() { // main 函数退出后会自动调用
for (auto &t: m_pool) t.join(); // 等待池里的线程全部执行完毕
}
};

ThreadPool tpool;

void myfunc() {
std::thread t1([&] {
download("hello.zip");
});
// 移交控制权到全局的 pool 列表,以延长 t1 的生命周期
tpool.push_back(std::move(t1));
}

int main() {
myfunc();
interact();
return 0;
}

C++20 std::jthread 类

C++20 引入了 std::jthread 类,和 std::thread 不同在于:他的解构函数里会自动调用 join() 函数,从而保证 pool 解构时会自动等待全部线程执行完毕。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
#include <iostream>
#include <thread>
#include <string>
#include <vector>

void download(std::string file) {
for (int i = 0; i < 10; i++) {
std::cout << "Downloading " << file
<< " (" << i * 10 << "%)..." << std::endl;
std::this_thread::sleep_for(std::chrono::milliseconds(400));
}
std::cout << "Download complete: " << file << std::endl;
}

void interact() {
std::string name;
std::cin >> name;
std::cout << "Hi, " << name << std::endl;
}

// ~jthread() 解构函数里会自动调用 join(),如果 joinable() 的话
std::vector<std::jthread> pool;

void myfunc() {
std::jthread t1([&] {
download("hello.zip");
});
// 移交控制权到全局的 pool 列表,以延长 t1 的生命周期
pool.push_back(std::move(t1));
}

int main() {
myfunc();
interact();
return 0;
}

异步

std::async

std::async 接受一个带返回值的 lambda,自身返回一个 std::future 对象。lambda 的函数体将在另一个线程里执行。在 main 里面做一些别的事情,最后调用 future 的 get() 方法,如果此时 download 还没完成,会等待 download 完成,并获取 download 的返回值。

除了 get() 会等待线程执行完毕外,wait() 也可以等待他执行完,但是不会返回其值。

只要线程没有执行完,wait() 会无限等下去。而 wait_for() 则可以指定一个最长等待时间,用 chrono 表示单位。他会返回一个 std::future_status 表示等待是否成功。

  • 如果超过这个时间线程还没有执行完毕,则放弃等待,返回 future_status::timeout。
  • 如果线程在指定的时间内执行完毕,则认为等待成功,返回 future_status::ready。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
#include <iostream>
#include <string>
#include <thread>
#include <future>

int download(std::string file) {
for (int i = 0; i < 10; i++) {
std::cout << "Downloading " << file
<< " (" << i * 10 << "%)..." << std::endl;
std::this_thread::sleep_for(std::chrono::milliseconds(400));
}
std::cout << "Download complete: " << file << std::endl;
return 404;
}

void interact() {
std::string name;
std::cin >> name;
std::cout << "Hi, " << name << std::endl;
}

int main() {
std::future<int> fret = std::async([&] {
return download("hello.zip");
});
interact();
std::cout << "Waiting for download complete..." << std::endl;
fret.wait();
std::cout << "Wait returned!" << std::endl;
int ret = fret.get();
std::cout << "Download result: " << ret << std::endl;
return 0;
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
// wait_for
int main() {
std::future<int> fret = std::async([&] {
return download("hello.zip");
});
interact();
while (true) {
std::cout << "Waiting for download complete..." << std::endl;
auto stat = fret.wait_for(std::chrono::milliseconds(1000));
if (stat == std::future_status::ready) {
std::cout << "Future is ready!!" << std::endl;
break;
} else {
std::cout << "Future not ready!!" << std::endl;
}
}
int ret = fret.get();
std::cout << "Download result: " << ret << std::endl;
return 0;
}

std::async 的第一个参数可以设为 std::launch::deferred,这时不会创建一个线程来执行,他只会把 lambda 函数体内的运算推迟到 future 的 get() 被调用时。也就是 main 中的 interact 计算完毕后。

download 的执行仍在主线程中,他只是函数式编程范式意义上的异步,而不涉及到真正的多线程。可以用这个实现惰性求值(lazy evaluation)之类。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
#include <iostream>
#include <string>
#include <thread>
#include <future>

int download(std::string file) {
for (int i = 0; i < 10; i++) {
std::cout << "Downloading " << file
<< " (" << i * 10 << "%)..." << std::endl;
std::this_thread::sleep_for(std::chrono::milliseconds(400));
}
std::cout << "Download complete: " << file << std::endl;
return 404;
}

void interact() {
std::string name;
std::cin >> name;
std::cout << "Hi, " << name << std::endl;
}

int main() {
std::future<int> fret = std::async(std::launch::deferred, [&] {
return download("hello.zip");
});
interact();
int ret = fret.get();
std::cout << "Download result: " << ret << std::endl;
return 0;
}

std::promise

如果不想让 std::async 帮你自动创建线程,想要手动创建线程,可以直接用 std::promise。

然后在线程返回的时候,用 set_value() 设置返回值。在主线程里,用 get_future() 获取其 std::future 对象,进一步 get() 可以等待并获取线程返回值。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
#include <iostream>
#include <string>
#include <thread>
#include <future>

int download(std::string file) {
for (int i = 0; i < 10; i++) {
std::cout << "Downloading " << file
<< " (" << i * 10 << "%)..." << std::endl;
std::this_thread::sleep_for(std::chrono::milliseconds(400));
}
std::cout << "Download complete: " << file << std::endl;
return 404;
}

void interact() {
std::string name;
std::cin >> name;
std::cout << "Hi, " << name << std::endl;
}

int main() {
std::promise<int> pret;
std::thread t1([&] {
auto ret = download("hello.zip");
pret.set_value(ret);
});
std::future<int> fret = pret.get_future();

interact();
int ret = fret.get();
std::cout << "Download result: " << ret << std::endl;

t1.join();
return 0;
}

std::future

future 为了三五法则,删除了拷贝构造/赋值函数。如果需要浅拷贝,实现共享同一个 future 对象,可以用 std::shared_future。

如果不需要返回值,std::async 里 lambda 的返回类型可以为 void, 这时 future 对象的类型为 std::future

同理有 std::promise,他的 set_value() 不接受参数,仅仅作为同步用。

互斥量

std::mutex:上锁,防止多个线程同时进入某一代码段。

调用 std::mutex 的 lock() 时,会检测 mutex 是否已经上锁。如果没有锁定,则对 mutex 进行上锁。如果已经锁定,则陷入等待,直到 mutex 被另一个线程解锁后,才再次上锁。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
#include <iostream>
#include <string>
#include <thread>
#include <vector>
#include <mutex>

int main() {
std::vector<int> arr;
std::mutex mtx;
std::thread t1([&] {
for (int i = 0; i < 1000; i++) {
mtx.lock();
arr.push_back(1);
mtx.unlock();
}
});
std::thread t2([&] {
for (int i = 0; i < 1000; i++) {
mtx.lock();
arr.push_back(2);
mtx.unlock();
}
});
t1.join();
t2.join();
return 0;
}

std::lock_guard

根据 RAII 思想,可将锁的持有视为资源,上锁视为锁的获取,解锁视为锁的释放。

std::lock_guard 就是这样一个工具类,他的构造函数里会调用 mtx.lock(),解构函数会调用 mtx.unlock()。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
#include <iostream>
#include <string>
#include <thread>
#include <vector>
#include <mutex>

int main() {
std::vector<int> arr;
std::mutex mtx;
std::thread t1([&] {
for (int i = 0; i < 1000; i++) {
std::lock_guard grd(mtx);
arr.push_back(1);
}
});
std::thread t2([&] {
for (int i = 0; i < 1000; i++) {
std::lock_guard grd(mtx);
arr.push_back(2);
}
});
t1.join();
t2.join();
return 0;
}

std::unique_lock

std::lock_guard 严格在解构时 unlock(),但是有时候我们会希望提前 unlock()。这时可以用 std::unique_lock,他额外存储了一个 flag 表示是否已经被释放。可以直接调用 unique_lock 的 unlock() 函数来提前解锁。他会在解构检测 flag,如果没有释放,则调用 unlock(),否则不调用。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
#include <iostream>
#include <string>
#include <thread>
#include <vector>
#include <mutex>

int main() {
std::vector<int> arr;
std::mutex mtx;
std::thread t1([&] {
for (int i = 0; i < 1000; i++) {
std::unique_lock grd(mtx);
arr.push_back(1);
}
});
std::thread t2([&] {
for (int i = 0; i < 1000; i++) {
std::unique_lock grd(mtx);
arr.push_back(2);
grd.unlock();
printf("outside of lock\n");
// grd.lock(); // 如果需要,还可以重新上锁
}
});
t1.join();
t2.join();
return 0;
}

std::unique_lock 的构造函数还可以有一个额外参数,那就是 std::defer_lock。指定了这个参数的话,std::unique_lock 不会在构造函数中调用 mtx.lock(),需要之后再手动调用 grd.lock() 才能上锁。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
#include <iostream>
#include <string>
#include <thread>
#include <vector>
#include <mutex>

int main() {
std::vector<int> arr;
std::mutex mtx;
std::thread t1([&] {
for (int i = 0; i < 1000; i++) {
std::unique_lock grd(mtx);
arr.push_back(1);
}
});
std::thread t2([&] {
for (int i = 0; i < 1000; i++) {
std::unique_lock grd(mtx, std::defer_lock);
printf("before the lock\n");
grd.lock();
arr.push_back(2);
grd.unlock();
printf("outside of lock\n");
}
});
t1.join();
t2.join();
return 0;
}

不同的对象,各有一个 mutex,独立地上锁,可以避免不必要的锁定,提升高并发时的性能。

也可以用无阻塞的 try_lock(),他在上锁失败时不会陷入等待,而是直接返回 false;如果上锁成功,则会返回 true。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
#include <cstdio>
#include <mutex>

std::mutex mtx1;

int main() {
if (mtx1.try_lock())
printf("succeed\n");
else
printf("failed\n");

if (mtx1.try_lock())
printf("succeed\n");
else
printf("failed\n");

mtx1.unlock();
return 0;
}

std::unique_lock 和 std::mutex 具有同样的接口。

这种只要具有某些指定名字的成员函数,就判断一个类是否满足某些功能的思想,在 Python 称为鸭子类型,而 C++ 称为 concept(概念)。比起虚函数和动态多态的接口抽象,concept 使实现和接口更加解耦合且没有性能损失。

死锁

双方都在等着对方释放锁,但是因为等待而无法释放锁,从而要无限制等下去。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
#include <iostream>
#include <string>
#include <thread>
#include <mutex>

int main() {
std::mutex mtx1;
std::mutex mtx2;

std::thread t1([&] {
for (int i = 0; i < 1000; i++) {
mtx1.lock();
mtx2.lock();
mtx2.unlock();
mtx1.unlock();
}
});
std::thread t2([&] {
for (int i = 0; i < 1000; i++) {
mtx2.lock();
mtx1.lock();
mtx1.unlock();
mtx2.unlock();
}
});
t1.join();
t2.join();
return 0;
}

解决1:永远不要同时持有两个锁

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
#include <iostream>
#include <string>
#include <thread>
#include <mutex>

int main() {
std::mutex mtx1;
std::mutex mtx2;

std::thread t1([&] {
for (int i = 0; i < 1000; i++) {
mtx1.lock();
mtx1.unlock();
mtx2.lock();
mtx2.unlock();
}
});
std::thread t2([&] {
for (int i = 0; i < 1000; i++) {
mtx2.lock();
mtx2.unlock();
mtx1.lock();
mtx1.unlock();
}
});
t1.join();
t2.join();
return 0;
}

解决2:保证双方上锁顺序一致

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
#include <iostream>
#include <string>
#include <thread>
#include <mutex>

int main() {
std::mutex mtx1;
std::mutex mtx2;

std::thread t1([&] {
for (int i = 0; i < 1000; i++) {
mtx1.lock();
mtx2.lock();
mtx2.unlock();
mtx1.unlock();
}
});
std::thread t2([&] {
for (int i = 0; i < 1000; i++) {
mtx1.lock();
mtx2.lock();
mtx2.unlock();
mtx1.unlock();
}
});
t1.join();
t2.join();
return 0;
}

解决3:用 std::lock 同时对多个上锁

可以用标准库的 std::lock(mtx1, mtx2, ...) 函数,一次性对多个 mutex 上锁。他接受任意多个 mutex 作为参数,并且他保证在无论任意线程中调用的顺序是否相同,都不会产生死锁问题。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
#include <iostream>
#include <string>
#include <thread>
#include <mutex>

int main() {
std::mutex mtx1;
std::mutex mtx2;

std::thread t1([&] {
for (int i = 0; i < 1000; i++) {
std::lock(mtx1, mtx2);
mtx1.unlock();
mtx2.unlock();
}
});
std::thread t2([&] {
for (int i = 0; i < 1000; i++) {
std::lock(mtx2, mtx1);
mtx2.unlock();
mtx1.unlock();
}
});
t1.join();
t2.join();
return 0;
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
// RAII

#include <iostream>
#include <string>
#include <thread>
#include <mutex>

int main() {
std::mutex mtx1;
std::mutex mtx2;

std::thread t1([&] {
for (int i = 0; i < 1000; i++) {
std::scoped_lock grd(mtx1, mtx2);
// do something
}
});
std::thread t2([&] {
for (int i = 0; i < 1000; i++) {
std::scoped_lock grd(mtx2, mtx1);
// do something
}
});
t1.join();
t2.join();
return 0;
}

同一个线程重复调用 lock() 也会造成死锁。即使只有一个线程一个锁,如果 lock() 以后又调用 lock(),也会造成死锁。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
#include <iostream>
#include <mutex>

std::mutex mtx1;

void other() {
mtx1.lock();
// do something
mtx1.unlock();
}

void func() {
mtx1.lock();
other();
mtx1.unlock();
}

int main() {
func();
return 0;
}

解决1:other 里不要再上锁

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
#include <iostream>
#include <mutex>

std::mutex mtx1;

/// NOTE: please lock mtx1 before calling other()
void other() {
// do something
}

void func() {
mtx1.lock();
other();
mtx1.unlock();
}

int main() {
func();
return 0;
}

解决2:改用 std::recursive_mutex

他会自动判断是不是同一个线程 lock() 了多次同一个锁,如果是则让计数器加1,之后 unlock() 会让计数器减1,减到0时才真正解锁。但是相比普通的 std::mutex 有一定性能损失。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
#include <iostream>
#include <mutex>

std::recursive_mutex mtx1;

void other() {
mtx1.lock();
// do something
mtx1.unlock();
}

void func() {
mtx1.lock();
other();
mtx1.unlock();
}

int main() {
func();
return 0;
}

线程安全的vector封装

vector 不是多线程安全的容器。多个线程同时访问同一个 vector 会出现数据竞争(data-race)现象。

先看看错误的方式:用一个类封装一下对 vector 的访问,使其访问都受到一个 mutex 的保护。会出错了!因为 size() 是 const 函数,而 mutex::lock() 却不是 const 的。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
#include <iostream>
#include <thread>
#include <vector>
#include <mutex>

class MTVector {
std::vector<int> m_arr;
std::mutex m_mtx;

public:
void push_back(int val) {
m_mtx.lock();
m_arr.push_back(val);
m_mtx.unlock();
}

# Error: mutex::lock() 却不是 const
size_t size() const {
m_mtx.lock();
size_t ret = m_arr.size();
m_mtx.unlock();
return ret;
}
};

int main() {
MTVector arr;

std::thread t1([&] () {
for (int i = 0; i < 1000; i++) {
arr.push_back(i);
}
});

std::thread t2([&] () {
for (int i = 0; i < 1000; i++) {
arr.push_back(1000 + i);
}
});

t1.join();
t2.join();

std::cout << arr.size() << std::endl;

return 0;
}

解决方法:mutable,让 this 为 const 时仅仅给 m_mtx 开后门,可以用 mutable 关键字修饰他,从而所有成员里只有他不是 const 的。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
class MTVector {
std::vector<int> m_arr;
mutable std::mutex m_mtx;

public:
void push_back(int val) {
m_mtx.lock();
m_arr.push_back(val);
m_mtx.unlock();
}

size_t size() const {
m_mtx.lock();
size_t ret = m_arr.size();
m_mtx.unlock();
return ret;
}
};

读写锁

读可以共享,写必须独占,且写和读不能共存。

标准库提供了 std::shared_mutex。上锁时,要指定你的需求是读还是写,负责调度的读写锁会帮你判断要不要等待。

push_back() 需要修改数据,因需求此为拉,使用 lock() 和 unlock() 的组合。

size() 则只要读取数据,不修改数据,因此可以和别人共享一起喝,使用 lock_shared() 和 unlock_shared() 的组合。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
#include <iostream>
#include <thread>
#include <vector>
#include <shared_mutex>

class MTVector {
std::vector<int> m_arr;
mutable std::shared_mutex m_mtx;

public:
void push_back(int val) {
m_mtx.lock();
m_arr.push_back(val);
m_mtx.unlock();
}

size_t size() const {
m_mtx.lock_shared();
size_t ret = m_arr.size();
m_mtx.unlock_shared();
return ret;
}
};

int main() {
MTVector arr;

std::thread t1([&] () {
for (int i = 0; i < 1000; i++) {
arr.push_back(i);
}
});

std::thread t2([&] () {
for (int i = 0; i < 1000; i++) {
arr.push_back(1000 + i);
}
});

t1.join();
t2.join();

std::cout << arr.size() << std::endl;

return 0;
}

或者:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
class MTVector {
std::vector<int> m_arr;
mutable std::shared_mutex m_mtx;

public:
void push_back(int val) {
std::unique_lock grd(m_mtx);
m_arr.push_back(val);
}

size_t size() const {
std::shared_lock grd(m_mtx);
return m_arr.size();
}
};

只需一次上锁的访问者模式

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
#include <iostream>
#include <thread>
#include <vector>
#include <mutex>

class MTVector {
std::vector<int> m_arr;
std::mutex m_mtx;

public:
class Accessor {
MTVector &m_that;
std::unique_lock<std::mutex> m_guard;

public:
Accessor(MTVector &that)
: m_that(that), m_guard(that.m_mtx)
{}

void push_back(int val) const {
return m_that.m_arr.push_back(val);
}

size_t size() const {
return m_that.m_arr.size();
}
};

Accessor access() {
return {*this};
}
};

int main() {
MTVector arr;

std::thread t1([&] () {
auto axr = arr.access(); # Accessor 构造函数上锁
for (int i = 0; i < 1000; i++) {
axr.push_back(i);
}
});

std::thread t2([&] () {
auto axr = arr.access();
for (int i = 0; i < 1000; i++) {
axr.push_back(1000 + i);
}
});

t1.join();
t2.join();

std::cout << arr.access().size() << std::endl;

return 0;
}

条件变量

条件变量cv.wait(lck) 将会让当前线程陷入等待。在其他线程中调用 cv.notify_one() 则会唤醒那个陷入等待的线程。

cv.notify_one() 只会唤醒其中一个等待中的线程,而 cv.notify_all() 会唤醒全部。

还可以额外指定一个参数,变成 cv.wait(lck, expr) 的形式,其中 expr 是个 lambda 表达式,只有其返回值为 true 时才会真正唤醒,否则继续等待。wait() 的过程中会暂时 unlock() 这个锁。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
#include <iostream>
#include <thread>
#include <mutex>
#include <condition_variable>

int main() {
std::condition_variable cv;
std::mutex mtx;
bool ready = false;

std::thread t1([&] {
std::unique_lock lck(mtx);
cv.wait(lck, [&] { return ready; });

std::cout << "t1 is awake" << std::endl;
});

std::cout << "notifying not ready" << std::endl;
cv.notify_one(); // useless now, since ready = false

ready = true;
std::cout << "notifying ready" << std::endl;
cv.notify_one(); // awakening t1, since ready = true

t1.join();

return 0;
}

std::condition_variable 必须和 std::unique_lock<std::mutex> 一起用。因为要保证多个线程被唤醒时 ( cv.notify_all() ),只有一个能够被启动。

std::condition_variable 仅仅支持 std::unique_lock<std::mutex> 作为 wait 的参数,如果需要用其他类型的 mutex 锁,可以用 std::condition_variable_any。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
#include <iostream>
#include <thread>
#include <vector>
#include <mutex>
#include <condition_variable>

int main() {
std::condition_variable cv;
std::mutex mtx;

std::thread t1([&] {
std::unique_lock lck(mtx);
cv.wait(lck);
std::cout << "t1 is awake" << std::endl;
});

std::thread t2([&] {
std::unique_lock lck(mtx);
cv.wait(lck);
std::cout << "t2 is awake" << std::endl;
});

std::thread t3([&] {
std::unique_lock lck(mtx);
cv.wait(lck);
std::cout << "t3 is awake" << std::endl;
});

std::this_thread::sleep_for(std::chrono::milliseconds(400));

std::cout << "notifying one" << std::endl;
cv.notify_one(); // awakening t1 only

std::this_thread::sleep_for(std::chrono::milliseconds(400));

std::cout << "notifying all" << std::endl;
cv.notify_all(); // awakening t1 and t2

t1.join();
t2.join();
t3.join();

return 0;
}

实现生成者消费者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
#include <iostream>
#include <thread>
#include <vector>
#include <mutex>
#include <condition_variable>

template <class T>
class MTQueue {
std::condition_variable m_cv;
std::mutex m_mtx;
std::vector<T> m_arr;

public:
T pop() {
std::unique_lock lck(m_mtx);
m_cv.wait(lck, [this] { return !m_arr.empty(); });
T ret = std::move(m_arr.back());
m_arr.pop_back();
return ret;
}

auto pop_hold() {
std::unique_lock lck(m_mtx);
m_cv.wait(lck, [this] { return !m_arr.empty(); });
T ret = std::move(m_arr.back());
m_arr.pop_back();
return std::pair(std::move(ret), std::move(lck));
}

void push(T val) {
std::unique_lock lck(m_mtx);
m_arr.push_back(std::move(val));
m_cv.notify_one();
}

void push_many(std::initializer_list<T> vals) {
std::unique_lock lck(m_mtx);
std::copy(
std::move_iterator(vals.begin()),
std::move_iterator(vals.end()),
std::back_insert_iterator(m_arr));
m_cv.notify_all();
}
};

int main() {
MTQueue<int> foods;

std::thread t1([&] {
for (int i = 0; i < 2; i++) {
auto food = foods.pop();
std::cout << "t1 got food:" << food << std::endl;
}
});

std::thread t2([&] {
for (int i = 0; i < 2; i++) {
auto food = foods.pop();
std::cout << "t2 got food:" << food << std::endl;
}
});

foods.push(42);
foods.push(233);
foods.push_many({666, 4399});

t1.join();
t2.join();

return 0;
}

原子操作

多个线程同时往一个 int 变量里累加,这样肯定会出错,因为 counter += i 在 CPU 看来会变成三个指令:

  1. 读取 counter 变量到 rax 寄存器
  2. rax 寄存器的值加上 1
  3. 把 rax 写入到 counter 变量

现代 CPU 为了高效,使用了大量奇技淫巧,比如他会把一条汇编指令拆分成很多微指令 (micro-ops),三个甚至有点保守估计了。

现代 CPU 还有高速缓存,乱序执行,指令级并行等优化策略,你根本不知道每条指令实际的先后顺序。

mutex 太过重量级,他会让线程被挂起,从而需要通过系统调用,进入内核层,调度到其他线程执行,有很大的开销。只是想要修改一个小小的 int 变量而已,用昂贵的 mutex 严重影响了效率。

atomic

更轻量级的 atomic,对他的 += 等操作,会被编译器转换成专门的指令。

CPU 识别到该指令时,会锁住内存总线,放弃乱序执行等优化策略(将该指令视为一个同步点,强制同步掉之前所有的内存操作),从而向你保证该操作是原子 (atomic) 的,不可分割的。

只需把 int 改成 atomic 即可。写法也有讲究:

1
2
3
counter = counter + 1;  // 错,不能保证原子性
counter += 1; // OK,能保证原子性
counter++; // OK,能保证原子性

除了用方便的运算符重载之外,还可以直接调用相应的函数名,比如: fetch_add 对应于 +=;store 对应于 =;load 用于读取其中的 int 值。

fetch_add会返回其旧值:

1
int old = atm.fetch_add(val)

除了会导致 atm 的值增加 val 外,还会返回 atm 增加前的值,存储到 old。

当然这里也可以 counter++,不过要追加多个的话还是得用到 counter.fetch_add(n)。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
#include <iostream>
#include <thread>
#include <atomic>
#include <vector>

int main() {
std::atomic<int> counter;
counter.store(0);

std::vector<int> data(20000);

std::thread t1([&] {
for (int i = 0; i < 10000; i++) {
int index = counter.fetch_add(1);
data[index] = i;
}
});

std::thread t2([&] {
for (int i = 0; i < 10000; i++) {
int index = counter.fetch_add(1);
data[index] = i + 10000;
}
});

t1.join();
t2.join();

std::cout << data[10000] << std::endl;

return 0;
}

exchange(val) 会把 val 写入原子变量,同时返回其旧的值。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
#include <iostream>
#include <atomic>

int main() {
std::atomic<int> counter;

counter.store(0);

int old = counter.exchange(3);
std::cout << "old=" << old << std::endl; // 0

int now = counter.load();
std::cout << "cnt=" << now << std::endl; // 3

return 0;
}

compare_exchange_strong(old, val) ,会读取原子变量的值,比较他是否和 old 相等。如果不相等,则把原子变量的值写入 old。如果相等,则把 val 写入原子变量。返回一个 bool 值,表示是否相等。注意 old 这里传的其实是一个引用。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
#include <iostream>
#include <atomic>

int main() {
boolalpha(std::cout);
std::atomic<int> counter;

counter.store(2);

int old = 1;
bool equal = counter.compare_exchange_strong(old, 3);
std::cout << "equal=" << equal << std::endl; // false
std::cout << "old=" << old << std::endl; // 2

int now = counter.load();
std::cout << "cnt=" << now << std::endl; // 2


equal = counter.compare_exchange_strong(old, 3);
std::cout << "equal=" << equal << std::endl; // true
std::cout << "old=" << old << std::endl; // 2

now = counter.load();
std::cout << "cnt=" << now << std::endl; // 3

return 0;
}

compare_exchange_strong 的逻辑,一般简称 CAS (compare-and-swap),他是并行编程最常用的原子操作之一。实际上任何 atomic 操作,包括 fetch_add,都可以基于 CAS 来实现:这就是 Taichi 实现浮点数 atomic_add 的方法。

并发与并行

并发(时间片调度):单核处理器,操作系统通过时间片调度算法,轮换着执行着不同的线程,看起来就好像是同时运行一样,其实每一时刻只有一个线程在运行。目的:异步地处理多个不同的任务,避免同步造成的阻塞。

并行(物理核心并行):多核处理器,每个处理器执行一个线程,真正的同时运行。目的:将一个任务分派到多个核上,从而更快完成任务。

开源工具:因特尔开源的并行编程库TBB(注意使用 2021.5 之前的版本,而不是最近改名成 OneTBB 的版本)

1
sudo apt-get install libtbb-dev

TBB简介

使用任务组,组织任务。一个任务不一定对应一个线程,如果任务数量超过CPU最大的线程数,会由 TBB 在用户层负责调度任务运行在多个预先分配好的线程,而不是由操作系统负责调度线程运行在多个物理核心。

TBB任务队列调度使用 工作窃取法(work-stealing),每个线程一个任务队列,做完本职工作后可以认领其他线程的任务。而不是原始的单一任务队列。

提供了并发版本的STL容器, tbb::concurrent_vector、tbb::concurrent_map 等。

优化了流水线工作流程,更好地利用缓存和程序的局部性。

性能测试工具

git上下载代码库,进行简单测试。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
#include <iostream>
#include <vector>
#include <cmath>
#include <benchmark/benchmark.h>

constexpr size_t n = 1<<27;
std::vector<float> a(n);

void BM_for(benchmark::State &bm) {
for (auto _: bm) {
// fill a with sin(i)
for (size_t i = 0; i < a.size(); i++) {
a[i] = std::sin(i);
}
}
}
BENCHMARK(BM_for);

void BM_reduce(benchmark::State &bm) {
for (auto _: bm) {
// calculate sum of a
float res = 0;
for (size_t i = 0; i < a.size(); i++) {
res += a[i];
}
// 因为 res 并没有被使用,防止编译优化掉这段for循环
benchmark::DoNotOptimize(res);
}
}
BENCHMARK(BM_reduce);

BENCHMARK_MAIN();
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
cmake_minimum_required(VERSION 3.10)

set(CMAKE_CXX_STANDARD 17)
set(CMAKE_BUILD_TYPE Release)

project(main LANGUAGES CXX)

add_executable(main main.cpp)

#find_package(OpenMP REQUIRED)
#target_link_libraries(main PUBLIC OpenMP::OpenMP_CXX)

find_package(TBB REQUIRED)
target_link_libraries(main PUBLIC TBB::tbb)

# 注意 OFF 这个设置,关闭后不会搜索是否在系统环境安装了google test 工具包
set(BENCHMARK_ENABLE_TESTING OFF CACHE BOOL "Turn off the fking test!")
add_subdirectory(benchmark)
target_link_libraries(main PUBLIC benchmark)