一. std::atomic_flag和std::atomic
(一)std::atomic_flag
1. std::atomic_flag是一個bool類型的原子變量,它有兩個狀態set和clear,對應着flag為true和false。
2. std::atomic_flag使用前必須被ATOMIC_FLAG_INIT初始化,此時的flag為clear狀態,相當於靜態初始化。
3. 三個原子化操作
(1)test_and_set():檢查當前flag是否被設置。若己設置直接返回true,若沒設置則將flag置為true ,並返回false。
(2)clear();清除flag標志,即flag=false。
(3)析構函數
4. 和所有atomic類型一樣,std::atomic_flag不支持拷貝和賦值等操作。因為賦值和拷貝調用了兩個對象,從第一個對象中讀值,然后再寫入另一個。對於兩個獨立的對象,這里就有兩個獨立的操作,合並這兩個操作必定不是原子的。
5. std::atomic_flag 類型不提供is_lock_free()。 該類型是一個簡單的布爾標志, 並且在這種類型上的操作都是無鎖的。但atomic_flag的可操作性不強,導致其應用局限性,還不如std::atomic<bool>。
(二)std::atomic<T>模板類
1. std::atomic_flag是無鎖類型的,但是atomic<bool>不一定是lock free的,可以用atomic<T>::is_lock_free()來判斷。通常情況下,編譯器不會為std::atomic<UDT>生成無鎖代碼,所有操作使用一個內部鎖(UDT為用戶自定義類型,如果其類型大小如同int或void*時,大多數平台仍會使用原子指令)。
2. fetch_系列函數返回的是舊值,復合賦值運算返回的是新值,但它們返回的都不是引用類型。因為任何依賴與這個結果的代碼都需要顯式加載該值。潛在的問題是,結果可能會被其他線程修改。而通過非原子值進行賦值,可以避免多余的加載過程,並且得到實際存儲的值。
3. compare_exchange_weak/strong函數可以保證“比較-交換”的原子化。compare_exchange_weak可能失敗,即此函數可能與expected值相等的情形下atomic的T值沒有替換為disired(atomic值未變)且返回false,這可能發生在缺少單條CAS操作(“比較-交換”指令)的機器上,所以通常使用一個循環中。

