我在之前一篇博文《漫談C++11 Thread庫之使寫多線程程序》中,着重介紹了<thread>頭文件中的std::thread類以及其上的一些基本操作,至此我們動手寫多線程程序已經基本沒有問題了。但是,單線程的那些"坑"我們仍還不知道怎么去避免。
多線程最主要的問題就是共享數據帶來的問題。如果共享數據都是只讀的,那么沒問題,因為只讀操作不會影響到數據,更不會涉及對數據的修改,所以所有線程都會獲得同樣的數據。但是,當一個或多個線程要修改共享數據時,就會產生很多潛在的麻煩。
#include <iostream> #include <thread> long sum = 0L; void fun() { for(int i=1;i<100000;++i) sum += i; } int main() { std::cout << "Before joining,sun = " << sum << std::endl; std::thread t1(fun); std::thread t2(fun); t1.join(); t2.join(); std::cout << "After joining,sun = " << sum << std::endl; }
程序結構很簡單,啟動兩個線程分別對變量sum加上 1-99999。其內存結構大致上是這樣的。
c++多線程程序中,每個線程都有一個線程棧,它們相互獨立,因此在線程棧中的數據,是不會被其他線程影響到的。但是在內存的數據段中的數據,是可以在全局被訪問到的。我們在上面這段代碼中定義的sum變量正是位於數據段中。
在目前來看,我們期望最后程序退出的時候,打印出sum是 9999900000。但是結果卻不盡人意,我們試着編譯運行:
[thread]g++ condition.cpp -omain -std=c++11 -lpthread [thread]main Before joining,sun = 0 After joining,sun = 5192258282 [thread]main Before joining,sun = 0 After joining,sun = 8418413412 [thread]main Before joining,sun = 0 After joining,sun = 5294478585
顯然結果還是比較意外的,運行了三次,都得到了不同的結果,而且沒有一次得到我們的期望值,這下我們精准地踩中了多線程的"坑"。試着多運行幾遍,看看會不會出現正確的結果。當然手動運行幾遍甚至幾十遍,還是可以應付得了的。但是要運行幾千遍,手動運行下來估計手就得抽筋了。這樣的機械般的操作還是交給shell腳本吧,由於我的機器配置不是很牛×,暫且先1000次看看,shell腳本如下,count.sh:
#!/bin/bash #result equal with 9999900000 cnt=0 #result more than 9999900000 cnt_more=0 #result less than 9999900000 cnt_less=0 for((i=0;i<1000;++i)) do var=$(main|tail -1) var=${var#After joining,sun = } if(($var == 9999900000)) then ((cnt++)) fi if(($var > 9999900000)) then ((cnt_more++)) fi if(($var < 9999900000)) then ((cnt_less++)) fi done echo "cnt="$cnt echo "cnt_more="$cnt_more echo "cnt_less="$cnt_less
其中變量cnt來統計1000次運行中總共得到過多少次的正確結果,用cnt_more統計偏大的結果,用cnt_less統計偏小的結果。這是該腳本的運行結果:
[thread]count.sh cnt=315 cnt_more=0 cnt_less=685
1000次運行中還是有315次得到了正確答案,有685次的結果是偏小的,卻沒有一次的結果是偏大的!那么問題出在哪里了?試着想象一下這樣一個場景:你和朋友合租在一間房子里邊,房子里面只有一間廚房,你們共用一個鍋。有一天你准備做一道西紅柿炒蛋,當你把西紅柿放入鍋中的時候,你的電話響了,你離開廚房去接電話。而這時候你的室友也要做飯,他要做一道紅燒魚,於是他把洗好的魚放入了鍋中煮,然后也離開了廚房(由於某種原因他不知道鍋里還有你的食材,在程序中線程也不會知道其他線程對共享的數據做了什么)。當你回來的時候繼續往里邊放入雞蛋,最后你得到的是一盤西紅柿炒雞蛋魚。而你的室友回來廚房的時候他要的紅燒魚就會不見了。
在上面的例子里,你和室友就代表着thread1和thread2線程,sum變量就是那個鍋。多線程中共享數據的問題,就是上面場景中你們共用一口鍋造成的問題。
要解決上面場景的問題,其中有一中可行的方案就是:你們做菜的步驟很短,短到什么程度呢,短到這個步驟不可被分割。例如你做的這道菜只有一個步驟,就是讓食材(對應於下面提到的原子數據類型)碰一下鍋(當然現實場景中基本沒有這樣的菜),這樣你們的做菜過程就不會被其他室友打斷、干擾,即使你們共同在使用一口鍋。
而上面的代碼中的 sum += i 在CPU指令的層面上是可以被分割的,我用g++的-S選項生成其匯編的指令看到了一段這樣的代碼:
movl $0, -4(%ebp) // sum = 0 movl $0, -8(%ebp) // i =0 ...... movl -8(%ebp), %eax //將i送入寄存器eax addl %eax, -4(%ebp) //將i的值加上sum的值,將結果保存到 sum中。 movl $0, %eax
匯編指令還是描述的比較清楚的,可以清楚的看到 sum += i;操作被分割成了兩條cpu指令,先是將i的值保存在eax寄存器中,然后將eax的值加上sum的值並保存在sum中。
而在c++中原子操作就是這樣的一種『小到不可分割的』操作。要使用原子操作我們需要引用c++11的一個新的頭文件<atomic>。在這個頭文件中定義了一個類模板struct atomic表示原子數據類型,在GNU的實現(/usr/include/c++/4.8.3/atomic)上如下:
template<typename _Tp> struct atomic { private: _Tp _M_i; public: atomic() noexcept = default; ~atomic() noexcept = default; atomic(const atomic&) = delete; //刪除了拷貝構造 atomic& operator=(const atomic&) = delete; atomic& operator=(const atomic&) volatile = delete; //刪除了 operator= constexpr atomic(_Tp __i) noexcept : _M_i(__i) { } operator _Tp() const noexcept { return load(); } operator _Tp() const volatile noexcept { return load(); } _Tp operator=(_Tp __i) noexcept { store(__i); return __i; } ... };
atomic模板中還實現了操作符的重載(由於篇幅,查看完整的類結構請參閱atomic頭文件),因此你可以像使用內置的數據類型那樣使用原子數據類型(c++保證這些操作是原子操作)。對應於內置的數據類型,原子數據類型都有一份對應的類型,歸納出來如下:
std::atomic_char | std::atomic<char> |
std::atomic_schar | std::atomic<signed char> |
std::atomic_uchar | std::atomic<unsigned char> |
std::atomic_short | std::atomic<short> |
std::atomic_ushort | std::atomic<unsigned short> |
std::atomic_int | std::atomic<int> |
std::atomic_uint | std::atomic<unsigned int> |
std::atomic_long | std::atomic<long> |
std::atomic_ulong | std::atomic<unsigned long> |
std::atomic_llong | std::atomic<long long> |
std::atomic_ullong | std::atomic<unsigned long long> |
更多的請見:http://en.cppreference.com/w/cpp/atomic/atomic |
我們之前的sum變量是long類型的,對應的原子數據類型是std::atomic_long,下面我們就簡單的修改一下開篇的代碼:
#include <iostream> #include <thread> #include <atomic> // modified std::atomic_long sum = {0L}; // modified void fun() { for(int i=0;i<100000;++i) sum += i; } int main() { std::cout << "Before joining,sun = " << sum << std::endl; std::thread t1(fun); std::thread t2(fun); t1.join(); t2.join(); std::cout << "After joining,sun = " << sum << std::endl; }
我們只增加了一個<atomic>頭文件,並且將 long sum = 0L; 修改成了 std::atomic_long sum {0L}; 注意不要寫成『std::atomic_long sum = 0L』的形式,因為long類型是不可以隱式轉換為std::atomic_long類型的。
為了證明不是偶然性,我們仍用上面的count.sh這個腳本運行1000次上面的修改過的程序:
[thread]g++ atomic.cpp -o main -std=c++11 -lpthread [thread]count.sh cnt=1000 cnt_more=0 cnt_less=0
可以看到原子操作還是有明顯的效果的,這1000次的運行我們都得到了正確的結果。事實證明原子操作的確可以作為解決共享數據引起的問題的一種有效的手段。
和其他的原子數據類型(包括atomic_bool)不同的是,他是鎖無關(lock-free)的一種類型,即線程對它的訪問是不需要加鎖的,因此他也沒有其他的原子類型的讀寫操作(load(),store())、運算符操作等。取而代之的是另外兩個原子操作的函數test_and_set()和clear()。atomic_flag類的結構在GNU上是這樣的:
#if __GCC_ATOMIC_TEST_AND_SET_TRUEVAL == 1 typedef bool __atomic_flag_data_type; #else typedef unsigned char __atomic_flag_data_type; #endif struct __atomic_flag_base { __atomic_flag_data_type _M_i; }; struct atomic_flag : public __atomic_flag_base {
...
bool test_and_set(memory_order __m = memory_order_seq_cst) noexcept; bool test_and_set(memory_order __m = memory_order_seq_cst) volatile noexcept; void clear(memory_order __m = memory_order_seq_cst) noexcept; void clear(memory_order __m = memory_order_seq_cst) volatile noexcept;
... private: static constexpr __atomic_flag_data_type _S_init(bool __i) { return __i ? __GCC_ATOMIC_TEST_AND_SET_TRUEVAL : 0; } };
atomic_flag::test_and_set()和其名字一樣,大致上是這樣工作的:首先檢查這atomic_flag類中的bool成員_M_i是否被設置成true,如果沒有就先設置成true,並返回之前的值(flase),如果atomic_flag中的bool成員已經是true,則直接返回true。
相比較而言atomic_flag::clear()更加簡單粗暴,它直接將atomic_flag的bool值得標志成員_M_i設置成flase,沒有返回值。
既然小標題是『自旋鎖——atomic_flag』,那么我們看看這把自旋鎖(spin lock)是怎么用的:
#include <iostream> #include <atomic> #include <unistd.h> #include <thread> std::atomic_flag lock = ATOMIC_FLAG_INIT; //初始化 void f(int n) { while(lock.test_and_set()) //獲取鎖的狀態 std::cout << "Waiting ... " << std::endl; std::cout << "Thread " << n << " is starting working." << std::endl; } void g(int n) { sleep(3); std::cout << "Thread " << n << " is going to clear the flag." << std::endl; lock.clear(); // 解鎖 } int main() { lock.test_and_set(); std::thread t1(f,1); std::thread t2(g,2); t1.join(); t2.join(); }
進入main函數后我們就先設置好atomic_flag,然后啟動了兩個線程t1和t2,其中t1中我們一直循環獲取atomic_flag的狀態,知道t2睡眠3秒后,clear()掉lock的鎖定狀態。其運行結果:
[thread]g++ atomic_flag.cpp -o main -std=c++11 -lpthread [thread]main Waiting ... Waiting ... Waiting ... Waiting ... Waiting ... // omit lager of "waiting..." thread 2 is going to clear the flag. Thread 1 is starting working.
這樣的結果正合我們的期望,實際上我們就是通過自旋鎖實現了讓t1線程一直在等待t2線程。
更進一步地我們還可以通過簡單的封裝,來實現一把鎖。MyLock.h(為了直觀我就都寫到一個文件中了):
#ifndef __MYLOCK_H_ #define __MYLOCK_H_ #include <iostream> #include <atomic> #include <thread> class MyLock { private: std::atomic_flag m_flag; public: MyLock(); void lock(); void unlock(); }; MyLock::MyLock() { m_flag.clear(); //if not do this,m_flag will be unspecified } void MyLock::lock() { while(m_flag.test_and_set()) ; } void MyLock::unlock() { m_flag.clear(); } #endif
現在我們就試着使用這把鎖,來改寫開篇的那個程序:
#include <iostream> #include <thread> #include "MyLock.h" //code above MyLock lk; long sum = 0; void add() { for(int i=0;i<100000;++i) { lk.lock(); sum += i; lk.unlock(); } } int main() { std::thread t1(add); std::thread t2(add); t1.join(); t2.join(); std::cout << "sum = " << sum << std::endl; }
運行后沒有問題,正確打印出結果sum=9999900000。
如果你點過開過上邊的atomic_flag::test_and_set()的鏈接,你會發現其實它是有參數的,其原型是這樣的:
bool test_and_set(std::memory_order order = std::memory_order_seq_cst) volatile;
bool test_and_set(std::memory_order order = std::memory_order_seq_cst);
void clear( std::memory_order order = std::memory_order_seq_cst ) volatile;
void clear( std::memory_order order = std::memory_order_seq_cst );
這兩個函數原型包含了一個新的數據類型std::memory_order,這是一個枚舉類型,其具體的定義在<bits/atomic_base.h>頭文件中(/usr/include/c++/4.8.3/bits/atomic_base.h)。所有的枚舉值得具體意義,我都查閱資料,注釋在后邊,如下:
typedef enum memory_order { memory_order_relaxed, //不對執行順序做任何保證 memory_order_consume, //本線程中所有有關本原子類型的操作,必須等到本條原子操作完成之后進行 memory_order_acquire, //本線程中,后續的讀操作必須在本條原子操作完成后進行 memory_order_release, // 本線程中,之前的寫操作完成后才執行本條原子操作 memory_order_acq_rel, //memory_order_acquire和memory_order_release 效果的合並 memory_order_seq_cst //順序一致 } memory_order;
test_and_set()和clear()的默認參數都是使用的memory_order_seq_cst這個枚舉值,其語義上是順序一致性(sequential consistent)。順序一致性是指線程執行的順序和我們程序員所寫代碼的順序是一致的。我們首次接觸這個概念的時候,可能會感到疑惑,一直以來我們都理所當然的以為我們寫的是什么,程序就怎么干。其實不然。當編譯器在編譯我們的源碼的時候會權衡我們的代碼做出適當的優化,如果編譯器認為執行順序和程序輸出結果無直接影響,那么就可能會重排序(reorder)指令以提高性能。而memory_order_seq_cst則保證了順序執行程序。如上邊memory_order定義的那樣,在C++11,並不是只支持順序一致的內存模型,因為順序一致意味着最低效。
關於內存順序個人以為這和硬件的關系跟大一些,在此不再用過多篇幅討論。了解一下應該就夠了。
最后謝謝你的閱讀,如果你能給我一點建議的話,那就更好了。