C++11之前沒有對並發編程提供語言級別的支持,這使得我們在編寫可移植的並發程序時,存在諸多的不便。現在C++11增加了線程以及線程相關的類,很方便地支持了並發編程,使得編寫的多線程程序的可移植性得到了很大的提高。
1. 線程
1.1 線程的創建
#inclde <thread>
std::thead t(ThreadFun, parm1, parm2,...);
t.join();或t.detach();
join會阻塞線程,直到線程函數結束
detach讓線程和線程對象分離,讓線程作為后台線程去執行,但需要注意,detach之后就無法再和線程發生聯系了,等於說失去了對線程的控制權。
2. 互斥量
C++11提供了以下4中語義的互斥量:
std::mutex:獨占互斥量,不能遞歸使用
std::timed_mutex:帶超時的獨占互斥量,不能遞歸使用
std::recursive_mutex:遞歸互斥量,不帶超時功能
std::recursive_timed_mutex:帶超時的遞歸互斥量
2.1 獨占互斥量
std::mutex m_mutex;
mutex.lock();
do something;
mutex.unlock();
注意:使用std::lock_guard<mutex> locker(m_mutex);可以簡化lock/unlock的寫法,同時也更安全,因為lock_guard在構造的時候會自動鎖定互斥量,而在退出作用域后進行析構時就會自動解鎖,從而保證了互斥量的正確操作。
try_lock()嘗試鎖定互斥量,如果成功則返回true
2.2 遞歸的獨占互斥量
需要注意的是盡量不要使用遞歸鎖:
(1)需要用到遞歸鎖的多線程互斥處理本身就應該可以簡化的,運行遞歸互斥很容易放縱復雜邏輯的產生
(2)遞歸鎖比起非遞歸鎖要麻煩,效率低
(3)遞歸鎖雖然允許同一個線程多次獲得同一個互斥量,可重復獲得的最大次數並未具體說明,一旦超過一定次數會拋出std::system錯誤
2.3 帶超時的互斥量和遞歸帶超時的互斥量
std::timed_mutex比std::mutex多了兩個超時獲取鎖的接口,try_lock_for和try_lock_until,這兩個接口是用開設置獲取互斥量的超時時間,使用時可以用一個while循環去不斷地獲取互斥量。
std::timed_mutex mutex;
std::chrono::secord timeout(2);
if (mutex.try_lock_for(timeout))
cout << "do work with the mutex" << endl;
else:
cout << "do work without the mutex" << endl;
3. 條件變量
3.1 說明
條件變量用於線程的同步機制,它能阻塞一個或多個線程,直到收到另外一個線程發出的同質或者超時,才會喚醒當前阻塞的線程。條件變量需要和互斥變量結合起來用。
C++提供了兩種條件變量:
(1)condition_variable,配合std::unique_lock<std::mutex>進行wait操作
(2)condition_variable_any,和任意帶有lock,unlock語義的mutex搭配使用,比較靈活,但效率比condition_variable低
注意以下函數的使用:
(1)std::lock_guard,它利用了RAII機制可以保證mutex的安全釋放
(2)std::unique_lock與lock_guard的區別在與,前者可以自由地釋放mutex,而后者需要等到std::lock_guard變量生命周期結束時才能釋放。
3.2 示例實現消息循環隊列
3.2.1 實現代碼
// 使用C++11的新特性實現線程安全的循環消息隊列 #pragma once #include<iostream> #include<mutex> #include<condition_variable> using namespace std; #define MAXQUEUELEN 32 template<typename T> class CMsgQueue { public: CMsgQueue() { m_pQueue = new T[MAXQUEUELEN]; m_nHead = m_nTail = 0; } ~CMsgQueue() { Clear(); } void Push(T msg) { unique_lock<mutex> lock(m_Mutex); while (IsFull()) { cout << "消息隊列已經滿,請等待..." << endl; m_ConditionVar.wait(lock); } cout << "Push: " << msg << endl; m_pQueue[m_nTail] = msg; m_nTail = m_nTail % MAXQUEUELEN + 1; m_ConditionVar.notify_all(); } bool IsFull() { return (m_nTail + 1) % MAXQUEUELEN == m_nHead; } bool IsEmpty() { return m_nTail == m_nHead; } T Pop() { unique_lock<mutex> lock(m_Mutex); while (IsEmpty()) { cout << "消息隊列為空,請等待..." << endl; m_ConditionVar.wait(lock); } T msg = m_pQueue[m_nHead]; cout << "Pop: " << msg << endl; m_nHead = m_nHead % MAXQUEUELEN + 1; m_ConditionVar.notify_all(); return msg; } void Clear() { if (m_pQueue != NULL) { delete[] m_pQueue; m_pQueue = NULL; } } private: T *m_pQueue; int m_nHead; int m_nTail; mutex m_Mutex; condition_variable m_ConditionVar; };
3.2.2 測試
void fun1(CMsgQueue<int> *pQueue) { for (int i = 0; i < 40; i++) { //this_thread::sleep_for(chrono::seconds(2)); pQueue->Push(i); } } void fun2(CMsgQueue<int> *pQueue) { for (int i = 0; i < 20; i++) { this_thread::sleep_for(chrono::seconds(2)); pQueue->Pop(); } } void main() { CMsgQueue<int> *pQueue = new CMsgQueue<int>; thread t1(fun1, pQueue); thread t2(fun2, pQueue); t1.join(); t2.join(); return; }