[轉]Lock-Free 編程


 

 

原文:http://www.cnblogs.com/gaochundong/p/lock_free_programming.html

 

Lock-Free 編程

 

文章索引

Lock-Free 編程是什么?

當談及 Lock-Free 編程時,我們常將其概念與 Mutex 或 Lock 聯系在一起,描述要在編程中盡量少使用這些鎖結構,降低線程間互相阻塞的機會,以提高應用程序的性能。類同的概念還有 "Lockless" 和 "Non-Blocking" 等。實際上,這樣的描述只涵蓋了 Lock-Free 編程的一部分內容。本質上說,Lock-Free 編程僅描述了代碼所表述的性質,而沒有限定或要求代碼該如何編寫。

基本上,如果程序中的某一部分符合下面的條件判定描述,則我們稱這部分程序是符合 Lock-Free的。反過來說,如果某一部分程序不符合下面的條件描述,則稱這部分程序是不符合 Lock-Free 的。

從這個意義上來說,Lock-Free 中的 "Lock" 並沒有直接涉及 Mutex 或 Lock 等互斥量結構,而是描述了應用程序因某種原因被鎖定的可能性,例如可能因為死鎖(DeadLock)、活鎖(LiveLock)或線程調度(Thread Scheduling)導致優先級被搶占等。

Lock-Free 編程的一個重要效果就是,在一系列訪問 Lock-Free 操作的線程中,如果某一個線程被掛起,那么其絕對不會阻止其他線程繼續運行(Non-Blocking)。

下面的這段簡單的程序片段中,沒有使用任何互斥量結構,但卻不符合 Lock-Free 的性質要求。如果用兩個線程同時執行這段代碼,在線程以某種特定的調度方式運行時,非常有可能兩個線程同時陷入死循環,也就是互相阻塞了對方。

  while (x == 0)
  {
    x = 1 - x;
  }

所以說,Lock-Free 編程所帶來的挑戰不僅來自於其任務本身的復雜性,還要始終着眼於對事物本質的洞察。

通常,應該沒有人會期待一個大型的應用程序中全部采用 Lock-Free 技術,而都是在有特定需求的類的設計上采用 Lock-Free 技術。例如,如果需要一個 Stack 類應對多線程並發訪問的場景,可以使用 Lock-Free 相關技術實現 ConcurrentStack 類,在其 Push 和 Pop 操作中進行具體的實現。所以,在使用 Lock-Free 技術前,需要預先考慮一些軟件工程方面的成本:

  • Lock-Free 技術很容易被錯誤的使用,代碼后期的維護中也不容易意識到,所以非常容易引入 Bug,而且這樣的 Bug 還非常難定位。
  • Lock-Free 技術的細節上依賴於內存系統模型、編譯器優化、CPU架構等,而這在使用 Lock 機制時是不相關的,所以也增加了理解和維護的難度。

Lock-Free 編程技術

當我們准備要滿足 Lock-Free 編程中的非阻塞條件時,有一系列的技術和方法可供使用,如原子操作(Atomic Operations)、內存柵欄(Memory Barrier)、避免 ABA 問題(Avoiding ABA Problem)等。那么我們該如何抉擇在何時使用哪種技術呢?可以根據下圖中的引導來判斷。

讀改寫原子操作(Atomic Read-Modify-Write Operations)

原子操作(Atomic Operations)在操作內存時可以被看做是不可分割的(Indivisible),其他線程不會打斷該操作,沒有操作只被完成一部分之說。在現代的 CPU 處理器上,很多操作已經被設計為原子的,例如對齊讀(Aligned Read)和對齊寫(Aligned Write)等。

Read-Modify-Write(RMW)操作的設計則讓執行更復雜的事務操作變成了原子的,使得當有多個寫入者想對相同的內存進行修改時,保證一次只執行一個操作。

例如,常見的對整型值進行加法操作的 RMW 操作:

  • 在 Win32 中有 _InterlockedIncrement
  • 在 iOS 中有 OSAtomicAdd32
  • 在 C++11 中有 std::atomic<int>::fetch_add

RMW 操作在不同的 CPU 家族中是通過不同的方式來支持的。

例如在 x86 架構下,通過 LOCK 指令前綴可以使許多指令操作(ADD, ADC, AND, BTC, BTR, BTS, CMPXCHG, CMPXCH8B, DEC, INC, NEG, NOT, OR, SBB, SUB, XOR, XADD, and XCHG)變成原子操作,其中 CMPXCHG 指令可用於實現 CAS 操作

下面是使用 LOCK 和 CMPXCHG 來實現 CAS 操作的代碼示例。

復制代碼
__inline int CAS(volatile int & destination, int exchange, int comperand)
{
  __asm {
    MOV eax, comperand
    MOV ecx, exchange
    MOV edx, destination
    LOCK CMPXCHG[edx], ecx /* 如果eax與edx相等, 則ecx送edx且ZF置1;
                              否則edx送ecx, 且ZF清0.*/
  }
}

/* Accumulator = AL, AX, EAX, or RAX depending on 
   whether a byte, word, doubleword, or
   quadword comparison is being performed */

IF accumulator = DEST
  THEN
    ZF ← 1;
    DEST ← SRC;
  ELSE
    ZF ← 0;
    accumulator ← DEST;
FI;
復制代碼

Compare-And-Swap 循環(CAS Loops)

在 Win32 平台上,CAS 操作有一組原生的實現,例如 _InterlockedCompareExchange 等。對 RMW 操作最常見的討論可能就是,如何通過 CAS Loops 來完成對事務的原子處理。

通常,開發人員會設計在一個循環中重復地執行 CAS 操作以試圖完成一個事務操作。這個過程分為 3 步:

  1. 從指定的內存位置讀取原始的值;
  2. 根據讀取到的原始的值計算出新的值;
  3. 檢測如果內存位置仍然是原始的值時,則將新值寫入該內存位置;

例如,向 LockFreeStack 中壓入新的節點:

復制代碼
 1 void LockFreeStack::Push(Node* newHead)
 2 {
 3   for (;;)
 4   {
 5     // Read the original value from a memory location.
 6     // Copy a shared variable (m_Head) to a local.
 7     Node* oldHead = m_Head;
 8 
 9     // Compute the new value to be set.
10     // Do some speculative work, not yet visible to other threads.
11     newHead->next = oldHead;
12 
13     // Set the new value only if the memory location is still the original value.
14     // Next, attempt to publish our changes to the shared variable.
15     // If the shared variable hasn't changed, the CAS succeeds and we return.
16     // Otherwise, repeat.
17     if (_InterlockedCompareExchange(&m_Head, newHead, oldHead) == oldHead)
18       return;
19   }
20 }
復制代碼

上面代碼中的循環操作仍然符合 Lock-Free 條件要求,因為如果 _InterlockedCompareExchange 條件測試失敗,也就意味着另外的線程已經成功修改了值,而當前線程可以再下一個循環周期內繼續判斷以完成操作。

ABA 問題(ABA Problem)

在實現 CAS Loops 時,當存在多個線程交錯地對共享的內存地址進行處理時,如果實現設計的不正確,將有可能遭遇 ABA 問題

若線程對同一內存地址進行了兩次讀操作,而兩次讀操作得到了相同的值,通過判斷 "值相同" 來判定 "值沒變"。然而,在這兩次讀操作的時間間隔之內,另外的線程可能已經修改了該值,這樣就相當於欺騙了前面的線程,使其認為 "值沒變",實際上值已經被篡改了。

下面是 ABA 問題發生的過程:

  1. T1 線程從共享的內存地址讀取值 A;
  2. T1 線程被搶占,線程 T2 開始運行;
  3. T2 線程將共享的內存地址中的值由 A 修改成 B,然后又修改回 A;
  4. T1 線程繼續執行,讀取共享的內存地址中的值仍為 A,認為沒有改變然后繼續執行;

由於 T1 並不知道在兩次讀取的值 A 已經被 "隱性" 的修改過,所以可能產生無法預期的結果。

例如,使用 List 來存放 Item,如果將一個 Item 從 List 中移除並釋放了其內存地址,然后重新創建一個新的 Item,並將其添加至 List 中,由於優化等因素,有可能新創建的 Item 的內存地址與前面刪除的 Item 的內存地址是相同的,導致指向新的 Item 的指針因此也等同於指向舊的 Item 的指針,這將引發 ABA 問題。

舉個更生活化的例子:

土豪拿了一個裝滿錢的 Hermes 黑色錢包去酒吧喝酒,將錢包放到吧台上后,轉頭和旁邊的朋友聊天,小偷趁土豪轉頭之際拿起錢包,將錢包里的錢取出來並放入餐巾紙保持錢包厚度,然后放回原處,小偷很有職業道德,只偷錢不偷身份證,土豪轉過頭后發現錢包還在,並且還是他熟悉的 Hermes 黑色錢包,厚度也沒什么變化,所以土豪認為什么都沒發生,繼續和朋友聊天,直到結賬時發現錢包中的錢已經被調包成餐巾紙。

所以,我覺得 ABA 問題還可以被俗稱為 "調包問題"。那么怎么解決 "調包問題" 呢?土豪開始想辦法了。

土豪想的第一個辦法是,找根繩子將錢包綁在手臂上,要打開錢包就得先把繩子割斷,割繩子就會被發現。這種做法實際上就是 Load-Link/Store-Conditional (LL/SC) 架構中所做的工作。

土豪想的另一個辦法是,在錢包上安個顯示屏,每次打開錢包顯示屏上的數字都會 +1,這樣當土豪在轉頭之前可以先記錄下顯示屏上的數字,在轉過頭后可以確認數字是否有變化,也就知道錢包是否被打開過。這種做法實際上就是 x86/64 架構中 Double-Word CAS Tagging 所做的工作。

土豪還擔心小偷下次會不會買一個一模一樣的錢包,直接調包整個錢包,這樣連銀行卡和身份證都丟了怎么辦,土豪決定買一個宇宙獨一無二的錢包,除非把它給燒了,否則就不會再有相同的錢包出現。這種做法實際上就是 Garbage Collection (GC) 所做的工作。

內存模型(Memory Model)對細粒度鎖的影響

在多線程系統中,當多個線程同時訪問共享的內存時,就需要一個規范來約束不同的線程該如何與內存交互,這個規范就稱之為內存模型(Memory Model)。

順序一致性內存模型(Sequential Consistency Memory Model)則是內存模型規范中的一種。在這個模型中,內存與訪問它的線程保持獨立,通過一個控制器(Memory Controller)來保持與線程的聯系,以進行讀寫操作。在同一個線程內的,讀寫操作的順序也就是代碼指定的順序。但多個線程時,讀寫操作就會與其他線程中的讀寫操作發生交錯。

如上圖中所示,Thread 1 中在寫入 Value 和 Inited 的值,而 Thread 2 中在讀取 Inited 和 Value 的值到 Ri 和 Rv 中。由於在內存控制器中發生重排(Memory Reordering),最終的結果可能有很多種情況,如下表所示。

順序一致性內存模型非常的直觀,也易於理解。但實際上,由於該模型在內存硬件實現效率上的限制,導致商用的 CPU 架構基本都沒有遵循該模型。一個更貼近實際的多處理器內存模型更類似於下圖中的效果。

也就是說,每個 CPU 核都會有其自己的緩存模型,例如上圖中的 Level 1 Cache 和 Level 2 Cache,用以緩存最近使用的數據,以提升存取效率。同時,所有的寫入數據都被緩沖到了 Write Buffer 緩沖區中,在數據在被刷新至緩存前,處理器可以繼續處理其他指令。這種架構提升了處理器的效率,但同時也意味着我們不僅要關注 Memory,同時也要關注 Buffer 和 Cache,增加了復雜性。

上圖所示為緩存不一致問題(Incoherent Caches),當主存(Main Memory)中存儲着 Value=5,Inited=0 時,Processor 1 就存在着新寫入 Cache 的值沒有被及時刷新至 Memory 的問題,而 Processor 2 則存在着讀取了 Cache 中舊值的問題。

顯然,上面介紹着內存重排和緩存機制會導致混亂,所以實際的內存模型中會引入鎖機制(Locking Protocol)。通常內存模型會遵循以下三個規則:

  • Rule 1:當線程在隔離狀態運行時,其行為不會改變;
  • Rule 2:讀操作不能被移動到獲取鎖操作之前;
  • Rule 3:寫操作不能被移動到釋放鎖操作之后;

Rule 3 保證了在釋放鎖之前,所有寫入操作已經完成。Rule 2 保證要讀取內存就必須先獲取鎖,不會再有其他線程修改內存。Rule 1 則保證了獲得鎖之后的操作行為是順序的。

在體現鎖機制(Locking Protocol)的價值的同時,我們也會意識到它所帶來的限制,也就是限制了編譯器和 CPU 對程序做優化的自由。

我們知道,.NET Framework 遵循 ECMA 標准,而 ECMA 標准中則定義了較為寬松的內存訪問模型,將內存訪問分為兩類:

  • 常規內存訪問(Ordinary Memory Access)
  • 易變內存訪問(Volatile Memory Access)

其中,易變內存訪問是特意為 "volatile" 設計,它包含如下兩個規則:

  1. 讀和寫操作不能被移動到 volatile-read 之前;
  2. 讀和寫操作不能被移動到 volatile-write 之后;

對於那些沒有使用 "lock" 和 "volatile" 的程序片段,編譯器和硬件可以對常規內存訪問做任何合理的優化。反過來講,內存系統僅需在應對 "lock" 和 "volatile" 時采取緩存失效和刷新緩沖區等措施,這極大地提高了性能。

順序一致性(Sequential Consistency)的要求描述了程序代碼描述的順序與內存操作執行的順序間的關系。多數編程語言都提供順序一致性的支持,例如在 C# 中可以將變量標記為 volatile。

A volatile read has "acquire semantics" meaning that the read is guaranteed to occur prior to any references to memory that occur after the read instruction in the CIL instruction sequence. 
A volatile write has "release semantics" meaning that the write is guaranteed to happen after any memory references prior to the write instruction in the CIL instruction sequence.

下面的列表展示了 .NET 中內存讀寫操作的效果。

Construct

 Refreshes 

Thread

Cache

Before?

 Flushes 

Thread

Cache

After?

Notes

 Ordinary Read

 No

No

Read of a non-volatile field

 Ordinary Write 

 No 

Yes

Write of a non-volatile field

 Volatile Read

 Yes

No

Read of volatile field,

or Thread.VolitelRead

 Volatile Write

 No

Yes

Write of volatile field

 Thread.MemoryBarrier 

 Yes

Yes

Special memory barrier method

 Interlocked Operations

 Yes

Yes

Increment, Add, Exchange, etc.

 Lock Acquire

 Yes

No

Monitor.Enter

or entering a lock {} region

 Lock Release

 No

Yes

Monitor.Exit

or exiting a lock {} region

代碼實踐

我們需要在實踐中體會 Lock-Free 編程,方能洞察機制的本質,加深理解。下面用實現棧 Stack 類的過程來完成對 Lock-Free 編程的探索。

棧結構實際上就是 FILO 先入后出隊列,通常包括兩個操作:

  • Push:向棧頂壓入一個元素(Item);
  • Pop:從棧頂彈出一個元素(Item);

這里我們選用單鏈表結構(Singly Linked List)來實現 FILO 棧,每次入棧的都是新的鏈表頭,每次出棧的也是鏈表頭。

實現普通的棧 SimpleStack 類

構建一個內部類 Node 用於存放 Item,並包含 Next 引用以指向下一個節點。

復制代碼
1     private class Node<TNode>
2     {
3       public Node<TNode> Next;
4       public TNode Item;
5       public override string ToString()
6       {
7         return string.Format("{0}", Item);
8       }
9     }
復制代碼

這樣,實現 Push 操作就是用新壓入的節點作為新的鏈表頭部,而實現 Pop 操作則是將鏈表頭部取出后將所指向的下一個節點作為新的鏈表頭。

復制代碼
 1   public class SimpleStack<T>
 2   {
 3     private class Node<TNode>
 4     {
 5       public Node<TNode> Next;
 6       public TNode Item;
 7       public override string ToString()
 8       {
 9         return string.Format("{0}", Item);
10       }
11     }
12 
13     private Node<T> _head;
14 
15     public SimpleStack()
16     {
17       _head = new Node<T>();
18     }
19 
20     public void Push(T item)
21     {
22       Node<T> node = new Node<T>();
23       node.Item = item;
24 
25       node.Next = _head.Next;
26       _head.Next = node;
27     }
28 
29     public T Pop()
30     {
31       Node<T> node = _head.Next;
32       if (node == null)
33         return default(T);
34 
35       _head.Next = node.Next;
36 
37       return node.Item;
38     }
39   }
復制代碼

使用如下代碼,先 Push 入棧 1000 個元素,然后在多線程中 Pop 元素。

復制代碼
 1   class Program
 2   {
 3     static void Main(string[] args)
 4     {
 5       SimpleStack<int> stack = new SimpleStack<int>();
 6 
 7       for (int i = 1; i <= 1000; i++)
 8       {
 9         stack.Push(i);
10       }
11 
12       bool[] poppedItems = new bool[10000];
13 
14       Parallel.For(0, 1000, (i) =>
15         {
16           int item = stack.Pop();
17           if (poppedItems[item])
18           {
19             Console.WriteLine(
20               "Thread [{0:00}] : Item [{1:0000}] was popped before!",
21               Thread.CurrentThread.ManagedThreadId, item);
22           }
23           poppedItems[item] = true;
24         });
25 
26       Console.WriteLine("Done.");
27       Console.ReadLine();
28     }
29   }
復制代碼

運行效果如下圖所示。

由上圖運行結果可知,當多個線程同時 Pop 數據時,可能發生看起來像同一個數據項 Item 被 Pop 出兩次的現象。

實現普通的加鎖的棧 SimpleLockedStack 類

那么為了保持一致性和准確性,首先想到的辦法就是加鎖。lock 不僅可以保護代碼區域內的指令不會被重排,還能在獲取鎖之后阻止其他線程修改數據。

復制代碼
 1   public class SimpleLockedStack<T>
 2   {
 3     private class Node<TNode>
 4     {
 5       public Node<TNode> Next;
 6       public TNode Item;
 7       public override string ToString()
 8       {
 9         return string.Format("{0}", Item);
10       }
11     }
12 
13     private Node<T> _head;
14     private object _sync = new object();
15 
16     public SimpleLockedStack()
17     {
18       _head = new Node<T>();
19     }
20 
21     public void Push(T item)
22     {
23       lock (_sync)
24       {
25         Node<T> node = new Node<T>();
26         node.Item = item;
27 
28         node.Next = _head.Next;
29         _head.Next = node;
30       }
31     }
32 
33     public T Pop()
34     {
35       lock (_sync)
36       {
37         Node<T> node = _head.Next;
38         if (node == null)
39           return default(T);
40 
41         _head.Next = node.Next;
42 
43         return node.Item;
44       }
45     }
46   }
復制代碼

加鎖之后,顯然運行結果就不會出錯了。

實現 Lock-Free 的棧 LockFreeStack 類

但顯然我們更關注性能問題,當有多個線程在交錯 Push 和 Pop 操作時,

  • 首先我們不希望發生等待鎖現象,如果線程取得鎖后被更高優先級的操作調度搶占,則所有等待鎖的線程都被阻塞;
  • 其次我們不希望線程等待鎖的時間過長;

所以准備采用 Lock-Free 技術,通過引入 CAS 操作通過細粒度鎖來實現。此處 CAS 使用 C# 中的Interlocked.CompareExchange 方法,該操作是原子的,並且有很多重載方法可供使用。

復制代碼
1     private static bool CAS(
2       ref Node<T> location, Node<T> newValue, Node<T> comparand)
3     {
4       return comparand ==
5         Interlocked.CompareExchange<Node<T>>(
6           ref location, newValue, comparand);
7     }
復制代碼

在實現 CAS Loops 時,我們使用 do..while.. 語法來完成。

復制代碼
 1     public void Push(T item)
 2     {
 3       Node<T> node = new Node<T>();
 4       node.Item = item;
 5 
 6       do
 7       {
 8         node.Next = _head.Next;
 9       }
10       while (!CAS(ref _head.Next, node, node.Next));
11     }
復制代碼

這樣,新的 LockFreeStack 類就誕生了。

復制代碼
 1   public class LockFreeStack<T>
 2   {
 3     private class Node<TNode>
 4     {
 5       public Node<TNode> Next;
 6       public TNode Item;
 7       public override string ToString()
 8       {
 9         return string.Format("{0}", Item);
10       }
11     }
12 
13     private Node<T> _head;
14 
15     public LockFreeStack()
16     {
17       _head = new Node<T>();
18     }
19 
20     public void Push(T item)
21     {
22       Node<T> node = new Node<T>();
23       node.Item = item;
24 
25       do
26       {
27         node.Next = _head.Next;
28       }
29       while (!CAS(ref _head.Next, node, node.Next));
30     }
31 
32     public T Pop()
33     {
34       Node<T> node;
35 
36       do
37       {
38         node = _head.Next;
39 
40         if (node == null)
41           return default(T);
42       }
43       while (!CAS(ref _head.Next, node.Next, node));
44 
45       return node.Item;
46     }
47 
48     private static bool CAS(
49       ref Node<T> location, Node<T> newValue, Node<T> comparand)
50     {
51       return comparand ==
52         Interlocked.CompareExchange<Node<T>>(
53           ref location, newValue, comparand);
54     }
55   }
復制代碼

這個新的類的測試結果正如我們想象也是正確的。

實現 ConcurrentStack 類

那實現 LockFreeStack 類之后,實際已經滿足了 Lock-Free 的條件要求,我們還能不能做的更好呢?

我們來觀察下上面實現的 Push 方法:

復制代碼
 1     public void Push(T item)
 2     {
 3       Node<T> node = new Node<T>();
 4       node.Item = item;
 5 
 6       do
 7       {
 8         node.Next = _head.Next;
 9       }
10       while (!CAS(ref _head.Next, node, node.Next));
11     }
復制代碼

發現當 CAS 操作判定失敗時,立即進入下一次循環判定。而在實踐中,當 CAS 判定失敗時,是因為其他線程正在更改相同的內存數據,如果立即再進行 CAS 判定則失敗幾率會更高,我們需要給那些正在修改數據的線程時間以完成操作,所以這里當前線程最好能 "休息" 一會。

"休息" 操作我們選用 .NET 中提供的輕量級(4 Bytes)線程等待機制 SpinWait 類。

復制代碼
 1     public void Push(T item)
 2     {
 3       Node<T> node = new Node<T>();
 4       node.Item = item;
 5 
 6       SpinWait spin = new SpinWait();
 7 
 8       while (true)
 9       {
10         Node<T> oldHead = _head;
11         node.Next = oldHead.Next;
12 
13         if (Interlocked.CompareExchange(ref _head, node, oldHead) == oldHead)
14           break;
15 
16         spin.SpinOnce();
17       }
18     }
復制代碼

實際上 SpinOnce() 方法調用了 Thread.SpinWait() 等若干操作,那么這些操作到底做了什么並且耗時多久呢?

首先,Thread.SpinWait(N) 會在當前 CPU 上緊湊的循環 N 個周期,每個周期都會發送 PAUSE 指令給 CPU,告訴 CPU 當前正在執行等待,不要做其他工作了。所以,重點是 N 的值是多少。在 .NET 中實現的 SpinOne 中根據統計意義的度量,將此處的 N 根據調用次數來變化。

  • 第一次調用,N = 4;
  • 第二次調用,N = 8;
  • ...
  • 第十次調用,N = 2048;

那么在 10 次調用之后呢?

10 次之后 SpinOnce 就不再進行 Spin 操作了,它根據情況選擇進入不同的 Yield 流程。

  • Thread.Yield:調用靜態方法 Thread.Yield(),如果在相同的 CPU Core 上存在相同或較低優先級的線程正在等待執行,則當前線程讓出時間片。如果沒有找到這樣的線程,則當前線程繼續運行。
  • Thread.Sleep(0):將 0 傳遞給 Thread.Sleep(),產生的行為與 Thread.Yield() 類似,唯一的區別就是要在所有的 CPU Core 上查找的相同或較低優先級的線程,而不僅限於當前的 CPU Core。如果沒有找到這樣的線程,則當前線程繼續運行。
  • Thread.Sleep(1):當前線程此時真正的進入了睡眠狀態(Sleep State)。雖然指定的是 1 毫秒,但依據不同系統的時間精度不同,這個操作可能花費 10-15 毫秒。

上面三種情況在 SpinOnce 中是根據如下的代碼來判斷執行的。

復制代碼
 1   int yieldsSoFar = (m_count >= 10 ? m_count - 10 : m_count);
 2  
 3   if ((yieldsSoFar % 20) == (20 - 1))
 4   {
 5     Thread.Sleep(1);
 6   }
 7   else if ((yieldsSoFar % 5) == (5 - 1))
 8   {
 9     Thread.Sleep(0);
10   }
11   else
12   {
13     Thread.Yield();
14   }
復制代碼

這樣,我們就可以通過添加失敗等待來進一步優化,形成了新的 ConcurrentStack 類。

復制代碼
  1   // A stack that uses CAS operations internally to maintain 
  2   // thread-safety in a lock-free manner. Attempting to push 
  3   // or pop concurrently from the stack will not trigger waiting, 
  4   // although some optimistic concurrency and retry is used, 
  5   // possibly leading to lack of fairness and/or live-lock. 
  6   // The stack uses spinning and back-off to add some randomization, 
  7   // in hopes of statistically decreasing the possibility of live-lock.
  8   // 
  9   // Note that we currently allocate a new node on every push. 
 10   // This avoids having to worry about potential ABA issues, 
 11   // since the CLR GC ensures that a memory address cannot be
 12   // reused before all references to it have died.
 13 
 14   /// <summary>
 15   /// Represents a thread-safe last-in, first-out collection of objects.
 16   /// </summary>
 17   public class ConcurrentStack<T>
 18   {
 19     // A volatile field should not normally be passed using a ref or out parameter, 
 20     // since it will not be treated as volatile within the scope of the function. 
 21     // There are exceptions to this, such as when calling an interlocked API. 
 22     // As with any warning, you may use the #pragma warning to disable this warning 
 23     // in those rare cases where you are intentionally using a volatile field 
 24     // as a reference parameter.
 25 #pragma warning disable 420
 26 
 27     /// <summary>
 28     /// A simple (internal) node type used to store elements 
 29     /// of concurrent stacks and queues.
 30     /// </summary>
 31     private class Node
 32     {
 33       internal readonly T m_value; // Value of the node.
 34       internal Node m_next;        // Next pointer.
 35 
 36       /// <summary>
 37       /// Constructs a new node with the specified value and no next node.
 38       /// </summary>
 39       /// <param name="value">The value of the node.</param>
 40       internal Node(T value)
 41       {
 42         m_value = value;
 43         m_next = null;
 44       }
 45     }
 46 
 47     // The stack is a singly linked list, and only remembers the head.
 48     private volatile Node m_head;
 49 
 50     /// <summary>
 51     /// Inserts an object at the top of the stack.
 52     /// </summary>
 53     /// <param name="item">The object to push onto the stack. 
 54     /// The value can be a null reference for reference types.
 55     /// </param>
 56     public void Push(T item)
 57     {
 58       // Pushes a node onto the front of the stack thread-safely. 
 59       // Internally, this simply swaps the current head pointer 
 60       // using a (thread safe) CAS operation to accomplish lock freedom. 
 61       // If the CAS fails, we add some back off to statistically 
 62       // decrease contention at the head, and then go back around and retry.
 63 
 64       Node newNode = new Node(item);
 65       newNode.m_next = m_head;
 66       if (Interlocked.CompareExchange(
 67         ref m_head, newNode, newNode.m_next) == newNode.m_next)
 68       {
 69         return;
 70       }
 71 
 72       // If we failed, go to the slow path and loop around until we succeed.
 73       SpinWait spin = new SpinWait();
 74 
 75       // Keep trying to CAS the existing head with 
 76       // the new node until we succeed.
 77       do
 78       {
 79         spin.SpinOnce();
 80         // Reread the head and link our new node.
 81         newNode.m_next = m_head;
 82       }
 83       while (Interlocked.CompareExchange(
 84         ref m_head, newNode, newNode.m_next) != newNode.m_next);
 85     }
 86 
 87     /// <summary>
 88     /// Attempts to pop and return the object at the top of the stack.
 89     /// </summary>
 90     /// <param name="result">
 91     /// When this method returns, if the operation was successful, 
 92     /// result contains the object removed. 
 93     /// If no object was available to be removed, the value is unspecified.
 94     /// </param>
 95     /// <returns>true if an element was removed and returned 
 96     /// from the top of the stack successfully; otherwise, false.</returns>
 97     public bool TryPop(out T result)
 98     {
 99       // Capture the original value from memory
100       Node head = m_head;
101 
102       // Is the stack empty?
103       if (head == null)
104       {
105         result = default(T);
106         return false;
107       }
108 
109       if (Interlocked.CompareExchange(
110         ref m_head, head.m_next, head) == head)
111       {
112         result = head.m_value;
113         return true;
114       }
115 
116       // Fall through to the slow path.
117       SpinWait spin = new SpinWait();
118 
119       // Try to CAS the head with its current next.  
120       // We stop when we succeed or when we notice that 
121       // the stack is empty, whichever comes first.
122       int backoff = 1;
123 
124       // avoid the case where TickCount could return Int32.MinValue
125       Random r = new Random(Environment.TickCount & Int32.MaxValue);
126 
127       while (true)
128       {
129         // Capture the original value from memory
130         head = m_head;
131 
132         // Is the stack empty?
133         if (head == null)
134         {
135           result = default(T);
136           return false;
137         }
138 
139         // Try to swap the new head.  If we succeed, break out of the loop.
140         if (Interlocked.CompareExchange(
141           ref m_head, head.m_next, head) == head)
142         {
143           result = head.m_value;
144           return true;
145         }
146 
147         // We failed to CAS the new head.  Spin briefly and retry.
148         for (int i = 0; i < backoff; i++)
149         {
150           spin.SpinOnce();
151         }
152 
153         // Arbitrary number to cap back-off.
154         backoff = spin.NextSpinWillYield ? r.Next(1, 8) : backoff * 2;
155       }
156     }
157 
158     /// <summary>
159     /// Gets a value that indicates whether the stack is empty.
160     /// </summary>
161     /// <value>true if the stack is empty; otherwise, false.</value>
162     public bool IsEmpty
163     {
164       // Checks whether the stack is empty. Clearly the answer 
165       // may be out of date even prior to
166       // the function returning (i.e. if another thread 
167       // concurrently adds to the stack). It does
168       // guarantee, however, that, if another thread 
169       // does not mutate the stack, a subsequent call
170       // to TryPop will return true 
171       // -- i.e. it will also read the stack as non-empty.
172       get { return m_head == null; }
173     }
174 
175     /// <summary>
176     /// Gets the number of elements contained in the stack.
177     /// </summary>
178     /// <value>The number of elements contained in the stack.</value>
179     public int Count
180     {
181       // Counts the number of entries in the stack. 
182       // This is an O(n) operation. The answer may be out of date before 
183       // returning, but guarantees to return a count that was once valid. 
184       // Conceptually, the implementation snaps a copy of the list and 
185       // then counts the entries, though physically this is not 
186       // what actually happens.
187       get
188       {
189         int count = 0;
190 
191         // Just whip through the list and tally up the number of nodes. 
192         // We rely on the fact that
193         // node next pointers are immutable after being en-queued 
194         // for the first time, even as
195         // they are being dequeued. If we ever changed this 
196         // (e.g. to pool nodes somehow),
197         // we'd need to revisit this implementation.
198 
199         for (Node curr = m_head; curr != null; curr = curr.m_next)
200         {
201           // we don't handle overflow, to be consistent with existing 
202           // generic collection types in CLR
203           count++;
204         }
205 
206         return count;
207       }
208     }
209 
210     /// <summary>
211     /// Removes all objects from the this stack.
212     /// </summary>
213     public void Clear()
214     {
215       // Clear the list by setting the head to null. 
216       // We don't need to use an atomic operation for this: 
217       // anybody who is mutating the head by pushing or popping 
218       // will need to use an atomic operation to guarantee they 
219       // serialize and don't overwrite our setting of the head to null.
220       m_head = null;
221     }
222 
223 #pragma warning restore 420
224   }
復制代碼

實際上,上面的 ConcurrentStack<T> 類就是 .NET Framework 中System.Collections.Concurrent.ConcurrentStack<T> 類的基本實現過程。

參考資料

本文《Lock Free 編程》由作者 Dennis Gao 發表自博客園博客,任何未經作者本人允許的人為或爬蟲轉載均為耍流氓。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM