測試時應包含以下頭文件:
#include <iostream> // std::cout
#include <chrono> // std::chrono::milliseconds
#include <thread> // std::thread
#include <mutex> // std::timed_mutex
#include <condition_variable>//條件變量
一、C++11中提供了std::mutex互斥量,共包含四種類型:
std::mutex:最基本的mutex類。
std::recursive_mutex:遞歸mutex類,能多次鎖定而不死鎖。
std::time_mutex:定時mutex類,可以鎖定一定的時間。
std::recursive_timed_mutex:定時遞歸mutex類。
——> > > std::mutex: std::mutex是C++中最基本的互斥量,提供了獨占所有權的特性,std::mutex提供了以下成員函數:
構造函數:std::mutex不允許拷貝構造,也不允許move拷貝,最初產生的mutex對象是處於unlocked狀態的。
lock():調用線程將鎖住該互斥量,線程調用該函數會發生以下3種情況:
(1)如果該互斥量當前沒有被鎖住,則調用線程將該互斥量鎖住,直到調用unlock之前,該線程一直擁有該鎖。
(2)如果當前互斥量被其他線程鎖住,則當前的調用線程被阻塞住。
(3)如果當前互斥量被當前調用線程鎖住,則會產生死鎖,,也就是說同一個線程中不允許鎖兩次。
unlock():解鎖,釋放對互斥量的所有權。
try_lock():嘗試鎖住互斥量,如果互斥量被其他線程占有,則當前線程也不會被阻塞,線程調用該函數會出現下面3種情況:
(1)如果當前互斥量沒有被其他線程占有,則該線程鎖住互斥量,直到該線程調用unlock釋放互斥量。
(2)如果當前互斥量被其他線程鎖住,則當前調用線程返回false,而並不會被阻塞掉。
(3)如果當前互斥量被當前調用線程鎖住,則會產生死鎖。
——> > > std::recursive_mutex: std::recursive_mutex與std::mutex類似,但是它能夠進行多次lock,這樣能夠規避一些死鎖問題:
(1)有時候會在兩個函數中分別對數據進行lock,如果在一個函數中又調用了另一個函數,此時如果使用std::mutex將會死鎖,而用std::recursive_mutex則不會。
(2)lock和unlock的數量必須相等:看起來std::recursive_mutex很不錯,但是使用的時候也需要多注意,否則會出錯。
(3)性能的問題,std::recursive_mutex的性能會比較差一些,可用接口auto begin = std::chrono::system_clock::now();測試。
——> > > std::time_mutex和std::recursive_timed_mutex: 這兩種互斥量類型和不帶time的相比,多了兩個成員函數:
try_lock_for():函數參數表示一個時間范圍,在這一段時間范圍之內線程如果沒有獲得鎖,則保持阻塞;如果在此期間其他線程釋放了鎖,則該線程可獲得該互斥鎖;如果超時(指定時間范圍內沒有獲得鎖),則函數調用返回false。
try_lock_until():函數參數表示一個時刻,在這一時刻之前線程如果沒有獲得鎖則保持阻塞;如果在此時刻前其他線程釋放了鎖,則該線程可獲得該互斥鎖;如果超過指定時刻沒有獲得鎖,則函數調用返回false。
二、C++11標准提供兩種基本鎖類型std::lock_guard和std::unique_lock,其模板類型可以是以上四種鎖,方便線程對互斥量鎖定解鎖,直到對象作用域結束。
互斥對象管理類模板的加鎖策略
前面提到std::lock_guard、std::unique_lock和std::shared_lock類模板在構造時是否加鎖是可選的,C++11提供了3種加鎖策略。
| 策略 | tag type | 描述 |
|---|---|---|
| (默認) | 無 | 請求鎖,阻塞當前線程直到成功獲得鎖。 |
| std::defer_lock | std::defer_lock_t | 不請求鎖。 |
| std::try_to_lock | std::try_to_lock_t | 嘗試請求鎖,但不阻塞線程,鎖不可用時也會立即返回。 |
| std::adopt_lock | std::adopt_lock_t | 假定當前線程已經獲得互斥對象的所有權,所以不再請求鎖。 |
下表列出了互斥對象管理類模板對各策略的支持情況。
| 策略 | std::lock_guard | std::unique_lock | std::shared_lock |
|---|---|---|---|
| (默認) | √ | √ | √(共享) |
| std::defer_lock | × | √ | √ |
| std::try_to_lock | × | √ | √ |
| std::adopt_lock | √ | √ | √ |
通過對lock和unlock進行一次薄的封裝,實現自動unlock的功能,兩者大部分功能是一樣的。
std::lock_guard 沒有多余的接口,構造函數時拿到鎖,析構函數時釋放鎖,更省時。
std::unique_lock提供了更好的上鎖和解鎖的控制,也更加靈活,提供了lock, unlock, try_lock等接口,所以更占資源和時間。支持std::lock_guard的功能,並且能夠和condition_variable一起使用來控制線程同步。
std::mutex mut; void insert_data() { std::lock_guard<std::mutex> lk(mut); queue.push_back(data); } void process_data() { std::unqiue_lock<std::mutex> lk(mut); queue.pop(); }
通過實現一個線程安全的隊列來說明兩者之間的差別
//.h頭文件 定義模板類
template <typename T>
class ThreadSafeQueue{ public: void Insert(T value); void Popup(T &value); bool Empety(); private: mutable std::mutex mut_; std::queue<T> que_;
//C++11新引入的condition_variable條件變量 std::condition_variable cond_; };
//.cpp文件 模板類中方法的實現
template <typename T>
void ThreadSafeQueue::Insert(T value){ std::lock_guard<std::mutex> lk(mut_); que_.push_back(value); cond_.notify_one(); } template <typename T>
void ThreadSafeQueue::Popup(T &value){ std::unique_lock<std::mutex> lk(mut_); //等待中的線程如果在等待期間需要解鎖mutex,並在之后重新將其鎖定 //wait函數會鎖定mutex。當lambda表達式返回false時,wait函數會解鎖mutex同時會將當前線程置於阻塞或等待狀態
cond_.wait(lk, [this]{return !que_.empety();}); value = que_.front(); que_.pop(); } template <typename T>
bool ThreadSafeQueue::Empty() const{ std::lock_guard<std::mutex> lk(mut_); return que_.empty(); }
三、std::try_lock、std::lock、std::call_once
——> > > std::try_lock支持嘗試對多個互斥量進行鎖定,嘗試鎖定成功返回-1,否則返回鎖定失敗的互斥量的位置,例如第一個鎖定失敗返回0、第二個失敗返回1。
int main () { std::mutex mtx1; std::mutex mtx2; if (-1 == std::try_lock(mtx1, mtx2)) { std::cout << "locked" << std::endl; mtx1.unlock(); mtx2.unlock(); } return 0; }
——> > > std::lock支持對多個鎖鎖定,並且避免死鎖的出現,以下代碼運行時有可能出現死鎖的情況:
void func(std::mutex* mtx1, std::mutex* mtx2, int index) { std::lock_guard<std::mutex> lock1(std::adopt_lock); std::lock_guard<std::mutex> lock2(std::adopt_lock); std::cout << index << "out\n"; } int main () { std::mutex mtx1; std::mutex mtx2; // 兩個線程的互斥量鎖定順序不同,可能造成死鎖
std::thread t1(func, &mtx1, &mtx2, 1); std::thread t2(func, &mtx2, &mtx1, 2); t1.join(); t2.join(); return 0; }
//而用std::lock能避免多個鎖出現死鎖:
void func(std::mutex* mtx1, std::mutex* mtx2, int index) { std::lock(*mtx1, *mtx2); // 同時鎖定 // std::adopt_lock作用是聲明互斥量已在本線程鎖定,std::lock_guard只是保證互斥量在作用域結束時被釋放
std::lock_guard<std::mutex> lock1(*mtx1, std::adopt_lock); std::lock_guard<std::mutex> lock2(*mtx2, std::adopt_lock); // 等價方法:【展示std::lock_guard和std::unique_lock使用時的區別】 //std::unique_lock<std::mutex> lock1(from.m, std::defer_lock); //std::unique_lock<std::mutex> lock2(to.m, std::defer_lock); //std::lock(lock1, lock2); std::cout << index << "out\n"; } int main () { std::mutex mtx1; std::mutex mtx2; std::thread t1(func, &mtx1, &mtx2, 1); std::thread t2(func, &mtx2, &mtx1, 2); t1.join(); t2.join(); return 0; }
——> > > std::call_once的作用是即使在多線程的情況下,也只執行一次指定的可調用對象(可以是函數、成員函數、函數對象、lambda函數),需要通過配合std::once_flag實現。具體的細節如下:
若在調用 call_once 的時刻, flag 指示已經調用了f指定的可調用對象,則 call_once 立即返回,就是說不再執行可調用對象。
否則,call_once 會調用指定的可調用對象。若該調用對象拋異常,則傳播異常給 call_once 的調用方,並且不翻轉 flag ,讓下一次調用仍然執行。若該調用正常返回,則翻轉 flag ,並保證同一 flag不在執行可調用對象。
四、條件變量std::condition_variable:
條件變量是並發程序設計中的一種控制結構。多個線程訪問一個共享資源(或稱臨界區)時,不但需要用互斥鎖實現獨享訪問以避免並發錯誤(稱為競爭危害),在獲得互斥鎖進入臨界區后還需要檢驗特定條件是否成立:C++11中引入了條件變量,其相關內容均在<condition_variable>中。這里主要介紹std::condition_variable類。
(1)、如果不滿足該條件,擁有互斥鎖的線程應該釋放該互斥鎖,把自身阻塞(block)並掛到(suspend)條件變量的線程隊列中
(2)、如果滿足該條件,擁有互斥鎖的線程在臨界區內訪問共享資源,在退出臨界區時通知(notify)在條件變量的線程隊列中處於阻塞狀態的線程,被通知的線程必須重新申請對該互斥鎖加鎖。
條件變量std::condition_variable用於多線程之間的通信,它可以阻塞一個或同時阻塞多個線程。std::condition_variable需要與std::unique_lock配合使用。std::condition_variable效果上相當於包裝了pthread庫中的pthread_cond_*()系列的函數。
當std::condition_variable對象的某個wait函數被調用的時候,它使用std::unique_lock(通過std::mutex)來鎖住當前線程。當前線程會一直被阻塞,直到另外一個線程在相同的std::condition_variable對象上調用了notification函數來喚醒當前線程。
std::condition_variable對象通常使用std::unique_lock<std::mutex>來等待,如果需要使用另外的lockable類型,可以使用std::condition_variable_any類。
std::condition_variable類的成員函數:
(1)、構造函數:僅支持默認構造函數,拷貝、賦值和移動(move)均是被禁用的。
(2)、wait:當前線程調用wait()后將被阻塞,直到另外某個線程調用notify_*喚醒當前線程;當線程被阻塞時,該函數會自動調用std::mutex的unlock()釋放鎖,使得其它被阻塞在鎖競爭上的線程得以繼續執行。一旦當前線程獲得通知(notify,通常是另外某個線程調用notify_*喚醒了當前線程),wait()函數也是自動調用std::mutex的lock()。
wait分為無條件被阻塞和帶條件的被阻塞兩種。
——> > > 無條件被阻塞:調用該函數前,當前線程應該已經對unique_lock<mutex> lck完成了加鎖。所有使用同一個條件變量的線程必須在wait函數中使用同一個unique_lock<mutex>。該wait函數內部會自動調用lck.unlock()對互斥鎖解鎖,使得其他被阻塞在互斥鎖上的線程恢復執行。使用本函數被阻塞的當前線程在獲得通知(notified,通過別的線程調用 notify_*系列的函數)而被喚醒后,wait()函數恢復執行並自動調用lck.lock()對互斥鎖加鎖。
——> > > 帶條件的被阻塞:wait函數設置了謂詞(Predicate),只有當pred條件為false時調用該wait函數才會阻塞當前線程,並且在收到其它線程的通知后只有當pred為true時才會被解除阻塞。因此,等效於while (!pred()) wait(lck).
(3)、wait_for:與wait()類似,只是wait_for可以指定一個時間段,在當前線程收到通知或者指定的時間超時之前,該線程都會處於阻塞狀態。而一旦超時或者收到了其它線程的通知,wait_for返回,剩下的步驟和wait類似。
(4)、wait_until:與wait_for類似,只是wait_until可以指定一個時間點,在當前線程收到通知或者指定的時間點超時之前,該線程都會處於阻塞狀態。而一旦超時或者收到了其它線程的通知,wait_until返回,剩下的處理步驟和wait類似。
(5)、notify_all: 喚醒所有的wait線程,如果當前沒有等待線程,則該函數什么也不做。
(6)、notify_one:喚醒某個wait線程,如果當前沒有等待線程,則該函數什么也不做;如果同時存在多個等待線程,則喚醒某個線程是不確定的(unspecified)。
條件變化存在虛假喚醒的情況,因此在線程被喚醒后需要檢查條件是否滿足。無論是notify_one或notify_all都是類似於發出脈沖信號,如果對wait的調用發生在notify之后是不會被喚醒的,所以接收者在使用wait等待之前也需要檢查條件是否滿足。
std::condition_variable_any類與std::condition_variable用法一樣,區別僅在於std::condition_variable_any的wait函數可以接受任何lockable參數,而std::condition_variable只能接受std::unique_lock<std::mutex>類型的參數。
std::notify_all_at_thread_exit函數:當調用該函數的線程退出時,所有在cond條件變量上等待的線程都會收到通知。
#include "condition_variable.hpp" #include <iostream> #include <chrono> #include <thread> #include <mutex> #include <condition_variable> #include <string>
namespace condition_variable_ { //////////////////////////////////////////////////////////////////////
// reference: http://www.cplusplus.com/reference/condition_variable/condition_variable/
std::mutex mtx; std::condition_variable cv; bool ready = false; static void print_id(int id) { std::unique_lock<std::mutex> lck(mtx); while (!ready) cv.wait(lck); // ...
std::cout << "thread " << id << '\n'; } static void go() { std::unique_lock<std::mutex> lck(mtx); ready = true; cv.notify_all(); } int test_condition_variable_1() { std::thread threads[10]; // spawn 10 threads:
for (int i = 0; i<10; ++i) threads[i] = std::thread(print_id, i); std::cout << "10 threads ready to race...\n"; go(); // go!
for (auto& th : threads) th.join(); return 0; } /////////////////////////////////////////////////////////////////////////
// reference: http://www.cplusplus.com/reference/condition_variable/condition_variable/wait/
// condition_variable::wait: Wait until notified, // The execution of the current thread (which shall have locked lck's mutex) is blocked until notified. // At the moment of blocking the thread, the function automatically calls lck.unlock(), allowing other locked threads to continue. // If pred is specified, the function only blocks if pred returns false, // and notifications can only unblock the thread when it becomes true (which is specially useful to check against spurious wake-up calls).
std::mutex mtx2; std::condition_variable cv2; int cargo = 0; static bool shipment_available() { return cargo != 0; } static void consume(int n) { for (int i = 0; i<n; ++i) { std::unique_lock<std::mutex> lck(mtx2); cv2.wait(lck, shipment_available); // consume:
std::cout << cargo << '\n'; cargo = 0; std::cout << "****: " << cargo << std::endl; } } int test_condition_variable_wait() { std::thread consumer_thread(consume, 10); // produce 10 items when needed:
for (int i = 0; i<10; ++i) { while (shipment_available()) std::this_thread::yield(); std::unique_lock<std::mutex> lck(mtx2); cargo = i + 1; cv2.notify_one(); } consumer_thread.join(); return 0; } /////////////////////////////////////////////////////////////////////////// // reference: http://www.cplusplus.com/reference/condition_variable/condition_variable/wait_for/
// condition_variable::wait_for: Wait for timeout or until notified // The execution of the current thread (which shall have locked lck's mutex) is blocked during rel_time, // or until notified (if the latter happens first). // At the moment of blocking the thread, the function automatically calls lck.unlock(), // allowing other locked threads to continue.
std::condition_variable cv3; int value; static void read_value() { std::cin >> value; cv3.notify_one(); } int test_condition_variable_wait_for() { std::cout << "Please, enter an integer (I'll be printing dots): \n"; std::thread th(read_value); std::mutex mtx; std::unique_lock<std::mutex> lck(mtx); while (cv3.wait_for(lck, std::chrono::seconds(1)) == std::cv_status::timeout) { std::cout << '.' << std::endl; } std::cout << "You entered: " << value << '\n'; th.join(); return 0; } ////////////////////////////////////////////////////////////////// // reference: http://www.cplusplus.com/reference/condition_variable/condition_variable/notify_one/
// condition_variable::notify_one: Notify one, Unblocks one of the threads currently waiting for this condition. // If no threads are waiting, the function does nothing. // If more than one, it is unspecified which of the threads is selected.
std::mutex mtx4; std::condition_variable produce4, consume4; int cargo4 = 0; // shared value by producers and consumers
static void consumer4() { std::unique_lock<std::mutex> lck(mtx4); while (cargo4 == 0) consume4.wait(lck); std::cout << cargo4 << '\n'; cargo4 = 0; produce4.notify_one(); } static void producer(int id) { std::unique_lock<std::mutex> lck(mtx4); while (cargo4 != 0) produce4.wait(lck); cargo4 = id; consume4.notify_one(); } int test_condition_variable_notify_one() { std::thread consumers[10], producers[10]; // spawn 10 consumers and 10 producers:
for (int i = 0; i<10; ++i) { consumers[i] = std::thread(consumer4); producers[i] = std::thread(producer, i + 1); } // join them back:
for (int i = 0; i<10; ++i) { producers[i].join(); consumers[i].join(); } return 0; } /////////////////////////////////////////////////////////////
// reference: http://www.cplusplus.com/reference/condition_variable/condition_variable/notify_all/
// condition_variable::notify_all: Notify all, Unblocks all threads currently waiting for this condition. // If no threads are waiting, the function does nothing.
std::mutex mtx5; std::condition_variable cv5; bool ready5 = false; static void print_id5(int id) { std::unique_lock<std::mutex> lck(mtx5); while (!ready5) cv5.wait(lck); // ...
std::cout << "thread " << id << '\n'; } static void go5() { std::unique_lock<std::mutex> lck(mtx5); ready5 = true; cv5.notify_all(); } int test_condition_variable_notify_all() { std::thread threads[10]; // spawn 10 threads:
for (int i = 0; i<10; ++i) threads[i] = std::thread(print_id5, i); std::cout << "10 threads ready to race...\n"; go5(); // go!
for (auto& th : threads) th.join(); return 0; } //////////////////////////////////////////////////////////// // reference: http://en.cppreference.com/w/cpp/thread/condition_variable
std::mutex m; std::condition_variable cv6; std::string data; bool ready6 = false; bool processed = false; static void worker_thread() { // Wait until main() sends data
std::unique_lock<std::mutex> lk(m); cv6.wait(lk, []{return ready6; }); // after the wait, we own the lock.
std::cout << "Worker thread is processing data\n"; data += " after processing"; // Send data back to main()
processed = true; std::cout << "Worker thread signals data processing completed\n"; // Manual unlocking is done before notifying, to avoid waking up // the waiting thread only to block again (see notify_one for details)
lk.unlock(); cv6.notify_one(); } int test_condition_variable_2() { std::thread worker(worker_thread); data = "Example data"; // send data to the worker thread
{ std::lock_guard<std::mutex> lk(m); ready6 = true; std::cout << "main() signals data ready for processing\n"; } cv6.notify_one(); // wait for the worker
{ std::unique_lock<std::mutex> lk(m); cv6.wait(lk, []{return processed; }); } std::cout << "Back in main(), data = " << data << '\n'; worker.join(); return 0; } } // namespace condition_variable_
