1、概要
本文是無鎖同步系列文章的第一篇,主要探討C++11中的Atomic。
我們知道在C++11中引入了mutex和方便優雅的lock_guard。但是有時候我們想要的是性能更高的無鎖實現,下面我們來討論C++11中新增的原子操作類Atomic,我們可以利用它巧妙地實現無鎖同步。
2、傳統的線程同步
1 #include <thread>
2 #include <mutex>
3
4 #include <iostream>
5
6 using namespace std; 7
8 mutex g_mutex; 9 int g_count = 0; 10
11 int main() 12 { 13 thread thr1([]() { 14 for (int i = 0;i < 5;i++) { 15 lock_guard<mutex> lock(g_mutex); //①
16 g_count += 10; 17 } 18 }); 19
20 thread thr2([]() { 21 for (int i = 0;i < 5;i++) { 22 lock_guard<mutex> lock(g_mutex); //②
23 g_count += 20; 24 } 25 }); 26
27 thr1.join(); 28 thr2.join(); 29
30 cout << g_count << endl; 31
32
33 }
在上述例子中,如果把①②的鎖注釋后,我們可能無法得到正確的結果。原因是C++並沒有給我們保證+=操作具有原子性(其本質應該是讀-加-寫3個操作)。
3、Atomic
C++11給我們帶來的Atomic一系列原子操作類,它們提供的方法能保證具有原子性。這些方法是不可再分的,獲取這些變量的值時,永遠獲得修改前的值或修改后的值,不會獲得修改過程中的中間數值。
這些類都禁用了拷貝構造函數,原因是原子讀和原子寫是2個獨立原子操作,無法保證2個獨立的操作加在一起仍然保證原子性。
這些類中,最簡單的是atomic_flag(其實和atomic<bool>相似),它只有test_and_set()和clear()方法。其中,test_and_set會檢查變量的值是否為false,如果為false則把值改為true。
除了atomic_flag外,其他類型可以通過atomic<T>獲得。atomic<T>提供了常見且容易理解的方法:
- store
- load
- exchange
- compare_exchange_weak
- compare_exchange_strong
store是原子寫操作,而load則是對應的原子讀操作。
exchange允許2個數值進行交換,並保證整個過程是原子的。
而compare_exchange_weak和compare_exchange_strong則是著名的CAS(compare and set)。參數會要求在這里傳入期待的數值和新的數值。它們對比變量的值和期待的值是否一致,如果是,則替換為用戶指定的一個新的數值。如果不是,則將變量的值和期待的值交換。
weak版本的CAS允許偶然出乎意料的返回(比如在字段值和期待值一樣的時候卻返回了false),不過在一些循環算法中,這是可以接受的。通常它比起strong有更高的性能。
3、例子
下面舉個簡單的例子,使用CAS操作實現一個不帶鎖的並發棧。這個例子從《C++並發編程》摘抄而來。
Push
在非並發條件下,要實現一個棧的Push操作,我們可能有如下操作:
- 新建一個節點
- 將該節點的next指針指向現有棧頂
- 更新棧頂
但是在並發條件下,上述無保護的操作明顯可能出現問題。下面舉一個例子:
- 原棧頂為A。(此時棧狀態: A->P->Q->...,我們約定從左到右第一個值為棧頂,P->Q代表p.next = Q)
- 線程1准備將B壓棧。線程1執行完步驟2后被強占。(新建節點B,並使 B.next = A,即B->A)
- 線程2得到cpu時間片並完成將C壓棧的操作,即完成步驟1、2、3。此時棧狀態(此時棧狀態: C->A->...)
- 這時線程1重新獲得cpu時間片,執行步驟3。導致棧狀態變為(此時棧狀態: B->A->...)
結果線程2的操作丟失,這顯然不是我們想要的結果。
那么我們如何解決這個問題呢?只要保證步驟3更新棧頂時候,棧頂是我們在步驟2中獲得頂棧頂即可。因為如果有其它線程進行操作,棧頂必然改變。
我們可以利用CAS輕松解決這個問題:如果棧頂是我們步驟2中獲取頂棧頂,則執行步驟3。否則,自旋(即重新執行步驟2)。
因此,不帶鎖的壓棧Push操作比較簡單。
1 template<typename T> 2 class lock_free_stack 3 { 4 private: 5 struct node 6 { 7 T data; 8 node* next; 9 10 node(T const& data_): 11 data(data_) 12 {} 13 }; 14 15 std::atomic<node*> head; 16 public: 17 void push(T const& data) 18 { 19 node* const new_node=new node(data); 20 new_node->next=head.load(); 21 while(!head.compare_exchange_weak(new_node->next,new_node)); 22 } 23 };
我們可以注意到一個非常巧妙的設計。在push方法里,atomic_compare_exchange_weak如果失敗,證明有其他線程更新了棧頂,而這個時候被其他線程更新的新棧頂值會被更新到new_node->next中,因此循環可以直接再次嘗試壓棧而無需由程序員更新new_node->next。
Pop
2017.3.14:
發現原文Pop部分有錯誤,所以暫時刪除