一、概述:
System.Collections.Concurrent 命名空間提供多個線程安全集合類。
當有多個線程並發訪問集合時,應使用這些類代替 System.Collections 和 System.Collections.Generic 命名空間中的對應類型。
為了對集合進行線程安全的訪問,定義了 IProducerConsumerCollection
- TryAdd()方法嘗試給集合添加一項,但如果集合禁止添加項,這個操作就可能失敗。為了給出相關信息,TryAdd()方法返回一個布爾值,以說明操作是成功還是失敗。
- TryTake()方法也以這種方式工作,以通知調用者操作是成功還是失敗,並在操作成功時返回集合中的項。
二、空間中包含的類
ConcurrentXXX :這些集合是線程安全的,如果某個動作不適用於線程的當前狀態,它們就返回false。在繼續之前,總是霈要確認添加或提取元素是否成功。不能相信集合會完成任務。
1、ConcurrentQueue
隊列
這個集合類用一種免鎖定的算法實現,使用在內部合並到一個鏈表中的32項數組。
訪問隊列元素的方法有Enqueue()、TryDequeue()和TryPeek()。這些方法的命名非常類似於前面Queue
因為這個類實現了IProducerConsumerCollection
2、ConcurrentStack
堆棧
非常類似於ConcurrentQueue
ConcurrcntStack
3、ConcurrentBag
包
該類沒有定義添加或提取項的任何順序。這個類使用一個把線程映射到內部使用的數組上的概念,因此嘗試減少鎖定。
訪問元素的方法有Add()、TryPeek()和 TryTake()。
4、ConcurrentDictionary
字典
——這是一個線程安全的鍵值集合。
TryAdd()、TryGetValue()、TryRemove()和TryUpdate()方法以非阻塞的方式訪問成員。
因為元素基於鍵和值, 所以 ConcurrentDictionary
5、BlockingCollection
集合
這個集合在可以添加或提取元素之前,會阻塞線程並一直等待。
BlockingCollection
Add()方法有一個重載版本,其中可以給該重載版本傳遞一個CancellationToken令牌。
這個令牌允許取消被阻塞的調用。如果不希望線程無限期地等待下去,且不希望從外部取消調用,就可以使用TryAdd()和 TryTake()方法,在這些方法中,也可以指定一個超時值,它表示在調用失敗之前應阻塞線程和等待的最長時間。
6、BlockingCollection
阻塞式集合
這是對實現了 IProducerConsumerCollection
還可以給構造函數傳遞任何其他實現了 IProducerConsumerCollection
下面的代碼示例簡單演示了使用BlockingCollection
一個線程是生成器,它使用Add()方法給集合寫入元素,另一個線程是使用者,它使用Take()方法從集合中提取元素:
internal static BlockingCollection<int> _TestBCollection;
class ThreadWork1 // 生產者 { public ThreadWork1() { } public void run() { System.Console.WriteLine("ThreadWork1 run { "); for (int i = 0; i < 100; i++) { System.Console.WriteLine("ThreadWork1 producer: " + i); _TestBCollection.Add(i); } _TestBCollection.CompleteAdding(); System.Console.WriteLine("ThreadWork1 run } "); } }
class ThreadWork2 // 消費者 { public ThreadWork2() { } public void run() { int i = 0; int nCnt = 0; bool IsDequeuue = false; System.Console.WriteLine("ThreadWork2 run { "); while (!_TestBCollection.IsCompleted) { IsDequeuue = _TestBCollection.TryTake(out i); if (IsDequeuue) { System.Console.WriteLine("ThreadWork2 consumer: " + i * i + " =====" + i); nCnt++; } } System.Console.WriteLine("ThreadWork2 run } "); } } static void StartT1() { ThreadWork1 work1 = new ThreadWork1(); work1.run(); } static void StartT2() { ThreadWork2 work2 = new ThreadWork2(); work2.run(); } static void Main(string[] args) { Task t1 = new Task(() => StartT1()); Task t2 = new Task(() => StartT2()); _TestBCollection = new BlockingCollection<int>();//可以跟容量 Console.WriteLine("Sample 4-4 Main {"); Console.WriteLine("Main t1 t2 started {"); t1.Start(); t2.Start(); Console.WriteLine("Main t1 t2 started }"); Console.WriteLine("Main wait t1 t2 end {"); Task.WaitAll(t1, t2); Console.WriteLine("Main wait t1 t2 end }"); Console.WriteLine("Sample 4-4 Main }"); }