摘要
本篇文章圍繞以下幾個問題展開:
- 進程和線程的區別
- 何為並發?C++中如何解決並發問題?C++中多線程的基本操作 淺談C++11中的多線程(一) - 唯有自己強大 - 博客園 (cnblogs.com)
- 同步互斥原理以及如何處理數據競爭
- 條件變量和原子操作
一,同步互斥原理
首先說明兩個專業名詞。
臨界資源:對於同一進程的多個線程,進程資源中有些對線程是共享的,但有些資源一次只能供一個線程使用,這樣的資源被稱為臨界資源,也可以叫做互斥資源,即只能被各個線程互斥訪問。
臨界區:線程中對臨界資源實施操作的那段程序,稱之為臨界區。
- 同步是指散步在不同任務之間的若干程序片斷,它們的運行必須嚴格按照規定的某種先后次序來運行,這種先后次序依賴於要完成的特定的任務。比如 A 任務的運行依賴於 B 任務產生的數據。
舉例:假如程序中有一個靜態變量,static int a;線程1負責往里寫入數據,線程2需要讀取其中的數據,那么線程2在讀數據之前必須是線程1寫入了數據,如果不是,那么線程2必須停下來等待線程1的操作結束。這就是線程之間在某些地方上的合作關系,協同工作嘛!
- 互斥是指散步在不同任務之間的若干程序片斷,當某個任務運行其中一個程序片段時,其它任務就不能運行它們之中的任一程序片段,只能等到該任務運行完這個程序片段后才可以運行。
舉例:還是假如程序中有一個靜態變量,static int b;線程1想要往里寫入數據,線程2也想要往里寫入數據,那么此時靜態變量b就是一個臨界資源(互斥資源),即一次只能被一個線程訪問。
同步跟互斥都是針對於線程來說的,可以把這理解為是線程之間的合作關系和制約關系。可以這樣理解,同一進程的各個線程之間不可能是完全獨立的,或多或少會有關系,或是合作關系,或是制約關系。具體的例子放在了同步和互斥的介紹中。
二,如何處理數據競爭
從上面數據競爭形成的條件入手,數據競爭源於並發修改同一數據結構,那么最簡單的處理數據競爭的方法就是對該數據結構采用某種保護機制,確保只有進行修改的線程才能看到數據被修改的中間狀態,從其他訪問線程的角度看,修改不是已經完成就是還未開始。
在線程里也有這么一把鎖——互斥鎖(mutex),互斥鎖是一種簡單的加鎖的方法來控制對共享資源的訪問。只要某一個線程上鎖了,那么就會強行霸占公共資源的訪問權,其他的線程無法訪問直到這個線程解鎖了,從而保護共享資源。
1️⃣ lock與unlock保護共享資源
Mutex全名mutual exclusion(互斥體),是個object對象,用來協助采取獨占排他方式控制對資源的並發訪問。這里的資源可能是個對象,或多個對象的組合。為了獲得獨占式的資源訪問能力,相應的線程必須鎖定(lock) mutex,這樣可以防止其他線程也鎖定mutex,直到第一個線程解鎖(unlock) mutex。mutex類的主要操作函數見下表:
由圖可知互斥鎖只有兩種狀態,即上鎖( lock )和解鎖( unlock )。mutex不僅提供了常規鎖,還為常規鎖可能造成的阻塞提供了嘗試鎖(帶時間的鎖需要帶時間的互斥類timed_mutex支持,具體見下文)。下面先給出一段示例代碼:
// mutex1.cpp 通過互斥體lock與unlock保護共享全局變量 #include <chrono> #include <mutex> #include <thread> #include <iostream> std::chrono::milliseconds interval(100); std::mutex mutex;
//兩個全局變量 int job_shared = 0; //兩個線程都能修改'job_shared',mutex將保護此變量 int job_exclusive = 0; //只有一個線程能修改'job_exclusive',不需要保護 //此線程只能修改 'job_shared' void job_1() { mutex.lock(); std::this_thread::sleep_for(5 * interval); //令‘job_1’持鎖等待 ++job_shared; std::cout << "job_1 shared (" << job_shared << ")\n"; mutex.unlock(); } // 此線程能修改'job_shared'和'job_exclusive' void job_2() { while (true) { //無限循環,直到獲得鎖並修改'job_shared' if (mutex.try_lock()) { //嘗試獲得鎖成功則修改'job_shared' ++job_shared; std::cout << "job_2 shared (" << job_shared << ")\n"; mutex.unlock(); return; } else { //嘗試獲得鎖失敗,接着修改'job_exclusive' ++job_exclusive; std::cout << "job_2 exclusive (" << job_exclusive << ")\n"; std::this_thread::sleep_for(interval); } } } int main() { std::thread thread_1(job_1); std::thread thread_2(job_2); thread_1.join(); thread_2.join(); getchar(); return 0; }
從上面的代碼看,創建了兩個線程和兩個全局變量。
- 全局變量job_exclusive只有job2調用,因此兩線程並不共享,不會產生數據競爭,所以不需要鎖保護。
- 另一個全局變量job_shared是兩線程共享的,會引起數據競爭,因此需要鎖保護。線程thread_1持有互斥鎖lock的時間較長,線程thread_2為免於空閑等待,使用了嘗試鎖try_lock,如果獲得互斥鎖則操作共享變量job_shared,未獲得互斥鎖則操作排他變量job_exclusive,提高多線程效率。
輸出結果:
2️⃣ lock_guard與unique_lock保護共享資源
但lock與unlock必須成對合理配合使用,使用不當可能會造成資源被永遠鎖住,甚至出現死鎖(兩個線程在釋放它們自己的lock之前彼此等待對方的lock)。
相關知識點:
是不是想起了C++另一對兒需要配合使用的對象new與delete,若使用不當可能會造成內存泄漏等嚴重問題,為此C++引入了智能指針shared_ptr與unique_ptr。智能指針借用了RAII技術(Resource Acquisition Is Initialization—使用類來封裝資源的分配和初始化,在構造函數中完成資源的分配和初始化,在析構函數中完成資源的清理,可以保證正確的初始化和資源釋放)對普通指針進行封裝,達到智能管理動態內存釋放的效果。
同樣的,C++也針對lock與unlock引入了智能鎖lock_guard與unique_lock,同樣使用了RAII技術對普通鎖進行封裝,達到智能管理互斥鎖資源釋放的效果。lock_guard與unique_lock的區別如下:
lock_guard:
unique_lock:
從上面兩個支持的操作函數表對比來看,unique_lock功能豐富靈活得多。如果需要實現更復雜的鎖策略可以用unique_lock,如果只需要基本的鎖功能,優先使用更嚴格高效的lock_guard。兩種鎖的簡單概述與策略對比見下表:
如果將上面的普通鎖lock/unlock替換為智能鎖lock_guard,其中job_1函數代碼修改如下:
void job_1() { std::lock_guard<std::mutex> lockg(mutex); //獲取RAII智能鎖,離開作用域會自動析構解鎖 std::this_thread::sleep_for(5 * interval); //令‘job_1’持鎖等待 ++job_shared; std::cout << "job_1 shared (" << job_shared << ")\n"; }
如果也想將job_2的嘗試鎖try_lock也使用智能鎖替代,由於lock_guard鎖策略不支持嘗試鎖,只好使用unique_lock來替代,代碼修改如下(其余代碼和程序執行結果與上面相同):
void job_2() { while (true) { //無限循環,直到獲得鎖並修改'job_shared' std::unique_lock<std::mutex> ulock(mutex, std::try_to_lock); //以嘗試鎖策略創建智能鎖 //嘗試獲得鎖成功則修改'job_shared' if (ulock) { ++job_shared; std::cout << "job_2 shared (" << job_shared << ")\n"; return; } else { //嘗試獲得鎖失敗,接着修改'job_exclusive' ++job_exclusive; std::cout << "job_2 exclusive (" << job_exclusive << ")\n"; std::this_thread::sleep_for(interval); } } }
3️⃣timed_mutex與recursive_mutex提供更強大的鎖
前面介紹的互斥量mutex提供了普通鎖lock/unlock和智能鎖lock_guard/unique_lock,基本能滿足我們大多數對共享數據資源的保護需求。但在某些特殊情況下,我們需要更復雜的功能,比如某個線程中函數的嵌套調用可能帶來對某共享資源的嵌套鎖定需求,mutex在一個線程中卻只能鎖定一次;再比如我們想獲得一個鎖,但不想一直阻塞,只想等待特定長度的時間,mutex也沒提供可設定時間的鎖。針對這些特殊需求,< mutex >庫也提供了下面幾種功能更豐富的互斥類,它們間的區別見下表:
繼續用前面的例子,將mutex替換為timed_mutex,將job_2的嘗試鎖tyr_lock()替換為帶時間的嘗試鎖try_lock_for(duration)。由於改變了嘗試鎖的時間,所以在真正獲得鎖之前的嘗試次數也有變化,該變化體現在嘗試鎖失敗后對排他變量job_exclusive的最終修改結果或修改次數上。更新后的代碼如下所示:
#include <chrono> #include <mutex> #include <thread> #include <iostream> std::chrono::milliseconds interval(100); std::timed_mutex tmutex; int job_shared = 0; //兩個線程都能修改'job_shared',mutex將保護此變量 int job_exclusive = 0; //只有一個線程能修改'job_exclusive',不需要保護 //此線程只能修改 'job_shared' void job_1() { std::lock_guard<std::timed_mutex> lockg(tmutex); //獲取RAII智能鎖,離開作用域會自動析構解鎖 std::this_thread::sleep_for(5 * interval); //令‘job_1’持鎖等待 ++job_shared; std::cout << "job_1 shared (" << job_shared << ")\n"; } // 此線程能修改'job_shared'和'job_exclusive' void job_2() { while (true) { //無限循環,直到獲得鎖並修改'job_shared' std::unique_lock<std::timed_mutex> ulock(tmutex,std::defer_lock); //創建一個智能鎖但先不鎖定 //嘗試獲得鎖成功則修改'job_shared' if (ulock.try_lock_for(3 * interval)) { //在3個interval時間段內嘗試獲得鎖 ++job_shared; std::cout << "job_2 shared (" << job_shared << ")\n"; return; } else { //嘗試獲得鎖失敗,接着修改'job_exclusive' ++job_exclusive; std::cout << "job_2 exclusive (" << job_exclusive << ")\n"; std::this_thread::sleep_for(interval); } } } int main() { std::thread thread_1(job_1); std::thread thread_2(job_2); thread_1.join(); thread_2.join(); getchar(); return 0; }
前章解答
前一篇文章中最后的程序運行結果可能會出現某行與其他行交疊錯亂的情況,主要是由於不止一個線程並發訪問了std::cout顯示終端資源導致的,解決方案就是對cout << “somethings” << endl語句加鎖,保證多個線程對cout資源的訪問同步。為了盡可能降低互斥鎖對性能的影響,應使用微粒鎖,即只對cout資源訪問語句進行加鎖保護,cout資源訪問完畢盡快解鎖以供其他線程訪問該資源。添加互斥鎖保護后的代碼如下:
//thread2.cpp 增加對cout顯示終端資源並發訪問的互斥鎖保護 #include <iostream> #include <thread> #include <chrono> #include <mutex> using namespace std; std::mutex mutex1; void thread_function(int n) { std::thread::id this_id = std::this_thread::get_id(); //獲取線程ID for (int i = 0; i < 5; i++) { mutex1.lock(); cout << "子線程" << this_id << " 運行 : " << i + 1 << endl; mutex1.unlock(); std::this_thread::sleep_for(std::chrono::seconds(n)); //進程睡眠n秒 } } class Thread_functor { public: // functor行為類似函數,C++中的仿函數是通過在類中重載()運算符實現,使你可以像使用函數一樣來創建類的對象 void operator()(int n) { std::thread::id this_id = std::this_thread::get_id(); for (int i = 0; i < 5; i++) { { std::lock_guard<std::mutex> lockg(mutex1); cout << "子仿函數線程" << this_id << " 運行: " << i + 1 << endl; } std::this_thread::sleep_for(std::chrono::seconds(n)); //進程睡眠n秒 } } }; int main() { thread mythread1(thread_function, 1); // 傳遞初始函數作為線程的參數 if (mythread1.joinable()) //判斷是否可以成功使用join()或者detach(),返回true則可以,返回false則不可以 mythread1.join(); // 使用join()函數阻塞主線程直至子線程執行完畢 Thread_functor thread_functor; thread mythread2(thread_functor, 3); // 傳遞初始函數作為線程的參數 if (mythread2.joinable()) mythread2.detach(); // 使用detach()函數讓子線程和主線程並行運行,主線程也不再等待子線程 auto thread_lambda = [](int n) { std::thread::id this_id = std::this_thread::get_id(); for (int i = 0; i < 5; i++) { mutex1.lock(); cout << "子lambad線程" << this_id << " 運行: " << i + 1 << endl; mutex1.unlock(); std::this_thread::sleep_for(std::chrono::seconds(n)); //進程睡眠n秒 } }; thread mythread3(thread_lambda, 4); // 傳遞初始函數作為線程的參數 if (mythread3.joinable()) mythread3.join(); // 使用join()函數阻塞主線程直至子線程執行完畢 unsigned int n = std::thread::hardware_concurrency(); //獲取可用的硬件並發核心數 mutex1.lock(); std::cout << n << " 支持並發線程" << endl; mutex1.unlock(); std::thread::id this_id = std::this_thread::get_id(); for (int i = 0; i < 5; i++) { { std::lock_guard<std::mutex> lockg(mutex1); cout << "主線程" << this_id << " 運行: " << i + 1 << endl; } std::this_thread::sleep_for(std::chrono::seconds(1)); } getchar(); return 0; }
多次運行結果:(已解決)