c++11中增加了線程以及線程相關的類,很方便的支持了並發編程。
1. 線程
線程創建
使用std::thread創建線程,提供線程函數或者函數對象即可,並且可以指定線程函數的參數。
#include<thread>
void func(int a){
cout << "param= " << a << endl;
}
int main(){
std::thread t(func, 10); //創建線程使用線程函數以及函數參數
t.join(); //等待線程函數執行完畢
return 0;
}
可以通過std::bind或lambda表達式來創建線程:
void func(int a,int b){
cout << "a = " << a << ", b = " << b << endl;
}
int main(){
std::thread t1(std::bind(func, 1, 2));
std::thread t2([](int a, int b){cout << "a = " << a << ", b = " << b << endl;}, 1,2);
return 0;
}
join 和 detach
如果希望在主線程中等待其他線程終止,使用join來實現,t.join()阻塞等待線程t結束,才能繼續往下執行;
如果不希望線程被阻塞執行,可以調用 t.detach() 方法,將線程和線程對象分離,分離之后,線程單獨執行,由系統回收資源。通過detach,線程和線程對象分離,讓線程作為后台線程執行,當前線程也不會被阻塞。但需要注意,detach之后,就無法再和線程發生聯系,不能通過join再阻塞等待線程,線程何時執行完我們也無法控制。
若在創建子線程的線程中,既沒有執行對子線程進行join也沒有進行detach,則有可能出現線程對象在線程函數結束之前就失效,從而出錯。
int main(){
std::thread t(func);
t.detach();
//做其他事情....
return 0;
}
線程生命期
std::thread在出了作用域之后就會被析構,這是如果線程函數還沒有被執行完,就會發生錯誤。因此,需要保證線程函數的生命周期在線程變量 std::thread的生命周期之內,這可以通過join阻塞等待實現。
線程不能被復制,但可以移動,比如:
#include<thread>
void func(){
//do some work
}
int main(){
std::thread t(func);
std::thread t1(std::move(t));
t.join();
t1.join();
return 0;
}//線程被移動(std::move)后,線程對象t就不代表任何線程了。
線程基本用法
1. 獲取當前信息
void func(){
}
int main(){
std::thread t(func);
cout << t.get_id() << endl; //獲取線程id
cout << std::thread::hardware_concurrency() << endl; //獲取cpu核數
return 0;
}
2. 線程休眠
void f(){
std::this_thread::sleep_for(std::chrono::seconds(3)); //當前線程休眠3秒鍾
}
2. 互斥量
互斥量是一種同步原語,是一種線程同步的手段,用來保護多線程同時訪問的共享數據。
c++11中提供了四種語義的互斥量(mutex):
std::mutex 獨占的互斥量,不能遞歸使用
std::timed_mutex 帶超時的獨占互斥量,不能遞歸使用
std::recursive_mutex 遞歸互斥量,不帶超時功能
std::recursive_timed_mutex 帶超時的遞歸互斥量
std::mutex
std::mutex g_lock;
void func(){
g_lock.lock();
std::cout << "entered thread " << std::this_thread::get_id() << std::endl;
std::this_thread::sleep_for(std::chrono::seconds(1));
std::cout << "leave thread " << std::this_thread::get_id() << endl;
g_lock.unlock();
}
std::recursive_mutex
遞歸鎖允許同一線程多次獲得該互斥鎖,可以用來解決同一線程需要多次獲取互斥量時死鎖的問題。但盡量不要使用遞歸鎖,因為:
(1)需要用到遞歸鎖定的多線程往往本身是可以簡化的
(2)遞歸鎖比起非遞歸鎖,效率會低一些
(3)遞歸鎖雖然允許同一線程多次獲得同一個互斥量,可重復獲得的最大次數並未具體說明,一旦超過次數,再對lock進行調用就會拋出std::system錯誤。
std::timed_mutex
std::timed_mutex是超時的獨占鎖,主要用於在獲取鎖時增加超時等待功能,在超時時間之內如果獲得鎖,則繼續,否則,返回超時,並不再阻塞。
std::time_mutex 使用 try_for_lock(timexx)來嘗試獲取鎖,在超過timexx時間仍未獲得鎖,則返回超時。
std::time_mutex mut;
void work(){
std::chrono::milliseconds timeout(100);
while(true){
if(mut.try_lock_for(time_out)){
std::cout << std::this_thread::get_id() << " :do work with the mutex" << endl;
std::chrono::milliseconds sleepDuration(250);
std::this_thread::sleep_for(sleepDuration);
mut.unlock();
}else{
std::cout << "time out for get lock" << endl;
}
}
}
lock_guard
lock_guard利用RAII機制,防止忘記unlock或者發生異常時無法unlock造成程序死鎖。
std::mutex mut;
void func(){
std::lock_guard<std::mutex> guard(mut);
xxxx
}
3. 條件變量
條件變量能阻塞一個或多個線程,直到收到另外一個線程發出的通知或者超時,才會喚醒當前阻塞的線程。條件變量需要和互斥量配合起來使用一。c++11提供了兩種條件變量: condition_variable, 配合std::unique_lock<std::mutex>進行wait操作 condition_variable_any,和任意帶有lock,unlock語義的mutex搭配使用,比較靈活,
但效率比condition_varaible略低
void Put(const T& x){
std::lock_guard<std::mutex> locker(m_mutex);
while(IsFull()){
cout << "緩沖區滿,需要等待..." << endl;
m_notFull.wait(m_mutex);
}
m_queue.push_back(x);
m_notEmpty.notify_one();
}
void Take(T& x){
std::lock_guard<std::mutex> locker(m_mutex);
while(IsEmpty()){
cout << "緩沖區空,需要等待...." << endl;
m_notEmpty.wait(m_mutex);
}
x = m_queue.front();
m_queue.pop_front();
m_notFull.notify_one();
}
std::list<T> m_queue;
std::mutex m_mutex;
std::condition_variable_any m_notEmpty;
std::condition_variable_any m_notFull;
4. 原子變量
c++11提供了一個原子類型 std::atomic< T>,可以使用任意類型作為模板參數,c++11內置了整型的原子變量,可以更方便的使用原子變量,使用原子變量就不需要使用互斥量來保護該變量了。
#include<atomic>
struct AtomicCounter{
std::atomic<int> value;
void inc(){
++value;
}
void dec(){
--value;
}
int get(){
return value.load();
}
};
5. call_once/once_flag使用
為了保證在多線程環境中某個函數僅被調用一次,或者某個變量僅僅被初始化一次,可以使用 std::call_once來保證函數在多線程環境中只被調用一次。使用std::call_once時,需要一個once_flag作為call_once的入參。
std::once_flag flag;
void do_once(){
std::call_once(flag, [](){ std::cout << "called once" << endl;});
}
6. 異步操作
c++11中提供了異步操作相關的類,主要有 std::future, std::promise, std::package_task. std::future 作為異步結果的傳輸通道,很方便的獲取線程函數的返回值; std::promise用來包裝一個值,將數據和future綁定起來,方便線程賦值;std::package_task用來包裝一個可調用對象,將函數和future幫頂起來,以便異步調用。
std::future 獲取線程函數返回值
c++ thread庫提供了future用來訪問異步操作的結果,future提供了獲取異步操作結果的通道,可以以同步等待的方式來獲取結果,可以通過查詢future的狀態(future_status)來獲取異步操作的結果。
future_status == Deferred 異步操作還沒開始
future_status == Ready 異步操作已經完成
future_status == Timeout 異步操作超時
std::future_status st;
do{
st = future.wait_for(std::chrono::seconds(1));
if(st == std::future_status::deferred){
//還沒開始
}else if(st == std::future_status::timeout){
//超時
}else if(st == std::future_status::ready){
//ready
}
}while(st != std::future_status::Ready);
獲得future結果的三種方式: get, wait, wait_for, 其中get等待異步操作的結果並返回結果;wait只是等待異步操作完成,沒有返回值;wait_for是超時等待返回的結果。
std::promise 協助線程賦值的類
std::promise 將數據和future綁定起來,為獲取線程函數中的某個值提供便利,在線程函數中為外面傳進來的promise賦值,在線程函數執行完成之后就可以通過promise的future獲取該值。
std::promise<int> pr;
std::thread t([](std::promise<int>& p)
{ p.set_value_at_thread_exit(9);}, //在線程函數中為promise賦值
std::ref(pr)); //線程函數的參數,使用引用
std::future<int> f = pr.get_future(); //獲取promise的future
auto r = f.get(); //利用get等待異步操作的結果(阻塞)
std::packaged_task 可調用對象的包裝類
std::packaged_task可以將函數等可調用對象和future綁定起來,以便異步調用。
std::packaged_task<int()> task([]{return 7;});
std::thread t1(std::ref(task));
std::future<int> f1 = task.get_future();
auto r1 = f1.get();
std::promise, std::packaged_task, std::future三者之間的關系
std::future提供了一個訪問異步操作結果的機制,它和線程是一個級別的,屬於低層次的對象。std::future之上的高一層為 std::packaged_task和std::promise, 他們內部都有future以便訪問異步操作的結果,std::packaged_task包裝的是一個異步操作,而std::promise包裝的是一個值,都是為了方便異步操作。
future被promise和packaged_task用來作為異步操作或異步結果的連接通道,用std::future和std::shared_future來獲取異步調用的結果。future不可拷貝,只能移動,shared_future可以拷貝,當需要將future放到容器中則需要用shared_future。
#include<iostream>
#include<utility>
#include<future>
#include<vector>
#include<thread>
int func(int x){
return x*x;
}
int main(){
std::packaged_task<int(int)> tsk(func);
std::future<int> fut = tsk.get_future();
std::thread(std::move(tsk), 2).detach();
int value = fut.get();
std::cout << "The result is " << value << ".\n";
//std::future不可復制,無法放到容器中,需要用shared_future
std::vector<std::shared_future<int>> v;
std::shared_future<int> f = std::async(std::launch::async, [](int a, int b){ return a + b; }, 2, 3);
v.push_back(f);
std::cout << "The shared future result is " << v[0].get() << "\n";
return 0;
}
std::async 線程異步操作函數
std::async比std::promise, std::packaged_task, std::thread更高一層,可以用來直接創建異步的task, 異步任務返回的結果也保存在future中,當需要獲取異步任務的結果時,只需要調用future.get() 方法即可,如果不關注異步任務的結果,只是簡單的等待任務完成的話,則調用 future.wait()方法。
std::async(std::launch::async | std::launch::deferred, f, args....),第一個參數是線程創建的策略,默認策略為 std::launch::async。
std::launch::async 在調用 async時就開始創建線程
std::launch::deferred 延遲加載方式創建線程。調用 async時不創建線程,直到調用了future的get或者wait方法時才創建線程。
#include<iostream>
#include<thread>
#include<future>
#include<utility>
int main(){
std::future<int> f1 = std::async(std::launch::async, [](){
return 8;
});
std::cout << f1.get() << std::endl;
std::future<void> f2 = std::async(std::launch::async, [](){
std::cout << 8 << std::endl;
});
f2.wait();
std::future<int> future = std::async(std::launch::async, [](){
std::this_thread::sleep_for(std::chrono::seconds(3));
return 8;
});
std::cout << "waiting ... " << std::endl;
std::future_status status;
do{
status = future.wait_for(std::chrono::seconds(1));
if (status == std::future_status::deferred){
std::cout << "deferred..\n";
}else if(status == std::future_status::timeout){
std::cout << "time out...\n";
}
else if (status == std::future_status::ready){
std::cout << "ready!\n";
}
} while (status != std::future_status::ready);
std::cout << "result is " << future.get() << std::endl;
return 0;
}
執行結果為: 
