- condition_variable
- wait()
- notify_one
- notify_all
condition_variable
條件變量的實際用途:
比如有兩個線程A和B,在線程A中等待一個條件滿足,(消息隊列中有要處理的消息),線程B專門往隊列中丟數據。當B往線程中放入數據,同時B通知線程A,開始往下執行。在服務器的后台設計中,有一個線程,阻塞式地讀取消息,並且將其解析,放入隊列中,此時線程B還通知A,要從隊列中去拿請求,並進行處理。
a) socket技術使得服務器中的程序能夠像打開文件一樣來讀取數據。
b) 線程B讀取數據,並將其放入到消息隊列中。
c) 線程B喚醒線程A,讓線程A從隊列中拿數據。
d) 服務器處理請求完成並返回結果。
通過條件變量類可以使得A等待B:
復習原先的代碼:(通過雙重鎖定,使得每次都判斷是否為空,如果為空那么就取得鎖)
class ProcessRequest { public: //把命令加入到一個隊列 void inMsgRecvQueue() { for (int i = 0; i < 100000; ++i) { //std::lock_guard<std::mutex> sbguard(my_mutex); cout << "插入一個元素" << endl; m_msgRecvQueue.push_back(i); //假設這個隊列表示玩家的命令 } //占用時間片 } bool outMsgLULProc(int &command) { //通過雙重鎖定,避免每次進來程序都鎖定。 if (!m_msgRecvQueue.empty()) { std::lock_guard<std::mutex> sbguard(my_mutex); if (!m_msgRecvQueue.empty()) { int command = m_msgRecvQueue.front(); m_msgRecvQueue.pop_front(); return true; } return false; } } //把命令移出一個隊列 void outMsgRecvQueue() { int command = 0; for (int i = 0; i < 100000; ++i) { bool result = outMsgLULProc(command); if (result == true) { cout << "outMsgRecvQueue() 執行,取出一個元素" << endl; } else { cout << "outMsgRecvQueue() 還執行,但是消息隊列為空" << endl; //消息隊列為空 } //占用時間片 } } private: std::list<int> m_msgRecvQueue; //容器,用於表示玩家的發送過來命令 std::mutex my_mutex; };
使用類std::condition_variable來替代雙重鎖定,用來等待一個條件達成,這個類需要和互斥量配合工作,用的時候需要生成類的對象。
pirvate: std::condition_variable my_condition;
wait()
出隊列修改: wait是卡在這里的,需要修改入隊列的線程。
void outMsgRecvQueue() { int command = 0; while (true) { std::unique_lock<std::mutex> sbguard1(my_mutex); my_condition.wait(sbguard1, [this] { if (!m_msgRecvQueue.empty())//lambda表達式就是一個可調用對象(函數) return true; else return false; }); //wait用來等待一個東西 //Wait(para1, para2) //para1: 互斥量 //para2:第二個參數Lambda表達式的返回值是False // 那么將解鎖互斥量,並阻塞本行,直到其他線程調用 notify_one() //如果沒有第二個參數,那么就跟第二個參數返回False效果一樣 } }
當然wait()之后可以提早解開 unique_lock(),然后執行邏輯。
notify_one
將原來阻塞的進程喚醒了。wait就開始恢復干活了,恢復之后
a) wait() 不斷嘗試獲取互斥量鎖,嘗試拿這個鎖。如果獲取不到鎖,流程就卡在wait這里,如果獲取到,wait就走下來了。
b) 實際上獲取到了鎖就等於上了鎖。如果wait有第二個參數(lambda),就判斷lambda表達式,
如果表達式為false,又將互斥量解鎖。然后另一個線程又休眠。
如果表達式為true,則wait返回,流程走下來(此時互斥鎖被鎖着)。
如果wait沒有第二個參數,則wait返回
void inMsgRecvQueue() { for (int i = 0; i < 100; ++i) { std::lock_guard<std::mutex> sbguard(my_mutex); m_msgRecvQueue.push_back(i); //假設這個隊列表示玩家的命令 cout << "插入一個元素" << endl; my_condition.notify_one(); } //占用時間片 }
同時獲取鎖的可能性:
1) void inMsgRecvQueue()
2) void outMsgRecvQueue()
可能出現同時競爭一個鎖的可能性,也就是說如果運行到了outMsgRecvQueue()的邏輯執行語句的時候,隊列中至少進去了一個元素,那么就有可能出現in和out並不是按序執行的情況。
out在執行邏輯語句的時候有延遲,此時如果in喚醒,out並不是卡在wait()的狀態,那么此時notify_one()調用就沒有效果。
深入思考
寫代碼用在商業中,必須理解。
在線程入口函數中, 隊列中可能會存在多條數據,這個時候處理不過來怎么辦?開更多的線程處理?或者限流,超過200條數據未處理,就卡住?