一. 互斥量
(一)Mutex系列類
1. std::mutex:獨占的互斥量,不能遞歸使用。
2. std::recursive_mutex:遞歸互斥量。允許同一線程多次獲得該互斥鎖,可以用來解決同一線程需要多次獲取互斥量時死鎖的問題。
3. std::time_mutex和std::recursive_time_mutex:帶超時的互斥量。前者是超時的獨占鎖,后者為超時的遞歸鎖。主要用於獲取鎖時增加超時等待功能,因為有時不知道獲取鎖需要多久,為了不至於一直等待下去,就設置一個等待超時時間。比std::mutex多了兩個超時獲取鎖的接口:try_lock_for和try_lock_until

//1. 獨占互斥鎖,不能遞歸使用 class mutex { public: //std::mutex不支持copy和move操作。 mutex(const mutex&) = delete; mutex& operator=(const mutex&) = delete; constexpr mutex() noexcept; //構造函數:新的對象是未鎖的 ~mutex(); void lock(); //上鎖 void unlock(); //解鎖 bool try_lock(); //嘗試上鎖。成功,返回true。失敗時返回false,但不阻塞。會有三種情況 //(1)如果當前互斥量沒被其他線程占有,則鎖住互斥量,直到該線程調用unlock //(2)如果當前互斥量被其他線程占用,則調用線程返回false,且不會被阻塞 //(3)如果互斥量己被當前線程鎖住,則會產生死鎖 }; //2. 遞歸互斥鎖可以被同一個線程多次加鎖(增加鎖計數),以獲得對互斥鎖對象的多層所有權。 //可以用來解決同一線程需要多次獲取互斥量時死鎖的問題 class recursive_mutex { public : recursive_mutex(const recursive_mutex&) = delete; recursive_mutex& operator=(const recursive_mutex&) = delete; recursive_mutex() noexcept; ~recursive_mutex(); void lock(); void unlock(); //釋放互斥量時需要調用與該鎖層次深度相同次數的unlock(),即lock()次數和unlock()次數相同 bool try_lock() noexcept; }; //3. 帶超時的互斥鎖,不能遞歸使用 class timed_mutex { public : timed_mutex(const timed_mutex&) = delete; timed_mutex& operator=(const timed_mutex&) = delete; timed_mutex(); ~timed_mutex(); void lock(); void unlock(); bool try_lock(); //在指定的relative_time時間內,嘗試獲取*this上的鎖。當relative_time.count()<=0時 //將立即返回,就像調用try_lock()樣。否則會阻塞,直到獲取鎖或超過給定的relative_time時間 //當鎖被調用線程獲取,返回true;反之,返回false. template<typename Rep, typename Period> bool try_lock_for(const std::chrono::duration<Rep, Period>& relative_time); //在指定的absolute_time時間內,嘗試獲取*this上的鎖。當absolute_time<=clock::now()時 //將立即返回,就像調用try_lock()一樣。否則會阻塞,直到獲取鎖或Clock::now()返回的 //時間等於或超過給定的absolute_time的時間。返回值含義與try_lock_for相同。 template<typename Clock, typename Duration> bool try_lock_until(const std::chrono::time_point<Clock, Duration>& absolute_time); }; //4. 帶定時的遞歸互斥鎖 class recursive_timed_mutex { public : recursive_timed_mutex(const recursive_timed_mutex&) = delete; recursive_timed_mutex& operator=(const recursive_timed_mutex&) = delete; recursive_timed_mutex(); ~recursive_timed_mutex(); void lock(); void unlock(); bool try_lock() noexcept; template<typename Rep, typename Period> bool try_lock_for(const std::chrono::duration<Rep, Period>& relative_time); template<typename Clock, typename Duration> bool try_lock_until(const std::chrono::time_point<Clock, Duration>& absolute_time); };
(二)注意事項
1. lock和unlock必須成對出現,否則可能引起未定義行為。為了防止出現這種行為,可用std::lock_guard、std::unique_lock等RAII方式來使用mutex。
2. 盡量不要使用遞歸鎖,主要原因如下:
(1)需要用到遞歸鎖定的多線程互斥處理往往本身就可以簡化,允許遞歸互斥很容易放縱復雜邏輯的產生,從而導致一些多線程同步引起的晦澀問題。
(2)遞歸鎖比起非遞歸鎖,效率會更低一些。
(3)遞歸鎖雖然允許同一線程多次獲得同一個互斥量,可重復獲得最大次數並未具體說明,一旦超過一定次數,再對lock進行調用就會拋出std::system錯誤。
3. 互斥量不允許拷貝,也不允許移動。新創建的互斥量對象是未上鎖的。
4. 多線程中,對多個互斥量須遵循同樣的順序進行加鎖,否則可能會產生死鎖現象。
【編程實驗】mutex系列
#include <iostream> #include <mutex> #include <thread> #include <chrono> #include <list> //1. 獨占互斥量 std::mutex g_mtx; int tickets = 100; //1.1 多線程模擬火車站多窗口售票 void sell_ticket() { while (true) { g_mtx.lock(); if (tickets > 0) { std::cout << "thread id(" << std::this_thread::get_id() << ") sell ticket: " << 101 - tickets-- << std::endl; } else { g_mtx.unlock(); break; } g_mtx.unlock(); std::this_thread::sleep_for(std::chrono::milliseconds(10)); } } //1.2 消息處理系統 class MsgManage { std::mutex mtx; std::list<int> lst; public: MsgManage(){} ~MsgManage() {} void InMsg() { for (int i = 0; i < 100; ++i) { mtx.lock(); std::cout << "insert element: " << i << std::endl; lst.push_back(i); mtx.unlock(); std::this_thread::sleep_for(std::chrono::milliseconds(3)); } } void outMsg() { while (true) { int num = 0; mtx.lock(); if (!lst.empty()) { num = lst.front(); lst.pop_front(); std::cout << "remove element: " << num << std::endl; }else { std::cout << "message queue is empty!" << std::endl; } if (num == 99) { mtx.unlock(); break;} mtx.unlock(); std::this_thread::sleep_for(std::chrono::milliseconds(4)); } } }; //2. 遞歸互斥量 struct Complex { std::recursive_mutex rmtx; int i; Complex(int i):i(i){} void mul(int x) { rmtx.lock(); i *= x; rmtx.unlock(); } void div(int x) { rmtx.lock(); i /= x; rmtx.unlock(); } void both(int x, int y) { rmtx.lock(); mul(x); //遞歸,會使得同一線程兩次獲取互斥量 div(y); rmtx.unlock(); } }; //3. 帶有超時的互斥量 std::timed_mutex tmutex; void work() { std::chrono::microseconds timeout(100); while (true) { if (tmutex.try_lock_for(timeout)) { std::cout << std::this_thread::get_id() << ": do work with this mutex" << std::endl; std::chrono::microseconds sleepDuration(250); std::this_thread::sleep_for(sleepDuration); tmutex.unlock(); }else { std::cout << std::this_thread::get_id() << ": do work without mutex" << std::endl; std::chrono::microseconds sleepDuration(100); std::this_thread::sleep_for(sleepDuration); } } } using namespace std; int main() { //1. 獨占互斥量 std::thread ths[10]; for (int i = 0; i < 10; ++i) { ths[i] = std::thread(sell_ticket); } for (auto& th : ths) { th.join(); } MsgManage manage; std::thread outMsg(&MsgManage::outMsg, &manage); std::thread inMsg(&MsgManage::InMsg, &manage); inMsg.join(); outMsg.join(); //2. 遞歸互斥量 Complex cmpl(1); cmpl.both(32, 23); //3. 帶超時的互斥量 std::thread t1(work); std::thread t2(work); t1.join(); t2.join(); return 0; }
二. 自解鎖
(一)std::lock_guard

//空的標記類 struct adopt_lock_t {}; //常量對象 constexpr adopt_lock_t adopt_lock {}; // CLASS TEMPLATE lock_guard template <class _Mutex> class lock_guard { //利用析構函數解鎖互斥量 public: using mutex_type = _Mutex; explicit lock_guard(_Mutex& _Mtx) : _MyMutex(_Mtx) { //構造,並加鎖 _MyMutex.lock(); } lock_guard(_Mutex& _Mtx, adopt_lock_t) : _MyMutex(_Mtx) { //構造,但不加鎖 } ~lock_guard() noexcept { // unlock _MyMutex.unlock(); } lock_guard(const lock_guard&) = delete; lock_guard& operator=(const lock_guard&) = delete; private: _Mutex& _MyMutex; };
1. 利用RAII技術在析構函數中對Mutex自動解鎖。但要注意,lock_guard並不負責管理Mutex的生命期。在lock_gurad對象的生命周期內,它所管理的Mutex對象會一直保持上鎖狀態,直至生命周期結束后才被解鎖。不需要,也無法手動通過lock_gurad對Mutex進行上鎖和解鎖操作。從總體上而言,沒有給程序員提供足夠的靈活度來對互斥量的進行上鎖和解鎖控制。
2. 它有兩個重載的構造函數,其中lock_gurad(_Mutex&)會自動對_Mutex進行加鎖,而lock_gurad(_Mutex&,adopt_lock_t)則只構造但不加鎖,因此需要在某個時候通過調用_Mutex本身的lock()進行上鎖 (說明:adopt_lock_t是個空的標簽類,起到通過標簽來重載構造函數的作用)。
3.模板參數表示互斥量類型。如std::mutex、std::recursive_mutex、std::timed_mutex,std::recursive_timed_mutex等。這些互斥量只有幾種基本操作:lock和unlock,以及try_lock系列函數。
4. lock_guard對象不可拷貝和移動。
(二)std::unique_lock

//空的標記類 struct adopt_lock_t {}; struct defer_lock_t {}; struct try_to_lock_t {}; //常量對象 constexpr adopt_lock_t adopt_lock {}; constexpr defer_lock_t defer_lock {}; constexpr try_to_lock_t try_to_lock {}; // CLASS TEMPLATE unique_lock template <class _Mutex> class unique_lock { // 在析構函數中自動解鎖mutex public: using mutex_type = _Mutex; // CONSTRUCT, ASSIGN, AND DESTROY unique_lock() noexcept : _Pmtx(nullptr), _Owns(false) { // 默認構造函數 } explicit unique_lock(_Mutex& _Mtx) : _Pmtx(_STD addressof(_Mtx)), _Owns(false) { // 構造並上鎖。 _Pmtx->lock(); //如果其他unique_lock己擁有該_Mtx,則會阻塞等待 _Owns = true; //成功獲取鎖,擁有鎖的所有權。 } unique_lock(_Mutex& _Mtx, adopt_lock_t) : _Pmtx(_STD addressof(_Mtx)), _Owns(true) { // 構造,並假定己上鎖(mutex需要在外面事先被鎖住)。注意擁有鎖的所有權 } unique_lock(_Mutex& _Mtx, defer_lock_t) noexcept : _Pmtx(_STD addressof(_Mtx)), _Owns(false) { // 構造,但不上鎖。false表示並未取得鎖的所有權。 } unique_lock(_Mutex& _Mtx, try_to_lock_t) : _Pmtx(_STD addressof(_Mtx)), _Owns(_Pmtx->try_lock()) { // 構造,並嘗試上鎖。如果上鎖不成功,並不會阻塞當前線程 } template <class _Rep, class _Period> unique_lock(_Mutex& _Mtx, const chrono::duration<_Rep, _Period>& _Rel_time) //在給定的時長內嘗試獲取鎖。 : _Pmtx(_STD addressof(_Mtx)), _Owns(_Pmtx->try_lock_for(_Rel_time)) { // construct and lock with timeout } template <class _Clock, class _Duration> unique_lock(_Mutex& _Mtx, const chrono::time_point<_Clock, _Duration>& _Abs_time) //在給定的時間點內嘗試獲取鎖 : _Pmtx(_STD addressof(_Mtx)), _Owns(_Pmtx->try_lock_until(_Abs_time)) { // construct and lock with timeout } unique_lock(_Mutex& _Mtx, const xtime* _Abs_time) : _Pmtx(_STD addressof(_Mtx)), _Owns(false) { // try to lock until _Abs_time _Owns = _Pmtx->try_lock_until(_Abs_time); } //支持移動構造 unique_lock(unique_lock&& _Other) noexcept : _Pmtx(_Other._Pmtx), _Owns(_Other._Owns) { // 移動拷貝,destructive copy _Other._Pmtx = nullptr; //失去對原mutex的所有權 _Other._Owns = false; } //支持移動賦值 unique_lock& operator=(unique_lock&& _Other) { //移動賦值, destructive copy if (this != _STD addressof(_Other)) { // different, move contents if (_Owns) { _Pmtx->unlock(); } _Pmtx = _Other._Pmtx; _Owns = _Other._Owns; _Other._Pmtx = nullptr; _Other._Owns = false; } return *this; } ~unique_lock() noexcept { // clean up if (_Owns) { _Pmtx->unlock(); //析構函數中解鎖 } } unique_lock(const unique_lock&) = delete; unique_lock& operator=(const unique_lock&) = delete; void lock() { // lock the mutex _Validate(); _Pmtx->lock(); _Owns = true; } _NODISCARD bool try_lock() { // try to lock the mutex _Validate(); _Owns = _Pmtx->try_lock(); return _Owns; } template <class _Rep, class _Period> _NODISCARD bool try_lock_for(const chrono::duration<_Rep, _Period>& _Rel_time) { // try to lock mutex for _Rel_time _Validate(); _Owns = _Pmtx->try_lock_for(_Rel_time); return _Owns; } template <class _Clock, class _Duration> _NODISCARD bool try_lock_until( const chrono::time_point<_Clock, _Duration>& _Abs_time) { // try to lock mutex until _Abs_time _Validate(); _Owns = _Pmtx->try_lock_until(_Abs_time); return _Owns; } _NODISCARD bool try_lock_until(const xtime* _Abs_time) { // try to lock the mutex until _Abs_time _Validate(); _Owns = _Pmtx->try_lock_until(_Abs_time); return _Owns; } void unlock() { // try to unlock the mutex if (!_Pmtx || !_Owns) { _THROW(system_error(_STD make_error_code(errc::operation_not_permitted))); } _Pmtx->unlock(); _Owns = false; } void swap(unique_lock& _Other) noexcept { // swap with _Other _STD swap(_Pmtx, _Other._Pmtx); _STD swap(_Owns, _Other._Owns); } _Mutex* release() noexcept { // 返回指向它所管理的 Mutex 對象的指針,並釋放所有權 _Mutex* _Res = _Pmtx; _Pmtx = nullptr; _Owns = false; return _Res; } _NODISCARD bool owns_lock() const noexcept { // 返回當前 std::unique_lock 對象是否獲得了鎖 return _Owns; } explicit operator bool() const noexcept { // 返回當前 std::unique_lock 對象是否獲得了鎖 return _Owns; } _NODISCARD _Mutex* mutex() const noexcept { // return pointer to managed mutex return _Pmtx; } private: _Mutex* _Pmtx; bool _Owns; //是否擁有鎖(當mutex被lock時,為true;否則為false) void _Validate() const { // check if the mutex can be locked if (!_Pmtx) { _THROW(system_error(_STD make_error_code(errc::operation_not_permitted))); } if (_Owns) { _THROW(system_error(_STD make_error_code(errc::resource_deadlock_would_occur))); } } };
1. 以獨占所有權的方式管理Mutex對象的上鎖和解鎖操作,即沒有其他的unique_lock對象同時擁有某個Mutex對象的所有權。同時要注意,unique_lock並不負責管理Mutex的生命期。
2. 與std::lock_guard一樣,在unique_lock生命期結束后,會對其所管理的Mute進行解鎖(注意:unique_lock只對擁有所有權的mutex才會在析構函數中被自動unlock)。但unique_lock比lock_guard使用更加靈活,功能更加強大
3. unique_lock模板參數的類型與lock_guard類型的含義相同。
4. unique_lock的構造函數:
(1)unique_lock()默認構造函數:新創建的unique_lock對象不管理任何Mute對象。
(2)unique_lock(_Mutex& m):構造並上鎖。如果此時某個另外的unique_lock己管理m對象,則當前線程會被阻塞。
(3)unique_lock(_Mutex& m, adopt_lock_t):構造,並假定m己上鎖。(m需要事先被上鎖,構造結束后unique_lock就擁有m的所有權)
(4)unique_lock(_Mutex& _Mtx, defer_lock_t):構造,但不上鎖。對象創建以后,可以手動調用unique_lock的lock來上鎖,才擁有_Mtx的所有權(再次強調一下,只有擁有所有權的mutex才會在析構函數中被自動unlock)
(5)unique_lock(_Mutex& _Mtx, try_to_lock_t):構造,並嘗試上鎖。如果上鎖不成功,並不會阻塞當前線程。
(6)在給定時長內或給定時間點嘗試獲鎖,並構造unique_lock對象。
5.上鎖/解鎖操作:lock,try_lock,try_lock_for,try_lock_until 和 unlock
(三)std::lock和std::scoped_lock
1. void std::lock(lock0&,lock1&,…,lockN&) 函數模板:
(1)std::lock會嘗試同時對多個可鎖定對象加鎖,它使用免死鎖算法避免死鎖。若任何一個不可用則阻塞。函數返回后,所有可鎖定對象己全部加鎖。如果期間發生異常,則該異常會被拋出,同時釋放所有己獲取的鎖。
(2)由於std::lock是個函數模板,它會為可鎖定對象加鎖,但不會並自動解鎖。因此需要為每個可鎖定對象手工解鎖,也可以用帶有adopt_lock參數的std::lock_guard,或者使用defer_lock參數的unique_lock來輔助解鎖操作。
2. std::scoped_lock類模板(C++17支持)

// CLASS TEMPLATE scoped_lock template <class... _Mutexes> class scoped_lock { // class with destructor that unlocks mutexes public: explicit scoped_lock(_Mutexes&... _Mtxes) : _MyMutexes(_Mtxes...) { // construct and lock _STD lock(_Mtxes...); } explicit scoped_lock(adopt_lock_t, _Mutexes&... _Mtxes) : _MyMutexes(_Mtxes...) { // construct but don't lock } ~scoped_lock() noexcept { // unlock all _For_each_tuple_element(_MyMutexes, [](auto& _Mutex) noexcept { _Mutex.unlock(); }); } scoped_lock(const scoped_lock&) = delete; scoped_lock& operator=(const scoped_lock&) = delete; private: tuple<_Mutexes&...> _MyMutexes; };
(1)是個類模板,它封裝了對std::lock的操作,可以避免對多個互斥量加鎖時造成的死鎖現象。同時利用RAII技術,實現對象析構時自動對這些互斥量進行解鎖。
(2)構造函數:
①scoped_lock(_Mutexes&... _Mtxes):構造scoped_lock,並上鎖。
②scoped_lock(adopt_lock_t, _Mutexes&... _Mtxes):構造但不上鎖。
(3)scoped_lock對象不可復制和移動
【編程實驗】自解鎖系列
#include <iostream> #include <mutex> #include <thread> #include <chrono> #include <vector> using namespace std; //1. std::lock與std::lock_guard或std::unique_lock配合使用 struct Box { explicit Box(int num) : num_things(num) {}; int num_things; std::mutex m; }; void transfer(Box& from, Box& to, int num) { //1. 方法1:使用lock_guard為多個mutex加鎖 //std::lock(from.m, to.m); //注意,這里傳入的是mutex對象。(要先加鎖) //std::lock_guard<std::mutex> lck1(from.m, std::adopt_lock); //adopt_lock:己假定from.m被上鎖 //std::lock_guard<std::mutex> lck2(to.m, std::adopt_lock); //2. 方法2:使用unique_lock為多個mutex加鎖 //必須使用std::defer_lock,而不是std::adopt_lock參數。因為當傳入adopt_lock時,表示己假定事先己上鎖, //就不能再為其加鎖,否則會拋出異常。而std::defer_lock表示加鎖操作會延遲到后續調用std::lock()時。 std::unique_lock<std::mutex> lck1(from.m, std::defer_lock); //defer:未實際加鎖 std::unique_lock<std::mutex> lck2(to.m, std::defer_lock); //同上 //同時鎖定兩個lock,而不死鎖 std::lock(lck1, lck2); //會調用lck1和lck2的lock()。注意,這里傳入的是unique_lock對象 from.num_things -= num; to.num_things += num; //from.m 與to.m的互斥解鎖於unique_lock或lock_guard的析構函數 } //2. 使用std::scoped_lock來鎖定多個互斥量 struct Employee { std::string id; std::vector<std::string> lunch_partners; //午餐合伙人 std::mutex m; std::string output() const { std::string ret = "Employee " + id + " has lunch partners: "; for (const auto& parner : lunch_partners) ret += parner + " "; return ret; } Employee(std::string id) :id(id){} }; void send_mail(Employee&, Employee&) { //模擬耗時的發信操作 std::this_thread::sleep_for(std::chrono::seconds(1)); } //分配午餐合伙人 void assign_lunch_partner(Employee& e1, Employee& e2) { static std::mutex io_mutex; //用於同步std::cout/std::endl操作 { std::lock_guard<std::mutex> lk(io_mutex); std::cout << e1.id << " and " << e2.id << " are waiting for locks" << std::endl; } //scoped_lock可以避免像使用std::lock/std::unique_lock那樣繁瑣的使用方式。 //使用scoped_lock可同時獲取兩個鎖,而不必擔心對assign_lunch_partner的其它調用會造成死鎖 { std::scoped_lock lk(e1.m, e2.m); { std::lock_guard<std::mutex> lk(io_mutex); std::cout << e1.id << " and " << e2.id << " got locks" << std::endl; } e1.lunch_partners.push_back(e2.id); e2.lunch_partners.push_back(e1.id); } send_mail(e1, e2); send_mail(e2, e1); } int main() { //1. std::lock的使用(配合std::lock_guard或std::unique_lock使用) Box b1(100); Box b2(50); std::thread t1(transfer, std::ref(b1), std::ref(b2), 10); std::thread t2(transfer, std::ref(b1), std::ref(b2), 5); t1.join(); t2.join(); std::cout << "box 1 num = " << b1.num_things << std::endl; std::cout << "box 2 num = " << b2.num_things << std::endl; //2. scoped_lock的使用: Employee alice("alice"), bob("bob"), christina("christina"), dave("dave"); //在線程中指派合伙人,因為內部的發郵件給用戶操作會消耗較長時間。 std::vector<std::thread> threads; threads.emplace_back(assign_lunch_partner, std::ref(alice), std::ref(bob)); threads.emplace_back(assign_lunch_partner, std::ref(christina), std::ref(bob)); threads.emplace_back(assign_lunch_partner, std::ref(christina), std::ref(alice)); threads.emplace_back(assign_lunch_partner, std::ref(dave), std::ref(bob)); for (auto& th : threads) th.join(); std::cout << alice.output() << endl; std::cout << bob.output() << endl; std::cout << christina.output() << endl; std::cout << dave.output() << endl; return 0; } /*輸出結果 box 1 num = 85 box 2 num = 65 alice and bob are waiting for locks alice and bob got locks christina and alice are waiting for locks christina and alice got locks christina and bob are waiting for locks christina and bob got locks dave and bob are waiting for locks dave and bob got locks Employee alice has lunch partners: bob christina Employee bob has lunch partners: alice christina dave Employee christina has lunch partners: alice bob Employee dave has lunch partners: bob */