bool compare_exchange_strong(T& expected, const T desired) volatile noexcept { if (*this == expected) *this = desired; else expected = *this; return (*this == expected) }
4. 整數原子類型沒有乘法、除法、位移操作,因為整數原子類型通常用於計數或者掩碼等,如果需要,可以將compare_change_weak()放入循環中完成。
5. std::atomic<UDT>,UDT類不能有任何虛函數或虛基類,以及必須使用編譯器創建的拷貝賦值操作。所有的基類和非靜態數據成員也都需要支持拷貝賦值操作。其比較-交換操作就類似於memcmp使用按位比較,而非為UDT類定義一個比較操作符。
6. 每個函數的操作都有一個內存排序參數。這個參數可以用來指定存儲的順序。
(1)store操作,可選如下順序:memory_order_relaxed、memory_order_release、memory_order_seq_cst。
(2)load操作,可選順序:memory_order_relaxed、memory_order_consume、memory_order_acquire和memory_order_seq_cst。
(3)讀-改-寫操作(RMW):memory_order_relaxed、memory_order_consume、memory_order_acquire、memory_order_release、memory_order_acq_rel和memory_order_seq_cst。
【編程實驗】std::atomic<T> 初體驗
#include <atomic> #include <thread> #include <iostream> #include <vector> #include <sstream> #include <mutex> #include <cassert> std::mutex io_mutex; class Foo {}; //自己封裝的自旋鎖 class spinlock { std::atomic_flag flag = ATOMIC_FLAG_INIT; public: void lock() { while (flag.test_and_set(std::memory_order_acquire)) { //std::this_thread::yield(); //這里應注釋掉,否則會引起線程切換,成本較高達不到自 //旋鎖要的效率 } } void unlock() { flag.clear(std::memory_order_release); } }; spinlock g_lock; std::stringstream stream; void append_number(int x) { g_lock.lock(); stream << "thread(" << std::this_thread::get_id() << "): " << x << "\n"; g_lock.unlock(); } std::vector<int> data; std::atomic<bool> data_read(false); void reader_thread() { while (!data_read.load()) { std::this_thread::sleep_for(std::chrono::microseconds(1)); } std::cout << "The answer=" << data[0] << std::endl; //1 } void writer_thread() { data.push_back(42); //2。 由於1和2處發生了data race,所以使用atomic<boo>進行同步。 data_read = true; } std::atomic<int> g_count = 0; void test() { for (int i = 0; i < 100000; ++i) { //count++; //原子操作 //g_count += 1; //原子操作 g_count = g_count + 1; //注意,不是原子操作 } } //線程安全的簡單鏈表 struct linkList { struct Node { int value; Node* next; }; void append(int val) { Node* newNode = new Node{ val,list_head }; //將頭結點移動到newNode的后面(即next下) //注意,由於線程的data race,這里可能出現多個線程 //同時創建新結點,並將newNode->next = list_head // 頭插法 //這里需要用while循環,因為第1個搶到compare_exchange_strong的線程,會成功將list_head改為 //newNode,使得newNode成為新的list_head(頭插法) 。而其余線程第1次調用compare_exchange_strong //時會因為newNode->next不等於新的list_head而失敗,該函數會將新的list_head掛在newNode->next下面, //如此當再次循環時,就能夠成功將newNode設置為新的list_head(這里可能再次遇到data race,會重復上述 //過程,直到成功)。 while (!(list_head.compare_exchange_strong(newNode->next, newNode))); //函數體為空 } void print() { for (Node* it = list_head; it != nullptr; it = it->next) { std::cout << " " << it->value; } std::cout << std::endl; } void clear() { Node* it; while (it = list_head) { list_head = it->next; delete it; } } std::atomic<Node*> list_head = nullptr; }; //實現一個無鎖的線程安全棧 template<typename T> class stack { struct node { std::shared_ptr<T> data; node* next; node(const T& d) :next(nullptr), data(std::make_shared<T>(d)) {} //用shared_ptr指向新分配出來的T }; private: std::atomic<node*> head; private: std::atomic<unsigned> threads_in_pop; //正在調用pop函數的線程數量 std::atomic<node*> toDeleted; public: void push(const T& data) { node* newNode = new node(data); //創建新節點 newNode->next = head.load(); //將原head移到新節點的后驅節點。這里可能出現多線程 //同時將head節點作為其后驅 //compare_exchange_weak是原子操作。先進行比較再交換。由於多線程的競爭,每次只允許 //一個線程compared_exchange_weak,第1個搶到的線程會交換成功,成為新的head。而其它 //線程則會失敗,並將newNode的后驅改為新的head並繼續循環比較-交換操作,直至所有 //線程的push操作結束。 while (!head.compare_exchange_weak(newNode->next, newNode)); //新節點變為新的head } std::shared_ptr<T> pop() { ++threads_in_pop; //引用計數,統計有多少個線程正在使用該函數 node* oldHead = head.load(); //① 多線程時,會出現head被多個線程搶到。 //每次只有一個線程可以執行compare_exchange_weak操作,並將head的后驅變成新的head //其它線程會返回失敗,並重置oldhead為新的head,然后開始另一輪的循環。 while (oldHead && !head.compare_exchange_weak(oldHead, oldHead->next)); //以下存在內存泄漏,雖然oldHead->data被取走了,但oldHead這個結點本身沒被釋放! //return oldHead ? oldHead->data : std::shared_ptr<T>();//由於①處oldHead的競爭,其他線程可能正在使 //用該節點,這里如果直接delete oldHead可能造 //成未定義行為,怎么辦? //解決方案: std::shared_ptr<T> res; if (oldHead) res.swap(oldHead->data); //從節點中提取數據 //當多線程同時調用pop時,先將這個要刪除的結點加入到待刪除節點鏈表中,然后等 //最后一個線程pop時,再將這個鏈表delete。 try_reclaim(oldHead); //嘗試回收節點 return res; } //刪除某個節點 static void delete_nodes(node* nodes) { while (nodes) { node* next = nodes->next; delete nodes; nodes = next; } } void try_reclaim(node* old_head) { if (threads_in_pop == 1) //只有當前線程使用。沒有競爭,可以直接刪除old_head { node* nodes_to_delete = toDeleted.exchange(nullptr);//返回toDeleted舊值 if (!--threads_in_pop) //雙重檢查,因為在上面的賦值時,可能另一個線程再pop()進來 { delete_nodes(nodes_to_delete); //確實只剩一個線程,就刪除待刪除節點鏈表 } else if (nodes_to_delete) { chain_pending_nodes(nodes_to_delete);//將已存在的鏈表鏈接到刪除鏈表后面 } delete old_head; //刪除節點本身 } else { chain_pending_node(old_head); //將節點加入到待刪除鏈表中(頭插法) --threads_in_pop; } } //將已存在的鏈表鏈接到刪除鏈表后面 void chain_pending_nodes(node* nodes) { node* last = nodes; while (node * next = last->next) { //讓next指針指向鏈表的末尾 last = next; } chain_pending_nodes(nodes, last); } void chain_pending_nodes(node* first, node* last) { last->next = toDeleted; //將原鏈表放在last->next下。 while (!toDeleted.compare_exchange_weak(last->next, first)); //first成為新的鏈表頭部 } void chain_pending_node(node* n) { chain_pending_nodes(n, n); } }; int main() { //1. 自旋瑣的實現 std::vector<std::thread> threads; for (int i = 1; i < 10; ++i) { threads.push_back(std::thread(append_number, i)); } for (auto& th : threads) th.join(); std::cout << stream.str(); //2. std::atomic<bool>的使用 std::thread reader(reader_thread); std::thread writer(writer_thread); reader.join(); writer.join(); //3. std::atomic<T*>的使用(指針的原子相加減操作) Foo fArr[5]; std::atomic<Foo*> pF(fArr); Foo* x = pF.fetch_add(2); //fetch_add返回舊值,即x==fArr assert(x == fArr); x = (pF -= 1); //x ==pF == &fArr[1]; assert(x == &fArr[1]); assert(pF.load() == &fArr[1]); //4. 線程安全的無鎖結構 //4.1 無鎖鏈表 std::vector<std::thread> ths; linkList lst; for (int i = 0; i < 10; ++i) ths.push_back(std::thread(&linkList::append, &lst, i)); for (auto& th : ths) th.join(); //打印內容 lst.print(); lst.clear(); //4.2 無鎖棧結構 stack<int> st; std::vector<std::thread> producers; for (int i = 0; i < 10; ++i) { producers.emplace_back([&st, i]() {st.push(i); }); } for (auto& p : producers) p.join(); std::vector<std::thread> consumers; for (int i = 0; i < 10; ++i) { consumers.emplace_back([&st]() { std::lock_guard<std::mutex> lck(io_mutex); std::cout << *st.pop() << " "; }); } for (auto& c : consumers) c.join(); std::cout << std::endl; //5.原子和非原子的加減操作 std::thread t1(test); std::thread t2(test); t1.join(); t2.join(); std::cout << g_count << std::endl; return 0; } /*輸出結果 thread(2200): 2 thread(13492): 3 thread(19072): 4 thread(11624): 5 thread(16712): 6 thread(14632): 1 thread(14124): 7 thread(2092): 8 thread(2820): 9 The answer=42 8 9 7 6 5 4 3 1 2 0 9 8 7 6 5 4 3 2 1 0 106675 */
二. 內存屏障與同步
(一)亂序的原因
從源碼變成可以被機器識別的程序,至少要經過編譯期和運行期。重排序分為兩類:編譯期重排序和運行期重排序。由於重排序的存在,指令實際的執行順序,並不是源碼中看到的順序。
1. 編譯器出於優化的目的,在編譯階段將源碼的順序進行交換。
2. 對計算機來說,通常內存的寫操作相對於讀操作要昂貴很多。因此,寫操作一般會在 CPU 內部的store buffer緩存。這就導致在一個 CPU 里執行一個寫操作之后,不一定馬上就被另一個 CPU 所看到。這從另一個角度講,效果上其實就是讀寫亂序了。
(二)CPU cache 工作機制及內存屏障【非權威,僅供參考】
1. 為了提高CPU效率,每個CPU中都有兩塊私有的特殊內存:store buffer和Invalidate隊列。其中,store buffer,用於CPU臨時寫入數據的緩沖區。Invalidate隊列為該CPU cache即將無效數據項的請求隊列。只有位於cache或memory中的數據才會被CPUs共享,並且他們通過MESI協議保證了數據的一致性。即不論哪個CPU發生了cache miss,它們從其他CPU嗅探到的或從內存加載的數據都是一樣的。
2. 存在於store buffer中的數據是最新的(因為執行了寫操作),該CPU下次的讀操作往往優先從這里而不是cache中獲取。但store buffer中的數據是該CPU私有的,不能被其他CPU共享。而且經常也不會立即刷新到cache或memory中,一般需要等到寫這個數據的CPU接收到來自其他CPU的invalidate acknowledgement回復后才會進行刷新。即store buffer和cache中數據可能是不一樣的,store buffer的較新,而cache中是舊值。此外,讀操作時,只有該項數據是無效的才會從其他CPU或memory中讀取,如果仍處於有效狀態則會從本地cache中讀取。因此,如果invalidate請求還保存在隊列中的話,就可能讀到舊值。只有等該CPU處理完隊列中的這條invalidate消息,才會從cache或memory中讀取。因此,store buffer和Invalidate隊列都可能引起memory oder問題。
3. 內存屏障的作用
(1)寫屏障指令:作用於store buffer,它只是約束執行CPU上的store操作的順序。該指令有兩個作用:A. 確保寫屏障之前的store操作不會被重排到指令之后。B.執行當遇到該指令時首先flush store buffer(也就是將指令之前store操作寫入於store buffer中的值刷新到cacheline中)。在C++11中可以使用std::atomic_thread_fence(std::memory_order_release)強制加入寫屏障指令。
(2)讀屏障指令:作用於Invalidate queue上,它只是約束執行CPU上的load操作的順序。該指令有兩個作用:A. 確保讀屏障之后的load操作不會被重排到指令之前。B.執行后續的load操作之前,先將Invalidate Queue隊列中相關項執行完,確保后續的load操作觸發cache miss,從而讀到最新的數據。讀屏障指令就象一道柵欄,嚴格區分了之前和之后的load操作。在C++11中可以使用std::atomic_thread_fence(std::memory_order_acquire) 強制加入讀屏障指令。
(三)C++11的6種memory order
內存順序 |
作用 |
memory_order_relexed |
①只保證當前操作的原子性。 ②不考慮線程間的同步,其它線程可能讀到舊值(因為不指定內存屏障,所以內存操作執行時可能是亂序的) |
memory_order_acquire |
①對讀取(load)實施acquire語義。相當於插入一個內存讀屏障,保證之后的讀操作不會被重排到該操作之前。 ②類似於mutex的lock操作,但不會阻塞。只是保證如果其它線程的release己經發生,則本線程其后對該原子變量的load操作一定會獲取到該變量release之前發生的寫入。因此,acuquire一般需要用循環,等待其他線程release的發生。 |
memory_order_release |
①對寫入(store)實施release語義(類似於mutex的unlock操作)。保證之前的寫操作不會被重排到該操作之后,同時確保該操作之前的store操作將數據寫入cache或memory中,確保cache或memory是最新數據。(相當於插入一個寫屏障)。 ②該操作之前當前線程內的所有寫操作,對於其他對這個原子變量進行acquire或assume的線程可見。(對於assume,寫操作指對依賴變量的寫操作) ③該操作本身是原子操作。 |
memory_order_consume |
類似memory_order_acquire,但只對與這塊內存有關的讀寫操作起作用。 |
memory_order_acq_rel |
①對讀取和寫入施加acquire-release語義,無法被重排(相當於同時插入讀寫兩種內存屏障)。 ②可以看見其他線程施加release語義的所有寫入,同時自己release結束后所有寫入對其他acquire線程可見。 |
memory_order_seq_cst |
①如果是讀取就是acquire語義,寫入就是release語義,讀寫就施加acquire-release語義。 ②同時會對所有使用此memory-order的原子操作建立一個全局順序。這樣,所有線程都將看到同樣一個的內存操作順序。 |
【編程實驗】內存順序
#include <iostream> #include <thread> #include <atomic> #include <cassert> using namespace std; //Acuqire-Release、relaxed語義 class CAcqRel { int m; std::atomic<bool> x, y; std::atomic<int> z; public: CAcqRel():x(false),y(false),z(0),m(0){} void write_x_then_y() { m = 1; //非原子變量 x.store(true, std::memory_order_relaxed); //relaxed語義,無法保證a和x寫入的先后順序。 y.store(true, std::memory_order_release); //對x實施release語義,保證a/x一定在y之前被寫入 } void read_y_then_x() { while (!y.load(std::memory_order_acquire)); //對y實施acuqire語義,同時使用循 //等待y原子變量的release的發生。 if (x.load(std::memory_order_relaxed)) //acquire線程可見到release之前的寫操作,因此x為true。 ++z; } void test() { std::thread a(&CAcqRel::write_x_then_y, this); std::thread b(&CAcqRel::read_y_then_x, this); a.join(); b.join(); assert(z.load() != 0); //條件成立 assert(m == 1); //條件成立 } }; //Consume語義 class CConsume { std::atomic<std::string*> ptr; int data; std::atomic<int> atData; public: CConsume():data(0),atData(0),ptr(nullptr){} void producer() { std::string* p = new std::string("Hello"); data = 42; atData.store(2019, std::memory_order_relaxed); ptr.store(p, std::memory_order_release); } void consumer() { std::string* p2; while (!(p2 = ptr.load(std::memory_order_consume))); //consume語義,只能保證額ptr //依賴的變量p己被存儲,但不保證 //data和atData的值。 assert(*p2 == "Hello"); //條件一定成立。 assert(data == 42); //無法保證data一定等於42。因為ptr對其無依賴。 assert(atData == 2019); //無法保證atData一定等於2019,因為ptr對其無依賴 } void test() { std::thread t1(&CConsume::consumer, this); std::thread t2(&CConsume::producer, this); t1.join(); t2.join(); } }; //seq_cst語義 class CSeqCst { std::string work; std::atomic<bool> ready; std::atomic<int> data; public: CSeqCst():ready(false),data(0){} void write() { //以下的寫操作由於采用memory_order_seq_cst語義,因此當寫入時會產生一個全局 //的寫入順序,即先work再data,最后寫入ready。這個順序對所有使用該語義的 //線程可見,即 work = "done"; data.store(2019, std::memory_order_seq_cst); ready = true; //默認采用memory_order_seq_cst語義 } void read() { //默認采用memory_order_seq_cst語義。當ready發生時,由於全局順序必然知道data //和work的存儲己發生 while (!ready.load()); std::cout << work << std::endl; //done,全局順序,work一定為done std::cout << data << std::endl; //2019,全局順序,data一定等於2019 } void test() { std::thread t1(&CSeqCst::write, this); std::thread t2(&CSeqCst::read, this); t1.join(); t2.join(); } }; class CMemoryBarriers { bool x = false; //x為一個非原子變量 std::atomic<bool> y; std::atomic<int> z; public: CMemoryBarriers():x(false),y(false),z(0){} void write_x_then_y() { x = true; //①在柵欄前存儲x std::atomic_thread_fence(std::memory_order_release); //②“釋放柵欄” y.store(true, std::memory_order_relaxed); //③在柵欄之后存儲y } void read_y_then_x() { while (!y.load(std::memory_order_relaxed)); //④ 在③寫入前持續等待 std::atomic_thread_fence(std::memory_order_acquire); //⑤“讀取柵欄”。②與⑤是同步關系 //②與⑤兩個柵欄都是必要的,這樣才 //能在兩個線程間建立同步關系。 if (x) ++z; //⑥。這里的x讀取是在⑤之后,由於柵欄之間的同步關系,也發生在①之后。所以 //這里讀取到的值是①寫入的。 } void test() { std::thread a(&CMemoryBarriers::write_x_then_y, this); std::thread b(&CMemoryBarriers::read_y_then_x, this); a.join(); b.join(); assert(z.load() != 0); //條件成立 } }; int main() { //測試memory-order CAcqRel ar; ar.test(); CConsume cs; cs.test(); CSeqCst sc; sc.test(); CMemoryBarriers mb; mb.test(); return 0; } /*輸出結果 done 2019 */