linux 環境有提供好的pthread_cond_wait() 和 phread_signal()、pthread_broadcast()
windows需要自己封裝,利用semophore控制線程等待和釋放,先簡單談一下設計好后api該
如何使用。
假設我們封裝好條件變量等待函數名字叫做wait(Mutex& mutex),Mutex是之前我們封裝的
條件變量,文章最下邊會給出這些文件的下載地址,在這里讀者當做linux 的mutex即可。我們
封裝的釋放函數為signal(),廣播函數為broadcast。
判斷等待條件變量和邏輯處理如下:
Lock(mutex);
while(條件不滿足)
{
wait(mutex);
}
todo...;
UnLock(mutex);
激活條件變量如下:
Lock(mutex);
todo ...;
if(條件滿足)
{
signal();/broadcast();
}
signal();
UnLock(mutex);
Condition 是我們封裝的條件變量類
這是封裝好api后調用規則,那么先考慮wait內部的基本形式
void Condition::wait(Mutex &mutex)
{
//1 Condition 類中表示阻塞線程數
mblocked ++;
//2 解鎖,釋放互斥量
UnLock(mutex);
//3 阻塞等待 mQueue為信號量
res = WaitForSingleObject(reinterpret_cast<HANDLE>(mQueue), INFINITE);
//4 做一些判斷和邏輯處理
//5 加鎖
Lock(mutex);
}
wait內部記錄一個阻塞的線程數mblocked,mblocked 是我們封裝Condition類的成員變量,
然后釋放外部的互斥量,然后調用阻塞函數,等待signal喚醒。
當WaitForSingleObject獲取信號后會繼續執行,做一些邏輯判斷,最后將mutex鎖住。
這里用到的mQueue是一個信號量,用信號量可以接受多個喚醒和控制線程喚醒數量。
下面是條件變量釋放的函數,我們先做只是放一個條件變量的api
void Condition::signal()
{
//1阻塞的線程減少
mblocked --;
//2將激活的信號個數設置為1
signals = 1;
//3
if (signals)
{
//釋放信號量
res = ReleaseSemaphore(reinterpret_cast<HANDLE>(mQueue), signals, 0);
ASSERT(res);
}
}
先不要着急往下寫,考慮下這么做真的合適么?
首先之前設計過外部調用
if(條件滿足)
{
signal();/broadcast();
}
這個只要條件滿足就可以激活,所以我們只用mblocked表示阻塞線程數是不夠的,當信號量被激活很多沒有被消耗的情況下
就需要統計當前可用的資源數,那么就在Condition類添加mWait表示當前可用的信號量個數。除此之外,考慮這樣一種情況,
當條件不滿足的時候 線程A調用void wait(Mutex &mutex)函數,wait函數先解鎖再阻塞,對應wait中第2,3步驟。而另一個
線程B當條件滿足時調用 signal函數激活之前阻塞的線程A,對應signal函數中第3步 。原阻塞線程A因為捕獲到信號量,所以
一次走到wait中第4、5步。由於第4和第5步之間沒有加鎖保護,所以這一階段用到的類的成員變量都是不安全的。所以在第3
和第4之間加一個互斥鎖,第5步之后釋放這個互斥鎖。同樣的道理,為了避免此時signal內部調用類的成員變量造成數據不一致
所以signal內部也需要加鎖,在signal內部第1步之前加鎖,第3步之后解鎖,或者第3步之前解鎖都可以。我覺得在第三步之前
釋放會好一些,在釋放信號量之前解鎖,避免死鎖。所以添加一個成員變量mMutex
用於部分代碼互斥。
那么改良后我們的函數如下:
void
Condition::wait(Mutex& mutex)
{
#ifndef WIN32
int ret = pthread_cond_wait(&mId, mutex.getId());
ASSERT(ret == 0);
#else
//1
mBlocked++;
//2
mutex.unlock();
int res = 0;
//3
res = WaitForSingleObject(reinterpret_cast<HANDLE>(mQueue), INFINITE);
ASSERT(res == WAIT_OBJECT_0);
//用於暫時存儲mWaiting的數值
unsigned wasWaiting = 0;
//4
res = WaitForSingleObject(reinterpret_cast<HANDLE>(mMutex), INFINITE);
ASSERT(res == WAIT_OBJECT_0);
wasWaiting = mWaiting;
//5
res = ReleaseMutex(reinterpret_cast<HANDLE>(mMutex));
ASSERT(res);
//6
mutex.lock();
#endif
}
步驟也做了相應的調整。
void Condition::signal () { #ifndef WIN32 int ret = pthread_cond_signal(&mId); ASSERT(ret == 0); #else unsigned signals = 0; int res = 0;
//1 res = WaitForSingleObject(reinterpret_cast<HANDLE>(mMutex), INFINITE); ASSERT(res == WAIT_OBJECT_0); //2 if (mWaiting != 0) { if (mBlocked == 0) { res = ReleaseMutex(reinterpret_cast<HANDLE>(mMutex)); ASSERT(res); return; } ++mWaiting; --mBlocked; signals = 1; } else { signals = mWaiting = 1; --mBlocked; } //3 res = ReleaseMutex(reinterpret_cast<HANDLE>(mMutex)); ASSERT(res); //4 if (signals) { res = ReleaseSemaphore(reinterpret_cast<HANDLE>(mQueue), signals, 0); ASSERT(res); } #endif }
改良后更新了步驟,注釋的就是步驟,方便接下來討論這兩段代碼的隱患,因為僅僅這些還不夠。目前現總結下mMutex作用:
1mMutex用於signal函數內部和wait函數 獲取信號量之后的代碼互斥,保護類的常用變量。
2當不同的線程調用wait等待后獲得激活時,mMutex保證獲得信號量之后的操作是互斥的,安全的。
由於調用wait函數之前需要加外部的互斥鎖,所以不同的線程調用wai函數時第一步的mBlocked++是互斥的,不會出錯。
唯一有可能出錯的是那種情況呢?
就是當signal發出信號后,當前有一個因為調用wait阻塞的線程A捕獲到該信號,進入第四步,修改或者訪問mBlocked變量的值,
與此同時有線程A調用wait函數,此時會進入wait內部第一步mBlocked++,多線程修改和讀取mBlocked會造成數據混亂,
所以此時需要在第一步之前加鎖,第2步之前解鎖,因此添加單個信號量mGate,用於控制當有線程處於解鎖狀態處理mBlocked等
類成員時,其他線程進入wait修改mBlocked值。
這個res = WaitForSingleObject(reinterpret_cast<HANDLE>(mGate), INFINITE);可以放在wait函數第4步之后,當第4步獲得互斥
資源后,阻塞等待獲取mGate信號,如果沒獲得需要等待別的線程釋放mGate,如果此時mGate不被釋放造成mMutex死鎖。所以
別的線程中先調用 WaitForSingleObject(reinterpret_cast<HANDLE>(mGate), INFINITE);后調用WaitForSingleObject mMutex會造成
死鎖。需要特別注意。如果規避了這一點,那么就可以避免死鎖。所有情況都對mGate互斥訪問並不友好,出現之前討論的情況只有一種:
就是當前應用程序中至少有一個線程處於等待,而signal釋放信號后,某一個等待的線程繼續執行4后面的操作,外界有新的線程調用wait時
修改mBlocked會出錯。所以只需要在signal函數中判斷當mWaiting數量為0時對mGate加鎖,mWait根據不同情況進行對mGate進行釋放。
修改后的代碼如下:
先封裝一個小函數:
void Condition::enterWait () { int res = 0; res = WaitForSingleObject(reinterpret_cast<HANDLE>(mGate), INFINITE); ASSERT(res == WAIT_OBJECT_0); ++mBlocked; res = ReleaseSemaphore(reinterpret_cast<HANDLE>(mGate), 1, 0); ASSERT(res); }
對mBlocked起到保護作用
void Condition::wait(Mutex& mutex) { #ifndef WIN32 int ret = pthread_cond_wait(&mId, mutex.getId()); ASSERT(ret == 0); #else //1
enterWait(); //2 mutex.unlock(); int res = 0;
//3 res = WaitForSingleObject(reinterpret_cast<HANDLE>(mQueue), INFINITE); ASSERT(res == WAIT_OBJECT_0); unsigned wasWaiting = 0; unsigned wasGone = 0; //4 res = WaitForSingleObject(reinterpret_cast<HANDLE>(mMutex), INFINITE); ASSERT(res == WAIT_OBJECT_0); wasWaiting = mWaiting; wasGone = mGone;
//signal釋放資源后,mWaiting 至少為1 if (wasWaiting != 0) {
//判斷mWaiting 數量為1 if (--mWaiting == 0) {
//如果當前沒有阻塞線程則釋放mGate if (mBlocked != 0) { res = ReleaseSemaphore(reinterpret_cast<HANDLE>(mGate), 1, 0); // open mGate ASSERT(res); wasWaiting = 0; } } } //5 res = ReleaseMutex(reinterpret_cast<HANDLE>(mMutex)); ASSERT(res); //6 mutex.lock(); #endif }
對應的signal函數:
void Condition::signal () { #ifndef WIN32 int ret = pthread_cond_signal(&mId); ASSERT(ret == 0); #else unsigned signals = 0; int res = 0;
//1 res = WaitForSingleObject(reinterpret_cast<HANDLE>(mMutex), INFINITE); ASSERT(res == WAIT_OBJECT_0); if (mWaiting != 0) {
//當前有空閑的信號量並且沒由阻塞的線程 if (mBlocked == 0) { res = ReleaseMutex(reinterpret_cast<HANDLE>(mMutex)); ASSERT(res); return; } //如果由阻塞的線程,那么阻塞數量-- ++mWaiting; --mBlocked; signals = 1; } else { //2當空閑的信號量為0時,互斥獲得mGate
res = WaitForSingleObject(reinterpret_cast<HANDLE>(mGate), INFINITE); ASSERT(res == WAIT_OBJECT_0);
//3 if (mBlocked ) { //如果當前有線程阻塞那么更新計數 signals = mWaiting = 1; --mBlocked; } else {
//由於用戶外部不判斷條件是否成立多次調動signal,此處不處理直接釋放mGate res = ReleaseSemaphore(reinterpret_cast<HANDLE>(mGate), 1, 0); ASSERT(res); } }
//4 res = ReleaseMutex(reinterpret_cast<HANDLE>(mMutex)); ASSERT(res);
//5 if (signals) { res = ReleaseSemaphore(reinterpret_cast<HANDLE>(mQueue), signals, 0); ASSERT(res); } #endif }
到目前為止,對於共享對象的保護和同步都做的比較完善了,還要注意一個問題就是虛假喚醒。這是
操作系統可能出現的一種情況,所以需要添加虛假喚醒的邏輯用mGone成員變量表示出錯的或是虛假喚醒的線程數
最終代碼如下:
void Condition::wait(Mutex& mutex) { #ifndef WIN32 int ret = pthread_cond_wait(&mId, mutex.getId()); ASSERT(ret == 0); #else enterWait(); mutex.unlock(); int res = 0; res = WaitForSingleObject(reinterpret_cast<HANDLE>(mQueue), INFINITE); ASSERT(res == WAIT_OBJECT_0); unsigned wasWaiting = 0; unsigned wasGone = 0; res = WaitForSingleObject(reinterpret_cast<HANDLE>(mMutex), INFINITE); ASSERT(res == WAIT_OBJECT_0); wasWaiting = mWaiting; wasGone = mGone; if (wasWaiting != 0) { if (--mWaiting == 0) { if (mBlocked != 0) { res = ReleaseSemaphore(reinterpret_cast<HANDLE>(mGate), 1, 0); // open mGate ASSERT(res); wasWaiting = 0; } else if (mGone != 0) { mGone = 0; } } } else if (++mGone == (ULONG_MAX / 2)) { res = WaitForSingleObject(reinterpret_cast<HANDLE>(mGate), INFINITE); ASSERT(res == WAIT_OBJECT_0); mBlocked -= mGone; res = ReleaseSemaphore(reinterpret_cast<HANDLE>(mGate), 1, 0); ASSERT(res); mGone = 0; } res = ReleaseMutex(reinterpret_cast<HANDLE>(mMutex)); ASSERT(res); if (wasWaiting == 1) { for (; wasGone; --wasGone) { res = WaitForSingleObject(reinterpret_cast<HANDLE>(mQueue), INFINITE); ASSERT(res == WAIT_OBJECT_0); } res = ReleaseSemaphore(reinterpret_cast<HANDLE>(mGate), 1, 0); ASSERT(res); } mutex.lock(); #endif }
wait部分添加了mGone的處理,當mWaiting數量為0進入
res = WaitForSingleObject(reinterpret_cast<HANDLE>(mMutex), INFINITE);
需要對mGone++表示虛假喚醒的線程數量
if (++mGone == (ULONG_MAX / 2)) { res = WaitForSingleObject(reinterpret_cast<HANDLE>(mGate), INFINITE); ASSERT(res == WAIT_OBJECT_0); mBlocked -= mGone; res = ReleaseSemaphore(reinterpret_cast<HANDLE>(mGate), 1, 0); ASSERT(res); mGone = 0; }
通過mGate對mBlocked保護起來,當喚醒的個數超過指定值會把多余的mblocked去掉並且把
虛假喚醒數量置空。舉個例子,當mBLocked為1時該線程被虛假喚醒,那么mGone變為1,由於是
虛假喚醒,用戶在外部調用wait函數時通過while循環判斷條件不滿足再次進入wait中enterGate
函數對mBlocked自增,此時mBlocked數量為2,所以當冗余的mBlocked超過指定值,就回去掉
這些mBlocked並將mGone置空。
if (wasWaiting == 1)
{
for (; wasGone; --wasGone) { res = WaitForSingleObject(reinterpret_cast<HANDLE>(mQueue), INFINITE); ASSERT(res == WAIT_OBJECT_0); } res = ReleaseSemaphore(reinterpret_cast<HANDLE>(mGate), 1, 0); ASSERT(res); }
該函數判斷Condation類的mWating變量有1變為0,並且阻塞的線程數為0,因為如果用戶沒有在外邊調用while
判斷條件導致虛假喚醒引起邏輯錯誤,所以為了起到保護作用對那些因為虛假喚醒錯過的信號進行資源占用,
直到信號量都被釋放后才進入mGate釋放。舉一個例子
如果外部調用
Lock(mutex);
if(條件不滿足)
{
wait(mutex);
}
//邏輯處理
...
UnLock(mutex);
當wait執行退出后會執行邏輯,而沒有while判斷條件是否真的滿足。所以我們要對信號量進行控制,保證信號量
數量正確。並且和mBlocked,mWait,等一致。
下面是signal函數最終版本
void Condition::signal () { #ifndef WIN32 int ret = pthread_cond_signal(&mId); ASSERT(ret == 0); #else unsigned signals = 0; int res = 0; res = WaitForSingleObject(reinterpret_cast<HANDLE>(mMutex), INFINITE); ASSERT(res == WAIT_OBJECT_0); if (mWaiting != 0) { if (mBlocked == 0) { res = ReleaseMutex(reinterpret_cast<HANDLE>(mMutex)); ASSERT(res); return; } ++mWaiting; --mBlocked; signals = 1; } else { res = WaitForSingleObject(reinterpret_cast<HANDLE>(mGate), INFINITE); ASSERT(res == WAIT_OBJECT_0); if (mBlocked > mGone) { if (mGone != 0) { mBlocked -= mGone; mGone = 0; } signals = mWaiting = 1; --mBlocked; } else { res = ReleaseSemaphore(reinterpret_cast<HANDLE>(mGate), 1, 0); ASSERT(res); } } res = ReleaseMutex(reinterpret_cast<HANDLE>(mMutex)); ASSERT(res); if (signals) { res = ReleaseSemaphore(reinterpret_cast<HANDLE>(mQueue), signals, 0); ASSERT(res); } #endif }
同樣的道理
if (mBlocked > mGone) { if (mGone != 0) { mBlocked -= mGone; mGone = 0; } signals = mWaiting = 1; --mBlocked; }
這個邏輯就是處理當虛假喚醒的mBlocked和mGone等數據准確性。
因為如果是虛假喚醒,用戶通過while(條件不滿足)這個方式繼續調用wait
會導致mBlocked++,假設就一個線程處於阻塞並且因為虛假喚醒通過while循環
重新調用wait函數,而此時mGone比mBlocked小1,所以mBlocked - mGone就是
更新差值給mBlocked,這是真正的處於阻塞的線程數量。
下面是代碼下載地址:
http://download.csdn.net/detail/secondtonone1/9658645
代碼效果測試截圖:


謝謝關注我的微信公眾號:

