異步的同步構造
任何使用了內核模式的線程同步構造,我都不是特別喜歡。因為所有這些基元都會阻塞一個線程的運行。創建線程的代價很大。創建了不用,這於情於理說不通。
創建了reader-writer鎖的情況,如果寫鎖被長時間占有,那么其他的讀請求線程都會被阻塞,隨着越來越多客戶端請求到達,服務器創建了更多的線程,而他們被創建出來的目的就是讓他們在鎖上停止運行。更糟糕的是,一旦writer鎖釋放,所有讀線程都同時解除阻塞並開始執行。現在,又變成大量的線程試圖在相對數量很少的cpu上運行。所以,windows開始在線程之間不同的進行上下文切換,而真正的工作時間卻很少。
鎖很流行,但長時間擁有會帶來巨大的伸縮性問題。如果代碼能通過異步的同步構造指出他想要一個鎖,那么會非常有用。在這種情況下,如果線程得不到鎖,可直接返回並執行其他工作,而不必在那里傻傻地阻塞。
SemaphoreSlim通過waitAsync實現了這個思路
public Task<bool> WaitAsync(int millisecondsTimeout, CancellationToken cancellationToken);
使用await asynclock.WaitAsync()就可以實現剛才說的情境。
但如果是reader-writer呢?.net framework提供了concurrentExclusiveSchedulerPair類。實例代碼如下:
private static void ConcurrentExclusiveSchedulerDemo() { var cesp = new ConcurrentExclusiveSchedulerPair(); var tfExclusive = new TaskFactory(cesp.ExclusiveScheduler); var tfConcurrent = new TaskFactory(cesp.ConcurrentScheduler); for (int i = 0; i < 5; i++) { var exclusive = i < 2; (exclusive ? tfExclusive : tfConcurrent).StartNew(() => { Console.WriteLine("{0} access",exclusive?"exclusive":"concurrent"); //這里進行獨占寫入或者並發讀取操作 }); } }
遺憾的是,framework沒有提供鞠詠reader-writer語義的異步鎖。所以我們可以自己構建一個,如下:

public sealed class AsyncOneManyLock { #region 鎖的代碼 //自旋鎖不要用readonly private SpinLock m_lock = new SpinLock(true); private void Lock() { bool taken = false;m_lock.Enter(ref taken); } private void Unlock() { m_lock.Exit(); } #endregion #region 鎖的狀態和輔助方法 private Int32 m_state = 0; private bool IsFree { get { return m_state == 0; } } private bool IsOwnedByWriter { get { return m_state == -1; } } private bool IsOwnedByReader { get { return m_state > 0; } } private Int32 AddReaders(Int32 count) { return m_state += count; } private Int32 SubtractReader() { return --m_state; } private void MakeWriter() { m_state = -1; } private void MakeFree() { m_state = 0; } #endregion //目的實在非競態條件時增強性能和減少內存消耗 private readonly Task m_noContentionAccessGranter; //每個等待的writer都通過他們在這里排隊的TaskCompletionSource來喚醒 private readonly Queue<TaskCompletionSource<Object>> m_qWaitingWriters = new Queue<TaskCompletionSource<object>>(); //一個TaskCompletionSource收到信號,所有等待的reader都喚醒 private TaskCompletionSource<Object> m_waitingReaderSignal = new TaskCompletionSource<object>(); private Int32 m_numWaitingReaders = 0; public AsyncOneManyLock() { //創建一個返回null的任務 m_noContentionAccessGranter = Task.FromResult<Object>(null); } public Task WaitAsync(OneManyMode mode) { Task accressGranter = m_noContentionAccessGranter;//假定無競爭 Lock () ; switch (mode) { case OneManyMode.Exclusive: if (IsFree) { MakeWriter();//無競爭 } else { //有競爭 var tcs = new TaskCompletionSource<Object>(); m_qWaitingWriters.Enqueue(tcs); accressGranter = tcs.Task; } break; case OneManyMode.Shared: if (IsFree||(IsOwnedByReader&&m_qWaitingWriters.Count==0)) { AddReaders(1);//無競爭 } else { //有競爭,遞增等待的reader數量,並返回reader任務使reader等待。 m_numWaitingReaders++; accressGranter = m_waitingReaderSignal.Task.ContinueWith(t => t.Result); } break; } Unlock(); return accressGranter; } public void Release() { //嘉定沒有代碼被釋放 TaskCompletionSource<Object> accessGranter = null; Lock () ; if (IsOwnedByWriter) { MakeFree(); } else { SubtractReader(); } if (IsFree) { //如果自由,喚醒一個等待的writer或所有等待的readers if (m_qWaitingWriters.Count>0) { MakeWriter(); accessGranter = m_qWaitingWriters.Dequeue(); } else if (m_numWaitingReaders>0) { AddReaders(m_numWaitingReaders); m_numWaitingReaders = 0; accessGranter = m_waitingReaderSignal; //為將來需要等待的readers創建一個新的tcs m_waitingReaderSignal = new TaskCompletionSource<object>(); } } Unlock(); //喚醒鎖外面的writer/reader,減少競爭幾率以提高性能 if (accessGranter!=null) { accessGranter.SetResult(null); } } }
上述代碼永遠不會阻塞線程。原因是內部沒有沒有很實用任何內核構造。這里確實使用了一個SpinLock,它在內部使用了用戶模式構造。但是他的執行時間很短,WaitAsync方法里,只是一些整數計算和比較,這也符合只有執行時間很短的代碼段才可以用自旋鎖來保護。所以使用一個spinLock來保護對queue的訪問,還是比較合適的。
並發集合類
FCL自帶4個線程安全的集合類,全部在System.Collections.Concurrent命名空間中定義。它們是ConcurrentStack、concurrentQueue、concurrentDictionary、concurrentBag。
所有這些集合都是“非阻塞”的,換而言之,如果一個線程試圖提取一個不存在的元素(數據項),線程會立即返回;線程不會阻塞在那里,等着一個元素的出現。正是由於這個原因,所以如果獲取了一個數據項,像tryDequeue,tryPop,tryTake和tryGetValue這樣的方法全部返回true;否則返回false。
一個集合“非阻塞”,並不意味着他就不需要鎖了。concurrentDictionary類在內部使用了Monitor。但是,對集合中的項進行操作時,鎖只被占有極短的時間。concurrentQueue和ConcurrentStack確實不需要鎖;他們兩個在內部都使用interlocked的方法來操縱集合。一個concurrentBag對象由大量迷你集合對象構成,每個線程一個。線程將一個項添加到bag中時,就用interlocked的方法將這個項添加到調用線程的迷你集合中。一個線程視圖從bag中提取一個元素時,bag就檢查調用線程的迷你集合,試圖從中取出數據項。如果數據項在哪里,就用一個interlocked方法提取這個項。如果不在,就在內部獲取一個monitor,以便從 線程的迷你集合提取一個項。這稱為一個線程從另一個線程“竊取”一個數據項。
注意,所有並發集合類都提供了getEnumerator方法,他一般用於C#的foreach語句,但也可用於Linq。對於concurrentQueue、ConcurrentStack和concurrentBag類,getEnumerator方法獲取集合內容的一個“快照”,並從這個快照中返回元素;實際集合內容可能在使用快照枚舉時發生改變。concurrentDictionary的getEnumerator的該方法不獲取他內容的快照。因此,在枚舉字典期間,字典的內容可能改變。還要注意,count屬性返回的是查詢時集合中的元素數量,如果其他線程同時正在集合中增刪,這個計數可能馬上就變得不正確。
ConcurrentStack、concurrentQueue、concurrentBag都實現了IProducerConsumerCollection接口,實現了這個接口的任何類都能轉變成一個阻塞集合,不過,盡量不使用這種阻塞集合。
這里我們重點介紹下concurrentDictionary。
ConcurrentDictionary
這里我對.net core中ConcurrentDictionary源碼進行了分析,里面采用了Volatile.Read和write,然后也使用了lock這種混合鎖,而且還定義了更細顆粒度的鎖。所以多線程使用ConcurrentDictionary集合還是比較好的選擇。
TryRemove
這個方法會調用內部私用的TryRemoveInternal

private bool TryRemoveInternal(TKey key, out TValue value, bool matchValue, TValue oldValue) { int hashcode = _comparer.GetHashCode(key); while (true) { Tables tables = _tables; int bucketNo, lockNo; //這里獲取桶的索引和鎖的索引,注意,鎖的索引和桶未必是同一個值,具體算法看源碼。 GetBucketAndLockNo(hashcode, out bucketNo, out lockNo, tables._buckets.Length, tables._locks.Length); //這里鎖住的只是對應這個index指向的鎖,而不是所有鎖。 lock (tables._locks[lockNo]) { //這里table可能被重新分配,所以這里再次獲取,看得到的是不是同一個table // If the table just got resized, we may not be holding the right lock, and must retry. // This should be a rare occurrence. if (tables != _tables) { continue; } Node prev = null; //這里同一個桶,可能因為連地址,有很多值,所以要對比key for (Node curr = tables._buckets[bucketNo]; curr != null; curr = curr._next) { Debug.Assert((prev == null && curr == tables._buckets[bucketNo]) || prev._next == curr); //對比是不是要刪除的的那個元素 if (hashcode == curr._hashcode && _comparer.Equals(curr._key, key)) { if (matchValue) { bool valuesMatch = EqualityComparer<TValue>.Default.Equals(oldValue, curr._value); if (!valuesMatch) { value = default(TValue); return false; } } //執行刪除,判斷有沒有上一個節點。然后修改節點指針或地址。 if (prev == null) { Volatile.Write<Node>(ref tables._buckets[bucketNo], curr._next); } else { prev._next = curr._next; } value = curr._value; tables._countPerLock[lockNo]--; return true; } prev = curr; } } value = default(TValue); return false; } }
TryAdd
這個方法會調用內部私用的TryAddInternal
TryAddInternal(key, _comparer.GetHashCode(key), value, false, true, out dummy);

/// <summary> /// Shared internal implementation for inserts and updates. /// If key exists, we always return false; and if updateIfExists == true we force update with value; /// If key doesn't exist, we always add value and return true; /// </summary> private bool TryAddInternal(TKey key, int hashcode, TValue value, bool updateIfExists, bool acquireLock, out TValue resultingValue) { Debug.Assert(_comparer.GetHashCode(key) == hashcode); while (true) { int bucketNo, lockNo; Tables tables = _tables; //老方法了,不多說,獲取hash索引和鎖索引 GetBucketAndLockNo(hashcode, out bucketNo, out lockNo, tables._buckets.Length, tables._locks.Length); bool resizeDesired = false; bool lockTaken = false; try { //這里都是true的,所以會獲取鎖 if (acquireLock) Monitor.Enter(tables._locks[lockNo], ref lockTaken); // If the table just got resized, we may not be holding the right lock, and must retry. // This should be a rare occurrence. if (tables != _tables) { continue; } // Try to find this key in the bucket Node prev = null; //查看對應的桶里, for (Node node = tables._buckets[bucketNo]; node != null; node = node._next) { Debug.Assert((prev == null && node == tables._buckets[bucketNo]) || prev._next == node); //查看有沒有相同的key值,有就返回false if (hashcode == node._hashcode && _comparer.Equals(node._key, key)) { // The key was found in the dictionary. If updates are allowed, update the value for that key. // We need to create a new node for the update, in order to support TValue types that cannot // be written atomically, since lock-free reads may be happening concurrently. //這個應該是addorupdate使用的,存在就更新。 if (updateIfExists) { if (s_isValueWriteAtomic) { node._value = value; } else { Node newNode = new Node(node._key, value, hashcode, node._next); if (prev == null) { Volatile.Write(ref tables._buckets[bucketNo], newNode); } else { prev._next = newNode; } } resultingValue = value; } else { resultingValue = node._value; } return false; } prev = node; } // The key was not found in the bucket. Insert the key-value pair. Volatile.Write<Node>(ref tables._buckets[bucketNo], new Node(key, value, hashcode, tables._buckets[bucketNo])); //這里checked檢查是否存在溢出。 checked { tables._countPerLock[lockNo]++; } // If the number of elements guarded by this lock has exceeded the budget, resize the bucket table. // It is also possible that GrowTable will increase the budget but won't resize the bucket table. // That happens if the bucket table is found to be poorly utilized due to a bad hash function. // _budget是 The maximum number of elements per lock before a resize operation is triggered if (tables._countPerLock[lockNo] > _budget) { resizeDesired = true; } } finally { if (lockTaken) Monitor.Exit(tables._locks[lockNo]); } // The fact that we got here means that we just performed an insertion. If necessary, we will grow the table. // // Concurrency notes: // - Notice that we are not holding any locks at when calling GrowTable. This is necessary to prevent deadlocks. //As a result, it is possible that GrowTable will be called unnecessarily. But, GrowTable will obtain lock 0 // and then verify that the table we passed to it as the argument is still the current table. if (resizeDesired) { GrowTable(tables); } //賦值 resultingValue = value; return true; } }
TryGetValue
TryGetValueInternal(key, _comparer.GetHashCode(key), out value);
private bool TryGetValueInternal(TKey key, int hashcode, out TValue value) { Debug.Assert(_comparer.GetHashCode(key) == hashcode); //用本地變量保存這個table的快照。 // We must capture the _buckets field in a local variable. It is set to a new table on each table resize. Tables tables = _tables; int bucketNo = GetBucket(hashcode, tables._buckets.Length); // We can get away w/out a lock here. // The Volatile.Read ensures that we have a copy of the reference to tables._buckets[bucketNo]. // This protects us from reading fields ('_hashcode', '_key', '_value' and '_next') of different instances. Node n = Volatile.Read<Node>(ref tables._buckets[bucketNo]); //如果key相符 ,賦值,不然繼續尋找下一個。 while (n != null) { if (hashcode == n._hashcode && _comparer.Equals(n._key, key)) { value = n._value; return true; } n = n._next; } value = default(TValue);//沒找到就賦默認值 return false; }