多線程編程學習筆記——使用並發集合(一)


 

接上文 多線程編程學習筆記——async和await(一)

 

接上文 多線程編程學習筆記——async和await(二)

 

接上文 多線程編程學習筆記——async和await(三)

 

        編程需要對基本的數據結構和算法有所了解。程序員為並發情況 選擇最合適 的數據結構,那就需要知道算法運行時間、空間復雜度等。

        對於並行計算,我們需要使用適當的數據結構,這些數據結構需要具備可伸縮性,盡可能地避免死鎖,同時還能提供線程安全的訪問。.Net Framework 4.0引入了System.Collections.Concurrent命名空間,其中包含了一些適合並發計算的數據結構。

  1. ConcurrentQueue,這個集合使用了原子的比較和交換(CAS)操作,使用SpinWait來保證線程安全。它實現了一個先進先出(FIFO)的集合,就是說元素出隊列的順序與加入隊列的順序是一樣的。可以調用enqueue文獻向隊列中加入元素,使用TryDequeue方法試圖取出隊列中的第一個元素,使用TryPeek方法則試圖得到第一個元素但並不從隊列中刪除此元素。
  2. ConCurrentStack的實現也沒有使用任何鎖,只采用了CAS操作。它是一個后進先出(LIFO)的集合,這意味着最近添加的元素會先返回。可以使用Push和PushRange方法添加元素,使用TryPop和TryPopRange方法獲取元素,以及使用TryPeek方法檢查元素。
  3. ConcurrentBag是一個支持重復元素的無序集合。它針對這樣以下情況 進行了優化,即多個線程以這樣的方式工作:每個線程產生和消費自己的任務,極少與其他線程的任務交互(如果要交互則使用鎖)。添加元素使用add方法,檢查元素使用TryPeek方法,獲取元素使用TryTake方法。
  4. ConcurrentDictionary是一個線程安全的字典集合的實現。對於讀操作無需使用鎖。但是對於寫操作則需要鎖。這個並發字典使用多個鎖,在字典桶之上實現了一個細粒度的鎖模型。使用參數 concurrencyLevel可以在構造函數 中定義鎖的數量,這意味着預估的線程數量將並發地更新這個字典。

         注:由於並發字典使用鎖,所以沒有必要請避免使用以下操作:Count、IsEmpty、Keys、Values、CopyTo及ToArray。

       5. BlockingCollection是對IProducerConsumerCollection泛型接口的實現 的一個高級封裝。它有很多先進的功能來實現管道場景,即當你有一些步驟需要使用之前步驟運行的結果時。BlockingCollectione類支持如下功能:分場 、調整內部集合容量、取消集合操作、從多個塊集合中獲取元素。

          並行算法有可能非常復雜,並且或多或少涵蓋了這些並行集合。線程安全並不是沒有代價的。比起System.Collections和System.Collections.Generic命名空間中的經典列表 、集合和數組來說,並發集合會有更大的開銷,因此,應該只在需要從多個任務中並發訪問集合的時候才使用並發集合。在中等代碼中使用並發集合沒有意義,因為它們會增加無謂的開銷。下面我們使用最簡單的例子來學習這些並行集合。

 

一、   使用ConcurrentDictionary

     本示例展示了一個非常簡單的場景,比較在單線程環境中使用通常的字典集合與使用並發字典的性能。

 1.程序示例代碼,如下。

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using System.Collections.Concurrent;
using System.Diagnostics; 

namespace ThreadCollectionDemo
{
    class Program
    {

        const string item = "Dict Name";
        public static string CurrentItem;
        static double time1;
        static void Main(string[] args)
        {
            Console.WriteLine(string.Format("-----  ConcurrentDictionary 操作----"));
            var concurrentDict = new ConcurrentDictionary<int, string>();
            var dict = new Dictionary<int, string>();
            var sw = new Stopwatch();
            sw.Start();

            //普通字典,100萬次循環
            for (int i = 0; i < 1000000; i++)
            {
                lock (dict)
                {
                    dict[i] = string.Format("{0} {1}", item, i);
                }
            }

            sw.Stop();
            Console.WriteLine(string.Format("對普通字典集合(Dictionary) 進行100萬次寫操作共用時間:{0}----",sw.Elapsed));
            time1 = sw.Elapsed.TotalMilliseconds;
            sw.Restart();
            for (int i = 0; i < 1000000; i++)
            {
                concurrentDict[i] = string.Format("{0} {1}", item, i);          

            }

            sw.Stop();
            Console.WriteLine(string.Format("對並行字典集合(ConcurrentDictionary) 進行100萬次寫操作共用時間:{0}", sw.Elapsed));

            Console.WriteLine(string.Format("寫操作  普通字典/並行字典 = {0}", time1/1.0/sw.Elapsed.TotalMilliseconds)); 

            Console.WriteLine();
           sw.Restart();

            for (int i = 0; i < 1000000; i++)
            {
                lock (dict)

                {
                    CurrentItem = dict[i];

                }

            }

            sw.Stop();
            Console.WriteLine(string.Format("對普通字典集合(Dictionary) 進行100萬次讀操作共用時間:{0}----", sw.Elapsed));
            time1 = sw.Elapsed.TotalMilliseconds;
            sw.Restart();
 

            for (int i = 0; i < 1000000; i++)

            {           
    CurrentItem= concurrentDict[i];
            }

            sw.Stop();
            Console.WriteLine(string.Format("對並行字典集合(ConcurrentDictionary) 進行100萬次讀操作共用時間:{0}----", sw.Elapsed));

            Console.WriteLine(string.Format("讀操作  普通字典/並行字典 = {0}", time1 / 1.0 / sw.Elapsed.TotalMilliseconds));
            //多線程並行讀取數據   

            sw.Restart();
            Task[] process = new Task[4];
            for (int i = 0; i < 4; i++)
            {               
                process[i ] = Task.Run(() => Get(concurrentDict, i));

            }

            Task.WhenAll(process);
            sw.Stop();
            Console.WriteLine(string.Format("多線程對並行字典集合(ConcurrentDictionary) 進行100萬次讀操作共用時間:{0}", sw.Elapsed));

            Console.WriteLine(string.Format("讀操作  普通字典/多線程讀並行字典 = {0}", time1 / 1.0 / sw.Elapsed.TotalMilliseconds));
            Console.Read();

        }

        private static void Get(ConcurrentDictionary<int,string> dict,int index)
        {
            for (int i = 0; i < 1000000; i += 4)
            {
                if (i%4==index)
                {
                    string s = dict[i];
                }             

            }   

        }
    }

}

2.單線程情況下,程序運行結果,如下圖。

 

       當程序啟動時我們創建了兩個集合,其中一個是標准的字典集合,另一個是新的並發字典集合。然后采用鎖的機制向標准的字典中添加元素,並測量完成100萬次的時間。同時采用同樣的場景來測量ConcurrentDictionary的性能 ,最后 比較從兩個集合中獲取值 的性能。

      通過上面的比較,我們發現ConcurrentDictionary寫操作比使用鎖的通常字典要慢的多,而讀操作則要快些。因此對字典需要大量的線程安全的讀操作,ConcurrentDictionary是最好的選擇。

      最后,我們使用4個線程同時讀ConcurrentDictionary,則會發現這個字典的性能更好。

     在main方法中添加一個四線程的讀字典的代碼。如下。

 //多線程並行讀取數據        

            sw.Restart();
            Task[] process = new Task[4];
            for (int i = 0; i < 4; i++)
            {             

                process[i ] = Task.Run(() => Get(concurrentDict, i));
            }
            Task.WhenAll(process); 
            sw.Stop();
            Console.WriteLine(string.Format("多線程對並行字典集合(ConcurrentDictionary) 進行100萬次讀操作共用時間:{0}", sw.Elapsed));
            Console.WriteLine(string.Format("讀操作  普通字典/多線程讀並行字典 = {0}", time1 / 1.0 / sw.Elapsed.TotalMilliseconds));



  

        private static void Get(ConcurrentDictionary<int,string> dict,int index)
        {

            for (int i = 0; i < 1000000; i += 4)
            {
                if (i%4==index)
                {
                    string s = dict[i];

                }             

            }  

        }

3.多線程情況下,程序運行結果,如下圖。從圖中可以看出,普通字典的操作是多線程讀並行字典的30多倍。這是我機器上的情況。各人的機器配置不一,請自行測試。

 

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM