背景
大多數企業開發人員都理解數據庫樂觀並發控制,不過很少有人聽說過 CAS(我去年才聽說這個概念),CAS 是多線程樂觀並發控制策略的一種,一些無鎖的支持並發的數據結構都會使用到 CAS,本文對比 CAS 和 數據庫樂觀並發控制,以此達到強化記憶的目的。
CAS
CAS = Compare And Swap
多線程環境下 this.i = this.i + 1 是沒有辦法保證線程安全的,因此就有了 CAS,CAS 可以保證上面代碼的線程安全性,但是 CAS 並不會保證 Swap 的成功,只有 Compare 成功了才會 Swap,即:沒有並發發生,即:在我讀取和修改之間沒有別人修改。另外說一點,如果 i 是局部變量,即:i = i + 1,那么這段代碼是線程安全的,因為局部變量是線程獨享的。
不明白 CAS 沒關系,下面通過 CAS 的標准模式 和 一個簡單的示例來理解 CAS。
CAS 的標准模式
偽代碼
1 var localValue, currentValue; 2 do 3 { 4 localValue = this. 5 6 var newValue = 執行一些計算; 7 8 currentValue = Interlocked.CompareExchange(ref this.value, newValue, localValue); 9 } while (localValue != currentValue);
說明
把 this.value 看成是數據庫數據,localValue 是某個用戶讀取的數據,newValue是用戶想修改的值,這里有必要解釋一下 CompareExchange 和 currentValue,它的內部實現代碼是這樣的(想想下面代碼是線程安全的):
1 var currentValue = this.value 2 if(currentValue == localValue){ 3 this.value = newValue; 4 } 5 return currentValue;
CompareExchange 用 sql 來類比就是:update xxx set value = newValue where value = localValue,只不過返回的值不同。通過 CompareExchange 的返回結果我們知道 CAS 是否成功了,即:是否出現並發了,即:是否在我讀取和修改之間別人已經修改過了,如果是,可以選擇重試。
累加示例
CAS 代碼
1 using System; 2 using System.Collections.Generic; 3 using System.Linq; 4 using System.Text; 5 using System.Threading.Tasks; 6 using System.Threading; 7 8 namespace InterlockStudy 9 { 10 class ConcurrentIncrease 11 { 12 public static void Test() 13 { 14 var sum = 0; 15 16 var tasks = new Task[10]; 17 for (var i = 1; i <= 10; i++) 18 { 19 tasks[i - 1] = Task.Factory.StartNew((state) => 20 { 21 int localSum, currentSum; 22 do 23 { 24 localSum = sum; 25 26 Thread.Sleep(10); 27 var value = (int)state; 28 var newValue = localSum + value; 29 30 currentSum = Interlocked.CompareExchange(ref sum, newValue, localSum); 31 } while (localSum != currentSum); 32 }, i); 33 } 34 35 Task.WaitAll(tasks); 36 37 Console.WriteLine(sum); 38 } 39 } 40 }
數據庫並發代碼
1 public static void Test13() 2 { 3 var tasks = new Task[10]; 4 for (var i = 1; i <= 10; i++) 5 { 6 tasks[i - 1] = Task.Factory.StartNew((state) => 7 { 8 int localSum, result; 9 do 10 { 11 using (var con = new SqlConnection(CONNECTION_STRING)) 12 { 13 con.Open(); 14 var cmd = new SqlCommand("select sum from Tests where Id = 1", con); 15 var reader = cmd.ExecuteReader(); 16 reader.Read(); 17 localSum = reader.GetInt32(0); 18 19 System.Threading.Thread.Sleep(10); 20 var value = (int)state; 21 var newValue = localSum + value; 22 23 cmd = new SqlCommand("update Tests set sum = " + newValue + " where sum = " + localSum + "", con); 24 result = cmd.ExecuteNonQuery(); 25 } 26 } while (result == 0); 27 }, i); 28 } 29 30 Task.WaitAll(tasks); 31 } 32 }
說明
我們發現 CAS 版本的代碼和數據庫版本的代碼出奇的相似,數據庫的CAS操作是通過 update + where 來完成的。
寫着玩的 RingBuffer
代碼
1 using System; 2 using System.Collections.Generic; 3 using System.Collections.Concurrent; 4 using System.Linq; 5 using System.Text; 6 using System.Threading.Tasks; 7 using System.Threading; 8 9 namespace InterlockStudy 10 { 11 internal class Node<T> 12 { 13 public T Data { get; set; } 14 15 public bool HasValue { get; set; } 16 } 17 18 class RingBuffer<T> 19 { 20 private readonly Node<T>[] _nodes; 21 private long _tailIndex = -1; 22 private long _headIndex = -1; 23 private AutoResetEvent _readEvent = new AutoResetEvent(false); 24 private AutoResetEvent _writeEvent = new AutoResetEvent(false); 25 26 public RingBuffer(int maxSize) 27 { 28 _nodes = new Node<T>[maxSize]; 29 30 for (var i = 0; i < maxSize; i++) 31 { 32 _nodes[i] = new Node<T>(); 33 } 34 } 35 36 public void EnQueue(T data) 37 { 38 while (true) 39 { 40 if (this.TryEnQueue(data)) 41 { 42 _readEvent.Set(); 43 return; 44 } 45 46 _writeEvent.WaitOne(); 47 } 48 49 } 50 51 public T DeQueue() 52 { 53 while (true) 54 { 55 T data; 56 if (this.TryDeQueue(out data)) 57 { 58 _writeEvent.Set(); 59 return data; 60 } 61 62 _readEvent.WaitOne(); 63 } 64 65 } 66 67 public bool TryEnQueue(T data) 68 { 69 long localTailIndex, newTailIndex, currentTailIndex; 70 do 71 { 72 localTailIndex = _tailIndex; 73 74 if (!this.CanWrite(localTailIndex)) 75 { 76 return false; 77 } 78 79 newTailIndex = localTailIndex + 1; 80 81 if (_nodes[newTailIndex % _nodes.Length].HasValue) 82 { 83 return false; 84 } 85 86 currentTailIndex = Interlocked.CompareExchange(ref _tailIndex, newTailIndex, localTailIndex); 87 } 88 while (localTailIndex != currentTailIndex); 89 90 _nodes[newTailIndex % _nodes.Length].Data = data; 91 _nodes[newTailIndex % _nodes.Length].HasValue = true; 92 93 return true; 94 } 95 96 public bool TryDeQueue(out T data) 97 { 98 long localHeadIndex, newHeadIndex, currentHeadIndex; 99 do 100 { 101 localHeadIndex = _headIndex; 102 103 if (!this.CanRead(localHeadIndex)) 104 { 105 data = default(T); 106 return false; 107 } 108 109 newHeadIndex = localHeadIndex + 1; 110 if (_nodes[newHeadIndex % _nodes.Length].HasValue == false) 111 { 112 data = default(T); 113 return false; 114 } 115 116 currentHeadIndex = Interlocked.CompareExchange(ref _headIndex, newHeadIndex, localHeadIndex); 117 } 118 while (localHeadIndex != currentHeadIndex); 119 120 data = _nodes[newHeadIndex % _nodes.Length].Data; 121 _nodes[newHeadIndex % _nodes.Length].HasValue = false; 122 123 return true; 124 } 125 126 private bool CanWrite(long localTailIndex) 127 { 128 return localTailIndex - _headIndex < _nodes.Length; 129 } 130 131 private bool CanRead(long localHeadIndex) 132 { 133 return _tailIndex - localHeadIndex > 0; 134 } 135 } 136 }
備注
倉促成文,如果有必要可以再寫篇文章,希望大家多批評。