下面是一個生產者消費者問題,來介紹condition_variable的用法。當線程間的共享數據發生變化的時候,可以通過condition_variable來通知其他的線程。消費者wait 直到生產者通知其狀態發生改變,Condition_variable是使用方法如下:
·當持有鎖之后,線程調用wait
·wait解開持有的互斥鎖(mutex),阻塞本線程,並將自己加入到喚醒隊列中
·當收到通知(notification),該線程從阻塞中恢復,並加入互斥鎖隊列(mutex queue)
線程被喚醒之后繼續持有鎖運行。
Condition variable有兩種類型:condition_variable 和 condition_variable_any,前一種效率更高,但是使用不夠靈活,只支持std::unique_lock<std::mutex>類型的互斥鎖;后一種比較靈活,支持所有類型的鎖,但是效率稍微低一些。
有一點需要注意的是使用condition variable進行通信的線程,condition variable 需要使用相同的互斥信號量(mutex)。
下面來看例子:(當按下回車鍵之后停止)
#include <thread> #include <iostream> #include <mutex> #include <queue> #include <condition_variable> #include <atomic> using namespace std; int main() { mutex lockBuffer; //申明互斥信號量 volatile bool ArretDemande = false; //使生產、消費過程的結束 queue<long> buffer; condition_variable_any cndNotifierConsommateurs;//condition variable condition_variable_any cndNotifierProducteur; thread ThreadProducteur([&]()//生產者線程 { std::atomic<long> interlock;//對interlock的操作將是原子的 interlock=1; while(true) { std::this_thread::sleep_for (chrono::milliseconds (15)); long element=interlock.fetch_add (1);//【1】 lockBuffer.lock (); while(buffer.size()==10 && ArretDemande ==false) { cndNotifierProducteur.wait (lockBuffer);//【2】 } if (ArretDemande==true) { lockBuffer.unlock (); cndNotifierConsommateurs.notify_one ();//【3】 break; } buffer.push(element); cout << "Production unlement :" << element << " size :" << buffer.size() << endl; lockBuffer.unlock (); cndNotifierConsommateurs.notify_one (); } } );
thread ThreadConsommateur([&]() { while(true) { lockBuffer.lock (); while(buffer.empty () && ArretDemande==false) { cndNotifierConsommateurs.wait(lockBuffer); } if (ArretDemande==true && buffer.empty ()) { lockBuffer.unlock(); cndNotifierProducteur.notify_one (); break; } long element=buffer.front(); buffer.pop (); cout << "Consommation element :" << element << " size :" << buffer.size() << endl; lockBuffer.unlock (); cndNotifierProducteur.notify_one (); } } ); std::cout << "Pour arreter pressez [ENTREZ]" << std::endl; getchar(); std::cout << "Arret demande" << endl ArretDemande=true; ThreadProducteur.join(); ThreadConsommateur.join(); cout<<"Main Thread"<<endl; return 0; }
運行結果:
對程序進行一下說明,程序中有三個線程,主線程、生產者線程、消費者線程,三個線程之間亂序執行,通過一些全局變量來控制他們的執行順序。主線程的作用是控制生產消費過程是否結束,當程序運行之后,主線程通過getchar()接收一個輸入,接收到輸入后會將ArretDemande設置為true,另外兩個線程會終止。生產者線程將生產出來的數據放在一個queue類型的buffer中,並解鎖,通知消費之線程,buffer中最多“能”存10個數據,如果buffer中已經有10個數據還沒有被取走,則會通知消費者線程“消費”,如果ArretDmande被置位,則打開鎖,並通知消費之線程。消費者線程主要是將buffer中的數據取出來,當buffer為空的時候阻塞自己,並通知生產者線程,當ArretDemande被置位,且已經消費完產品則解鎖,並通知生產者線程。需要注意的是需要通信的生產者和消費者這兩個線程通過condition variable來實現通信,必須操作同一個mutex,這里是lockbuffer,並且每次Notify都會打開當前鎖。
程序中對interlock進行的操作是原子的,interlock.fet_add(N),效果是將interlock加N,然后返回interlock在加N之前的值,atomic類型是通過一定的內存順序規則來實現這個過程的。
雖然conditon_variable 只能支持std::unique_lock<std::mutex>類型的互斥鎖,但是在大部分情況下已經夠用,而且使用std::unique_lock<std::mutex>會比較簡單,因為std::unique_lock<std::mutex>在聲明的時候就會初始化,在生命周期結束之后就會自動解鎖,因此我們不用太花精力來考慮什么時候解鎖。我們來看看下面這段程序:
#include <condition_variable> #include <mutex> #include <thread> #include <iostream> #include <queue> #include <chrono> int main() { std::queue<int> produced_nums; std::mutex m;; std::condition_variable cond_var; bool done = false; bool notified = false; std::thread producer([&]() { for ( int i = 0; i < 5; ++i) { std::this_thread::sleep_for(std::chrono:: seconds(1)); std:: unique_lock<std::mutex > lock(m); //May lock mutex after construction, unlock before destruction. std::cout << "producing " << i << '\n' ; produced_nums.push(i); notified = true;
cond_var.notify_one(); } done = true; cond_var.notify_one(); }); //cond_var.notify_one(); std::thread consumer([&]() { while (!done) { std:: unique_lock<std::mutex > lock(m); while (!notified) { // loop to avoid spurious wakeups cond_var.wait(lock); } while (!produced_nums.empty()) { std::cout << "consuming " << produced_nums.front() << '\n'; produced_nums.pop(); } notified = false; } }); producer.join(); consumer.join(); return 0; }
運行結果:
C:\Windows\system32\cmd.exe /c producer_consumer.exe
producing 0
consuming 0
producing 1
consuming 1
producing 2
consuming 2
producing 3
consuming 3
producing 4
consuming 4
Hit any key to close this window...
更新:2012年8月4日16:53:25