ConcurrentDictionary並發字典知多少?


背景

在上一篇文章你真的了解字典嗎?一文中我介紹了Hash Function和字典的工作的基本原理.
有網友在文章底部評論,說我的Remove和Add方法沒有考慮線程安全問題.
https://docs.microsoft.com/en-us/dotnet/api/system.collections.generic.dictionary-2?redirectedfrom=MSDN&view=netframework-4.7.2
查閱相關資料后,發現字典.net中Dictionary本身時不支持線程安全的,如果要想使用支持線程安全的字典,那么我們就要使用ConcurrentDictionary了.
在研究ConcurrentDictionary的源碼后,我覺得在ConcurrentDictionary的線程安全的解決思路很有意思,其對線程安全的處理對對我們項目中的其他高並發場景也有一定的參考價值,在這里再次分享我的一些學習心得和體會,希望對大家有所幫助.

Concurrent

ConcurrentDictionary是Dictionary的線程安全版本,位於System.Collections.Concurrent的命名空間下,該命名空間下除了有ConcurrentDictionary,還有以下Class都是我們常用的那些類庫的線程安全版本.

BlockingCollection :為實現 IProducerConsumerCollection 的線程安全集合提供阻塞和限制功能。

ConcurrentBag :表示對象的線程安全的無序集合.

ConcurrentQueue :表示線程安全的先進先出 (FIFO) 集合。

如果讀過我上一篇文章你真的了解字典嗎?的小伙伴,對這個ConcurrentDictionary的工作原理應該也不難理解,它是簡簡單單地在讀寫方法加個lock嗎?

工作原理

Dictionary

如下圖所示,在字典中,數組entries用來存儲數據,buckets作為橋梁,每次通過hash function獲取了key的哈希值后,對這個哈希值進行取余,即hashResult%bucketsLength=bucketIndex,余數作為buckets的index,而buckets的value就是這個key對應的entry所在entries中的索引,所以最終我們就可以通過這個索引在entries中拿到我們想要的數據,整個過程不需要對所有數據進行遍歷,的時間復雜度為1.

Alt text

ConcurrentDictionary

ConcurrentDictionary的數據存儲類似,只是buckets有個更多的職責,它除了有dictionary中的buckets的橋梁的作用外,負責了數據存儲.

Alt text

key的哈希值與buckets的length取余后hashResult%bucketsLength=bucketIndex,余數作為buckets的索引就能找到我們要的數據所存儲的塊,當出現兩個key指向同一個塊時,即上圖中的John Smith和Sandra Dee他同時指向152怎么辦呢?存儲節點Node具有Next屬性執行下個Node,上圖中,node 152的Next為154,即我們從152開始找Sandra Dee,發現不是我們想要的,再到154找,即可取到所需數據.

由於官方原版的源碼較為復雜,理解起來有所難度,我對官方源碼做了一些精簡,下文將圍繞這個精簡版的ConcurrentDictionary展開敘述.
https://github.com/liuzhenyulive/DictionaryMini

數據結構

Node

ConcurrentDictionary中的每個數據存儲在一個Node中,它除了存儲value信息,還存儲key信息,以及key對應的hashcode

private class Node
        {
            internal TKey m_key;   //數據的key
            internal TValue m_value;  //數據值
            internal volatile Node m_next;  //當前Node的下級節點
            internal int m_hashcode;  //key的hashcode

            //構造函數
            internal Node(TKey key, TValue value, int hashcode, Node next)
            {
                m_key = key;
                m_value = value;
                m_next = next;
                m_hashcode = hashcode;
            }
        }

Table

而整個ConcurrentDictionary的數據存儲在這樣的一個Table中,其中m_buckets的Index負責映射key,m_locks是線程鎖,下文中會有詳細介紹,m_countPerLock存儲每個lock鎖負責的node數量.


 private class Tables
        {
            internal readonly Node[] m_buckets;   //上文中提到的buckets
            internal readonly object[] m_locks;   //線程鎖
            internal volatile int[] m_countPerLock;  //索格鎖所管理的數據數量
            internal readonly IEqualityComparer<TKey> m_comparer;  //當前key對應的type的比較器

            //構造函數
            internal Tables(Node[] buckets, object[] locks, int[] countPerlock, IEqualityComparer<TKey> comparer)
            {
                m_buckets = buckets;
                m_locks = locks;
                m_countPerLock = countPerlock;
                m_comparer = comparer;
            }
        }

ConcurrentDictionary會在構造函數中創建Table,這里我對原有的構造函數進行了簡化,通過默認值進行創建,其中DefaultConcurrencyLevel默認並發級別為當前計算機處理器的線程數.

        //構造函數
        public ConcurrentDictionaryMini() : this(DefaultConcurrencyLevel, DEFAULT_CAPACITY, true,
            EqualityComparer<TKey>.Default)
        {
        }

        /// <summary>
        ///
        /// </summary>
        /// <param name="concurrencyLevel">並發等級,默認為CPU的線程數</param>
        /// <param name="capacity">默認容量,31,超過31后會自動擴容</param>
        /// <param name="growLockArray">時否動態擴充鎖的數量</param>
        /// <param name="comparer">key的比較器</param>
        internal ConcurrentDictionaryMini(int concurrencyLevel, int capacity, bool growLockArray, IEqualityComparer<TKey> comparer)
        {
            if (concurrencyLevel < 1)
            {
                throw new Exception("concurrencyLevel 必須為正數");
            }

            if (capacity < 0)
            {
                throw new Exception("capacity 不能為負數.");
            }

            if (capacity < concurrencyLevel)
            {
                capacity = concurrencyLevel;
            }

            object[] locks = new object[concurrencyLevel];
            for (int i = 0; i < locks.Length; i++)
            {
                locks[i] = new object();
            }

            int[] countPerLock = new int[locks.Length];
            Node[] buckets = new Node[capacity];
            m_tables = new Tables(buckets, locks, countPerLock, comparer);

            m_growLockArray = growLockArray;
            m_budget = buckets.Length / locks.Length;
        }

方法

ConcurrentDictionary中較為基礎重點的方法分別位Add,Get,Remove,Grow Table方法,其他方法基本上是建立在這四個方法的基礎上進行的擴充.

Add

向Table中添加元素有以下亮點值得我們關注.

  • 開始操作前會聲明一個tables變量來存儲操作開始前的m_tables,在正式開始操作后(進入lock)的時候,會檢查tables在准備工作階段是否別的線程改變,如果改變了,則重新開始准備工作並從新開始.

  • 通過GetBucketAndLockNo方法獲取bucket索引以及lock索引,其內部就是取余操作.

 private void GetBucketAndLockNo(
            int hashcode, out int bucketNo, out int lockNo, int bucketCount, int lockCount)
        {
            //0x7FFFFFFF 是long int的最大值 與它按位與數據小於等於這個最大值
            bucketNo = (hashcode & 0x7fffffff) % bucketCount;
            lockNo = bucketNo % lockCount;
        }
  • 對數據進行操作前會從m_locks取出第lockNo個對象最為lock,操作完成后釋放該lock.多個lock一定程度上減少了阻塞的可能性.

  • 在對數據進行更新時,如果該Value的Type為允許原子性寫入的,則直接更新該Value,否則創建一個新的node進行覆蓋.

        /// <summary>
        /// Determines whether type TValue can be written atomically
        /// </summary>
        private static bool IsValueWriteAtomic()
        {
            Type valueType = typeof(TValue);

            //
            // Section 12.6.6 of ECMA CLI explains which types can be read and written atomically without
            // the risk of tearing.
            //
            // See http://www.ecma-international.org/publications/files/ECMA-ST/ECMA-335.pdf
            //
            if (valueType.IsClass)
            {
                return true;
            }
            switch (Type.GetTypeCode(valueType))
            {
                case TypeCode.Boolean:
                case TypeCode.Byte:
                case TypeCode.Char:
                case TypeCode.Int16:
                case TypeCode.Int32:
                case TypeCode.SByte:
                case TypeCode.Single:
                case TypeCode.UInt16:
                case TypeCode.UInt32:
                    return true;

                case TypeCode.Int64:
                case TypeCode.Double:
                case TypeCode.UInt64:
                    return IntPtr.Size == 8;

                default:
                    return false;
            }
        }

該方法依據CLI規范進行編寫,簡單來說,32位的計算機,對32字節以下的數據類型寫入時可以一次寫入的而不需要移動內存指針,64位計算機對64位以下的數據可一次性寫入,不需要移動內存指針.保證了寫入的安全.
詳見12.6.6 http://www.ecma-international.org/publications/files/ECMA-ST/ECMA-335.pdf


 private bool TryAddInternal(TKey key, TValue value, bool updateIfExists, bool acquireLock, out TValue resultingValue)
        {
            while (true)
            {
                int bucketNo, lockNo;
                int hashcode;

                //https://www.cnblogs.com/blurhkh/p/10357576.html
                //需要了解一下值傳遞和引用傳遞
                Tables tables = m_tables;
                IEqualityComparer<TKey> comparer = tables.m_comparer;
                hashcode = comparer.GetHashCode(key);

                GetBucketAndLockNo(hashcode, out bucketNo, out lockNo, tables.m_buckets.Length, tables.m_locks.Length);

                bool resizeDesired = false;
                bool lockTaken = false;

                try
                {
                    if (acquireLock)
                        Monitor.Enter(tables.m_locks[lockNo], ref lockTaken);

                    //如果表剛剛調整了大小,我們可能沒有持有正確的鎖,必須重試。
                    //當然這種情況很少見
                    if (tables != m_tables)
                        continue;

                    Node prev = null;
                    for (Node node = tables.m_buckets[bucketNo]; node != null; node = node.m_next)
                    {
                        if (comparer.Equals(node.m_key, key))
                        {
                            //key在字典里找到了。如果允許更新,則更新該key的值。
                            //我們需要為更新創建一個node,以支持不能以原子方式寫入的TValue類型,因為free-lock 讀取可能同時發生。
                            if (updateIfExists)
                            {
                                if (s_isValueWriteAtomic)
                                {
                                    node.m_value = value;
                                }
                                else
                                {
                                    Node newNode = new Node(node.m_key, value, hashcode, node.m_next);
                                    if (prev == null)
                                    {
                                        tables.m_buckets[bucketNo] = newNode;
                                    }
                                    else
                                    {
                                        prev.m_next = newNode;
                                    }
                                }

                                resultingValue = value;
                            }
                            else
                            {
                                resultingValue = node.m_value;
                            }

                            return false;
                        }

                        prev = node;
                    }

                    //key沒有在bucket中找到,則插入該數據
                    Volatile.Write(ref tables.m_buckets[bucketNo], new Node(key, value, hashcode, tables.m_buckets[bucketNo]));
                    //當m_countPerLock超過Int Max時會拋出OverflowException
                    checked
                    {
                        tables.m_countPerLock[lockNo]++;
                    }

                    //
                    // 如果m_countPerLock[lockNo] > m_budget,則需要調整buckets的大小。
                    // GrowTable也可能會增加m_budget,但不會調整bucket table的大小。.
                    // 如果發現bucket table利用率很低,也會發生這種情況。
                    //
                    if (tables.m_countPerLock[lockNo] > m_budget)
                    {
                        resizeDesired = true;
                    }
                }
                finally
                {
                    if (lockTaken)
                        Monitor.Exit(tables.m_locks[lockNo]);
                }

                if (resizeDesired)
                {
                    GrowTable(tables, tables.m_comparer, false, m_keyRehashCount);
                }

                resultingValue = value;
                return true;
            }
        }

Get

從Table中獲取元素的的流程與前文介紹ConcurrentDictionary工作原理時一致,但有以下亮點值得關注.

  • 讀取bucket[i]在Volatile.Read()方法中進行,該方法會自動對讀取出來的數據加鎖,避免在讀取的過程中,數據被其他線程remove了.
  • Volatile讀取指定字段時,在讀取的內存中插入一個內存屏障,阻止處理器重新排序內存操作,如果在代碼中此方法之后出現讀取或寫入,則處理器無法在此方法之前移動它。

 public bool TryGetValue(TKey key, out TValue value)
        {
            if (key == null) throw new ArgumentNullException("key");

            // We must capture the m_buckets field in a local variable. It is set to a new table on each table resize.
            Tables tables = m_tables;
            IEqualityComparer<TKey> comparer = tables.m_comparer;
            GetBucketAndLockNo(comparer.GetHashCode(key), out var bucketNo, out _, tables.m_buckets.Length, tables.m_locks.Length);

            // We can get away w/out a lock here.
            // The Volatile.Read ensures that the load of the fields of 'n' doesn't move before the load from buckets[i].
            Node n = Volatile.Read(ref tables.m_buckets[bucketNo]);

            while (n != null)
            {
                if (comparer.Equals(n.m_key, key))
                {
                    value = n.m_value;
                    return true;
                }
                n = n.m_next;
            }

            value = default(TValue);
            return false;
        }

Remove

Remove方法實現其實也並不復雜,類似我們鏈表操作中移除某個Node.移除節點的同時,還要對前后節點進行鏈接,相信一塊小伙伴們肯定很好理解.

 private bool TryRemoveInternal(TKey key, out TValue value, bool matchValue, TValue oldValue)
        {
            while (true)
            {
                Tables tables = m_tables;

                IEqualityComparer<TKey> comparer = tables.m_comparer;

                int bucketNo, lockNo;

                GetBucketAndLockNo(comparer.GetHashCode(key), out bucketNo, out lockNo, tables.m_buckets.Length, tables.m_locks.Length);

                lock (tables.m_locks[lockNo])
                {
                    if (tables != m_tables)
                        continue;

                    Node prev = null;
                    for (Node curr = tables.m_buckets[bucketNo]; curr != null; curr = curr.m_next)
                    {
                        if (comparer.Equals(curr.m_key, key))
                        {
                            if (matchValue)
                            {
                                bool valuesMatch = EqualityComparer<TValue>.Default.Equals(oldValue, curr.m_value);
                                if (!valuesMatch)
                                {
                                    value = default(TValue);
                                    return false;
                                }
                            }
                            if (prev == null)
                                Volatile.Write(ref tables.m_buckets[bucketNo], curr.m_next);
                            else
                            {
                                prev.m_next = curr.m_next;
                            }

                            value = curr.m_value;
                            tables.m_countPerLock[lockNo]--;
                            return true;
                        }

                        prev = curr;
                    }
                }

                value = default(TValue);
                return false;
            }
        }

Grow table

當table中任何一個m_countPerLock的數量超過了設定的閾值后,會觸發此操作對Table進行擴容.

private void GrowTable(Tables tables, IEqualityComparer<TKey> newComparer, bool regenerateHashKeys,
            int rehashCount)
        {
            int locksAcquired = 0;
            try
            {
                //首先鎖住第一個lock進行resize操作.
                AcquireLocks(0, 1, ref locksAcquired);

                if (regenerateHashKeys && rehashCount == m_keyRehashCount)
                {
                    tables = m_tables;
                }
                else
                {
                    if (tables != m_tables)
                        return;

                    long approxCount = 0;
                    for (int i = 0; i < tables.m_countPerLock.Length; i++)
                    {
                        approxCount += tables.m_countPerLock[i];
                    }

                    //如果bucket數組太空,則將預算加倍,而不是調整表的大小
                    if (approxCount < tables.m_buckets.Length / 4)
                    {
                        m_budget = 2 * m_budget;
                        if (m_budget < 0)
                        {
                            m_budget = int.MaxValue;
                        }

                        return;
                    }
                }

                int newLength = 0;
                bool maximizeTableSize = false;
                try
                {
                    checked
                    {
                        newLength = tables.m_buckets.Length * 2 + 1;
                        while (newLength % 3 == 0 || newLength % 5 == 0 || newLength % 7 == 0)
                        {
                            newLength += 2;
                        }
                    }
                }
                catch (OverflowException)
                {
                    maximizeTableSize = true;
                }

                if (maximizeTableSize)
                {
                    newLength = int.MaxValue;

                    m_budget = int.MaxValue;
                }

                AcquireLocks(1, tables.m_locks.Length, ref locksAcquired);

                object[] newLocks = tables.m_locks;

                //Add more locks
                if (m_growLockArray && tables.m_locks.Length < MAX_LOCK_NUMBER)
                {
                    newLocks = new object[tables.m_locks.Length * 2];
                    Array.Copy(tables.m_locks, newLocks, tables.m_locks.Length);

                    for (int i = tables.m_locks.Length; i < newLocks.Length; i++)
                    {
                        newLocks[i] = new object();
                    }
                }

                Node[] newBuckets = new Node[newLength];
                int[] newCountPerLock = new int[newLocks.Length];

                for (int i = 0; i < tables.m_buckets.Length; i++)
                {
                    Node current = tables.m_buckets[i];
                    while (current != null)
                    {
                        Node next = current.m_next;
                        int newBucketNo, newLockNo;
                        int nodeHashCode = current.m_hashcode;

                        if (regenerateHashKeys)
                        {
                            //Recompute the hash from the key
                            nodeHashCode = newComparer.GetHashCode(current.m_key);
                        }

                        GetBucketAndLockNo(nodeHashCode, out newBucketNo, out newLockNo, newBuckets.Length,
                            newLocks.Length);

                        newBuckets[newBucketNo] = new Node(current.m_key, current.m_value, nodeHashCode,
                            newBuckets[newBucketNo]);
                        checked
                        {
                            newCountPerLock[newLockNo]++;
                        }

                        current = next;
                    }
                }

                if (regenerateHashKeys)
                {
                    unchecked
                    {
                        m_keyRehashCount++;
                    }
                }

                m_budget = Math.Max(1, newBuckets.Length / newLocks.Length);

                m_tables = new Tables(newBuckets, newLocks, newCountPerLock, newComparer);
            }
            finally
            {
                ReleaseLocks(0, locksAcquired);
            }
        }

學習感悟

  • lock[]:在以往的線程安全上,我們對數據的保護往往是對數據的修改寫入等地方加上lock,這個lock經常上整個上下文中唯一的,這樣的設計下就可能會出現多個線程,寫入的根本不是一塊數據,卻要等待前一個線程寫入完成下一個線程才能繼續操作.在ConcurrentDictionary中,通過哈希算法,從數組lock[]中找出key的准確lock,如果不同的key,使用的不是同一個lock,那么這多個線程的寫入時互不影響的.

  • 寫入要考慮線程安全,讀取呢?不可否認,在大部分場景下,讀取不必去考慮線程安全,但是在我們這樣的鏈式讀取中,需要自上而下地查找,是不是有種可能在查找個過程中,鏈路被修改了呢?所以ConcurrentDictionary中使用Volatile.Read來讀取出數據,該方法從指定字段讀取對象引用,在需要它的系統上,插入一個內存屏障,阻止處理器重新排序內存操作,如果在代碼中此方法之后出現讀取或寫入,則處理器無法在此方法之前移動它。

  • 在ConcurrentDictionary的更新方法中,對數據進行更新時,會判斷該數據是否可以原子寫入,如果時可以原子寫入的,那么就直接更新數據,如果不是,那么會創建一個新的node覆蓋原有node,起初看到這里時候,我百思不得其解,不知道這么操作的目的,后面在jeo duffy的博客中Thread-safety, torn reads, and the like中找到了答案,這樣操作時為了防止torn reads(撕裂讀取),什么叫撕裂讀取呢?通俗地說,就是有的數據類型寫入時,要分多次寫入,寫一次,移動一次指針,那么就有可能寫了一半,這個結果被另外一個線程讀取走了.比如說我把 劉振宇三個字改成周傑倫的過程中,我先把劉改成周了,正在我准備去把振改成傑的時候,另外一個線程過來讀取結果了,讀到的數據是周振宇,這顯然是不對的.所以對這種,更安全的做法是先把周傑倫三個字寫好在一張紙條上,然后直接替換掉劉振宇.更多信息在CLI規范12.6.6有詳細介紹.

  • checkedunckecked關鍵字.非常量的運算(non-constant)運算在編譯階段和運行時下不會做溢出檢查,如下這樣的代碼時不會拋出異常的,算錯了也不會報錯。

int ten = 10;
int i2 = 2147483647 + ten;

但是我們知道,int的最大值是2147483647,如果我們將上面這樣的代碼嵌套在checked就會做溢出檢查了.

checked
{
int ten = 10;
int i2 = 2147483647 + ten;
}

相反,對於常量,編譯時是會做溢出檢查的,下面這樣的代碼在編譯時就會報錯的,如果我們使用unckeck標簽進行標記,則在編譯階段不會做移除檢查.

int a = int.MaxValue * 2;

那么問題來了,我們當然知道checked很有用,那么uncheck呢?如果我們只是需要那么一個數而已,至於溢出不溢出的關系不大,比如說生成一個對象的HashCode,比如說根據一個算法計算出一個相對隨機數,這都是不需要准確結果的,ConcurrentDictionary中對於m_keyRehashCount++這個運算就使用了unchecked,就是因為m_keyRehashCount是用來生成哈希值的,我們並不關心它有沒有溢出.

  • volatile關鍵字,表示一個字段可能是由在同一時間執行多個線程進行修改。出於性能原因,編譯器\運行時系統甚至硬件可以重新排列對存儲器位置的讀取和寫入。聲明的字段volatile不受這些優化的約束。添加volatile修飾符可確保所有線程都能按照執行順序由任何其他線程執行的易失性寫入,易失性寫入是一件瘋狂的事情的事情:普通玩家慎用.

本博客所涉及的代碼都保存在github中,Take it easy to enjoy it!
https://github.com/liuzhenyulive/DictionaryMini/blob/master/DictionaryMini/DictionaryMini/ConcurrentDictionaryMini.cs


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM