c++11中有 mutex (互斥量),有 condition_variable (條件變量),並沒有 semaphore (信號量)。信號量,操作系統中一般都有提,后來 google 說可以使用 mutex+condition_variable 實現一個,后來寫來寫去,都死鎖 (deadlock) ——,O__O"…,后來 google 了一個,整理了一下思路。
信號量
神馬是信號量?信號量是一個整數 count,提供兩個原子(atom,不可分割)操作:P 操作和 V 操作,或是說 wait 和 signal 操作。
- P操作 (wait操作):count 減1;如果 count < 0 那么掛起執行線程;
- V操作 (signal操作):count 加1;如果 count <= 0 那么喚醒一個執行線程;
如何理解這個信號量?為嘛信號量是這個東西?想想互斥量 mutex,相當於一把鎖,如果一個人來了 lock 一下,其他人進不去了;最初的人 unlock 了,又可以進一個人了,進去一個又 lock 住。如果 mutex 鎖在 unlock 狀態下叫做 1 的話,lock 狀態叫 0;1 實際反映的是鎖的數量 !現在可以有多把鎖,數量 count 最初為 n (可以設定);
- 來一個人取一把鎖(count減1),如果發現鎖的數量(count)小於0,豈不言外之意,沒有鎖了 ? 這樣的情況下就要等(wait),或是說懸掛(suspend),或是說阻塞(block);直到神馬時候呢?有人釋放一把鎖給 me 為止。當然了,如果最初就有鎖的話,直接拿一把進去就可以了,O__O"…。
- me 的事情辦完了,要出去了,還回一把鎖(count加1),如果發現 count <=0,言外之意是神馬呢?有人在等丫,O__O"…;好吧,me 把自己的鎖給某一個人,喚醒一個等待的線程。當然如果最初就沒有人等,me 就走 me 的,不用喚醒誰了。
下面的解釋就對應上面的 wait 操作和 signal 操作,也算是它的真實意圖了。wait 和 signal 有神馬用呢?最典型的用法可能就是 count = 1 時候相當於一把互斥鎖丫!當然很多時候,共享的資源有多個,比如有 n 個坑,每個線程占一個坑,這個時候使用信號量這個工具要比 mutex 更合適。算了,問題還集中在 wait 和 signal 的實現上。
問題和思路
- 一個 int 或是 long 變量 count,很好設定;因為 wait 和 signal 都是原子操作,所以至少要一個 mutex 來保證互斥;
- 有時候需要 suspend 一個線程,有時候要 wakeup 一個線程,條件變量是合適人選,所以需要一個 condition_variable;suspend 和 wakeup 都是在 condition_variable 上(這是載體);
- wakeup 一個線程,腫么保證一定會 wakeup 一個 condition_variable 上的一個呢?me 們借助一個輔助變量 wakeups —— 要喚醒線程名額(初值為 0),也就是在 signal 的時候,發現有人在等,就 wakeups++(要喚醒人數+1) 然后通知一下條件變量可以喚醒一個。而 wait 操作如果發現 count 數量不夠,就阻塞,直到 wakeups 有名額(大於0)為止(條件變量的 wait 就是等待該條件發生),當然喚醒一個之后 wakeups 減1。(條件變量設置的條件是 wakeups > 0。)
程序代碼
- /*
- * author: http://p9as.blogspot.com/2012/06/c11-semaphores.html
- * modified by: ilovers
- */
- #include <mutex>
- #include <condition_variable>
- namespace ilovers{
- class semaphore {
- public:
- semaphore(int value=1): count{value}, wakeups{0} {}
- void wait(){
- std::unique_lock<std::mutex> lock{mutex};
- if (--count<0) { // count is not enough ?
- condition.wait(lock, [&]()->bool{ return wakeups>0;}); // suspend and wait ...
- --wakeups; // ok, me wakeup !
- }
- }
- void signal(){
- std::lock_guard<std::mutex> lock{mutex};
- if(++count<=0) { // have some thread suspended ?
- ++wakeups;
- condition.notify_one(); // notify one !
- }
- }
- private:
- int count;
- int wakeups;
- std::mutex mutex;
- std::condition_variable condition;
- };
- };
幾點說明:
- semaphore 組織在 ilovers 命名空間下,使用的時候包含頭文件,然后 ilovers::semaphore sem; 方式使用;默認 count = 1,也就是一個 mutex 的效果;
- sem.wait(); 是 P 操作;sem.signal(); 是 V 操作;
- 程序中構造對象統一使用 c++11 的 {} 方式,雖然 () 依然使用,比如 count{value} 和 count(value),lock{mutex} 和 lock(mutex) 都是可以的;
信號量的用處
信號量的用法大體來說和 mutex m 一樣;m.lock(); 和 m.unlock(); 將需要保護的代碼上下圍起來;mutex 的特點就是 count = 1,典型的(二分)互斥量;而 sem(n) 則是數量有 n 個,比如 n 個坑,都占完的情況下,阻塞當前線程,否則可以進去(占個坑,O__O"…)。
同步兩個線程的順序
如果 semaphore::count = 0 的情況下,一個線程 A 首先 wait 的話,就會被阻塞掉;而一個線程 B 如果不 wait 就直接干活,等干完 signal 一下的話,過一會 A 就會執行了(count = 1,已經有數量了)。這樣就能保證 B 在 A 的前面執行了,類似的方法可以將多個線程 A、B、C、D 按照 D、C、B、A 排個順序執行出來。
- #include <iostream>
- #include <thread>
- #include "ilovers/semaphore"
- std::mutex m;
- ilovers::semaphore ba(0), cb(0), dc(0);
- void a()
- {
- ba.wait(); // b --> a
- std::lock_guard<std::mutex> lock{m};
- std::cout << "thread a" << '\n';
- }
- void b()
- {
- cb.wait(); // c --> b
- std::lock_guard<std::mutex> lock{m};
- std::cout << "thread b" << '\n';
- ba.signal(); // b --> a
- }
- void c()
- {
- dc.wait(); // d --> c
- std::lock_guard<std::mutex> lock{m};
- std::cout << "thread c" << '\n';
- cb.signal(); // c --> b
- }
- void d()
- {
- std::lock_guard<std::mutex> lock{m};
- std::cout << "thread d" << '\n';
- dc.signal(); // d --> c
- }
- int main()
- {
- std::thread th1{a}, th2{b}, th3{c}, th4{d};
- th1.join();
- th2.join();
- th3.join();
- th4.join();
- std::cout << "thread main" << std::endl;
- return 0;
- }
去掉上面的同步的信號量 ba、cb、dc 之后,程序的輸出可能是 a b c d、a c d b、a c b d 等,幾乎不可能是 d c b a 的順序,然后加上三個控制順序的信號量后,輸出的順序就是 d c b a main。對了 std::lock_guard 和 mutex 是為了互斥訪問 std::cout 對象,這點要注意些(cout 一個共享對象/變量)。
后話
如果 u 沒有注意到這個真相,me 這里提一下:count = -5,表明當前有 5 個線程阻塞了(really?),count = 10,表明當前有 10 個位置可以直接用(不用等)。如果 count = N0(初值),而當前 count = 0 表明 N0 個進程在干活,沒有人在等;count = -10,表明 N0 個干活,N0+10 執行了 wait 操作,10 個在等待。
當初 me 認為不需要 wakeups 輔助變量就可以 wait 和 notify,而是直接使用 count,后來發現不對,notify_one 的時候 count 一定是負的,而不是 count > 0(有空閑位置,分配給一個掛起的線程?no !此時的已經沒有掛起的線程了!),根據負的 count 則不能斷定要不要真的從條件變量上卸掉一個線程,O__O"…。比如,假設當前有 5 個掛起來的線程,count = -5,此時有一個線程工作完了,signal/V 操作了一下,count = -4,這個時候 conditon_variable 怎么知道要不要喚醒一個線程呢?但是通過一個 wakeups 就可以:一個線程工作完了,V 操作一下,同時 wakeups++,這個時候 condition_variable 感知到 wakeups 有名額了,然后喚醒一個線程。喚醒的邏輯就是如此。
總趕腳有點撇腳
思來想去,wakeups 這個輔助變量真的有用么?或是說一定需要借助它嗎?wakeups me 們說它的意義是:有一個 V 操作然后去喚醒一個掛起的線程。這里面為嘛要借助一個輔助變量呢?(雖然借助於 wakeups 然后設置條件變量的條件 wakeups > 0 可以。) V 操作的時候直接 notify_one 而 P 操作的時候只是簡單的掛起線程(不設置條件),不是更自然么?好吧,至少 me 目前是這么想的,貌似也是行的通的:
簡單版本
- /*
- * author: ilovers
- */
- #include <mutex>
- #include <condition_variable>
- namespace ilovers{
- class semaphore {
- public:
- semaphore(int value=1): count{value}{}
- void wait(){
- std::unique_lock<std::mutex> lock{mutex};
- if (--count<0) // count is not enough ?
- condition.wait(lock); // suspend and wait...
- }
- void signal(){
- std::lock_guard<std::mutex> lock{mutex};
- if(++count<=0) // have some thread suspended ?
- condition.notify_one(); // notify one !
- }
- private:
- int count;
- std::mutex mutex;
- std::condition_variable condition;
- };
- };
趕腳這個才是更自然的 semaphore 的實現,O__O"…。me 測試了一下 semaphore 作為 mutex 使用(默認),去同步四個線程,每個線程 ++sum 100000 次,發現不帶 wakeups 的簡單版本,速度很慢, 20秒左右;而上面帶 wakeups 的版本則快一些,2 秒左右(當然結果是一致的),O__O"…可能是 condition_variable::wait 操作,帶有條件則執行效率更高的緣故?現在只能這么猜測了。
參考:http://blog.csdn.net/zdarks/article/details/46994767