無鎖同步-C++11之Atomic和CAS


1、概要

      本文是無鎖同步系列文章的第一篇,主要探討C++11中的Atomic。

      我們知道在C++11中引入了mutex和方便優雅的lock_guard。但是有時候我們想要的是性能更高的無鎖實現,下面我們來討論C++11中新增的原子操作類Atomic,我們可以利用它巧妙地實現無鎖同步。

2、傳統的線程同步

 1 #include <thread>
 2 #include <mutex>
 3 
 4 #include <iostream>
 5 
 6 using namespace std;  7 
 8 mutex g_mutex;  9 int g_count = 0; 10 
11 int main() 12 { 13  thread thr1([]() { 14         for (int i = 0;i < 5;i++) { 15             lock_guard<mutex> lock(g_mutex);    //
16             g_count += 10; 17  } 18  }); 19 
20  thread thr2([]() { 21         for (int i = 0;i < 5;i++) { 22             lock_guard<mutex> lock(g_mutex);    //
23             g_count += 20; 24  } 25  }); 26 
27  thr1.join(); 28  thr2.join(); 29 
30     cout << g_count << endl; 31 
32 
33 }

       在上述例子中,如果把①②的鎖注釋后,我們可能無法得到正確的結果。原因是C++並沒有給我們保證+=操作具有原子性(其本質應該是讀-加-寫3個操作)。

3、Atomic

       C++11給我們帶來的Atomic一系列原子操作類,它們提供的方法能保證具有原子性。這些方法是不可再分的,獲取這些變量的值時,永遠獲得修改前的值或修改后的值,不會獲得修改過程中的中間數值。

       這些類都禁用了拷貝構造函數,原因是原子讀和原子寫是2個獨立原子操作,無法保證2個獨立的操作加在一起仍然保證原子性。

       這些類中,最簡單的是atomic_flag(其實和atomic<bool>相似),它只有test_and_set()和clear()方法。其中,test_and_set會檢查變量的值是否為false,如果為false則把值改為true。

       除了atomic_flag外,其他類型可以通過atomic<T>獲得。atomic<T>提供了常見且容易理解的方法:

  1. store
  2. load
  3. exchange
  4. compare_exchange_weak
  5. compare_exchange_strong

       store是原子寫操作,而load則是對應的原子讀操作。

       exchange允許2個數值進行交換,並保證整個過程是原子的。

       而compare_exchange_weak和compare_exchange_strong則是著名的CAS(compare and set)。參數會要求在這里傳入期待的數值和新的數值。它們對比變量的值和期待的值是否一致,如果是,則替換為用戶指定的一個新的數值。如果不是,則將變量的值和期待的值交換。

       weak版本的CAS允許偶然出乎意料的返回(比如在字段值和期待值一樣的時候卻返回了false),不過在一些循環算法中,這是可以接受的。通常它比起strong有更高的性能。

3、例子

       下面舉個簡單的例子,使用CAS操作實現一個不帶鎖的並發棧。這個例子從《C++並發編程》摘抄而來。

  Push

       在非並發條件下,要實現一個棧的Push操作,我們可能有如下操作:

    1. 新建一個節點
    2. 將該節點的next指針指向現有棧頂
    3. 更新棧頂    

       但是在並發條件下,上述無保護的操作明顯可能出現問題。下面舉一個例子:

  1. 原棧頂為A。(此時棧狀態: A->P->Q->...,我們約定從左到右第一個值為棧頂,P->Q代表p.next = Q)
  2. 線程1准備將B壓棧。線程1執行完步驟2后被強占。(新建節點B,並使 B.next = A,即B->A)
  3. 線程2得到cpu時間片並完成將C壓棧的操作,即完成步驟1、2、3。此時棧狀態(此時棧狀態: C->A->...)
  4. 這時線程1重新獲得cpu時間片,執行步驟3。導致棧狀態變為(此時棧狀態: B->A->...)

       結果線程2的操作丟失,這顯然不是我們想要的結果。

 

       那么我們如何解決這個問題呢?只要保證步驟3更新棧頂時候,棧頂是我們在步驟2中獲得頂棧頂即可。因為如果有其它線程進行操作,棧頂必然改變。

       我們可以利用CAS輕松解決這個問題:如果棧頂是我們步驟2中獲取頂棧頂,則執行步驟3。否則,自旋(即重新執行步驟2)。

       因此,不帶鎖的壓棧Push操作比較簡單。

 

 1 template<typename T>
 2 class lock_free_stack
 3 {
 4 private:
 5   struct node
 6   {
 7     T data;
 8     node* next;
 9 
10     node(T const& data_): 
11      data(data_)
12     {}
13   };
14 
15   std::atomic<node*> head;
16 public:
17   void push(T const& data)
18   {
19     node* const new_node=new node(data); 
20     new_node->next=head.load(); 
21     while(!head.compare_exchange_weak(new_node->next,new_node));
22   }
23 };

       我們可以注意到一個非常巧妙的設計。在push方法里,atomic_compare_exchange_weak如果失敗,證明有其他線程更新了棧頂,而這個時候被其他線程更新的新棧頂值會被更新到new_node->next中,因此循環可以直接再次嘗試壓棧而無需由程序員更新new_node->next

  Pop

     2017.3.14:

     發現原文Pop部分有錯誤,所以暫時刪除


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM