.NET:通過 CAS 來理解數據庫樂觀並發控制,順便給出無鎖的 RingBuffer。


背景

大多數企業開發人員都理解數據庫樂觀並發控制,不過很少有人聽說過 CAS(我去年才聽說這個概念),CAS 是多線程樂觀並發控制策略的一種,一些無鎖的支持並發的數據結構都會使用到 CAS,本文對比 CAS 和 數據庫樂觀並發控制,以此達到強化記憶的目的。

CAS

CAS = Compare And Swap

多線程環境下 this.i = this.i + 1 是沒有辦法保證線程安全的,因此就有了 CAS,CAS 可以保證上面代碼的線程安全性,但是 CAS 並不會保證 Swap 的成功,只有 Compare 成功了才會 Swap,即:沒有並發發生,即:在我讀取和修改之間沒有別人修改。另外說一點,如果 i 是局部變量,即:i = i + 1,那么這段代碼是線程安全的,因為局部變量是線程獨享的。

不明白 CAS 沒關系,下面通過 CAS 的標准模式 和 一個簡單的示例來理解 CAS。

CAS 的標准模式

偽代碼

 1                     var localValue, currentValue;
 2                     do
 3                     {
 4                         localValue = this.
 5 
 6                         var newValue = 執行一些計算;
 7 
 8                         currentValue = Interlocked.CompareExchange(ref this.value, newValue, localValue);
 9                     } while (localValue != currentValue);

說明

把 this.value 看成是數據庫數據,localValue 是某個用戶讀取的數據,newValue是用戶想修改的值,這里有必要解釋一下 CompareExchange 和 currentValue,它的內部實現代碼是這樣的(想想下面代碼是線程安全的):

1 var currentValue = this.value
2 if(currentValue == localValue){
3    this.value = newValue;
4 }
5 return currentValue;

CompareExchange  用 sql 來類比就是:update xxx set value = newValue where value = localValue,只不過返回的值不同。通過 CompareExchange 的返回結果我們知道 CAS 是否成功了,即:是否出現並發了,即:是否在我讀取和修改之間別人已經修改過了,如果是,可以選擇重試。

累加示例

CAS 代碼

 1 using System;
 2 using System.Collections.Generic;
 3 using System.Linq;
 4 using System.Text;
 5 using System.Threading.Tasks;
 6 using System.Threading;
 7 
 8 namespace InterlockStudy
 9 {
10     class ConcurrentIncrease
11     {
12         public static void Test()
13         {
14             var sum = 0;
15 
16             var tasks = new Task[10];
17             for (var i = 1; i <= 10; i++)
18             {
19                 tasks[i - 1] = Task.Factory.StartNew((state) =>
20                 {
21                     int localSum, currentSum;
22                     do
23                     {
24                         localSum = sum;
25 
26                         Thread.Sleep(10);
27                         var value = (int)state;
28                         var newValue = localSum + value;
29 
30                         currentSum = Interlocked.CompareExchange(ref sum, newValue, localSum);
31                     } while (localSum != currentSum);
32                 }, i);
33             }
34 
35             Task.WaitAll(tasks);
36 
37             Console.WriteLine(sum);
38         }
39     }
40 }

數據庫並發代碼

 1         public static void Test13()
 2         {
 3             var tasks = new Task[10];
 4             for (var i = 1; i <= 10; i++)
 5             {
 6                 tasks[i - 1] = Task.Factory.StartNew((state) =>
 7                 {
 8                     int localSum, result;
 9                     do
10                     {
11                         using (var con = new SqlConnection(CONNECTION_STRING))
12                         {
13                             con.Open();
14                             var cmd = new SqlCommand("select sum from Tests where Id = 1", con);
15                             var reader = cmd.ExecuteReader();
16                             reader.Read();
17                             localSum = reader.GetInt32(0);
18 
19                             System.Threading.Thread.Sleep(10);
20                             var value = (int)state;
21                             var newValue = localSum + value;
22 
23                             cmd = new SqlCommand("update Tests set sum = " + newValue + " where sum = " + localSum + "", con);
24                             result = cmd.ExecuteNonQuery();
25                         }
26                     } while (result == 0);
27                 }, i);
28             }
29 
30             Task.WaitAll(tasks);
31         }
32     }

說明

我們發現 CAS 版本的代碼和數據庫版本的代碼出奇的相似,數據庫的CAS操作是通過 update + where 來完成的。

寫着玩的 RingBuffer

代碼

  1 using System;
  2 using System.Collections.Generic;
  3 using System.Collections.Concurrent;
  4 using System.Linq;
  5 using System.Text;
  6 using System.Threading.Tasks;
  7 using System.Threading;
  8 
  9 namespace InterlockStudy
 10 {
 11     internal class Node<T>
 12     {
 13         public T Data { get; set; }
 14 
 15         public bool HasValue { get; set; }
 16     }
 17 
 18     class RingBuffer<T>
 19     {
 20         private readonly Node<T>[] _nodes;
 21         private long _tailIndex = -1;
 22         private long _headIndex = -1;
 23         private AutoResetEvent _readEvent = new AutoResetEvent(false);
 24         private AutoResetEvent _writeEvent = new AutoResetEvent(false);
 25 
 26         public RingBuffer(int maxSize)
 27         {
 28             _nodes = new Node<T>[maxSize];
 29 
 30             for (var i = 0; i < maxSize; i++)
 31             {
 32                 _nodes[i] = new Node<T>();
 33             }
 34         }
 35 
 36         public void EnQueue(T data)
 37         {
 38             while (true)
 39             {
 40                 if (this.TryEnQueue(data))
 41                 {
 42                     _readEvent.Set();
 43                     return;
 44                 }
 45 
 46                 _writeEvent.WaitOne();
 47             }
 48 
 49         }
 50 
 51         public T DeQueue()
 52         {
 53             while (true)
 54             {
 55                 T data;
 56                 if (this.TryDeQueue(out data))
 57                 {
 58                     _writeEvent.Set();
 59                     return data;
 60                 }
 61 
 62                 _readEvent.WaitOne();
 63             }
 64 
 65         }
 66 
 67         public bool TryEnQueue(T data)
 68         {
 69             long localTailIndex, newTailIndex, currentTailIndex;
 70             do
 71             {
 72                 localTailIndex = _tailIndex;
 73 
 74                 if (!this.CanWrite(localTailIndex))
 75                 {
 76                     return false;
 77                 }
 78 
 79                 newTailIndex = localTailIndex + 1;
 80 
 81                 if (_nodes[newTailIndex % _nodes.Length].HasValue)
 82                 {
 83                     return false;
 84                 }
 85 
 86                 currentTailIndex = Interlocked.CompareExchange(ref _tailIndex, newTailIndex, localTailIndex);
 87             }
 88             while (localTailIndex != currentTailIndex);
 89 
 90             _nodes[newTailIndex % _nodes.Length].Data = data;
 91             _nodes[newTailIndex % _nodes.Length].HasValue = true;
 92 
 93             return true;
 94         }
 95 
 96         public bool TryDeQueue(out T data)
 97         {
 98             long localHeadIndex, newHeadIndex, currentHeadIndex;
 99             do
100             {
101                 localHeadIndex = _headIndex;
102 
103                 if (!this.CanRead(localHeadIndex))
104                 {
105                     data = default(T);
106                     return false;
107                 }
108 
109                 newHeadIndex = localHeadIndex + 1;
110                 if (_nodes[newHeadIndex % _nodes.Length].HasValue == false)
111                 {
112                     data = default(T);
113                     return false;
114                 }
115 
116                 currentHeadIndex = Interlocked.CompareExchange(ref _headIndex, newHeadIndex, localHeadIndex);
117             }
118             while (localHeadIndex != currentHeadIndex);
119 
120             data = _nodes[newHeadIndex % _nodes.Length].Data;
121             _nodes[newHeadIndex % _nodes.Length].HasValue = false;
122 
123             return true;
124         }
125 
126         private bool CanWrite(long localTailIndex)
127         {
128             return localTailIndex - _headIndex < _nodes.Length;
129         }
130 
131         private bool CanRead(long localHeadIndex)
132         {
133             return _tailIndex - localHeadIndex > 0;
134         }
135     }
136 }

備注

倉促成文,如果有必要可以再寫篇文章,希望大家多批評。

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM