C++11並發——多線程條件變量std::condition_variable(四)


https://www.jianshu.com/p/a31d4fb5594f

https://blog.csdn.net/y396397735/article/details/81272752

https://www.cnblogs.com/haippy/p/3252041.html

 

std::condition_variable 是條件變量,

當 std::condition_variable 對象的某個 wait 函數被調用的時候,它使用 std::unique_lock(通過 std::mutex) 來鎖住當前線程。

當前線程會一直被阻塞,直到另外一個線程在相同的 std::condition_variable 對象上調用了 notification 函數來喚醒當前線程。

std::condition_variable 對象通常使用 std::unique_lock<std::mutex> 來等待,如果需要使用另外的 lockable 類型,可以使用 std::condition_variable_any 類,本文后面會講到 std::condition_variable_any 的用法。

#include <iostream>                // std::cout
#include <thread>                // std::thread
#include <mutex>                // std::mutex, std::unique_lock
#include <condition_variable>    // std::condition_variable

std::mutex mtx; // 全局互斥鎖.
std::condition_variable cv; // 全局條件變量.
bool ready = false; // 全局標志位.

void do_print_id(int id)
{
    std::unique_lock <std::mutex> lck(mtx);
    while (!ready) // 如果標志位不為 true, 則等待...
        cv.wait(lck); // 當前線程被阻塞, 當全局標志位變為 true 之后,
    // 線程被喚醒, 繼續往下執行打印線程編號id.
    std::cout << "thread " << id << '\n';
}

void go()
{
    std::unique_lock <std::mutex> lck(mtx);
    ready = true; // 設置全局標志位為 true.
    cv.notify_all(); // 喚醒所有線程.
}

int main()
{
    std::thread threads[10];
    // spawn 10 threads:
    for (int i = 0; i < 10; ++i)
        threads[i] = std::thread(do_print_id, i);

    std::cout << "10 threads ready to race...\n";
    go(); // go!

  for (auto & th:threads)
        th.join();

    return 0;
}
concurrency ) ./ConditionVariable-basic1 
threads ready to race...
thread 1
thread 0
thread 2
thread 3
thread 4
thread 5
thread 6
thread 7
thread 8
thread 9

好了,對條件變量有了一個基本的了解之后,我們來看看 std::condition_variable 的各個成員函數。

std::condition_variable 構造函數

default (1)
condition_variable();
copy [deleted] (2)
condition_variable (const condition_variable&) = delete;

std::condition_variable 的拷貝構造函數被禁用,只提供了默認構造函數。

std::condition_variable::wait() 介紹

unconditional (1)
void wait (unique_lock<mutex>& lck);
predicate (2)
template <class Predicate>
  void wait (unique_lock<mutex>& lck, Predicate pred);

std::condition_variable 提供了兩種 wait() 函數。當前線程調用 wait() 后將被阻塞(此時當前線程應該獲得了鎖(mutex),不妨設獲得鎖 lck),直到另外某個線程調用 notify_* 喚醒了當前線程。

在線程被阻塞時,該函數會自動調用 lck.unlock() 釋放鎖,使得其他被阻塞在鎖競爭上的線程得以繼續執行。另外,一旦當前線程獲得通知(notified,通常是另外某個線程調用 notify_* 喚醒了當前線程),wait() 函數也是自動調用 lck.lock(),使得 lck 的狀態和 wait 函數被調用時相同。

在第二種情況下(即設置了 Predicate),只有當 pred 條件為 false 時調用 wait() 才會阻塞當前線程,並且在收到其他線程的通知后只有當 pred 為 true 時才會被解除阻塞。因此第二種情況類似以下代碼:

while (!pred()) wait(lck);

 

#include <iostream>                // std::cout
#include <thread>                // std::thread, std::this_thread::yield
#include <mutex>                // std::mutex, std::unique_lock
#include <condition_variable>    // std::condition_variable

std::mutex mtx;
std::condition_variable cv;

int cargo = 0;
bool shipment_available()
{
    return cargo != 0;
}

// 消費者線程.
void consume(int n)
{
    for (int i = 0; i < n; ++i) {
        std::unique_lock <std::mutex> lck(mtx);
        cv.wait(lck, shipment_available);
        std::cout << cargo << '\n';
        cargo = 0;
    }
}

int main()
{
    std::thread consumer_thread(consume, 10); // 消費者線程.

    // 主線程為生產者線程, 生產 10 個物品.
    for (int i = 0; i < 10; ++i) {
        while (shipment_available())
            std::this_thread::yield();
/*
std::this_thread::yield: 當前線程放棄執行,操作系統調度另一線程繼續執行。
即當前線程將未使用完的“CPU時間片”讓給其他線程使用,
等其他線程使用完后再與其他線程一起競爭"CPU"。
std::this_thread::sleep_for: 表示當前線程休眠一段時間,
休眠期間不與其他線程競爭CPU,根據線程需求,等待若干時間。

*/
        std::unique_lock <std::mutex> lck(mtx);
        cargo = i + 1;
        cv.notify_one();
    }

    consumer_thread.join();

    return 0;
}

 

 

1. std::condition_variable

條件變量提供了兩類操作:wait和notify。這兩類操作構成了多線程同步的基礎。

1.1 wait

wait是線程的等待動作,直到其它線程將其喚醒后,才會繼續往下執行。下面通過偽代碼來說明其用法:

std::mutex mutex; std::condition_variable cv; 
// 條件變量與臨界區有關,用來獲取和釋放一個鎖,因此通常會和mutex聯用。
 std::unique_lock lock(mutex); 
// 此處會釋放lock,然后在cv上等待,直到其它線程通過cv.notify_xxx來喚醒當前線程,
//cv被喚醒后會再次對lock進行上鎖,然后wait函數才會返回。 
// wait返回后可以安全的使用mutex保護的臨界區內的數據。此時mutex仍為上鎖狀態 cv.wait(lock) 

 

除wait外, 條件變量還提供了wait_for和wait_until,這兩個名稱是不是看着有點兒眼熟,std::mutex也提供了_for和_until操作。在C++11多線程編程中,需要等待一段時間的操作,
一般情況下都會有xxx_for和xxx_until版本。前者用於等待指定時長,后者用於等待到指定的時間。

 

1.2 notify

了解了wait,notify就簡單多了:喚醒wait在該條件變量上的線程。notify有兩個版本:notify_one和notify_all。

  • notify_one 喚醒等待的一個線程,注意只喚醒一個。
  • notify_all 喚醒所有等待的線程。使用該函數時應避免出現驚群效應

其使用方式見下例:

std::mutex mutex;
 std::condition_variable cv; 
std::unique_lock lock(mutex); 
// 所有等待在cv變量上的線程都會被喚醒。但直到lock釋放了mutex,被喚醒的線程才會從wait返回。 
cv.notify_all(lock)

 

// conditionVariable.cpp

#include <iostream>
#include <condition_variable>
#include <mutex>
#include <thread>

std::mutex mutex_;
std::condition_variable condVar;

void doTheWork(){
  std::cout << "Processing shared data." << std::endl;
}

void waitingForWork(){
    std::cout << "Worker: Waiting for work." << std::endl;

    std::unique_lock<std::mutex> lck(mutex_);
    condVar.wait(lck);
    doTheWork();
    std::cout << "Work done." << std::endl;
}

void setDataReady(){
    std::cout << "Sender: Data is ready."  << std::endl;
    condVar.notify_one();
}

int main(){

  std::cout << std::endl;

  std::thread t1(waitingForWork);
  std::thread t2(setDataReady);

  t1.join();
  t2.join();

  std::cout << std::endl;

}

該程序有兩個子線程: t1和t2。 它們在第33行和第34行中獲得可調用的有效負載(函數或函子) waitingForWork和setDataReady。

函數setDataReady通過使用條件變量condVar調用condVar.notify_one()進行通知。 在持有鎖的同時,線程T2正在等待其通知: condVar.wait(lck).

 

虛假的喚醒

細節決定成敗。事實上,可能發生的是,接收方在發送方發出通知之前完成了任務。 這怎么可能呢?接收方對虛假的喚醒很敏感。所以即使沒有通知發生,接收方也有可能會醒來。

為了保護它,我不得不向等待方法添加一個判斷。 這就是我在下一個例子中所做的:

 

// conditionVariableFixed.cpp

#include <iostream>
#include <condition_variable>
#include <mutex>
#include <thread>

std::mutex mutex_;
std::condition_variable condVar;

bool dataReady;

void doTheWork(){
  std::cout << "Processing shared data." << std::endl;
}

void waitingForWork(){
    std::cout << "Worker: Waiting for work." << std::endl;

    std::unique_lock<std::mutex> lck(mutex_);
    condVar.wait(lck,[]{return dataReady;});
    doTheWork();
    std::cout << "Work done." << std::endl;
}

void setDataReady(){
    std::lock_guard<std::mutex> lck(mutex_);
    dataReady=true;
    std::cout << "Sender: Data is ready."  << std::endl;
    condVar.notify_one();
}

int main(){

  std::cout << std::endl;

  std::thread t1(waitingForWork);
  std::thread t2(setDataReady);

  t1.join();
  t2.join();

  std::cout << std::endl;

}
View Code

與第一個示例的關鍵區別是在第11行中使用了一個布爾變量dataReady 作為附加條件。 dataReady在第28行中被設置為true。

它在函數waitingForWork中被檢查:

condVar.wait(lck,[]{return dataReady;})

這就是為什么wait方法有一個額外的重載,它接受一個判定。判定是個callable,它返回true或false。 
在此示例中,callable是lambda函數。因此,條件變量檢查兩個條件:判定是否為真,通知是否發生。

關於dataReady
dataReady是個共享變量,將會被改變。所以我不得不用鎖來保護它。
因為線程T1只設置和釋放鎖一次,所以std::lock_guard已經夠用了。但是線程t2就不行了,wait方法將持續鎖定和解鎖互斥體。所以我需要更強大的鎖:std::unique_lock。
但這還不是全部,條件變量有很多挑戰,它們必須用鎖來保護,並且易受虛假喚醒的影響。
大多數用例都很容易用tasks來解決,后續再說task問題。

喚醒不了

條件變量的異常行為還是有的。大約每10次執行一次conditionVariable.cpp就會發生一些奇怪的現象:

我不知道怎么回事,這種現象完全違背了我對條件變量的直覺。
在安東尼·威廉姆斯的支持下,我解開了謎團。
問題在於,如果發送方在接收方進入等待狀態之前發送通知,則通知會丟失。C ++標准同時也將條件變量描述為同步機制,“condition_variable類是一個同步原語,可以用來同時阻塞一個線程或多個線程。。。”。
因此,通知消息已經丟失了,但是接收方還在等啊和等啊等啊等啊…
怎么解決這個問題呢?去除掉wait第二個參數的判定可以有效幫助喚醒。實際上,在判定設置為真的情況下,接收器也能夠獨立於發送者的通知進而繼續其工作。

 

 

 

 

 

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM