c++11中增加了線程以及線程相關的類,很方便的支持了並發編程。
1. 線程
線程創建
使用std::thread創建線程,提供線程函數或者函數對象即可,並且可以指定線程函數的參數。
#include<thread> void func(int a){ cout << "param= " << a << endl; } int main(){ std::thread t(func, 10); //創建線程使用線程函數以及函數參數 t.join(); //等待線程函數執行完畢 return 0; }
可以通過std::bind或lambda表達式來創建線程:
void func(int a,int b){ cout << "a = " << a << ", b = " << b << endl; } int main(){ std::thread t1(std::bind(func, 1, 2)); std::thread t2([](int a, int b){cout << "a = " << a << ", b = " << b << endl;}, 1,2); return 0; }
join 和 detach
如果希望在主線程中等待其他線程終止,使用join來實現,t.join()阻塞等待線程t結束,才能繼續往下執行;
如果不希望線程被阻塞執行,可以調用 t.detach() 方法,將線程和線程對象分離,分離之后,線程單獨執行,由系統回收資源。通過detach,線程和線程對象分離,讓線程作為后台線程執行,當前線程也不會被阻塞。但需要注意,detach之后,就無法再和線程發生聯系,不能通過join再阻塞等待線程,線程何時執行完我們也無法控制。
若在創建子線程的線程中,既沒有執行對子線程進行join也沒有進行detach,則有可能出現線程對象在線程函數結束之前就失效,從而出錯。
int main(){ std::thread t(func); t.detach(); //做其他事情.... return 0; }
線程生命期
std::thread在出了作用域之后就會被析構,這是如果線程函數還沒有被執行完,就會發生錯誤。因此,需要保證線程函數的生命周期在線程變量 std::thread的生命周期之內,這可以通過join阻塞等待實現。
線程不能被復制,但可以移動,比如:
#include<thread> void func(){ //do some work } int main(){ std::thread t(func); std::thread t1(std::move(t)); t.join(); t1.join(); return 0; }//線程被移動(std::move)后,線程對象t就不代表任何線程了。
線程基本用法
1. 獲取當前信息
void func(){ } int main(){ std::thread t(func); cout << t.get_id() << endl; //獲取線程id cout << std::thread::hardware_concurrency() << endl; //獲取cpu核數 return 0; }
2. 線程休眠
void f(){ std::this_thread::sleep_for(std::chrono::seconds(3)); //當前線程休眠3秒鍾 }
2. 互斥量
互斥量是一種同步原語,是一種線程同步的手段,用來保護多線程同時訪問的共享數據。
c++11中提供了四種語義的互斥量(mutex):
std::mutex 獨占的互斥量,不能遞歸使用
std::timed_mutex 帶超時的獨占互斥量,不能遞歸使用
std::recursive_mutex 遞歸互斥量,不帶超時功能
std::recursive_timed_mutex 帶超時的遞歸互斥量
std::mutex
std::mutex g_lock; void func(){ g_lock.lock(); std::cout << "entered thread " << std::this_thread::get_id() << std::endl; std::this_thread::sleep_for(std::chrono::seconds(1)); std::cout << "leave thread " << std::this_thread::get_id() << endl; g_lock.unlock(); }
std::recursive_mutex
遞歸鎖允許同一線程多次獲得該互斥鎖,可以用來解決同一線程需要多次獲取互斥量時死鎖的問題。但盡量不要使用遞歸鎖,因為:
(1)需要用到遞歸鎖定的多線程往往本身是可以簡化的
(2)遞歸鎖比起非遞歸鎖,效率會低一些
(3)遞歸鎖雖然允許同一線程多次獲得同一個互斥量,可重復獲得的最大次數並未具體說明,一旦超過次數,再對lock進行調用就會拋出std::system錯誤。
std::timed_mutex
std::timed_mutex是超時的獨占鎖,主要用於在獲取鎖時增加超時等待功能,在超時時間之內如果獲得鎖,則繼續,否則,返回超時,並不再阻塞。
std::time_mutex 使用 try_for_lock(timexx)來嘗試獲取鎖,在超過timexx時間仍未獲得鎖,則返回超時。
std::time_mutex mut; void work(){ std::chrono::milliseconds timeout(100); while(true){ if(mut.try_lock_for(time_out)){ std::cout << std::this_thread::get_id() << " :do work with the mutex" << endl; std::chrono::milliseconds sleepDuration(250); std::this_thread::sleep_for(sleepDuration); mut.unlock(); }else{ std::cout << "time out for get lock" << endl; } } }
lock_guard
lock_guard利用RAII機制,防止忘記unlock或者發生異常時無法unlock造成程序死鎖。
std::mutex mut; void func(){ std::lock_guard<std::mutex> guard(mut); xxxx }
3. 條件變量
條件變量能阻塞一個或多個線程,直到收到另外一個線程發出的通知或者超時,才會喚醒當前阻塞的線程。條件變量需要和互斥量配合起來使用一。c++11提供了兩種條件變量: condition_variable, 配合std::unique_lock<std::mutex>進行wait操作
condition_variable_any,和任意帶有lock,unlock語義的mutex搭配使用,比較靈活,
但效率比condition_varaible略低
void Put(const T& x){ std::lock_guard<std::mutex> locker(m_mutex); while(IsFull()){ cout << "緩沖區滿,需要等待..." << endl; m_notFull.wait(m_mutex); } m_queue.push_back(x); m_notEmpty.notify_one(); } void Take(T& x){ std::lock_guard<std::mutex> locker(m_mutex); while(IsEmpty()){ cout << "緩沖區空,需要等待...." << endl; m_notEmpty.wait(m_mutex); } x = m_queue.front(); m_queue.pop_front(); m_notFull.notify_one(); } std::list<T> m_queue; std::mutex m_mutex; std::condition_variable_any m_notEmpty; std::condition_variable_any m_notFull;
4. 原子變量
c++11提供了一個原子類型 std::atomic< T>,可以使用任意類型作為模板參數,c++11內置了整型的原子變量,可以更方便的使用原子變量,使用原子變量就不需要使用互斥量來保護該變量了。
#include<atomic> struct AtomicCounter{ std::atomic<int> value; void inc(){ ++value; } void dec(){ --value; } int get(){ return value.load(); } };
5. call_once/once_flag使用
為了保證在多線程環境中某個函數僅被調用一次,或者某個變量僅僅被初始化一次,可以使用 std::call_once來保證函數在多線程環境中只被調用一次。使用std::call_once時,需要一個once_flag作為call_once的入參。
std::once_flag flag; void do_once(){ std::call_once(flag, [](){ std::cout << "called once" << endl;}); }
6. 異步操作
c++11中提供了異步操作相關的類,主要有 std::future, std::promise, std::package_task. std::future 作為異步結果的傳輸通道,很方便的獲取線程函數的返回值; std::promise用來包裝一個值,將數據和future綁定起來,方便線程賦值;std::package_task用來包裝一個可調用對象,將函數和future幫頂起來,以便異步調用。
std::future 獲取線程函數返回值
c++ thread庫提供了future用來訪問異步操作的結果,future提供了獲取異步操作結果的通道,可以以同步等待的方式來獲取結果,可以通過查詢future的狀態(future_status)來獲取異步操作的結果。
future_status == Deferred 異步操作還沒開始
future_status == Ready 異步操作已經完成
future_status == Timeout 異步操作超時
std::future_status st; do{ st = future.wait_for(std::chrono::seconds(1)); if(st == std::future_status::deferred){ //還沒開始 }else if(st == std::future_status::timeout){ //超時 }else if(st == std::future_status::ready){ //ready } }while(st != std::future_status::Ready);
獲得future結果的三種方式: get, wait, wait_for, 其中get等待異步操作的結果並返回結果;wait只是等待異步操作完成,沒有返回值;wait_for是超時等待返回的結果。
std::promise 協助線程賦值的類
std::promise 將數據和future綁定起來,為獲取線程函數中的某個值提供便利,在線程函數中為外面傳進來的promise賦值,在線程函數執行完成之后就可以通過promise的future獲取該值。
std::promise<int> pr; std::thread t([](std::promise<int>& p) { p.set_value_at_thread_exit(9);}, //在線程函數中為promise賦值 std::ref(pr)); //線程函數的參數,使用引用 std::future<int> f = pr.get_future(); //獲取promise的future auto r = f.get(); //利用get等待異步操作的結果(阻塞)
std::packaged_task 可調用對象的包裝類
std::packaged_task可以將函數等可調用對象和future綁定起來,以便異步調用。
std::packaged_task<int()> task([]{return 7;}); std::thread t1(std::ref(task)); std::future<int> f1 = task.get_future(); auto r1 = f1.get();
std::promise, std::packaged_task, std::future三者之間的關系
std::future提供了一個訪問異步操作結果的機制,它和線程是一個級別的,屬於低層次的對象。std::future之上的高一層為 std::packaged_task和std::promise, 他們內部都有future以便訪問異步操作的結果,std::packaged_task包裝的是一個異步操作,而std::promise包裝的是一個值,都是為了方便異步操作。
future被promise和packaged_task用來作為異步操作或異步結果的連接通道,用std::future和std::shared_future來獲取異步調用的結果。future不可拷貝,只能移動,shared_future可以拷貝,當需要將future放到容器中則需要用shared_future。
#include<iostream> #include<utility> #include<future> #include<vector> #include<thread> int func(int x){ return x*x; } int main(){ std::packaged_task<int(int)> tsk(func); std::future<int> fut = tsk.get_future(); std::thread(std::move(tsk), 2).detach(); int value = fut.get(); std::cout << "The result is " << value << ".\n"; //std::future不可復制,無法放到容器中,需要用shared_future std::vector<std::shared_future<int>> v; std::shared_future<int> f = std::async(std::launch::async, [](int a, int b){ return a + b; }, 2, 3); v.push_back(f); std::cout << "The shared future result is " << v[0].get() << "\n"; return 0; }
std::async 線程異步操作函數
std::async比std::promise, std::packaged_task, std::thread更高一層,可以用來直接創建異步的task, 異步任務返回的結果也保存在future中,當需要獲取異步任務的結果時,只需要調用future.get() 方法即可,如果不關注異步任務的結果,只是簡單的等待任務完成的話,則調用 future.wait()方法。
std::async(std::launch::async | std::launch::deferred, f, args....)
,第一個參數是線程創建的策略,默認策略為 std::launch::async。
std::launch::async 在調用 async時就開始創建線程
std::launch::deferred 延遲加載方式創建線程。調用 async時不創建線程,直到調用了future的get或者wait方法時才創建線程。
#include<iostream> #include<thread> #include<future> #include<utility> int main(){ std::future<int> f1 = std::async(std::launch::async, [](){ return 8; }); std::cout << f1.get() << std::endl; std::future<void> f2 = std::async(std::launch::async, [](){ std::cout << 8 << std::endl; }); f2.wait(); std::future<int> future = std::async(std::launch::async, [](){ std::this_thread::sleep_for(std::chrono::seconds(3)); return 8; }); std::cout << "waiting ... " << std::endl; std::future_status status; do{ status = future.wait_for(std::chrono::seconds(1)); if (status == std::future_status::deferred){ std::cout << "deferred..\n"; }else if(status == std::future_status::timeout){ std::cout << "time out...\n"; } else if (status == std::future_status::ready){ std::cout << "ready!\n"; } } while (status != std::future_status::ready); std::cout << "result is " << future.get() << std::endl; return 0; }
執行結果為: