為了讓共享的數組,集合能夠被多線程更新,我們現在(.net4.0之后)可以使用並發集合來實現這個功能。而System.Collections和System.Collections.Generic命名空間中所提供的經典列表,集合和數組都不是線程安全的,如果要使用,還需要添加代碼來同步。
先看一個例子,通過並行循環向一個List<string>集合添加元素。因為List不是線程安全的,所以必須對Add方法加鎖來串行化。
任務開始:

private static int NUM_AES_KEYS =80000; static void Main(string[] args) { Console.WriteLine("任務開始..."); var sw = Stopwatch.StartNew(); for (int i = 0; i < 4; i++) { ParallelGennerateMD5Keys(); Console.WriteLine(_keyList.Count); } Console.WriteLine("結束時間:" + sw.Elapsed); Console.ReadKey(); }
private static List<string> _keyList; private static void ParallelGennerateMD5Keys() { _keyList=new List<string>(NUM_AES_KEYS); Parallel.ForEach(Partitioner.Create(1, NUM_AES_KEYS + 1), range => { var md5M = MD5.Create(); for (int i = range.Item1; i < range.Item2; i++) { byte[] data = Encoding.Unicode.GetBytes(Environment.UserName + i); byte[] result = md5M.ComputeHash(data); string hexString = ConverToHexString(result); lock (_keyList) { _keyList.Add(hexString); } } }); }
但如果我們去掉lock,得到的結果如下:
沒有一次是滿80000的。lock關鍵字創建了一個臨界代碼區,當一個任務進入之后,其他任務會被阻塞並等待進入。lock關鍵字引入了一定的開銷,而且會降低可擴展性。對於這個問題,.Net4.0提供了System.Collections.Concurrent命名空間用於解決線程安全問題,它包含了5個集合:ConcurrentQueue<T>,ConcurrentStack<T>,ConcurrentBag<T>,BlockingCollection<T>,ConcurrentDictionary<TKey,TValue>。這些集合都在某種程度上使用了無鎖技術,性能得到了提升。
ConcurrentQueue
一個FIFO(先進先出)的集合。支持多任務進並發行入隊和出隊操作。
ConcurrentQueue是完全無鎖的,它是System.Collections.Queue的並發版本。提供三個主要的方法:
- Enqueue--將元素加入到隊列尾部。
- TryDequeue--嘗試刪除隊列頭部元素。並將元素通過out參數返回。返回值為bool型,表示是否執行成功。
- TryPeek--嘗試將隊列頭部元素通過out參數返回,但不會刪除這個元素。返回值bool型,表示操作是否成功。
修改上面的代碼:
private static ConcurrentQueue<string> _keyQueue; private static void ParallelGennerateMD5Keys() { _keyQueue = new ConcurrentQueue<string>(); Parallel.ForEach(Partitioner.Create(1, NUM_AES_KEYS + 1), range => { var md5M = MD5.Create(); for (int i = range.Item1; i < range.Item2; i++) { byte[] data = Encoding.Unicode.GetBytes(Environment.UserName + i); byte[] result = md5M.ComputeHash(data); string hexString = ConverToHexString(result); _keyQueue.Enqueue(hexString); } }); }
結果如下:
可以看見,它的使用很簡單,不用擔心同步問題。接下我們通過生產者-消費者模式,對上面的問題進行改造,分解成兩個任務。使用兩個共享的ConcurrentQueue實例。_byteArraysQueue 和 _keyQueue ,ParallelGennerateMD5Keys 方法生產byte[],ConverKeysToHex方法去消費並產生key。
private static ConcurrentQueue<string> _keyQueue; private static ConcurrentQueue<byte[]> _byteArraysQueue; private static void ParallelGennerateMD5Keys(int maxDegree) { var parallelOptions = new ParallelOptions{MaxDegreeOfParallelism = maxDegree}; var sw = Stopwatch.StartNew(); _keyQueue = new ConcurrentQueue<string>(); Parallel.ForEach(Partitioner.Create(1, NUM_AES_KEYS + 1),parallelOptions, range => { var md5M = MD5.Create(); for (int i = range.Item1; i < range.Item2; i++) { byte[] data = Encoding.Unicode.GetBytes(Environment.UserName + i); byte[] result = md5M.ComputeHash(data); _byteArraysQueue.Enqueue(result); } }); Console.WriteLine("MD5結束時間:" + sw.Elapsed); } private static void ConverKeysToHex(Task taskProducer) { var sw = Stopwatch.StartNew(); while (taskProducer.Status == TaskStatus.Running || taskProducer.Status == TaskStatus.WaitingToRun || _byteArraysQueue.Count > 0) { byte[] result; if (_byteArraysQueue.TryDequeue(out result)) { string hexString = ConverToHexString(result); _keyQueue.Enqueue(hexString); } } Console.WriteLine("key結束時間:" + sw.Elapsed); }
這次我修改了執行次數為180000
private static int NUM_AES_KEYS =180000; static void Main(string[] args) { Console.WriteLine("任務開始..."); var sw = Stopwatch.StartNew(); _byteArraysQueue=new ConcurrentQueue<byte[]>(); _keyQueue=new ConcurrentQueue<string>();
//生產key 和 消費key的兩個任務 var taskKeys = Task.Factory.StartNew(()=>ParallelGennerateMD5Keys(Environment.ProcessorCount - 1)); var taskHexString = Task.Factory.StartNew(()=>ConverKeysToHex(taskKeys)); string lastKey;
//隔半秒去看一次。 while (taskHexString.Status == TaskStatus.Running || taskHexString.Status == TaskStatus.WaitingToRun) { Console.WriteLine("_keyqueue的個數是{0},_byteArraysQueue的個數是{1}", _keyQueue.Count,_byteArraysQueue.Count); if (_keyQueue.TryPeek(out lastKey)) { // Console.WriteLine("第一個Key是{0}",lastKey); } Thread.Sleep(500); } //等待兩個任務結束 Task.WaitAll(taskKeys, taskHexString); Console.WriteLine("結束時間:" + sw.Elapsed); Console.WriteLine("key的總數是{0}" , _keyQueue.Count); Console.ReadKey(); }
從結果可以發現,_bytaArraysQueue里面的byte[] 幾乎是生產一個,就被消費一個。
理解生產者和消費者
使用ConcurrentQueue可以很容易的實現並行的生產者-消費者模式或多階段的線性流水線。如下:
我們可以改造上面的main方法,讓一半的線程用於生產,一半的線程用於消費。
static void Main(string[] args) { Console.WriteLine("任務開始..."); var sw = Stopwatch.StartNew(); _byteArraysQueue=new ConcurrentQueue<byte[]>(); _keyQueue=new ConcurrentQueue<string>(); var taskKeyMax = Environment.ProcessorCount/2; var taskKeys = Task.Factory.StartNew(() => ParallelGennerateMD5Keys(taskKeyMax)); var taskHexMax = Environment.ProcessorCount - taskKeyMax; var taskHexStrings=new Task[taskHexMax]; for (int i = 0; i < taskHexMax; i++) { taskHexStrings[i] = Task.Factory.StartNew(() => ConverKeysToHex(taskKeys)); } Task.WaitAll(taskHexStrings); Console.WriteLine("結束時間:" + sw.Elapsed); Console.WriteLine("key的總數是{0}" , _keyQueue.Count); Console.ReadKey(); }
而這些消費者的結果又可以繼續作為生產者,繼續串聯下去。
ConcurrentStack
一個LIFO(后進先出)的集合,支持多任務並發進行壓入和彈出操作。它是完全無鎖的。是System.Collections.Stack的並發版本。
它和ConcurrentQueue非常相似,區別在於使用了不同的方法名,更好的表示一個棧。ConcurrentStack主要提供了下面五個重要方法。
- Push:將元素添加到棧頂。
- TryPop:嘗試刪除棧頂部的元素,並通過out返回。返回值為bool,表示操作是否成功。
- TryPeek:嘗試通過out返回棧頂部的元素,返回值為bool,表示是否成功。
- PushRange:一次將多個元素插入棧頂。
- TryPopRange:一次將多個元素從棧頂移除。
為了判斷棧是否包含任意項,可以使用IsEmpty屬性判斷。
if(!_byteArraysStack.IsEmpty)
而使用Count方法,開銷相對較大。另外我們可以將不安全的集合或數組轉化為並發集合。下例將數組作為參數傳入。操作上和List一樣。
private static string[] _HexValues = {"AF", "BD", "CF", "DF", "DA", "FE", "FF", "FA"}; static void Main(string[] args) { var invalidHexStack = new ConcurrentStack<string>(_HexValues); while (!invalidHexStack.IsEmpty) { string value; invalidHexStack.TryPop(out value); Console.WriteLine(value); } }
反之,可以用CopyTo和ToArray方法將並發集合創建一個不安全集合。
ConcurrentBag
一個無序對象集合,在同一個線程添加元素(生產)和刪除元素(消費)的場合下效率特別高,ConcurrentBag最大程度上減少了同步的需求以及同步帶來的開銷。然而它在生產線程和消費線程完全分開的情況下,效率低下。
它提供了3個重要方法
- Add--添加元素到無序組
- TryTake--嘗試從無序組中刪除一個元素,out返回。返回值bool 表示操作是否成功。
- TryPeek--嘗試通過out返回一個參數。返回值bool 表示操作是否成功。
下面的實例中Main方法通過Parallel.Invoke並發的加載三個方法。有多個生產者和消費者。對應三個ConcurrentBag<string>:_sentencesBag,_capWrodsInSentenceBag和_finalSentencesBag。
- ProduceSentences 隨機生產句子 (消費者)
- CapitalizeWordsInSentence 改造句子 (消費者/生產者)
- RemoveLettersInSentence 刪除句子 (消費者)
static void Main(string[] args) { Console.WriteLine("任務開始..."); var sw = Stopwatch.StartNew(); _sentencesBag=new ConcurrentBag<string>(); _capWrodsInSentenceBag=new ConcurrentBag<string>(); _finalSentencesBag=new ConcurrentBag<string>(); _producingSentences = true; Parallel.Invoke(ProduceSentences,CapitalizeWordsInSentence,RemoveLettersInSentence); Console.WriteLine("_sentencesBag的總數是{0}", _sentencesBag.Count); Console.WriteLine("_capWrodsInSentenceBag的總數是{0}", _capWrodsInSentenceBag.Count); Console.WriteLine("_finalSentencesBag的總數是{0}", _finalSentencesBag.Count); Console.WriteLine("總時間:{0}",sw.Elapsed); Console.ReadKey(); }

private static ConcurrentBag<string> _sentencesBag; private static ConcurrentBag<string> _capWrodsInSentenceBag; private static ConcurrentBag<string> _finalSentencesBag; private static volatile bool _producingSentences = false; private static volatile bool _capitalWords = false; private static void ProduceSentences() { string[] rawSentences = { "並發集合你可知", "ConcurrentBag 你值得擁有", "stoneniqiu", "博客園", ".Net並發編程學習", "Reading for you", "ConcurrentBag 是個無序集合" }; try { Console.WriteLine("ProduceSentences..."); _sentencesBag = new ConcurrentBag<string>(); var random = new Random(); for (int i = 0; i < NUM_AES_KEYS; i++) { var sb = new StringBuilder(); sb.Append(rawSentences[random.Next(rawSentences.Length)]); sb.Append(' '); _sentencesBag.Add(sb.ToString()); } } finally { _producingSentences = false; } } private static void CapitalizeWordsInSentence() { SpinWait.SpinUntil(() => _producingSentences); try { Console.WriteLine("CapitalizeWordsInSentence..."); _capitalWords = true; while ((!_sentencesBag.IsEmpty)||_producingSentences) { string sentence; if (_sentencesBag.TryTake(out sentence)) { _capWrodsInSentenceBag.Add(sentence.ToUpper()+"stoneniqiu"); } } } finally { _capitalWords = false; } } private static void RemoveLettersInSentence() { SpinWait.SpinUntil(() => _capitalWords); Console.WriteLine("RemoveLettersInSentence..."); while (!_capWrodsInSentenceBag.IsEmpty || _capitalWords) { string sentence; if (_capWrodsInSentenceBag.TryTake(out sentence)) { _finalSentencesBag.Add(sentence.Replace("stonenqiu","")); } } }
在CapitalizeWordsInSentence 方法中,使用SpinUntil方法並傳入共享bool變量_producingSentences,當其為true的時候,SpinUnit方法會停止自旋。但協調多個生產者和消費者自旋並非最好的解決方案,我們可以使用BlockingCollection(下面會講)來提升性能。
SpinWait.SpinUntil(() => _producingSentences);
另外兩個用作標志的共享bool變量在聲明的時候使用了volatile關鍵字。這樣可以確保在不同的線程中進行訪問的時候,可以得到這些變量的最新值。
private static volatile bool _producingSentences = false; private static volatile bool _capitalWords = false;
BlockingCollection
與經典的阻塞隊列數據結構類似,適用於多個任務添加和刪除數據的生產者-消費者的情形。提供了阻塞和界限的能力。
BlockingCollection是對IProducerConsumerCollection<T>實例的一個包裝。而這個接口繼承於ICollection,IEnumerable<T>。前面的並發集合都繼承了這個接口。因此這些集合都可以封裝在BlockingCollection中。
將上面的例子換成BlockingCollection

static void Main(string[] args) { Console.WriteLine("任務開始..."); var sw = Stopwatch.StartNew(); _sentencesBC = new BlockingCollection<string>(NUM_SENTENCE); _capWrodsInSentenceBC = new BlockingCollection<string>(NUM_SENTENCE); _finalSentencesBC = new BlockingCollection<string>(NUM_SENTENCE); Parallel.Invoke(ProduceSentences,CapitalizeWordsInSentence,RemoveLettersInSentence); Console.WriteLine("_sentencesBag的總數是{0}", _sentencesBC.Count); Console.WriteLine("_capWrodsInSentenceBag的總數是{0}", _capWrodsInSentenceBC.Count); Console.WriteLine("_finalSentencesBag的總數是{0}", _finalSentencesBC.Count); Console.WriteLine("總時間:{0}",sw.Elapsed); Console.ReadKey(); } private static int NUM_SENTENCE = 2000000; private static BlockingCollection<string> _sentencesBC; private static BlockingCollection<string> _capWrodsInSentenceBC; private static BlockingCollection<string> _finalSentencesBC; private static volatile bool _producingSentences = false; private static volatile bool _capitalWords = false; private static void ProduceSentences() { string[] rawSentences = { "並發集合你可知", "ConcurrentBag 你值得擁有", "stoneniqiu", "博客園", ".Net並發編程學習", "Reading for you", "ConcurrentBag 是個無序集合" }; Console.WriteLine("ProduceSentences..."); _sentencesBC = new BlockingCollection<string>(); var random = new Random(); for (int i = 0; i < NUM_SENTENCE; i++) { var sb = new StringBuilder(); sb.Append(rawSentences[random.Next(rawSentences.Length)]); sb.Append(' '); _sentencesBC.Add(sb.ToString()); } //讓消費者知道,生產過程已經完成 _sentencesBC.CompleteAdding(); } private static void CapitalizeWordsInSentence() { Console.WriteLine("CapitalizeWordsInSentence..."); //生產者是否完成 while (!_sentencesBC.IsCompleted) { string sentence; if (_sentencesBC.TryTake(out sentence)) { _capWrodsInSentenceBC.Add(sentence.ToUpper() + "stoneniqiu"); } } //讓消費者知道,生產過程已經完成 _capWrodsInSentenceBC.CompleteAdding(); } private static void RemoveLettersInSentence() { //SpinWait.SpinUntil(() => _capitalWords); Console.WriteLine("RemoveLettersInSentence..."); while (!_capWrodsInSentenceBC.IsCompleted) { string sentence; if (_capWrodsInSentenceBC.TryTake(out sentence)) { _finalSentencesBC.Add(sentence.Replace("stonenqiu","")); } } }
無需再使用共享的bool變量來同步。在操作結束后,調用CompeteAdding方法來告之下游的消費者。這個時候IsAddingComplete屬性為true。
_sentencesBC.CompleteAdding();
而在生產者中也無需使用自旋了。可以判斷IsCompleted屬性。而當IsAddingComplete屬性為true且集合為空的時候,IsCompleted才為true。這個時候就表示,生產者的元素已經被使用完了。這樣代碼也更簡潔了。
while (!_sentencesBC.IsCompleted)
最后的結果要比使用ConcurrentBag快了0.8秒。一共是200w條數據,處理三次。
ConcurrentDictionary
與經典字典類似,提供了並發的鍵值訪問。它對讀操作是完全無鎖的,在添加和修改的時候使用了細粒度的鎖。是IDictionary的並發版本。
它提供最重要方法如下:
- AddOrUpdate--如果鍵不存在就添加一個鍵值對。如果鍵已經存在,就更新鍵值對。可以使用函數來生成或者更新鍵值對。需要在委托內添加同步代碼來確保線程安全。
- GetEnumerator--返回遍歷整個ConcurrentDictionary的枚舉器,而且是線程安全的。
- GetOrAdd--如果鍵不存在就添加一個新鍵值對,如果存在就返回這個鍵現在的值,而不添加新值。
- TryAdd
- TryOrGetVaule
- TryRemove
- TryUpdate
下面的例子創建一個ConcurrentDictionary,然后不斷的更新。lock關鍵字確保一次只有一個線程運行Update方法。
static void Main(string[] args) { Console.WriteLine("任務開始..."); var sw = Stopwatch.StartNew(); rectangInfoDic=new ConcurrentDictionary<string, RectangInfo>(); GenerateRectangles(); foreach (var keyValue in rectangInfoDic) { Console.WriteLine("{0},{1},更新次數{2}",keyValue.Key,keyValue.Value.Size,keyValue.Value.UpdateTimes); } Console.WriteLine("總時間:{0}",sw.Elapsed); Console.ReadKey(); } private static ConcurrentDictionary<string, RectangInfo> rectangInfoDic; private const int MAX_RECTANGLES = 2000; private static void GenerateRectangles() { Parallel.For(1, MAX_RECTANGLES + 1, (i) => { for (int j = 0; j < 50; j++) { var newkey = string.Format("Rectangle{0}", i%5000); var rect = new RectangInfo(newkey, i, j); rectangInfoDic.AddOrUpdate(newkey, rect, (key, existRect) => { if (existRect != rect) { lock (existRect) { existRect.Update(rect.X,rect.Y); } return existRect; } return existRect; }); } }); }
Rectangle:

public class RectangInfo:IEqualityComparer<RectangInfo> { public string Name { get; set; } public int X { get; set; } public int Y { get; set; } public int UpdateTimes { get; set; } public int Size { get { return X*Y; } } public DateTime LastUpdate { get; set; } public RectangInfo(string name,int x,int y) { Name = name; X = x; Y = y; LastUpdate = DateTime.Now; } public RectangInfo(string key) : this(key, 0, 0) { } public void Update(int x,int y) { X = x; Y = y; UpdateTimes++; } public bool Equals(RectangInfo x, RectangInfo y) { return (x.Name == y.Name && x.Size == y.Size); } public int GetHashCode(RectangInfo obj) { return obj.Name.GetHashCode(); } }
本章學習了五種並發集合,熟悉了生產者-消費者的並發模型,我們可以使用並發集合來設計並優化流水線。希望本文對你有幫助。
閱讀書籍:《C#並行編程高級教程》 。