線程的管理
啟動線程
為了讓編譯器識別 std::thread 類,這個簡單的例子也要包含 <thread> 頭文件。
如同大多數C++標准庫一樣
線程在std::thread對象創建(為線程指定任務)啟動
無參任務
最簡單的任務,通常是無參數無返回(void-returning)的函數,這種函數在其所屬線程上運行,直到函數執行完畢,線程也就結束了。
例如:
#include<iostream>
#include<thread>
using namespace std;
void go()
{
cout << "Welcome to Thread!";
}
void main()
{
thread t1(go);
cin.get();
}
運行結果
有參任務
std::thread 可以用可調用(callable)類型構造,將帶有函數調用符類型
的實例傳入 std::thread 類中,替換默認的構造函數。
#include <iostream>
#include <thread>
#include <string>
using namespace std;
void run(int num)
{
cout << "線程" << num << endl;
}
void main()
{
thread p(run,1);
cin.get();
}
執行結果!
等待線程
啟動了線程,你需要明確是要等待線程結束(加入式joined),還是讓其自主運行(分
離式——detached)。如果 std::thread 對象銷毀之前還沒有做出決定,程序就會終止
( std::thread 的析構函數會調用 std::terminate() )。因此,即便是有異常存在,也需要確保線程能夠正確的加入(joined)或分離(detached)
例如
#include <iostream>
#include <thread>
#include <string>
#include <chrono> //c++時間庫
using namespace std;
void run(int num)
{
chrono::seconds(3); //c++標准庫休眠3秒鍾
std::cout << "線程" << num << endl;
}
void main()
{
thread t(run, 1);
t.join(); //阻塞主函數等待等待線程結束
cin.get();
}
joinable()查看當前線程是否被join true沒有flase成功
#include <iostream>
#include <thread>
#include <string>
#include <chrono> //c++時間庫
using namespace std;
void run(int num)
{
chrono::seconds(3); //c++標准庫休眠
std::cout << "線程" << num << endl;
}
void main()
{
thread t(run, 1);
if(t.joinable())
t.join();
cin.get();
}
分離線程
使用detach()會讓線程在后台運行,這就意味着主線程不能與之產生直接交互。也就是說,不會等待這個線程結束;
如果線程分離,那么就不可能有 std::thread 對象能引用它,分離線程
的確在后台運行,所以分離線程不能被加入。不過C++運行庫保證,當線程退出時,相關資源的能夠正確回收,后台線程的歸屬和控制C++運行庫都會處理。
例如
#include <iostream>
#include <thread>
#include <string>
#include <chrono> //c++時間庫
using namespace std;
void run(int num)
{
chrono::seconds(3); //c++標准庫休眠
std::cout << "線程" << num << endl;
}
void main()
{
thread t(run, 1);
t.detach(); //脫離當前主線程自由執行
cin.get();
}
轉移線程所有權
假設要寫一個在后台啟動線程的函數,想通過新線程返回的所有權去調用這個函數,而不是
等待線程結束再去調用;或完全與之相反的想法:創建一個線程,並在函數中轉移所有權,
都必須要等待線程結束。總之,新線程的所有權都需要轉移。
線程的所有權可以在 std::thread 實例中移動,下面將展示一個例子。
例如:
#include<iostream>
#include<thread>
using namespace std;
void run1()
{
cout << "run1" << endl;
}
void run2()
{
cout << "run2" << endl;
}
void main()
{
std::thread t1(run1); // 1
std::thread t2 = std::move(t1); // 2當顯式使用 std::move() 創建t2后,t1的所有權就轉移給了t2
t1 = std::thread(run2);
cin.get();
}
std::thread 支持移動,就意味着線程的所有權可以在函數外進行轉移,就如下面程序一樣。
#include<iostream>
#include<thread>
using namespace std;
std::thread run1()
{
void some_function();
return std::thread(some_function);
}
void main()
{
void some_function();
thread t1(std::thread(run1));
}
運行時決定線程數量
std::thread::hardware_concurrency()
這個函數將返回能同時並發在一個程序中的線程數量。
例如,多核系統中,返回值就可以能是CPU核芯的數量。
返回值也僅僅是一個提示,當系統信息無法獲取時,函數也會返回0。但是,這也無
法掩蓋這個函數對啟動線程數量的幫助。
使用線程組來分割任。
//如下
//將100個任務分片,分成4片
#include <iostream>
#include <thread>
#include <vector>
#include <string>
#include <iterator>
#include <numeric>
#include <algorithm>
using namespace std;
template<typename Iterator, typename T>
struct accumulate_block
{
void operator()(Iterator first, Iterator last, T& result) //迭代器頭,迭代器尾,線程的數量 (重載)
{
result = std::accumulate(first, last, result);
//累加 開始 結束 累加的初值
}
};
template<typename Iterator, typename T>
T parallel_accumulate(Iterator first, Iterator last, T init)
{
unsigned long const length = std::distance(first, last);
// 若輸入數據為空,則返回初始值
if (!length)
return init;
// 計算所需要的最大線程數量,每個線程至少計算25個數據
unsigned long const min_per_thread = 25;
unsigned long const max_threads =
(length + min_per_thread - 1) / min_per_thread;
// 獲取硬件可並發線程數量
unsigned long const hardware_threads =
std::thread::hardware_concurrency();
// 計算實際要創建的線程數量
unsigned long const num_threads =
std::min(hardware_threads != 0 ? hardware_threads : 2, max_threads);
// 根據線程數量,拆分數據
unsigned long const block_size = length / num_threads;
// 創建用於存放每個線程計算結果的容器和線程
std::vector<T> results(num_threads);
std::vector<std::thread> threads(num_threads - 1);
Iterator block_start = first;
for (unsigned long i = 0; i<(num_threads - 1); ++i)
{
Iterator block_end = block_start;
// 移動迭代器
std::advance(block_end, block_size);
// 啟動新線程,對一塊數據進行處理
threads[i] = std::thread(
accumulate_block<Iterator, T>(),
block_start, block_end, std::ref(results[i]));
// 為下一個線程准備數據
block_start = block_end;
}
// 當啟動了所有的子線程對數據進行計算,本線程就對數據的最后一塊進行計算
accumulate_block<Iterator, T>()(block_start, last, results[num_threads - 1]);
// 使用fore_each對所有的線程執行join操作,等待它們執行結束
std::for_each(threads.begin(), threads.end(),
std::mem_fn(&std::thread::join));
// 最后對所有的計算結果求和
return std::accumulate(results.begin(), results.end(), init);
}
int main()
{
std::cout << "threads: " << std::thread::hardware_concurrency() << std::endl;
std::vector<int> vi;
for (int i = 0; i<100; ++i)
{
vi.push_back(1);
}
int sum = parallel_accumulate(vi.begin(), vi.end(), 5);
std::cout << "sum=" << sum << std::endl;
cin.get();
return 0;
}
識別線程
線程標識類型是 std::thread::id ,可以通過兩種方式進行檢索。
第一種,可以通過調用 std::thread 對象的成員函數 get_id() 來直接獲取。
如果 std::thread 對象沒有與任何執行線程相關聯, get_id() 將返回 std::thread::type 默認構造值,這個值表示“沒有線程”。
第
二種,當前線程中調用 std::this_thread::get_id() (這個函數定義在 <thread> 頭文件中)也可
以獲得線程標識
std::thread::id 實例常用作檢測線程是否需要進行一些操作,比如:當用線程來分割一項工
主線程可能要做一些與其他線程不同的工作。這種情況下,啟動其他線程
前,它可以將自己的線程ID通過 std::this_thread::get_id() 得到,並進行存儲。
就是算法核心部分(所有線程都一樣的),每個線程都要檢查一下,其擁有的線程ID是否與初始線程的ID相同。
std::thread::id master_thread;
void some_core_part_of_algorithm()
{
if(std::this_thread::get_id()==master_thread)
{
do_master_thread_work();
}
do_common_work();
}
總結
討論了C++標准庫中基本的線程管理方式:啟動線程,等待結束和不等待結束(因為需要它們運行在后台)。
並了解應該如何在線程啟動前,向線程函數中傳遞參數,如何轉移線程的
所有權,如何使用線程組來分割任務。
最后使用線程標識來確定關聯數據,以及特殊線程的特殊解決方案
線程間共享數據
當線程在訪問共享數據的時候,必須定一些規矩,用來限定線程可訪問的數據。
還有,一個線程更新了共享數據,需要對其他線程進行通知。
從易用性的角度,同一進程中的多個線程進行數據共享,有利有弊。
錯誤的共享數據使用是產生並發bug的一個主要原因。
共享數據帶來的問題
當涉及到共享數據時,問題很可能是因為共享數據修改所導致。
如果共享數據是只讀的,那么只讀操作不會影響到數據,更不會涉及對數據的修改,所以所有線程都會獲得同樣的數據。
但是,當一個或多個線程要修改共享數據時,就會產生很多麻煩。這種情況下,就必須
小心謹慎,才能確保一切所有線程都工作正常
例如破壞一個鏈表
如圖:
條件競爭
良心競爭:
並發中競爭條件的形成,取決於一個以上線程的相對執行順序,每個線程都搶着完成自己的任務。大多數情況下,即使改變執行順序,也是良性競爭,其結果可以接受。
惡心競爭:
例如,有兩個線程同時向一個處理隊列中添加任務,因為系統提供的變量保持不變,所以誰先誰后都不會有什么影響。當變量遭到破壞時,才會產生條件競爭。
並發中對數據的條件競爭通常表示為“惡性”(problematic)條件競爭,我們對不產生問題的良性條件競爭不感興趣。
C++標准中也定義了數據競爭(data race)這個術語,一種特殊的條件競爭:並發的
去修改一個獨立對象,數據競爭是(可怕的)定義行為(undefine behavior)的起
因。
避免惡性條件競爭
這里提供一些方法來解決惡性條件競爭,最簡單的辦法就是對數據結構采用某種保護機制,確保只有進行修改的線程才能看到不變量被破壞時的中間狀態。
從其他訪問線程的角度來看,修改不是已經完成了,就是還沒開始。
另一個選擇是對數據結構和不變量的設計進行修改,修改完的結構必須能完成一系列不可分
割的變化,也就是保證每個不變量保持穩定的狀,這就是無鎖編程
另一種處理條件競爭的方式是,使用事務(transacting)的方式去處理數據結構的更新,這里的"處理"就如同對數據庫進行更新一樣。
所需的一些數據和讀取都存儲在事務日志中,然后將之前的操作合為一步,再進行提交。
當數據結構被另一個線程修改后,或處理已經重啟的情況下,提交就會無法進行,這稱作為“軟件事務內存”(software transactional memory
(STM))。理論研究中,這是一個很熱門的研究領域。這個概念將不會在本書中再進行介紹,
因為在C++中沒有對STM進行直接支持。
保護共享數據結構的最基本的方式,是使用C++標准庫提供的互斥量(mutex)。
使用互斥量保護共享數據
當程序中有共享數據,肯定不想讓其陷入條件競爭,或是不變量被破壞。
那么,將所有訪問共享數據結構的代碼都標記為互斥豈不是更好?這樣任何一個線程在執行這些代碼時,其他任何線程試圖訪問共享數據結構,就必須等到那一段代碼執行結束。
於是,一個線程就不可能會看到被破壞的不變量,除非它本身就是修改共享數據的線程。
當訪問共享數據前,使用互斥量將相關數據鎖住,再當訪問結束后,再將數據解鎖。線程庫需要保證,當一個線程使用特定互斥量鎖住共享數據時,其他的線程想要訪問鎖住的數據,
都必須等到之前那個線程對數據進行解鎖后,才能進行訪問。這就保證了所有線程能看到共享數據,而不破壞不變量。
互斥量是C++中一種最通用的數據保護機制,但它不是“銀蛋”;精心組織代碼來保護正確的數據,並在接口內部避免競爭條件是非常重要的。但互斥量自身也有問
題,也會造成死鎖,或是對數據保護的太多(或太少)。
C++中使用互斥量
C++中通過實例化 srd::mutex 創建互斥量,通過調用成員函數lock()進行上鎖,unlock()進行解鎖。
不推薦實踐中直接去調用成員函數,因為調用成員函數就意味着,必須記住在每個函數出口都要去調用unlock(),也包括異常的情況。
C++標准庫為互斥量提供了一個RAII語法的模板類 std::lack_guard ,其會在構造的時候提供已鎖的互斥量,並在析構的時候進行解鎖,從而保證了一個已鎖的互斥量總是會被正確的解鎖。
std::mutex 和 std::lock_guard 都在 <mutex> 頭文件中聲明。
實踐調用成員函數
//進程的鎖定
#include <iostream>
#include <thread>
#include <string>
#include<windows.h>
#include<mutex>
using namespace std;
//兩個線程並行訪問一個變量
int g_num = 20;//找到或者找不到的標識
mutex g_mutex;
void goA(int num)
{
g_mutex.lock();//你訪問的變量,在你訪問期間,別人訪問不了
for (int i = 0; i < 15; i++)
{
g_num = 10;
std::cout << "線程" << num << " " << g_num << endl;
}
g_mutex.unlock();
}
void goB(int num)
{
for (int i = 0; i < 15; i++)
{
g_num = 11;
std::cout << "線程" << num << " " << g_num << endl;
}
}
void main()
{
thread t1(goA, 1);
thread t2(goB, 2);
t1.join();
t2.join();
std::cin.get();
}
運行結果
RAII語法的模板類lack_guard()
RAII語法實現自動解鎖
//進程的鎖定
#include <iostream>
#include <thread>
#include<mutex>
using namespace std;
//兩個線程並行訪問一個變量
int g_num = 20;//找到或者找不到的標識
mutex g_mutex;
void goA(int num)
{
lock_guard<std::mutex>guard(g_mutex);//自動解鎖
for (int i = 0; i < 15; i++)
{
g_num = 10;
std::cout << "線程" << num << " " << g_num << endl;
}
}
void goB(int num)
{
for (int i = 0; i < 15; i++)
{
g_num = 11;
std::cout << "線程" << num << " " << g_num << endl;
}
}
void main()
{
thread t1(goA, 1);
thread t2(goB, 2);
t1.join();
t2.join();
std::cin.get();
}
精心組織代碼來保護共享數據
用互斥量來保護數據,並不是僅僅在每一個成員函數中都加入一個 std::lock_guard 對象那么簡單。
一個迷失的指針或引用,將會讓這種保護形同虛設。
函數可能沒在互斥量保護的區域內,存儲着指針或者引用,這樣就很危險。
更危險的是:將保護數據作為一個運行時參數.
如同下面:
#include <iostream>
#include <thread>
#include<mutex>
class some_data
{
public :
int a;
std::string b;
public:
void do_something()
{
std::cout << a;
}
};
class data_wrapper
{
private:
some_data data;
std::mutex m;
public:
template<typename Function>
void process_data(Function func) //通過傳遞的函數將,保護的數據傳遞出去,跳過保護
{
std::lock_guard<std::mutex> l(m);
data.a = 10;
func(data); // 1 傳遞“保護”數據給用戶函數
}
};
some_data* unprotected;
void malicious_function(some_data& protected_data)
{
unprotected = &protected_data;
}
data_wrapper x;
void foo()
{
x.process_data(malicious_function); // 2 傳遞一個惡意函數
unprotected->do_something(); // 3 在無保護的情況下訪問保護數據
}
void main()
{
foo();
std::cin.get();
}
例子中process_data看起來沒有任何問題, std::lock_guard 對數據做了很好的保護,但調用
用戶提供的函數func①,就意味着foo能夠繞過保護機制將函數 malicious_function 傳遞進去
在沒有鎖定互斥量的情況下調用 do_something() 。
這段代碼的問題在於,它根本沒有做到保護:只是將所有可訪問的數據結構代碼標記為互斥。
發現接口內在的條件競爭
因為使用了互斥量或其他機制保護了共享數據,就不必再為條件競爭所擔憂嗎?並不是這樣,你依舊需要確定特定的數據受到了保護。
回想之前雙鏈表的例子,為了能讓線程安全地刪除一個節點,需要確保防止對這三個節點(待刪除的節點及其前后相鄰的節點)的並發訪問。
如果只對指向每個節點的指針進行訪問保護,那就和沒有使用互斥量一樣,條件競爭仍會發生——整個數據結構和整個刪除操作需要保護,但指針不需要保護。
這種情況下最簡單的解決方案就是使用互斥量來保護整個鏈表,盡管對鏈表的個別操作是安全的,但不意味着你就能走出困境;即使在一個很簡單的接口中,依舊可能遇到條件競爭
例如,構建一個類似於 std::stack 結構的棧除了構造函數和swap()以外,需要對 std::stack 提供五個操作:push()一個新元素進棧,pop()一個元素出棧,top()查看棧頂元素,empty()判斷棧是否是空棧,size()了解棧中有多少個元素。
即使修改了top(),使其返回一個拷貝而非引用,對內部數據使用一個互斥量進行保護,不過這個接口仍存在條件競爭。
這個問題不僅存在於基於互斥量實現的接口中,在無鎖實現的接口中,條件競爭依舊會產生。
這是接口的問題,與其實現方式無關。
一個給定操作需要兩個或兩個以上的互斥量時,另一個潛在的問題將出現:死鎖(deadlock)。
與條件競爭完全相反——不同於兩個線程會互相等待,從而什么都沒做。
死鎖
但線程有對鎖的競爭:一對線程需要對他們所有的互斥量做一些操作,其中每個線程都有一個互斥量,且等待另一個解鎖。
這樣沒有線程能工作,因為他們都在等待對方釋放互斥量。這種情況就是死鎖,它的最大問題就是由兩個或兩個以上的互斥量來鎖定一個操作。
避免死鎖的一般建議,就是讓兩個互斥量總以相同的順序上鎖:總在互斥量B之前鎖住互斥量A,就永遠不會死鎖。某些情況下是可以這樣用,因為不同的互斥量用於不同的地方。
不過,事情沒那么簡單,比如:當有多個互斥量保護同一個類的獨立實例時,一個操作對同一個類的兩個不同實例進行數據的交換操作,為了保證數據交換操作的正確性,就要避免數據被並發修改,並確保每個實例上的互斥量都能鎖住自己要保護的區域。
不過,選擇一個固定的順序(例如,實例提供的第一互斥量作為第一個參數,提供的第二個互斥量為第二個參數),可能會適得其反:在參數交換了之后,兩個線程試圖在相同的兩個實例間進行數據交換時,程序又死鎖了!
std::lock ——可以一次性鎖住多個(兩個以上)的互斥量,並且沒有副作用(死鎖風險)
交換操作中使用 std::lock() 和 std::lock_guard
#include <iostream>
#include <thread>
#include<mutex>
#include <string>
class some_big_object
{
};
void swap(some_big_object& lhs, some_big_object& rhs);
class X
{
private:
some_big_object some_detail;
std::mutex m;
public:
X(some_big_object const& sd) :some_detail(sd) {}
friend void swap(X& lhs, X& rhs)
{
if (&lhs == &rhs)
return;
std::lock(lhs.m, rhs.m); // 1
std::lock_guard<std::mutex> lock_a(lhs.m, std::adopt_lock); // 2
std::lock_guard<std::mutex> lock_b(rhs.m, std::adopt_lock); // 3
swap(lhs.some_detail, rhs.some_detail);
}
};
① 鎖住兩個互斥量,並且兩個 std:lock_guard 實例已經創建好②③,還有一個
互斥量。提供 std::adopt_lock 參數除了表示 std::lock_guard 對象已經上鎖外,還表示現成的鎖,而非嘗試創建新的鎖。
這樣,就能保證在大多數情況下,函數退出時互斥量能被正確的解鎖(保護操作可能會拋出一個異常),也允許使用一個簡單的“return”作為返回。還有,需要注意的是,當使用 std::lock 去鎖lhs.m或rhs.m時,可能會拋出異常;這種情況下,異常會傳播到 std::lock 之外。
當 std::lock 成功的獲取一個互斥量上的鎖,並且當其嘗試從另一個互斥量上再獲取鎖時,就會有異常拋出,第一個鎖也會隨着異常的產生而自動釋放,所以 std::lock 要么將兩個鎖都鎖住,要不一個都不鎖。
避免死鎖的進階
雖然鎖是產生死鎖的一般原因,但也不排除死鎖出現在其他地方。
無鎖的情況下,僅需要每個 std::thread 對象調用join(),兩個線程就能產生死鎖。這種情況下,沒有線程可以繼續運行,因為他們正在互相等待。這種情況很常見,一個線程會等待另一個線程,其他線程同時也會等待第一個線程結束,所以三個或更多線程的互相等待也會發生死鎖。
為了避免死鎖,這里的指導意見為:當機會來臨時,不要拱手讓人(don’t wait for another thread if there’s achance it’s waiting for you)。
以下提供一些的指導建議,如何識別死鎖,並消除其他線程的等待。
避免嵌套鎖第一個建議往往是最簡單的:
一個線程已獲得一個鎖時,再別去獲取第二個(don’t acquire alock if you already hold one)。
如果能堅持這個建議,因為每個線程只持有一個鎖,鎖上就不會產生死鎖。即使互斥鎖造成死鎖的最常見原因,也可能會在其他方面受到死鎖的困擾(比如:線程間的互相等待)。
當你需要獲取多個鎖,使用一個 std::lock 來做這件事(對獲取鎖的操作上鎖),避免產生死鎖。
使用固定順序獲取鎖當硬性條件要求你獲取兩個以上(包括兩個)的鎖,並且不能使用 std::lock 單獨操作來獲取它們;那么最好在每個線程上,用固定的順序獲取它們獲取它們(鎖)。
獲取兩個互斥量時,避免死鎖的方法:關鍵是如何在線程之間,以一致性的順序獲取鎖。一些情況下,這種方式相對簡單。
unique_lock——靈活的鎖
unique_lock 介紹
std::unqiue_lock 通過對不變量的放松(by relaxing the invariants),會比 std:lock_guard 更加靈活;一個 std::unique_lock 實現不會總是擁有與互斥量相關的數據類型。
首先,就像你能將 std::adopt_lock 作為第二個參數傳入到構造函數,對互斥所進行管理,你也可以把 std::defer_lock 作為第二個參數傳遞進去,為了表明互斥量在結構上應該保持解鎖狀態。
這樣,就可以被后面調用lock()函數的 std::unique_lock 對象(不是互斥量)所獲取,或傳遞 std::unique_lock 對象本身到 std::lock() 中。清單3.6可以很容易被改寫為清單3.9中的代
碼,使用 std::unique_lock 和 std::defer_lock ,而非 std::lock_guard 和 std::adopt_lock 。
代碼長度相同,且幾乎等價,唯一不同的就是: std::unique_lock 會占用比較多的空間,並且比 std::lock_guard 運行的稍慢一些。保證靈活性是要付出代價的,這個代價就允許 std::unique_lock 實例不攜帶互斥量:該信息已被存儲,且已被更新。
unique_lock 構造函數
default 構造函數
新創建的 unique_lock 對象不管理任何 Mutex 對象。
locking 初始化
新創建的 unique_lock 對象管理 Mutex 對象 m,並嘗試調用 m.lock() 對 Mutex 對象進行上鎖,如果此時另外某個 unique_lock 對象已經管理了該 Mutex 對象 m,則當前線程將會被阻塞。
try-locking 初始化
新創建的 unique_lock 對象管理 Mutex 對象 m,並嘗試調用 m.try_lock() 對 Mutex 對象進行上鎖,但如果上鎖不成功,並不會阻塞當前線程。
deferred 初始化
新創建的 unique_lock 對象管理 Mutex 對象 m,但是在初始化的時候並不鎖住 Mutex 對象。 m 應該是一個沒有當前線程鎖住的 Mutex 對象。
adopting 初始化
新創建的 unique_lock 對象管理 Mutex 對象 m, m 應該是一個已經被當前線程鎖住的 Mutex 對象。(並且當前新創建的 unique_lock 對象擁有對鎖(Lock)的所有權)。
locking 一段時間(duration)
新創建的 unique_lock 對象管理 Mutex 對象 m,並試圖通過調用 m.try_lock_for(rel_time) 來鎖住 Mutex 對象一段時間(rel_time)。
locking 直到某個時間點(time point)
新創建的 unique_lock 對象管理 Mutex 對象m,並試圖通過調用 m.try_lock_until(abs_time) 來在某個時間點(abs_time)之前鎖住 Mutex 對像。
copy [deleted]
unique_lock 對象不能被拷貝構造。
移動(move)構造
新創建的 unique_lock 對象獲得了由 x 所管理的 Mutex 對象的所有權(包括當前 Mutex 的狀態)。調用 move 構造之后, x 對象如同通過默認構造函數所創建的,就不再管理任何 Mutex 對象了。
unique_lock 的構造函數參考
#include <iostream> // std::cout
#include <thread> // std::thread
#include <mutex> // std::mutex, std::lock, std::unique_lock
using namespace std;
// std::adopt_lock, std::defer_lock
std::mutex foo, bar;
void task_a() {
std::lock(foo, bar); // simultaneous lock (prevents deadlock)
std::unique_lock<std::mutex> lck1(foo, std::adopt_lock);
std::unique_lock<std::mutex> lck2(bar, std::adopt_lock);
std::cout << "task a\n";
// (unlocked automatically on destruction of lck1 and lck2)
}
void task_b() {
// foo.lock(); bar.lock(); // replaced by:
std::unique_lock<std::mutex> lck1, lck2;
lck1 = std::unique_lock<std::mutex>(bar, std::defer_lock);
lck2 = std::unique_lock<std::mutex>(foo, std::defer_lock);
std::lock(lck1, lck2); // simultaneous lock (prevents deadlock)
std::cout << "task b\n";
// (unlocked automatically on destruction of lck1 and lck2)
}
int main()
{
std::thread th1(task_a);
std::thread th2(task_b);
th1.join();
th2.join();
cin.get();
return 0;
}
unique_lock 移動(move assign)賦值操作
移動情況是鎖的所有權需要從一個域轉到另一個
移動賦值(move assignment)之后,由 A所管理的 Mutex 對象及其狀態將會被新的 std::unique_lock 對象取代。
如果被賦值的對象之前已經獲得了它所管理的 Mutex 對象的鎖,則在移動賦值(move assignment)之前會調用 unlock 函數釋放它所占有的鎖。
調用移動賦值(move assignment)之后, A對象如同通過默認構造函數所創建的,也就不再管理任何 Mutex 對象了
例如
#include <iostream> // std::cout
#include <thread> // std::thread
#include <mutex> // std::mutex, std::unique_lock
#include<string>
std::mutex mtx; // mutex for critical section
void print_fifty(std::string c) {
std::unique_lock<std::mutex> lck; // default-constructed
lck = std::unique_lock<std::mutex>(mtx); // move-assigned
std::cout << c;
std::cout << '\n';
}
int main()
{
std::thread th1(print_fifty, "Move OK !");
th1.join();
std::cin.get();
return 0;
}
unique_lock 主要成員函數
1、 上鎖/解鎖操作:lock,try_lock,try_lock_for,try_lock_until 和 unlock
2、 修改操作:移動賦值(move assignment),交換(swap)(與另一個 std::unique_lock 對象交換它們所管理的 Mutex 對象的所有權),釋放(release)(返回指向它所管理的 Mutex 對象的指針,並釋放所有權)
3、 獲取屬性操作:owns_lock(返回當前 std::unique_lock 對象是否獲得了鎖)、operator bool()(與 owns_lock 功能相同,返回當前 std::unique_lock 對象是否獲得了鎖)、mutex(返回當前 std::unique_lock 對象所管理的 Mutex 對象的指針)
std::unique_lock::lock
上鎖操作,調用它所管理的 Mutex 對象的 lock 函數。如果在調用 Mutex 對象的 lock 函數時該 Mutex 對象已被另一線程鎖住,則當前線程會被阻塞,直到它獲得了鎖。
該函數返回時,當前的 unique_lock 對象便擁有了它所管理的 Mutex 對象的鎖。如果上鎖操作失敗,則拋出 system_error 異常。
// unique_lock::lock/unlock
#include <iostream> // std::cout
#include <thread> // std::thread
#include <mutex> // std::mutex, std::unique_lock, std::defer_lock
std::mutex mtx; // mutex for critical section
void print_thread_id(int id) {
std::unique_lock<std::mutex> lck(mtx, std::defer_lock);
// critical section (exclusive access to std::cout signaled by locking lck):
lck.lock();
std::cout << "thread #" << id << '\n';
lck.unlock();
}
int main()
{
std::thread threads[10];
// spawn 10 threads:
for (int i = 0; i<10; ++i)
threads[i] = std::thread(print_thread_id, i + 1);
for (auto& th : threads) th.join();
std::cin.get();
return 0;
}
std::unique_lock::try_lock
上鎖操作,調用它所管理的 Mutex 對象的 try_lock 函數,如果上鎖成功,則返回 true,否則返回 false。
#include <iostream> // std::cout
#include <vector> // std::vector
#include <thread> // std::thread
#include <mutex> // std::mutex, std::unique_lock, std::defer_lock
std::mutex mtx; // mutex for critical section
void print_star() {
std::unique_lock<std::mutex> lck(mtx, std::defer_lock);
// print '*' if successfully locked, '#' otherwise:
if (lck.try_lock())
std::cout << '*';
else
std::cout << '#';
}
int main()
{
std::vector<std::thread> threads;
for (int i = 0; i<500; ++i)
threads.emplace_back(print_star);
for (auto& x : threads) x.join();
std::cin.get();
return 0;
}
std::unique_lock::try_lock_for
上鎖操作,調用它所管理的 Mutex 對象的 try_lock_for 函數,如果上鎖成功,則返回 true,否則返回 false。
#include <iostream> // std::cout
#include <chrono> // std::chrono::milliseconds
#include <thread> // std::thread
#include <mutex> // std::timed_mutex, std::unique_lock, std::defer_lock
std::timed_mutex mtx;
void fireworks() {
std::unique_lock<std::timed_mutex> lck(mtx, std::defer_lock);
// waiting to get a lock: each thread prints "-" every 200ms:
while (!lck.try_lock_for(std::chrono::milliseconds(200))) {
std::cout << "-";
}
// got a lock! - wait for 1s, then this thread prints "*"
std::this_thread::sleep_for(std::chrono::milliseconds(1000));
std::cout << "*\n";
}
int main()
{
std::thread threads[10];
// spawn 10 threads:
for (int i = 0; i<10; ++i)
threads[i] = std::thread(fireworks);
for (auto& th : threads) th.join();
std::cin.get();
return 0;
}
std::unique_lock::owns_lock
返回當前 std::unique_lock 對象是否獲得了鎖。
#include <iostream> // std::cout
#include <vector> // std::vector
#include <thread> // std::thread
#include <mutex> // std::mutex, std::unique_lock, std::try_to_lock
std::mutex mtx; // mutex for critical section
void print_star () {
std::unique_lock<std::mutex> lck(mtx,std::try_to_lock);
// print '*' if successfully locked, 'x' otherwise:
if (lck.owns_lock())
std::cout << '*';
else
std::cout << 'x';
}
int main ()
{
std::vector<std::thread> threads;
for (int i=0; i<500; ++i)
threads.emplace_back(print_star);
for (auto& x: threads) x.join();
return 0;
}
同步並發操作
當你不僅想要保護數據,還想對單獨的線程進行同步。例如,在第一個線程完成前,可能需要等待另一個線程執行完成。
通常情況下,線程會等待一個特定事件的發生,或者等待某一條件達成(為true)。這可能需要定期檢查“任務完成”標識,或將類似的東西放到共享數據中,但這與理想情況還是差很多。
像這種情況就需要在線程中進行同步,C++標准庫提供了一些工具可用於同步操作,形式上表現為
條件變量(condition variables)和期望(futures)。
等待一個事件或其他條件三種方式
當一個線程等待另一個線程完成任務時,它會有很多選擇
一、它可以持續的檢查共享數據標志(用於做保護工作的互斥量),直
到另一線程完成工作時對這個標志進行重設。
不過,就是一種浪費:線程消耗寶貴的執行時間持續的檢查對應標志,並且當互斥量被等待線程上鎖后,其他線程就沒有辦法獲取鎖,這樣線程就會持續等待。因為以上方式對等待線程限制資源,並且在完成時阻礙對標識的設置。
二、個選擇是在等待線程在檢查間隙,使用 std::this_thread::sleep_for() 進行周期性的間歇
例如
bool flag;
std::mutex m;
void wait_for_flag()
{
std::unique_lock<std::mutex> lk(m);
while(!flag)
{
lk.unlock(); // 1 解鎖互斥量
std::this_thread::sleep_for(std::chrono::milliseconds(100)); // 2 休眠100ms
lk.lock(); // 3 再鎖互斥量
}
}
這個實現就進步很多,因為當線程休眠時,線程沒有浪費執行時間,但是很難確定正確的休
眠時間。太短的休眠和沒有休眠一樣,都會浪費執行時間;太長的休眠時間,可能會讓任務
等待線程醒來。休眠時間過長是很少見的情況,因為這會直接影響到程序的行為,當在高節
奏游戲(fast-paced game)中,它意味着丟幀,或在一個實時應用中超越了一個時間片。
三、選擇(也是優先的選擇)是,使用C++標准庫提供的工具去等待事件的發生。
通過另一線程觸發等待事件的機制是最基本的喚醒方式(例如:流水線上存在額外的任務時),這種機制就稱為“條件變量”(condition variable)。
從概念上來說,一個條件變量會與多個事件或其他條件相關,並且一個或多個線程會等待條件的達成。
當某些線程被終止時,為了喚醒等待線程(允許等待線程繼續執行)終止的線程將會向等待着的線程廣播“條件達成”的信息。
等待條件達成
C++標准庫對條件變量有兩套實
現: std::condition_variable 和 std::condition_variable_any 。
這兩個實現都包含在 <condition_variable> 頭文件的聲明中。
兩者都需要與一個互斥量一起才能工作(互斥量是為了同步);前者僅限於std::mutex 一起工作,而后者可以和任何滿足最低標准的互斥量一起工作,從而加上了_any的后綴。
因為 std::condition_variable_any 更加通用,這就可能從體積、性能,以及系統資源的使用方面產生額外的開銷,所以 std::condition_variable 一般作為首選的類型,當對靈活性有硬性要求時,我們才會去考慮 std::condition_variable_any 。
所以,如何使用 std::condition_variable 去處理之前提到的情況——當有數據需要處理時,
如何喚醒休眠中的線程對其進行處理?以下清單展示了一種使用條件變量做喚醒的方式。
#include <iostream> // std::cout
#include <thread> // std::thread
#include <mutex> // std::mutex, std::unique_lock
#include <condition_variable> // std::condition_variable
std::mutex mtx; // 全局互斥鎖.
std::condition_variable cv; // 全局條件變量.
bool ready = false; // 全局標志位.
void do_print_id(int id)
{
std::unique_lock <std::mutex> lck(mtx);
while (!ready) // 如果標志位不為 true, 則等待...
cv.wait(lck); // 當前線程被阻塞, 當全局標志位變為 true 之后,
// 線程被喚醒, 繼續往下執行打印線程編號id.
std::cout << "thread " << id << '\n';
}
void go()
{
std::unique_lock <std::mutex> lck(mtx);
ready = true; // 設置全局標志位為 true.
cv.notify_all(); // 喚醒所有線程.
}
int main()
{
std::thread threads[10];
// spawn 10 threads:
for (int i = 0; i < 10; ++i)
threads[i] = std::thread(do_print_id, i);
std::cout << "10 threads ready to race...\n";
go(); // go!
for (auto & th:threads)
th.join();
return 0;
}
std::condition_variable 構造函數
default (1) |
condition_variable(); |
copy [deleted] (2) |
condition_variable (const condition_variable&) = delete; |
std::condition_variable 的拷貝構造函數被禁用,只提供了默認構造函數。
std::condition_variable::wait() 介紹
unconditional (1) |
void wait (unique_lock<mutex>& lck); |
predicate (2) |
template <class Predicate> void wait (unique_lock<mutex>& lck, Predicate pred); |
std::condition_variable 提供了兩種 wait() 函數。
當前線程調用 wait() 后將被阻塞(此時當前線程應該獲得了鎖(mutex),不妨設獲得鎖 lck),直到另外某個線程調用 notify_* 喚醒了當前線程。
在線程被阻塞時,該函數會自動調用 lck.unlock() 釋放鎖
,使得其他被阻塞在鎖競爭上的線程得以繼續執行。
另外,一旦當前線程獲得通知(notified,通常是另外某個線程調用 notify_* 喚醒了當前線程),wait() 函數也是自動調用 lck.lock(),使得 lck 的狀態和 wait 函數被調用時相同。
在第二種情況下(即設置了 Predicate),只有當 pred 條件為 false 時調用 wait() 才會阻塞當前線程,並且在收到其他線程的通知后只有當 pred 為 true 時才會被解除阻塞.
#include <iostream> // std::cout
#include <thread> // std::thread, std::this_thread::yield
#include <mutex> // std::mutex, std::unique_lock
#include <condition_variable> // std::condition_variable
std::mutex mtx;
std::condition_variable cv;
int cargo = 0;
bool shipment_available()
{
return cargo != 0;
}
// 消費者線程.
void consume(int n)
{
for (int i = 0; i < n; ++i) {
std::unique_lock <std::mutex> lck(mtx);
cv.wait(lck, shipment_available);
std::cout << cargo << '\n';
cargo = 0;
}
}
int main()
{
std::thread consumer_thread(consume, 10); // 消費者線程.
// 主線程為生產者線程, 生產 10 個物品.
for (int i = 0; i < 10; ++i) {
while (shipment_available())
std::this_thread::yield(); //線程調用yield()方法后,表明自己做的事已經完成,讓出自己的cpu時間給其他線程使用
std::unique_lock <std::mutex> lck(mtx);
cargo = i + 1;
cv.notify_one();
}
consumer_thread.join();
std::cin.get();
return 0;
}