ConcurrentDictionary
ConcurrentDictionary一大特點是線程安全,在沒有ConcurrentDictionary前
在多線程下用Dictionary,不管讀寫都要加個鎖,不但麻煩,性能上也不是很好
微軟得出的結果是默認的鎖的數量是CPU核的個數,這個線程池默認的線程數量一樣。隨着Dictionary的擴容,鎖的個數也可以跟着增加,這個可以在構造函數中自己指定。
private sealed class Tables
{
internal readonly Node[] _buckets; // bucket成了這樣,也就是ConcurrentDictionary可以認為是一個bucket數組,每個Bucket里又由next來形成鏈表
internal readonly object[] _locks; // 這個就是鎖的數組了
internal volatile int[] _countPerLock; // 這個是每個鎖罩的元素個數
internal Tables(Node[] buckets, object[] locks, int[] countPerLock)
{
_buckets = buckets;
_locks = locks;
_countPerLock = countPerLock;
}
}
//由Dictionary里的Entry改成Node,並且把next放到Node里
private sealed class Node
{
internal readonly TKey _key;
internal TValue _value;
internal volatile Node _next; //next由volatile修飾,確保不被優化且讀寫原子性
internal readonly int _hashcode;
internal Node(TKey key, TValue value, int hashcode, Node next)
{
_key = key;
_value = value;
_next = next;
_hashcode = hashcode;
}
}
里面的變量:
private volatile Tables _tables; // 這不同於Dictionary的bucket 數組,而是整個封裝起來,而且用volatile來保證讀寫時的原子性
private readonly bool _growLockArray; // 是否在Dictionary擴容時也增加鎖的數量
private int _budget; // 單個鎖罩的元素的最大個數
private const int DefaultCapacity = 31; //ConcurrentDictionary默認大小,和List,Dictionary不一樣
private const int MaxLockNumber = 1024; //最大鎖的個數,不過也可以在構造函數中弄個更大的,不般沒必要
public bool TryGetValue(TKey key, out TValue value)
{
if (key == null) ThrowKeyNullException();
return TryGetValueInternal(key, _comparer.GetHashCode(key), out value);
}
private bool TryGetValueInternal(TKey key, int hashcode, out TValue value)
{
Debug.Assert(_comparer.GetHashCode(key) == hashcode);
//先用本地變量存一下,免得在另外一個線程擴容時變了
Tables tables = _tables;
//又是hashcode取余哈,不多說
//int bucketNo = (hashcode & 0x7fffffff) % bucketCount;
int bucketNo = GetBucket(hashcode, tables._buckets.Length);
//這個用Valatile確保讀了最新的Node,一個定義為volatile的變量是說這變量可能會被意想不到地改變,這樣,編譯器就不會去假設這個變量的值了。精確地說就是,優化器在用到這個變量時必須每次都小心地重新讀取這個變量的值,而不是使用保存在寄存器里的備份。簡單地說就是防止編譯器對代碼進行優化。比如如下程序:
1 2 3 4 |
XBYTE[2]=0x55; XBYTE[2]=0x56; XBYTE[2]=0x57; XBYTE[2]=0x58; |
對外部硬件而言,上述四條語句分別表示不同的操作,會產生四種不同的動作,但是編譯器卻會對上述四條語句進行優化,認為只有XBYTE[2]=0x58(即忽略前三條語句,只產生一條機器代碼)。如果鍵入volatile,則編譯器會逐一地進行編譯並產生相應的機器代碼(產生四條代碼)。
- 原子性:對任意單個volatile變量的讀/寫具有原子性,但類似於volatile++這種復合操作不具有原子性。
Node n = Volatile.Read<Node>(ref tables._buckets[bucketNo]);
//遍歷bucket,真懷疑這些代碼是幾個人寫的,風格都不一樣
while (n != null)
{
//找到了
if (hashcode == n._hashcode && _comparer.Equals(n._key, key))
{
//返回true和value
value = n._value;
return true;
}
n = n._next;
}
value = default(TValue);
return false;
}
public TValue GetOrAdd(TKey key, Func<TKey, TValue> valueFactory)
{
if (key == null) ThrowKeyNullException();
if (valueFactory == null) throw new ArgumentNullException(nameof(valueFactory));
int hashcode = _comparer.GetHashCode(key);
TValue resultingValue;
//先TryGet,沒有的再TryAdd
if (!TryGetValueInternal(key, hashcode, out resultingValue))
{
TryAddInternal(key, hashcode, valueFactory(key), false, true, out resultingValue);
}
return resultingValue;
}
private bool TryAddInternal(TKey key, int hashcode, TValue value, bool updateIfExists, bool acquireLock, out TValue resultingValue)
{
Debug.Assert(_comparer.GetHashCode(key) == hashcode);
while (true)
{
int bucketNo, lockNo;
Tables tables = _tables;
//GetBucketAndLockNo函數里面就是下面兩句
//bucketNo = (hashcode & 0x7fffffff) % bucketCount; 取余得bucket No.,和Dictionary一樣
//lockNo = bucketNo % lockCount; 也是取余得鎖No. 也就是一個鎖也是可能給多個Bucket用的
GetBucketAndLockNo(hashcode, out bucketNo, out lockNo, tables._buckets.Length, tables._locks.Length);
bool resizeDesired = false;
bool lockTaken = false;
try
{
if (acquireLock) //參數指定需要鎖的話就鎖上這個bucket的鎖,也就在構造函數初始化時不需要鎖
Monitor.Enter(tables._locks[lockNo], ref lockTaken);
//這里是做個校驗,判斷tables是否在這邊取完鎖后其他線程把元素給擴容了,擴容會生成一個新的tables,tables變了的話上面的鎖就沒意義了,需要重來,所以這整個是在while(true)里面
if (tables != _tables)
{
continue;
}
Node prev = null;
//這里就遍歷bucket里的鏈表了,和Dictionary差不多
for (Node node = tables._buckets[bucketNo]; node != null; node = node._next)
{
Debug.Assert((prev == null && node == tables._buckets[bucketNo]) || prev._next == node);
if (hashcode == node._hashcode && _comparer.Equals(node._key, key))//看是否找到
{
//看是否需要更新node
if (updateIfExists)
{
if (s_isValueWriteAtomic) //這個是判斷是否是支持原子操作的值類型,比如32位上byte,int,byte,short都是原子的,而long,double就不是了,支持原子操作的直接賦值就可以了,得注意是值類型,引用類型可不能這么搞
{
node._value = value;
}
else //不是原子操作的值類型就new一個node
{
Node newNode = new Node(node._key, value, hashcode, node._next);
if (prev == null)
{
tables._buckets[bucketNo] = newNode;
}
else
{
prev._next = newNode;
}
}
resultingValue = value;
}
else//不更新就直接取值
{
resultingValue = node._value;
}
return false; //找到了返回false,表示不用Add就Get了
}
prev = node;
}
// 找了一圈沒找着,就Add吧,new一個node用Volatile的寫操作寫到bucket里
Volatile.Write<Node>(ref tables._buckets[bucketNo], new Node(key, value, hashcode, tables._buckets[bucketNo]));
checked//這里如果超出int大小,拋overflow exception, 能進這里表示一個鎖罩int.MaxValue大小的Node,真成扛把子了,極端情況下只有一個鎖而且Node的大小已經是Int.MaxValue才可能會出現(還要看budget同不同意)
{
tables._countPerLock[lockNo]++;
}
//如果鎖罩的Node個數大於budget就表示差不多需要擴容了,黑社會表示地盤不夠用了
if (tables._countPerLock[lockNo] > _budget)
{
resizeDesired = true;
}
}
finally
{
if (lockTaken) //出現異常要把鎖釋放掉
Monitor.Exit(tables._locks[lockNo]);
}
if (resizeDesired)
{
GrowTable(tables); //擴容
}
resultingValue = value; //result值
return true;
}
}
擴容:
private void GrowTable(Tables tables)
{
const int MaxArrayLength = 0X7FEFFFFF;
int locksAcquired = 0;
try
{
// 先把第一個鎖鎖住,免得其他線程也要擴容走進來
AcquireLocks(0, 1, ref locksAcquired);
//如果table已經變了,也就是那些等着上面鎖的線程進來發現已經擴容完了直接返回就好了
if (tables != _tables)
{
return;
}
// 計算每個鎖罩的元素的個數總和,也就是當前元素的個數
long approxCount = 0;
for (int i = 0; i < tables._countPerLock.Length; i++)
{
approxCount += tables._countPerLock[i];
}
//如果元素總和不到Bucket大小的1/4,說明擴容擴得不是時候,歸根結底是budget小了
if (approxCount < tables._buckets.Length / 4)
{
_budget = 2 * _budget;//2倍增加budget
if (_budget < 0) //小於0說明overflow了,看看,前面用check,這里又用小於0。。
{
_budget = int.MaxValue; //直接最大值吧
}
return;
}
int newLength = 0;
bool maximizeTableSize = false;
try
{
checked
{
//2倍+1取得一個奇數作了新的容量
newLength = tables._buckets.Length * 2 + 1;
//看是否能整除3/5/7,能就+2,直到不能整除為止,也挺奇怪這算法,List是2倍,Dictionary是比2倍大的一個質數,這里又是另外一種,只能說各人有各人的算法
while (newLength % 3 == 0 || newLength % 5 == 0 || newLength % 7 == 0)
{
newLength += 2;
}
Debug.Assert(newLength % 2 != 0);
if (newLength > MaxArrayLength)
{
maximizeTableSize = true;
}
}
}
catch (OverflowException)
{
maximizeTableSize = true;
}
if (maximizeTableSize)//進這里表示溢出了
{
newLength = MaxArrayLength; //直接給最大值
_budget = int.MaxValue; //budget也給最大值,因為沒法再擴容了,給小了進來也沒意義
}
//擴容之后又是熟悉的重新分配元素,和Dictionary基本一致,這里要先把所有鎖鎖住,前面已經鎖了第一個,這里鎖其他的
AcquireLocks(1, tables._locks.Length, ref locksAcquired);
object[] newLocks = tables._locks;
//如果允許增加鎖並則鎖的個數還不到1024,就增加鎖
if (_growLockArray && tables._locks.Length < MaxLockNumber)
{
newLocks = new object[tables._locks.Length * 2]; //也是2倍增加
Array.Copy(tables._locks, 0, newLocks, 0, tables._locks.Length); //舊鎖復制到新數組里
for (int i = tables._locks.Length; i < newLocks.Length; i++) //再初始化增的鎖
{
newLocks[i] = new object();
}
}
//新的Node數組
Node[] newBuckets = new Node[newLength];
int[] newCountPerLock = new int[newLocks.Length];
//遍歷bucket
for (int i = 0; i < tables._buckets.Length; i++)
{
Node current = tables._buckets[i];//當前node
while (current != null)
{
Node next = current._next;
int newBucketNo, newLockNo;
//算新的bucket No.和lock No.
GetBucketAndLockNo(current._hashcode, out newBucketNo, out newLockNo, newBuckets.Length, newLocks.Length);
//重建個新的node,注意next指到了上一個node,和Dictionary里一樣
newBuckets[newBucketNo] = new Node(current._key, current._value, current._hashcode, newBuckets[newBucketNo]);
checked
{
newCountPerLock[newLockNo]++; //這個鎖又罩了一個小弟,加一個
}
current = next;
}
}
//調整下budget
_budget = Math.Max(1, newBuckets.Length / newLocks.Length);
//得到新的table
_tables = new Tables(newBuckets, newLocks, newCountPerLock);
}
finally
{
// 釋放鎖
ReleaseLocks(0, locksAcquired);
}
}
說完了,總結下,ConcurrentDictionary可以說是為了避免一個大鎖鎖住整個Dictionary帶來的性能損失而出來的,當然也是采用空間換時間,不過這空間換得還是很值得的,一些object而已。
原理在於Dictionary本質是是一個鏈表數組,只有在多線程同時操作到數組里同一個鏈表時才需要鎖,所以就用到一個鎖數組,每個鎖罩着幾個小弟(bucket及bucket內的鏈表元素),這樣多線程讀寫不同鎖罩的區域的時候可以同時進行而不會等待,進而提高多線程性能。
不過也凡事無絕對,不同業務場景的需求不一樣,可能Dictionary配合ReaderWriterLockSlim在某些場景(比如讀的機會遠大於寫的)可能會有更好的表現。
引自:https://www.cnblogs.com/brookshi/p/5583892.html