摘要
本篇文章圍繞以下幾個問題展開:
- 進程和線程的區別
- 何為並發?C++中如何解決並發問題?C++中多線程的基本操作 淺談C++11中的多線程(一) - 唯有自己強大 - 博客園 (cnblogs.com)
- 同步互斥原理以及如何處理數據競爭 淺談C++11中的多線程(二) - 唯有自己強大 - 博客園 (cnblogs.com)
- 條件變量和原子操作
條件變量
一、何為條件變量
在前一篇文章淺談C++11中的多線程(二) - 唯有自己強大 - 博客園 (cnblogs.com)中解釋了線程同步的原理和實現,使用互斥鎖解決數據競爭訪問問題。我們在使用mutex時,一般都會期望加鎖不要阻塞,總是能立刻拿到鎖,然后盡快訪問數據,用完之后盡快解鎖,這樣才能不影響並發性和性能。
如果需要等待某個條件的成立,我們就該使用條件變量(condition variable)了,那什么是條件變量呢?
C++11提供了condition_variable類。使用時需要include頭文件<condition_variable>。
簡單理解來說:如果把變量區看成是一座房子,那么前面兩篇頻繁用到的mutex可以看成是房門的鎖,正常來說是房門常年打開的,鎖並用不上。但是有了多線程以后,為了防止多個線程一窩蜂胡亂篡改里面的數據,所以就有了鎖的概念。
現在假設每個線程都有一個管理鎖的人,叫lock_guard,或者unique_lock,但是一次只能有一個人能夠去操作鎖(鎖上或者是解鎖)。一般來說他們是輪流去操作鎖。而condition_variable則可以看做是門童,如果沒有滿足條件,門童就會通知線程的管鎖人必須要休眠而不可以操作鎖,可是一旦條件滿足,他就會喚醒某些線程的管鎖人可以去操作鎖了。
二,為何要用條件變量
下面給出一個簡單的程序示例:一個線程往隊列中放入數據,一個線程從隊列中提取數據,取數據前需要判斷一下隊列中確實有數據,由於這個隊列是線程間共享的,所以,需要使用互斥鎖進行保護,一個線程在往隊列添加數據的時候,另一個線程不能取,反之亦然。程序實現代碼如下:
//cond_var1.cpp用互斥鎖實現一個生產者消費者模型 #include <iostream> #include <deque> #include <thread> #include <mutex> std::deque<int> q; //雙端隊列標准容器全局變量 std::mutex mu; //互斥鎖全局變量 //生產者,往隊列放入數據 void function_1() { int count = 10; while (count > 0) { std::unique_lock<std::mutex> locker(mu); q.push_front(count); //數據入隊鎖保護 locker.unlock(); std::this_thread::sleep_for(std::chrono::seconds(1)); //延時1秒 count--; } } //消費者,從隊列提取數據 void function_2() { int data = 0; while ( data != 1) { std::unique_lock<std::mutex> locker(mu); if (!q.empty()) { //判斷隊列是否為空 data = q.back(); q.pop_back(); //數據出隊鎖保護 locker.unlock(); std::cout << "t2 got a value from t1: " << data << std::endl; } else { locker.unlock(); } } } int main() { std::thread t1(function_1); std::thread t2(function_2); t1.join(); t2.join(); getchar(); return 0; }
從代碼中不難看出:在生產過程中,因每放入一個數據有1秒延時,所以這個生產的過程是很慢的;在消費過程中,存在着一個while循環,只有在接收到表示結束的數據的時候,才會停止,每次循環內部,都是先加鎖,判斷隊列不空,然后就取出一個數,最后解鎖。所以說,在1s內,做了很多無用功!這樣的話,CPU占用率會很高,可能達到100%(單核)。
這就引入了條件變量來解決該問題:條件變量使用“通知—喚醒”模型,生產者生產出一個數據后通知消費者使用,消費者在未接到通知前處於休眠狀態節約CPU資源;當消費者收到通知后,趕緊從休眠狀態被喚醒來處理數據,使用了事件驅動模型,在保證不誤事兒的情況下盡可能減少無用功降低對資源的消耗。
三,如何使用條件變量
C++標准庫在< condition_variable >中提供了條件變量,借由它,一個線程可以喚醒一個或多個其他等待中的線程。原則上,條件變量的運作如下:
- 你必須同時包含< mutex >和< condition_variable >,並聲明一個mutex和一個condition_variable變量;
- 那個通知“條件已滿足”的線程(或多個線程之一)必須調用notify_one()或notify_all(),以便條件滿足時喚醒處於等待中的一個條件變量;
- 那個等待"條件被滿足"的線程必須調用wait(),可以讓線程在條件未被滿足時陷入休眠狀態,當接收到通知時被喚醒去處理相應的任務;
//cond_var2.cpp用條件變量解決輪詢間隔難題 #include <iostream> #include <deque> #include <thread> #include <mutex> #include <condition_variable> std::deque<int> q; //雙端隊列標准容器全局變量 std::mutex mu; //互斥鎖全局變量 std::condition_variable cond; //全局條件變量 //生產者,往隊列放入數據 void function_1() { int count = 10; while (count > 0) { std::unique_lock<std::mutex> locker(mu); q.push_front(count); //數據入隊鎖保護 locker.unlock(); cond.notify_one(); // 向一個等待線程發出“條件已滿足”的通知 std::this_thread::sleep_for(std::chrono::seconds(1)); //延時1秒 count--; } } //消費者,從隊列提取數據 void function_2() { int data = 0; while (data != 1) { std::unique_lock<std::mutex> locker(mu); while (q.empty()) //判斷隊列是否為空 cond.wait(locker); // 解鎖互斥量並陷入休眠以等待通知被喚醒,被喚醒后加鎖以保護共享數據 data = q.back(); q.pop_back(); //數據出隊鎖保護 locker.unlock(); std::cout << "t2 got a value from t1: " << data << std::endl; } } int main() { std::thread t1(function_1); std::thread t2(function_2); t1.join(); t2.join(); getchar(); return 0; }
上面的代碼有四個注意事項:
- 在function_2中,在判斷隊列是否為空的時候,使用的是while(q.empty()),而不是if(q.empty()),因為wait的喚醒可能由於系統的原因被喚醒,這個的時機是不確定的。這個過程也被稱作偽喚醒。如果在錯誤的時候被喚醒了,執行后面的語句就會錯誤,所以需要再次判斷隊列是否為空,如果還是為空,就繼續wait()阻塞;
- 在管理互斥鎖的時候,使用的是std::unique_lock而不是std::lock_guard,而且事實上也不能使用std::lock_guard。這需要先解釋下wait()函數所做的事情,可以看到,在wait()函數之前,使用互斥鎖保護了,如果wait的時候什么都沒做,豈不是一直持有互斥鎖?那生產者也會一直卡住,不能夠將數據放入隊列中了。所以,wait()函數會先調用互斥鎖的unlock()函數,然后再將自己睡眠,在被喚醒后,又會繼續持有鎖,保護后面的隊列操作。lock_guard沒有lock和unlock接口,而unique_lock提供了,這就是必須使用unique_lock的原因;
- 使用細粒度鎖,盡量減小鎖的范圍,在notify_one()的時候,不需要處於互斥鎖的保護范圍內,所以在喚醒條件變量之前可以將鎖unlock()。
- cv.notify_one()指的是通知其中某一個線程,cv.notify_all()指的是通知全部線程。
下面給出條件變量支持的操作函數表:
值得注意的是:
- 所有通知(notification)都會被自動同步化,所以並發調用notify_one()和notify_all()不會帶來麻煩;
- 所有等待某個條件變量(condition variable)的線程都必須使用相同的mutex,當wait()家族的某個成員被調用時該mutex必須被unique_lock鎖定,否則會發生不明確的行為;
- wait()函數會執行“解鎖互斥量–>陷入休眠等待–>被通知喚醒–>再次鎖定互斥量–>檢查條件判斷式是否為真”幾個步驟,這意味着傳給wait函數的判斷式總是在鎖定情況下被調用的,可以安全的處理受互斥量保護的對象;但在"解鎖互斥量–>陷入休眠等待"過程之間產生的通知(notification)會被遺失。
原子操作
一、何為原子操作(atomic)
所謂的原子操作,取的就是“原子是最小的、不可分割的最小個體”的意義,它表示在多個線程訪問同一個全局資源的時候,能夠確保所有其他的線程都不在同一時間內訪問相同的資源。也就是他確保了在同一時刻只有唯一的線程對這個資源進行訪問。這有點類似互斥對象對共享資源的訪問的保護,但是原子操作更加接近底層,因而效率更高。
在新標准C++11,引入了原子操作的概念,並通過這個新的頭文件提供了多種原子操作數據類型,例如,atomic_bool,atomic_int等等,如果我們在多個線程中對這些類型的共享資源進行操作,編譯器將保證這些操作都是原子性的,也就是說,確保任意時刻只有一個線程對這個資源進行訪問,編譯器將保證,多個線程訪問這個共享資源的正確性。從而避免了鎖的使用,提高了效率。
二、atomic高效體現
使用atomic可以避免使用鎖,而且更加底層,比mutex效率更高。為了方便使用,c++11為模版函數提供了別名(即原子類型)。
💛我們先來看一個例子:(加鎖不使用atomic)
#include <iostream> #include <ctime> #include <mutex> #include <vector> #include <thread> using namespace std; mutex mtx; size_t Count = 0; void threadFun() { for (int i = 0; i < 10000; i++) { // 上鎖(防止多個線程同時訪問同一資源) unique_lock<mutex> lock(mtx); Count++; } } int main(void) { clock_t start_time = clock(); // 啟動多個線程 vector<thread> threads; for (int i = 0; i < 10; i++) threads.push_back(thread(threadFun)); for (auto& thad : threads) thad.join(); // 檢測count是否正確 10000*10 = 100000 cout << "count number:" << Count << endl; clock_t end_time = clock(); std::cout << "耗時:" << end_time - start_time << "ms" << std::endl; return 0; }
輸出結果:
🧡使用atomic:
#include <iostream> #include <ctime> #include <vector> #include <thread> #include <atomic> using namespace std; atomic<size_t> Count(0);//創建原子類型,將Cout初始化為0 void threadFun() { for (int i = 0; i < 10000; i++) Count++; } int main(void) { clock_t start_time = clock(); // 啟動多個線程 vector<thread> threads; for (int i = 0; i < 10; i++) threads.push_back(thread(threadFun)); for (auto& thad : threads) thad.join(); // 檢測count是否正確 10000*10 = 100000 cout << "count number:" << Count << endl; clock_t end_time = clock(); cout << "耗時:" << end_time - start_time << "ms" <<endl; return 0; }
總結:從上面的截圖可以發現,第一張圖用時33ms,第二張圖用時19ms,使用原子操作能提高程序的運行效率。
三,原子操作中的內存訪問模型
原子操作保證了對數據的訪問只有未開始和已完成兩種狀態,不會訪問到中間狀態,但我們訪問數據一般是需要特定順序的,比如想讀取寫入后的最新數據,原子操作函數是支持控制讀寫順序的,即帶有一個數據同步內存模型參數std::memory_order,用於對同一時間的讀寫操作進行排序。C++11定義的6種類型如下:
typedef enum memory_order { memory_order_relaxed, // 不對執行順序做保證 memory_order_acquire, // 本線程中,所有后續的讀操作必須在本條原子操作完成后執行 memory_order_release, // 本線程中,所有之前的寫操作完成后才能執行本條原子操作 memory_order_acq_rel, // 同時包含 memory_order_acquire 和 memory_order_release memory_order_consume, // 本線程中,所有后續的有關本原子類型的操作,必須在本條原子操作完成之后執行 memory_order_seq_cst // 全部存取都按順序執行 } memory_order;
內存訪問模型屬於比較底層的控制接口,如果對編譯原理和CPU指令執行過程不了解的話,容易引入bug。內存模型不是本章重點,這里不再展開介紹,后續的代碼都使用默認的順序一致性模型或比較穩妥的Release-Acquire模型,如果想了解更多,可以參考鏈接:std::memory_order - cppreference.com
四,使用原子類型實現自旋鎖
自旋鎖(spinlock)與互斥鎖(mutex)類似,在任一時刻最多只能有一個持有者,但如果資源已被占用,互斥鎖會讓資源申請者進入睡眠狀態,而自旋鎖不會引起調用者睡眠,會一直循環判斷該鎖是否成功獲取。
自旋鎖是專為防止多處理器並發而引入的一種鎖,它在內核中大量應用於中斷處理等部分。對於多核處理器來說,檢測到鎖可用與設置鎖狀態兩個動作需要實現為一個原子操作。
標准庫還專門提供了一個原子布爾類型std::atomic_flag,不同於所有 std::atomic 的特化,它保證是免鎖的,不提供load()與store(val)操作,但提供了test_and_set()與clear()操作:
- test_and_set,如果atomic_flag 對象已經被設置了,就返回True,如果未被設置,就設置之然后返回False(等價於上鎖)
- clear,把atomic_flag對象清掉(等價於解鎖)
注意這個所謂atomic_flag對象其實就是當前的線程。可用std::atomic_flag實現自旋鎖的功能,代碼如下:
//atomic2.cpp 使用原子布爾類型實現自旋鎖的功能 #include <thread> #include <vector> #include <iostream> #include <atomic> using namespace std; atomic_flag lock = ATOMIC_FLAG_INIT; //初始化原子布爾類型 void f(int n) { for (int cnt = 0; cnt < 10; ++cnt) { while (lock.test_and_set(memory_order_acquire));// 獲得鎖(自旋) cout << n << " thread Output: " << cnt << '\n'; lock.clear(memory_order_release); // 釋放鎖 } } int main() { vector<thread> v; //實例化一個元素類型為thread的向量 for (int n = 0; n < 10; ++n) { v.emplace_back(f, n); //以參數(f,n)為初值的元素放到向量末尾,相當於啟動新線程f(n) } for (auto& t : v) { //遍歷向量v中的元素,基於范圍的for循環,auto&自動推導變量類型並引用指針指向的內容 t.join(); //阻塞主線程直至子線程執行完畢 } getchar(); return 0; }