C++11 生產者消費者


下面是一個生產者消費者問題,來介紹condition_variable的用法。當線程間的共享數據發生變化的時候,可以通過condition_variable來通知其他的線程。消費者wait 直到生產者通知其狀態發生改變,Condition_variable是使用方法如下:

·當持有鎖之后,線程調用wait

·wait解開持有的互斥鎖(mutex),阻塞本線程,並將自己加入到喚醒隊列中

·當收到通知(notification),該線程從阻塞中恢復,並加入互斥鎖隊列(mutex queue)

 線程被喚醒之后繼續持有鎖運行。

 

Condition variable有兩種類型:condition_variable 和 condition_variable_any,前一種效率更高,但是使用不夠靈活,只支持std::unique_lock<std::mutex>類型的互斥鎖;后一種比較靈活,支持所有類型的鎖,但是效率稍微低一些。

有一點需要注意的是使用condition variable進行通信的線程,condition variable 需要使用相同的互斥信號量(mutex)。

下面來看例子:(當按下回車鍵之后停止)

#include <thread>

#include <iostream>

#include <mutex>

#include <queue>

#include <condition_variable>

#include <atomic>

using namespace std;

int main()
{

    mutex lockBuffer; //申明互斥信號量

    volatile bool ArretDemande = false; //使生產、消費過程的結束

    queue<long> buffer;       

    condition_variable_any cndNotifierConsommateurs;//condition variable

    condition_variable_any cndNotifierProducteur;   
 
    thread ThreadProducteur([&]()//生產者線程
    {
       
        std::atomic<long> interlock;//對interlock的操作將是原子的

        interlock=1;   

        while(true)
        {               

                std::this_thread::sleep_for (chrono::milliseconds (15));               

                long element=interlock.fetch_add (1);//【1】

                lockBuffer.lock ();

                while(buffer.size()==10 && ArretDemande ==false)
                {
                   
                    cndNotifierProducteur.wait (lockBuffer);//【2】

                }

                if (ArretDemande==true)

                {                   

                    lockBuffer.unlock ();

                    cndNotifierConsommateurs.notify_one ();//【3】

                    break;

                }

                buffer.push(element);

                cout << "Production unlement :" << element << " size :" << buffer.size() << endl;

                lockBuffer.unlock ();

                cndNotifierConsommateurs.notify_one ();

        }

    } );
thread ThreadConsommateur([
&]() { while(true) { lockBuffer.lock (); while(buffer.empty () && ArretDemande==false) { cndNotifierConsommateurs.wait(lockBuffer); } if (ArretDemande==true && buffer.empty ()) { lockBuffer.unlock(); cndNotifierProducteur.notify_one (); break; } long element=buffer.front(); buffer.pop (); cout << "Consommation element :" << element << " size :" << buffer.size() << endl; lockBuffer.unlock (); cndNotifierProducteur.notify_one (); } } ); std::cout << "Pour arreter pressez [ENTREZ]" << std::endl; getchar(); std::cout << "Arret demande" << endl ArretDemande=true; ThreadProducteur.join(); ThreadConsommateur.join(); cout<<"Main Thread"<<endl; return 0; }

 

運行結果:

對程序進行一下說明,程序中有三個線程,主線程、生產者線程、消費者線程,三個線程之間亂序執行,通過一些全局變量來控制他們的執行順序。主線程的作用是控制生產消費過程是否結束,當程序運行之后,主線程通過getchar()接收一個輸入,接收到輸入后會將ArretDemande設置為true,另外兩個線程會終止。生產者線程將生產出來的數據放在一個queue類型的buffer中,並解鎖,通知消費之線程,buffer中最多“能”存10個數據,如果buffer中已經有10個數據還沒有被取走,則會通知消費者線程“消費”,如果ArretDmande被置位,則打開鎖,並通知消費之線程。消費者線程主要是將buffer中的數據取出來,當buffer為空的時候阻塞自己,並通知生產者線程,當ArretDemande被置位,且已經消費完產品則解鎖,並通知生產者線程。需要注意的是需要通信的生產者和消費者這兩個線程通過condition variable來實現通信,必須操作同一個mutex,這里是lockbuffer,並且每次Notify都會打開當前鎖。

程序中對interlock進行的操作是原子的,interlock.fet_add(N),效果是將interlock加N,然后返回interlock在加N之前的值,atomic類型是通過一定的內存順序規則來實現這個過程的。

雖然conditon_variable 只能支持std::unique_lock<std::mutex>類型的互斥鎖,但是在大部分情況下已經夠用,而且使用std::unique_lock<std::mutex>會比較簡單,因為std::unique_lock<std::mutex>在聲明的時候就會初始化,在生命周期結束之后就會自動解鎖,因此我們不用太花精力來考慮什么時候解鎖。我們來看看下面這段程序:

#include <condition_variable>
#include <mutex>
#include <thread>
#include <iostream>
#include <queue>
#include <chrono>
 
int main()
{
    std::queue<int> produced_nums;
    std::mutex m;;
    std::condition_variable cond_var;
    bool done = false;
    bool notified = false;
 
    std::thread producer([&]() {
        for ( int i = 0; i < 5; ++i) {
            std::this_thread::sleep_for(std::chrono:: seconds(1));
            std:: unique_lock<std::mutex > lock(m);  //May lock mutex after construction, unlock before destruction.
            std::cout << "producing " << i << '\n' ;
            produced_nums.push(i);
            notified = true;
       cond_var.notify_one();
} done = true; cond_var.notify_one(); }); //cond_var.notify_one(); std::thread consumer([&]() { while (!done) { std:: unique_lock<std::mutex > lock(m); while (!notified) { // loop to avoid spurious wakeups cond_var.wait(lock); } while (!produced_nums.empty()) { std::cout << "consuming " << produced_nums.front() << '\n'; produced_nums.pop(); } notified = false; } }); producer.join(); consumer.join(); return 0; }

 運行結果:

C:\Windows\system32\cmd.exe /c producer_consumer.exe
producing 0
consuming 0
producing 1
consuming 1
producing 2
consuming 2
producing 3
consuming 3
producing 4
consuming 4
Hit any key to close this window...

更新:2012年8月4日16:53:25


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM