1. 條件變量概述
多線程訪問一個共享資源(或稱臨界區),不僅需要用互斥鎖實現獨享訪問避免並發錯誤,在獲得互斥鎖進入臨界區后,有時還需檢查特定條件是否成立。
當某個線程修改測試條件后,將通知其它正在等待條件的線程繼續往下執行。
條件變量需要和一個互斥鎖綁定,這個互斥鎖的作用為:a. 互斥地訪問臨界資源。 b. 保護測試條件。
1)wait線程從條件不滿足,等待到重新執行過程,以 pthread_cond_wait 為例。
a. (wait前必須先加鎖)調用線程將自己放入等待隊列,mutex解鎖。(調用線程己加入等待隊列並解鎖,此時,允許其他線程改變“測試條件”)
b. 掛起,等待pthread_cond_signal或pthread_cond_broadcast去喚醒。(其他線程改變測試條件,當條件滿足時會發出通知)
c. 被喚醒,mutex加鎖。
關於條件變量的幾個問題:
(1) 為什么在pthread_cond_wait之前需要加鎖?
mutex是用來保護“測試條件”的,調用者將mutex傳遞給pthread_cond_wait,該函數內部會自動將調用線程放到等待隊列中,然后再解鎖mutex,
並等待“測試條件”成立。這種做法關閉了從我們檢測“測試條件”的時刻到將線程放入到等待隊列之間的這段“時間窗口”,使得“測試條件”
在線程加入等待隊列之前不會被其他線程修改,從而確保調用線程不會錯過“測試條件”的改變。最后,當pthread_cond_wait返回前,mutex又被上鎖了。
(2) 為什么使用while語句來循環判斷“測試條件”而不使用if語句?
線程API存在一個事實(很多語言中都如此,不僅僅是C++),就是即使在沒有通知條件變量的情況下線程也可能被喚醒,這樣的喚醒稱為虛假喚醒
(spurious wakeups),但此時“測試條件”往往並沒有被滿足。因此正確的做法是,通過while循環確認等待的“測試條件”是否確己發生並將其作
為喚醒后的首個動作來處理,一旦確認是“虛假喚醒”則繼續wait等待。而如果使用if語句,則喚醒后無法進行這種確認從而可能導致錯誤。
(3)pthread_cond_signal 和 pthread_mutex_unlock順序問題
a. pthread_cond_signal放於pthread_mutex_unlock之前
在上面對wait線程的解析中,我們可以看到,wait線程被喚醒后是會對mutex重新加鎖的,但此時鎖可能還沒有被notify線程釋放(會發生這
種現象就是因為系統對線程的調度),會造成等待線程從內核中喚醒然后又回到內核空間(因為cond_wait返回后會有原子加鎖的行為),
所以一來一回會有性能的問題。但在Linux中推薦使用這種模式。
b. pthread_cond_signal放於pthread_mutex_unlock之后
不會出現之前說的那個潛在的性能損耗,因為在signal之前就已經釋放鎖了。但如果unlock和signal之前,有個低優先級的線程正在mutex上
等待的話,那么這個低優先級的線程就會搶占高優先級的線程(cond_wait的線程)。
2)notify線程:在wait線程阻塞期間,notify線程獲取互斥鎖並進入臨界區內訪問共享資源,然后改變測試條件,當條件滿足時通知在條件變
量上等待的wait線程。wait線程確認條件成立后重新申請對該互斥鎖加鎖,否則繼續等待。
條件變量類部分定義如下:
class condition_variable { public: using native_handle_type = _Cnd_t; condition_variable() { _Cnd_init_in_situ(_Mycnd()); } // 默認構造函數 ~condition_variable() noexcept { _Cnd_destroy_in_situ(_Mycnd()); } // 析構函數 condition_variable(const condition_variable&) = delete; condition_variable& operator=(const condition_variable&) = delete; // 不可復制和移動 void notify_one() noexcept { _Check_C_return(_Cnd_signal(_Mycnd())); } // 喚醒一個等待線程 void notify_all() noexcept { _Check_C_return(_Cnd_broadcast(_Mycnd())); } // 喚醒所有的等待線程 void wait(unique_lock<mutex>& _Lck) { // 等待,直到被喚醒 _Check_C_return(_Cnd_wait(_Mycnd(), _Lck.mutex()->_Mymtx())); } template <class _Predicate> void wait(unique_lock<mutex>& _Lck, _Predicate _Pred) { // 等待信號並測試條件 while (!_Pred()) { // 判斷測試條件,只有當Pred不成立時才阻塞 wait(_Lck); } } };
使用條件變量的wait線程基本流程:
2. mutex+condition_variable實現信號量
1)P和V操作:信號量是一個整數 count,提供兩個原子(atom,不可分割)操作:P 操作和 V 操作,或是說 wait 和 signal 操作。
a. P操作 (wait操作):count 減1,如果 count < 0 那么掛起執行線程。
--count; //表示申請一個資源 if (count < 0) //表示沒有空閑資源 { 調用進程進入等待隊列Queue; 阻塞進程; }
b. V操作 (signal操作):count 加1,如果 count <= 0 那么喚醒一個執行線程。
++count; //表示釋放一個資源 if (count <= 0) //表示有進程處於阻塞狀態 { 從等待隊列Queue中取出一個進程P; 進程P進入就緒隊列; }
來一個進程取一把鎖(count減1),如果發現鎖的數量小於0,即沒有鎖了? 於是只能進行(wait),直到有其它進程釋放出一把鎖為止。
進程的事情辦完后,要出去了,還回一把鎖(count加1),如果發現 count <=0,即有進程在等,於是把自己的鎖給它,喚醒一個等待的線程。
2)代碼實現如下
class semaphore { public: semaphore(int value = 1): count(value) {} void P() { std::unique_lock<std::mutex> lock(mutex); if (--count < 0) condition.wait(lock); } void V() { std::lock_guard<std::mutex> lock(mutex); if(++count <= 0) condition.notify_one(); } private: int count; std::mutex mutex; std::condition_variable condition; };