C++ chrono
明确区分时间点 与时间段 。
时间点类型:chrono::steady_clock::time_point 等
auto t0 = chrono::steady_clock::now(); auto t1 = t0 + chrono::seconds(30 ); auto dt = t1 - t0; int64_t sec = chrono::duration_cast<chrono::seconds>(dt).count();
可以用 std::this_thread::sleep_for 替代 Unix 类操作系统专有的的
#include <iostream> #include <thread> #include <chrono> int main () { std ::this_thread::sleep_for(std ::chrono::milliseconds(400 )); return 0 ; }
除了接受一个时间段的 sleep_for,还有接受一个时间点的
#include <iostream> #include <thread> #include <chrono> int main () { auto t = std ::chrono::steady_clock::now() + std ::chrono::milliseconds(400 ); std ::this_thread::sleep_until(t); return 0 ; }
C++ 中的多线程:std::thread
C++11 开始,为多线程提供了语言级别的支持。他用 std::thread
std::thread 构造函数的参数可以是任意 lambda 表达式。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 #include <iostream> #include <thread> #include <string> void download (std ::string file) { for (int i = 0 ; i < 10 ; i++) { std ::cout << "Downloading " << file << " (" << i * 10 << "%)..." << std ::endl ; std ::this_thread::sleep_for(std ::chrono::milliseconds(400 )); } std ::cout << "Download complete: " << file << std ::endl ; }void interact () { std ::string name; std ::cin >> name; std ::cout << "Hi, " << name << std ::endl ; }int main () { std ::thread t1 ([&] { download("hello.zip" ); }) ; interact(); std ::cout << "Waiting for child thread..." << std ::endl ; t1.join(); std ::cout << "Child thread exited!" << std ::endl ; return 0 ; }
find_package (Threads REQUIRED)target_link_libraries (cpptest PUBLIC Threads::Threads)
作为一个 C++ 类,std::thread 同样遵循 RAII
当 t1 所在的函数退出时,就会调用 std::thread 的解构函数,这会销毁 t1
对象管理 ,而是在线程退出以后自动销毁自己。
void myfunc () { std ::thread t1 ([&] { download("hello.zip" ); }) ; t1.detach (); }int main () { myfunc(); interact(); return 0 ; }
但是 detach
的问题是进程退出时候不会等待所有子线程执行完毕。所以另一种解法是把 t1
对象移动到一个全局变量去,从而延长其生命周期到 myfunc 函数体外。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 #include <iostream> #include <thread> #include <string> #include <vector> void download (std ::string file) { for (int i = 0 ; i < 10 ; i++) { std ::cout << "Downloading " << file << " (" << i * 10 << "%)..." << std ::endl ; std ::this_thread::sleep_for(std ::chrono::milliseconds(400 )); } std ::cout << "Download complete: " << file << std ::endl ; }void interact () { std ::string name; std ::cin >> name; std ::cout << "Hi, " << name << std ::endl ; }class ThreadPool { std ::vector <std ::thread> m_pool;public : void push_back (std ::thread thr) { m_pool.push_back(std ::move (thr)); } ~ThreadPool() { for (auto &t: m_pool) t.join(); } }; ThreadPool tpool;void myfunc () { std ::thread t1 ([&] { download("hello.zip" ); }) ; tpool.push_back(std ::move (t1)); }int main () { myfunc(); interact(); return 0 ; }
C++20 std::jthread 类
C++20 引入了 std::jthread 类,和 std::thread
不同在于:他的解构函数里会自动调用 join() 函数,从而保证 pool
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 #include <iostream> #include <thread> #include <string> #include <vector> void download (std ::string file) { for (int i = 0 ; i < 10 ; i++) { std ::cout << "Downloading " << file << " (" << i * 10 << "%)..." << std ::endl ; std ::this_thread::sleep_for(std ::chrono::milliseconds(400 )); } std ::cout << "Download complete: " << file << std ::endl ; }void interact () { std ::string name; std ::cin >> name; std ::cout << "Hi, " << name << std ::endl ; }std ::vector <std ::jthread> pool;void myfunc () { std ::jthread t1 ([&] { download("hello.zip" ); }) ; pool.push_back(std ::move (t1)); }int main () { myfunc(); interact(); return 0 ; }
std::async 接受一个带返回值的 lambda,自身返回一个 std::future
对象。lambda 的函数体将在另一个线程里执行。在 main
里面做一些别的事情,最后调用 future 的 get() 方法,如果此时 download
还没完成,会等待 download 完成,并获取 download 的返回值。
除了 get() 会等待线程执行完毕外,wait()
只要线程没有执行完,wait() 会无限等下去。而 wait_for()
则可以指定一个最长等待时间,用 chrono 表示单位。他会返回一个
std::future_status 表示等待是否成功。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 #include <iostream> #include <string> #include <thread> #include <future> int download (std ::string file) { for (int i = 0 ; i < 10 ; i++) { std ::cout << "Downloading " << file << " (" << i * 10 << "%)..." << std ::endl ; std ::this_thread::sleep_for(std ::chrono::milliseconds(400 )); } std ::cout << "Download complete: " << file << std ::endl ; return 404 ; }void interact () { std ::string name; std ::cin >> name; std ::cout << "Hi, " << name << std ::endl ; }int main () { std ::future <int > fret = std ::async([&] { return download("hello.zip" ); }); interact(); std ::cout << "Waiting for download complete..." << std ::endl ; fret.wait(); std ::cout << "Wait returned!" << std ::endl ; int ret = fret.get (); std ::cout << "Download result: " << ret << std ::endl ; return 0 ; }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 int main () { std ::future <int > fret = std ::async([&] { return download("hello.zip" ); }); interact(); while (true ) { std ::cout << "Waiting for download complete..." << std ::endl ; auto stat = fret.wait_for(std ::chrono::milliseconds(1000 )); if (stat == std ::future_status::ready ) { std ::cout << "Future is ready!!" << std ::endl ; break ; } else { std ::cout << "Future not ready!!" << std ::endl ; } } int ret = fret.get (); std ::cout << "Download result: " << ret << std ::endl ; return 0 ; }
std::async 的第一个参数可以设为
std::launch::deferred,这时不会创建一个线程来执行,他只会把 lambda
函数体内的运算推迟 到 future 的 get() 被调用时。也就是
main 中的 interact 计算完毕后。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 #include <iostream> #include <string> #include <thread> #include <future> int download (std ::string file) { for (int i = 0 ; i < 10 ; i++) { std ::cout << "Downloading " << file << " (" << i * 10 << "%)..." << std ::endl ; std ::this_thread::sleep_for(std ::chrono::milliseconds(400 )); } std ::cout << "Download complete: " << file << std ::endl ; return 404 ; }void interact () { std ::string name; std ::cin >> name; std ::cout << "Hi, " << name << std ::endl ; }int main () { std ::future <int > fret = std ::async(std ::launch::deferred, [&] { return download("hello.zip" ); }); interact(); int ret = fret.get (); std ::cout << "Download result: " << ret << std ::endl ; return 0 ; }
如果不想让 std::async 帮你自动创建线程,想要手动创建线程,可以直接用
然后在线程返回的时候,用 set_value() 设置返回值。在主线程里,用
get_future() 获取其 std::future 对象,进一步 get()
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 #include <iostream> #include <string> #include <thread> #include <future> int download (std ::string file) { for (int i = 0 ; i < 10 ; i++) { std ::cout << "Downloading " << file << " (" << i * 10 << "%)..." << std ::endl ; std ::this_thread::sleep_for(std ::chrono::milliseconds(400 )); } std ::cout << "Download complete: " << file << std ::endl ; return 404 ; }void interact () { std ::string name; std ::cin >> name; std ::cout << "Hi, " << name << std ::endl ; }int main () { std ::promise<int > pret; std ::thread t1 ([&] { auto ret = download("hello.zip" ); pret.set_value(ret); }) ; std ::future <int > fret = pret.get_future(); interact(); int ret = fret.get (); std ::cout << "Download result: " << ret << std ::endl ; t1.join(); return 0 ; }
future 对象,可以用 std::shared_future。
如果不需要返回值,std::async 里 lambda 的返回类型可以为 void, 这时
future 对象的类型为 std::future。
同理有 std::promise,他的 set_value()
调用 std::mutex 的 lock() 时,会检测 mutex
是否已经上锁。如果没有锁定,则对 mutex
进行上锁。如果已经锁定,则陷入等待,直到 mutex
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 #include <iostream> #include <string> #include <thread> #include <vector> #include <mutex> int main () { std ::vector <int > arr; std ::mutex mtx; std ::thread t1 ([&] { for (int i = 0 ; i < 1000 ; i++) { mtx.lock(); arr.push_back(1 ); mtx.unlock(); } }) ; std ::thread t2 ([&] { for (int i = 0 ; i < 1000 ; i++) { mtx.lock(); arr.push_back(2 ); mtx.unlock(); } }) ; t1.join(); t2.join(); return 0 ; }
std::lock_guard 就是这样一个工具类,他的构造函数里会调用
mtx.lock(),解构函数会调用 mtx.unlock()。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 #include <iostream> #include <string> #include <thread> #include <vector> #include <mutex> int main () { std ::vector <int > arr; std ::mutex mtx; std ::thread t1 ([&] { for (int i = 0 ; i < 1000 ; i++) { std ::lock_guard grd(mtx); arr.push_back(1 ); } }) ; std ::thread t2 ([&] { for (int i = 0 ; i < 1000 ; i++) { std ::lock_guard grd(mtx); arr.push_back(2 ); } }) ; t1.join(); t2.join(); return 0 ; }
std::lock_guard 严格在解构时 unlock(),但是有时候我们会希望提前
unlock()。这时可以用 std::unique_lock,他额外存储了一个 flag
表示是否已经被释放。可以直接调用 unique_lock 的 unlock()
函数来提前解锁。他会在解构检测 flag,如果没有释放,则调用
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 #include <iostream> #include <string> #include <thread> #include <vector> #include <mutex> int main () { std ::vector <int > arr; std ::mutex mtx; std ::thread t1 ([&] { for (int i = 0 ; i < 1000 ; i++) { std ::unique_lock grd(mtx); arr.push_back(1 ); } }) ; std ::thread t2 ([&] { for (int i = 0 ; i < 1000 ; i++) { std ::unique_lock grd(mtx); arr.push_back(2 ); grd.unlock(); printf ("outside of lock\n" ); } }) ; t1.join(); t2.join(); return 0 ; }
std::unique_lock 的构造函数还可以有一个额外参数,那就是
不会在构造函数中调用 mtx.lock(),需要之后再手动调用 grd.lock()
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 #include <iostream> #include <string> #include <thread> #include <vector> #include <mutex> int main () { std ::vector <int > arr; std ::mutex mtx; std ::thread t1 ([&] { for (int i = 0 ; i < 1000 ; i++) { std ::unique_lock grd(mtx); arr.push_back(1 ); } }) ; std ::thread t2 ([&] { for (int i = 0 ; i < 1000 ; i++) { std ::unique_lock grd(mtx, std ::defer_lock); printf ("before the lock\n" ); grd.lock(); arr.push_back(2 ); grd.unlock(); printf ("outside of lock\n" ); } }) ; t1.join(); t2.join(); return 0 ; }
也可以用无阻塞的 try_lock(),他在上锁失败时不会陷入等待,而是直接返回
false;如果上锁成功,则会返回 true。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 #include <cstdio> #include <mutex> std ::mutex mtx1;int main () { if (mtx1.try_lock()) printf ("succeed\n" ); else printf ("failed\n" ); if (mtx1.try_lock()) printf ("succeed\n" ); else printf ("failed\n" ); mtx1.unlock(); return 0 ; }
std::unique_lock 和 std::mutex 具有同样的接口。
Python 称为鸭子类型,而 C++ 称为
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 #include <iostream> #include <string> #include <thread> #include <mutex> int main () { std ::mutex mtx1; std ::mutex mtx2; std ::thread t1 ([&] { for (int i = 0 ; i < 1000 ; i++) { mtx1.lock(); mtx2.lock(); mtx2.unlock(); mtx1.unlock(); } }) ; std ::thread t2 ([&] { for (int i = 0 ; i < 1000 ; i++) { mtx2.lock(); mtx1.lock(); mtx1.unlock(); mtx2.unlock(); } }) ; t1.join(); t2.join(); return 0 ; }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 #include <iostream> #include <string> #include <thread> #include <mutex> int main () { std ::mutex mtx1; std ::mutex mtx2; std ::thread t1 ([&] { for (int i = 0 ; i < 1000 ; i++) { mtx1.lock(); mtx1.unlock(); mtx2.lock(); mtx2.unlock(); } }) ; std ::thread t2 ([&] { for (int i = 0 ; i < 1000 ; i++) { mtx2.lock(); mtx2.unlock(); mtx1.lock(); mtx1.unlock(); } }) ; t1.join(); t2.join(); return 0 ; }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 #include <iostream> #include <string> #include <thread> #include <mutex> int main () { std ::mutex mtx1; std ::mutex mtx2; std ::thread t1 ([&] { for (int i = 0 ; i < 1000 ; i++) { mtx1.lock(); mtx2.lock(); mtx2.unlock(); mtx1.unlock(); } }) ; std ::thread t2 ([&] { for (int i = 0 ; i < 1000 ; i++) { mtx1.lock(); mtx2.lock(); mtx2.unlock(); mtx1.unlock(); } }) ; t1.join(); t2.join(); return 0 ; }
解决3:用 std::lock 同时对多个上锁
可以用标准库的 std::lock(mtx1, mtx2, ...) 函数,一次性对多个 mutex
上锁。他接受任意多个 mutex
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 #include <iostream> #include <string> #include <thread> #include <mutex> int main () { std ::mutex mtx1; std ::mutex mtx2; std ::thread t1 ([&] { for (int i = 0 ; i < 1000 ; i++) { std ::lock(mtx1, mtx2); mtx1.unlock(); mtx2.unlock(); } }) ; std ::thread t2 ([&] { for (int i = 0 ; i < 1000 ; i++) { std ::lock(mtx2, mtx1); mtx2.unlock(); mtx1.unlock(); } }) ; t1.join(); t2.join(); return 0 ; }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 #include <iostream> #include <string> #include <thread> #include <mutex> int main () { std ::mutex mtx1; std ::mutex mtx2; std ::thread t1 ([&] { for (int i = 0 ; i < 1000 ; i++) { std ::scoped_lock grd(mtx1, mtx2); } }) ; std ::thread t2 ([&] { for (int i = 0 ; i < 1000 ; i++) { std ::scoped_lock grd(mtx2, mtx1); } }) ; t1.join(); t2.join(); return 0 ; }
同一个线程重复调用 lock() 也会造成死锁。即使只有一个线程一个锁,如果
lock() 以后又调用 lock(),也会造成死锁。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 #include <iostream> #include <mutex> std ::mutex mtx1;void other () { mtx1.lock(); mtx1.unlock(); }void func () { mtx1.lock(); other(); mtx1.unlock(); }int main () { func(); return 0 ; }
解决1:other 里不要再上锁
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 #include <iostream> #include <mutex> std ::mutex mtx1;void other () { }void func () { mtx1.lock(); other(); mtx1.unlock(); }int main () { func(); return 0 ; }
解决2:改用 std::recursive_mutex
他会自动判断是不是同一个线程 lock()
了多次同一个锁,如果是则让计数器加1,之后 unlock()
会让计数器减1,减到0时才真正解锁。但是相比普通的 std::mutex
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 #include <iostream> #include <mutex> std ::recursive_mutex mtx1;void other () { mtx1.lock(); mtx1.unlock(); }void func () { mtx1.lock(); other(); mtx1.unlock(); }int main () { func(); return 0 ; }
vector 不是多线程安全的容器。多个线程同时访问同一个 vector
先看看错误的方式:用一个类封装一下对 vector
的访问,使其访问都受到一个 mutex 的保护。会出错了 !因为
size() 是 const 函数,而 mutex::lock() 却不是 const 的。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 #include <iostream> #include <thread> #include <vector> #include <mutex> class MTVector { std ::vector <int > m_arr; std ::mutex m_mtx;public : void push_back (int val) { m_mtx.lock(); m_arr.push_back(val); m_mtx.unlock(); } # Error: mutex::lock() 却不是 const 的 size_t size () const { m_mtx.lock(); size_t ret = m_arr.size (); m_mtx.unlock(); return ret; } };int main () { MTVector arr; std ::thread t1 ([&] () { for (int i = 0 ; i < 1000 ; i++) { arr.push_back(i); } }) ; std ::thread t2 ([&] () { for (int i = 0 ; i < 1000 ; i++) { arr.push_back(1000 + i); } }) ; t1.join(); t2.join(); std ::cout << arr.size () << std ::endl ; return 0 ; }
解决方法:mutable ,让 this 为 const 时仅仅给 m_mtx
开后门,可以用 mutable 关键字修饰他,从而所有成员里只有他不是 const
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 class MTVector { std ::vector <int > m_arr; mutable std ::mutex m_mtx;public : void push_back (int val) { m_mtx.lock(); m_arr.push_back(val); m_mtx.unlock(); } size_t size () const { m_mtx.lock(); size_t ret = m_arr.size (); m_mtx.unlock(); return ret; } };
push_back() 需要修改数据,因需求此为拉,使用 lock() 和 unlock()
size() 则只要读取数据,不修改数据,因此可以和别人共享一起喝,使用
lock_shared() 和 unlock_shared() 的组合。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 #include <iostream> #include <thread> #include <vector> #include <shared_mutex> class MTVector { std ::vector <int > m_arr; mutable std ::shared_mutex m_mtx;public : void push_back (int val) { m_mtx.lock(); m_arr.push_back(val); m_mtx.unlock(); } size_t size () const { m_mtx.lock_shared(); size_t ret = m_arr.size (); m_mtx.unlock_shared(); return ret; } };int main () { MTVector arr; std ::thread t1 ([&] () { for (int i = 0 ; i < 1000 ; i++) { arr.push_back(i); } }) ; std ::thread t2 ([&] () { for (int i = 0 ; i < 1000 ; i++) { arr.push_back(1000 + i); } }) ; t1.join(); t2.join(); std ::cout << arr.size () << std ::endl ; return 0 ; }
class MTVector { std ::vector <int > m_arr; mutable std ::shared_mutex m_mtx;public : void push_back (int val) { std ::unique_lock grd (m_mtx) ; m_arr.push_back(val); } size_t size () const { std ::shared_lock grd (m_mtx) ; return m_arr.size (); } };
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 #include <iostream> #include <thread> #include <vector> #include <mutex> class MTVector { std ::vector <int > m_arr; std ::mutex m_mtx;public : class Accessor { MTVector &m_that; std ::unique_lock<std ::mutex> m_guard; public : Accessor(MTVector &that) : m_that(that), m_guard(that.m_mtx) {} void push_back (int val) const { return m_that.m_arr.push_back(val); } size_t size () const { return m_that.m_arr.size (); } }; Accessor access () { return {*this }; } };int main () { MTVector arr; std ::thread t1 ([&] () { auto axr = arr.access(); # Accessor 构造函数上锁 for (int i = 0 ; i < 1000 ; i++) { axr.push_back(i); } }) ; std ::thread t2 ([&] () { auto axr = arr.access(); for (int i = 0 ; i < 1000 ; i++) { axr.push_back(1000 + i); } }) ; t1.join(); t2.join(); std ::cout << arr.access().size () << std ::endl ; return 0 ; }
条件变量cv.wait(lck) 将会让当前线程陷入等待。在其他线程中调用
cv.notify_one() 则会唤醒那个陷入等待的线程。
cv.notify_one() 只会唤醒其中一个等待中的线程,而 cv.notify_all()
还可以额外指定一个参数,变成 cv.wait(lck, expr) 的形式,其中 expr
是个 lambda 表达式,只有其返回值为 true
时才会真正唤醒,否则继续等待。wait() 的过程中会暂时 unlock()
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 #include <iostream> #include <thread> #include <mutex> #include <condition_variable> int main () { std ::condition_variable cv; std ::mutex mtx; bool ready = false ; std ::thread t1 ([&] { std ::unique_lock lck(mtx); cv.wait(lck, [&] { return ready ; }); std ::cout << "t1 is awake" << std ::endl ; }) ; std ::cout << "notifying not ready" << std ::endl ; cv.notify_one(); ready = true ; std ::cout << "notifying ready" << std ::endl ; cv.notify_one(); t1.join(); return 0 ; }
std::condition_variable 必须和 std::unique_lock<std::mutex>
一起用。因为要保证多个线程被唤醒时 ( cv.notify_all()
std::condition_variable 仅仅支持 std::unique_lock<std::mutex>
作为 wait 的参数,如果需要用其他类型的 mutex 锁,可以用
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 #include <iostream> #include <thread> #include <vector> #include <mutex> #include <condition_variable> int main () { std ::condition_variable cv; std ::mutex mtx; std ::thread t1 ([&] { std ::unique_lock lck(mtx); cv.wait(lck); std ::cout << "t1 is awake" << std ::endl ; }) ; std ::thread t2 ([&] { std ::unique_lock lck(mtx); cv.wait(lck); std ::cout << "t2 is awake" << std ::endl ; }) ; std ::thread t3 ([&] { std ::unique_lock lck(mtx); cv.wait(lck); std ::cout << "t3 is awake" << std ::endl ; }) ; std ::this_thread::sleep_for(std ::chrono::milliseconds(400 )); std ::cout << "notifying one" << std ::endl ; cv.notify_one(); std ::this_thread::sleep_for(std ::chrono::milliseconds(400 )); std ::cout << "notifying all" << std ::endl ; cv.notify_all(); t1.join(); t2.join(); t3.join(); return 0 ; }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 #include <iostream> #include <thread> #include <vector> #include <mutex> #include <condition_variable> template <class T >class MTQueue { std ::condition_variable m_cv; std ::mutex m_mtx; std ::vector <T> m_arr;public : T pop () { std ::unique_lock lck (m_mtx) ; m_cv.wait(lck, [this ] { return !m_arr.empty(); }); T ret = std ::move (m_arr.back()); m_arr.pop_back(); return ret; } auto pop_hold () { std ::unique_lock lck (m_mtx) ; m_cv.wait(lck, [this ] { return !m_arr.empty(); }); T ret = std ::move (m_arr.back()); m_arr.pop_back(); return std ::pair(std ::move (ret), std ::move (lck)); } void push (T val) { std ::unique_lock lck (m_mtx) ; m_arr.push_back(std ::move (val)); m_cv.notify_one(); } void push_many (std ::initializer_list <T> vals) { std ::unique_lock lck (m_mtx) ; std ::copy( std ::move_iterator(vals.begin ()), std ::move_iterator(vals.end ()), std ::back_insert_iterator(m_arr)); m_cv.notify_all(); } };int main () { MTQueue<int > foods; std ::thread t1 ([&] { for (int i = 0 ; i < 2 ; i++) { auto food = foods.pop(); std ::cout << "t1 got food:" << food << std ::endl ; } }) ; std ::thread t2 ([&] { for (int i = 0 ; i < 2 ; i++) { auto food = foods.pop(); std ::cout << "t2 got food:" << food << std ::endl ; } }) ; foods.push(42 ); foods.push(233 ); foods.push_many({666 , 4399 }); t1.join(); t2.join(); return 0 ; }
多个线程同时往一个 int 变量里累加,这样肯定会出错,因为 counter += i
在 CPU 看来会变成三个指令:
读取 counter 变量到 rax 寄存器
rax 寄存器的值加上 1
把 rax 写入到 counter 变量
现代 CPU
现代 CPU
int 变量而已,用昂贵的 mutex 严重影响了效率。
更轻量级的 atomic,对他的 += 等操作,会被编译器转换成专门的指令。
(atomic) 的,不可分割的。
只需把 int 改成 atomic 即可。写法也有讲究:
counter = counter + 1 ; counter += 1 ; counter++;
fetch_add 对应于 +=;store 对应于 =;load 用于读取其中的 int 值。
int old = atm.fetch_add(val)
除了会导致 atm 的值增加 val 外,还会返回 atm 增加前的值,存储到
当然这里也可以 counter++,不过要追加多个的话还是得用到
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 #include <iostream> #include <thread> #include <atomic> #include <vector> int main () { std ::atomic<int > counter; counter.store(0 ); std ::vector <int > data (20000 ) ; std ::thread t1 ([&] { for (int i = 0 ; i < 10000 ; i++) { int index = counter.fetch_add(1 ); data[index] = i; } }) ; std ::thread t2 ([&] { for (int i = 0 ; i < 10000 ; i++) { int index = counter.fetch_add(1 ); data[index] = i + 10000 ; } }) ; t1.join(); t2.join(); std ::cout << data[10000 ] << std ::endl ; return 0 ; }
exchange(val) 会把 val 写入原子变量,同时返回其旧的值。
#include <iostream> #include <atomic> int main () { std ::atomic<int > counter; counter.store(0 ); int old = counter.exchange(3 ); std ::cout << "old=" << old << std ::endl ; int now = counter.load(); std ::cout << "cnt=" << now << std ::endl ; return 0 ; }
compare_exchange_strong(old, val) ,会读取原子变量的值,比较他是否和
old 相等。如果不相等,则把原子变量的值写入 old。如果相等,则把 val
写入原子变量。返回一个 bool 值,表示是否相等。注意 old
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 #include <iostream> #include <atomic> int main () { boolalpha(std ::cout ); std ::atomic<int > counter; counter.store(2 ); int old = 1 ; bool equal = counter.compare_exchange_strong(old, 3 ); std ::cout << "equal=" << equal << std ::endl ; std ::cout << "old=" << old << std ::endl ; int now = counter.load(); std ::cout << "cnt=" << now << std ::endl ; equal = counter.compare_exchange_strong(old, 3 ); std ::cout << "equal=" << equal << std ::endl ; std ::cout << "old=" << old << std ::endl ; now = counter.load(); std ::cout << "cnt=" << now << std ::endl ; return 0 ; }
compare_exchange_strong 的逻辑,一般简称 CAS
(compare-and-swap),他是并行编程最常用的原子操作之一。实际上任何 atomic
操作,包括 fetch_add,都可以基于 CAS 来实现:这就是 Taichi 实现浮点数
atomic_add 的方法。
开源工具:因特尔开源的并行编程库TBB(注意使用 2021.5
之前的版本,而不是最近改名成 OneTBB 的版本)
sudo apt-get install libtbb-dev
提供了并发版本的STL容器, tbb::concurrent_vector、tbb::concurrent_map
git 上下载代码库,进行简单测试。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 #include <iostream> #include <vector> #include <cmath> #include <benchmark/benchmark.h> constexpr size_t n = 1 <<27 ;std ::vector <float > a (n) ;void BM_for (benchmark::State &bm) { for (auto _: bm) { for (size_t i = 0 ; i < a.size (); i++) { a[i] = std ::sin (i); } } } BENCHMARK(BM_for);void BM_reduce (benchmark::State &bm) { for (auto _: bm) { float res = 0 ; for (size_t i = 0 ; i < a.size (); i++) { res += a[i]; } benchmark::DoNotOptimize(res); } } BENCHMARK(BM_reduce); BENCHMARK_MAIN();
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 cmake_minimum_required (VERSION 3.10 )set (CMAKE_CXX_STANDARD 17 )set (CMAKE_BUILD_TYPE Release)project (main LANGUAGES CXX)add_executable (main main.cpp)find_package (TBB REQUIRED)target_link_libraries (main PUBLIC TBB::tbb)set (BENCHMARK_ENABLE_TESTING OFF CACHE BOOL "Turn off the fking test!" )add_subdirectory (benchmark)target_link_libraries (main PUBLIC benchmark)