這篇是並發集合中的最后一篇,介紹一下BlockingCollection。在工作中我還沒有使用過,但是看上去應該是為了便捷使用並發集合而創建的類型。默認情況下,BlockingCollection使用的是ConcurrentQueue容器,當然我們也可以使用其他實現了IProducerConsumerCollection的類型來操作。 static Task GetRandomDelay() { int delay = new Random(DateTime.Now.Millisecond).Next(1, 500); return Task.Delay(delay); } class CustomTask { public int Id { get; set; } } static async Task RunProgram(IProducerConsumerCollection<CustomTask> collection = null) { var taskCollection = new BlockingCollection<CustomTask>(); if(collection != null) taskCollection= new BlockingCollection<CustomTask>(collection); var taskSource = Task.Run(() => TaskProducer(taskCollection)); Task[] processors = new Task[4]; for (int i = 1; i <= 4; i++) { string processorId = "Processor " + i; processors[i - 1] = Task.Run(() => TaskProcessor(taskCollection, processorId)); } await taskSource; await Task.WhenAll(processors); } static async Task TaskProducer(BlockingCollection<CustomTask> collection) { for (int i = 1; i <= 20; i++) { await Task.Delay(20); var workItem = new CustomTask { Id = i }; collection.Add(workItem); Console.WriteLine("Task {0} has been posted", workItem.Id); } collection.CompleteAdding(); //完成工作 } static async Task TaskProcessor(BlockingCollection<CustomTask> collection, string name) { await GetRandomDelay(); foreach (CustomTask item in collection.GetConsumingEnumerable()) { Console.WriteLine("Task {0} has been processed by {1}", item.Id, name); await GetRandomDelay(); } } 首先調用默認的BlockingCollection: 然后我們傳入一個ConcurrentStack實例 Console.WriteLine("Using a Stack inside of BlockingCollection"); Console.WriteLine(); Task t = RunProgram(new ConcurrentStack<CustomTask>()); t.Wait();
C# 並行編程 之 並發集合 (.Net Framework 4.0) 2015年05月08日 10:15:29 zy__ 閱讀數 24909更多 分類專欄: C# 版權聲明:本文為博主原創文章,遵循 CC 4.0 BY-SA 版權協議,轉載請附上原文出處鏈接和本聲明。 本文鏈接:https://blog.csdn.net/wangzhiyu1980/article/details/45497907 此文為個人學習《C#並行編程高級教程》的筆記,總結並調試了一些文章中的代碼示例。 在以后開發過程中可以加以運用。 對於並行任務,與其相關緊密的就是對一些共享資源,數據結構的並行訪問。經常要做的就是對一些隊列進行加鎖-解鎖,然后執行類似插入,刪除等等互斥操作。 .NetFramework 4.0 中提供了一些封裝好的支持並行操作數據容器,可以減少並行編程的復雜程度。 基本信息 .NetFramework中並行集合的名字空間: System.Collections.Concurrent 並行容器: ConcurrentQueue ConcurrentStack ConcurrentBag : 一個無序的數據結構集,當不需要考慮順序時非常有用。 BlockingCollection : 與經典的阻塞隊列數據結構類似 ConcurrentDictionary 這些集合在某種程度上使用了無鎖技術(CAS Compare-and-Swap和內存屏障 Memory Barrier),與加互斥鎖相比獲得了性能的提升。但在串行程序中,最好不用這些集合,它們必然會影響性能。 關於CAS: http://www.tuicool.com/articles/zuui6z http://www.360doc.com/content/11/0914/16/7656248_148221200.shtml 關於內存屏障 http://en.wikipedia.org/wiki/Memory_barrier 用法與示例 ConcurrentQueue 其完全無鎖,但當CAS面臨資源競爭失敗時可能會陷入自旋並重試操作。 Enqueue:在隊尾插入元素 TryDequeue:嘗試刪除隊頭元素,並通過out參數返回 TryPeek:嘗試將對頭元素通過out參數返回,但不刪除該元素。 程序示例: using System; using System.Text; using System.Threading.Tasks; using System.Collections.Concurrent; namespace Sample4_1_concurrent_queue { class Program { internal static ConcurrentQueue<int> _TestQueue; class ThreadWork1 // producer { public ThreadWork1() { } public void run() { System.Console.WriteLine("ThreadWork1 run { "); for (int i = 0; i < 100; i++) { System.Console.WriteLine("ThreadWork1 producer: " + i); _TestQueue.Enqueue(i); } System.Console.WriteLine("ThreadWork1 run } "); } } class ThreadWork2 // consumer { public ThreadWork2() { } public void run() { int i = 0; bool IsDequeuue = false; System.Console.WriteLine("ThreadWork2 run { "); for (; ; ) { IsDequeuue = _TestQueue.TryDequeue(out i); if (IsDequeuue) System.Console.WriteLine("ThreadWork2 consumer: " + i * i + " ====="); if (i == 99) break; } System.Console.WriteLine("ThreadWork2 run } "); } } static void StartT1() { ThreadWork1 work1 = new ThreadWork1(); work1.run(); } static void StartT2() { ThreadWork2 work2 = new ThreadWork2(); work2.run(); } static void Main(string[] args) { Task t1 = new Task(() => StartT1()); Task t2 = new Task(() => StartT2()); _TestQueue = new ConcurrentQueue<int>(); Console.WriteLine("Sample 3-1 Main {"); Console.WriteLine("Main t1 t2 started {"); t1.Start(); t2.Start(); Console.WriteLine("Main t1 t2 started }"); Console.WriteLine("Main wait t1 t2 end {"); Task.WaitAll(t1, t2); Console.WriteLine("Main wait t1 t2 end }"); Console.WriteLine("Sample 3-1 Main }"); Console.ReadKey(); } } } ConcurrentStack 其完全無鎖,但當CAS面臨資源競爭失敗時可能會陷入自旋並重試操作。 Push:向棧頂插入元素 TryPop:從棧頂彈出元素,並且通過out 參數返回 TryPeek:返回棧頂元素,但不彈出。 程序示例: using System; using System.Text; using System.Threading.Tasks; using System.Collections.Concurrent; namespace Sample4_2_concurrent_stack { class Program { internal static ConcurrentStack<int> _TestStack; class ThreadWork1 // producer { public ThreadWork1() { } public void run() { System.Console.WriteLine("ThreadWork1 run { "); for (int i = 0; i < 100; i++) { System.Console.WriteLine("ThreadWork1 producer: " + i); _TestStack.Push(i); } System.Console.WriteLine("ThreadWork1 run } "); } } class ThreadWork2 // consumer { public ThreadWork2() { } public void run() { int i = 0; bool IsDequeuue = false; System.Console.WriteLine("ThreadWork2 run { "); for (; ; ) { IsDequeuue = _TestStack.TryPop(out i); if (IsDequeuue) System.Console.WriteLine("ThreadWork2 consumer: " + i * i + " =====" + i); if (i == 99) break; } System.Console.WriteLine("ThreadWork2 run } "); } } static void StartT1() { ThreadWork1 work1 = new ThreadWork1(); work1.run(); } static void StartT2() { ThreadWork2 work2 = new ThreadWork2(); work2.run(); } static void Main(string[] args) { Task t1 = new Task(() => StartT1()); Task t2 = new Task(() => StartT2()); _TestStack = new ConcurrentStack<int>(); Console.WriteLine("Sample 4-1 Main {"); Console.WriteLine("Main t1 t2 started {"); t1.Start(); t2.Start(); Console.WriteLine("Main t1 t2 started }"); Console.WriteLine("Main wait t1 t2 end {"); Task.WaitAll(t1, t2); Console.WriteLine("Main wait t1 t2 end }"); Console.WriteLine("Sample 4-1 Main }"); Console.ReadKey(); } } } 測試中一個有趣的現象: 雖然生產者已經在棧中插入值已經到了25,但消費者第一個出棧的居然是4,而不是25。很像是出錯了。但仔細想想入棧,出棧和打印語句是兩個部分,而且並不是原子操作,出現這種現象應該也算正常。 Sample 3-1 Main { Main t1 t2 started { Main t1 t2 started } Main wait t1 t2 end { ThreadWork1 run { ThreadWork1 producer: 0 ThreadWork2 run { ThreadWork1 producer: 1 ThreadWork1 producer: 2 ThreadWork1 producer: 3 ThreadWork1 producer: 4 ThreadWork1 producer: 5 ThreadWork1 producer: 6 ThreadWork1 producer: 7 ThreadWork1 producer: 8 ThreadWork1 producer: 9 ThreadWork1 producer: 10 ThreadWork1 producer: 11 ThreadWork1 producer: 12 ThreadWork1 producer: 13 ThreadWork1 producer: 14 ThreadWork1 producer: 15 ThreadWork1 producer: 16 ThreadWork1 producer: 17 ThreadWork1 producer: 18 ThreadWork1 producer: 19 ThreadWork1 producer: 20 ThreadWork1 producer: 21 ThreadWork1 producer: 22 ThreadWork1 producer: 23 ThreadWork1 producer: 24 ThreadWork1 producer: 25 ThreadWork2 consumer: 16 =====4 ThreadWork2 consumer: 625 =====25 ThreadWork2 consumer: 576 =====24 ThreadWork2 consumer: 529 =====23 ThreadWork1 producer: 26 ThreadWork1 producer: 27 ThreadWork1 producer: 28 ConcurrentBag 一個無序的集合,程序可以向其中插入元素,或刪除元素。 在同一個線程中向集合插入,刪除元素的效率很高。 Add:向集合中插入元素 TryTake:從集合中取出元素並刪除 TryPeek:從集合中取出元素,但不刪除該元素。 程序示例: using System; using System.Text; using System.Threading.Tasks; using System.Collections.Concurrent; namespace Sample4_3_concurrent_bag { class Program { internal static ConcurrentBag<int> _TestBag; class ThreadWork1 // producer { public ThreadWork1() { } public void run() { System.Console.WriteLine("ThreadWork1 run { "); for (int i = 0; i < 100; i++) { System.Console.WriteLine("ThreadWork1 producer: " + i); _TestBag.Add(i); } System.Console.WriteLine("ThreadWork1 run } "); } } class ThreadWork2 // consumer { public ThreadWork2() { } public void run() { int i = 0; int nCnt = 0; bool IsDequeuue = false; System.Console.WriteLine("ThreadWork2 run { "); for (;;) { IsDequeuue = _TestBag.TryTake(out i); if (IsDequeuue) { System.Console.WriteLine("ThreadWork2 consumer: " + i * i + " =====" + i); nCnt++; } if (nCnt == 99) break; } System.Console.WriteLine("ThreadWork2 run } "); } } static void StartT1() { ThreadWork1 work1 = new ThreadWork1(); work1.run(); } static void StartT2() { ThreadWork2 work2 = new ThreadWork2(); work2.run(); } static void Main(string[] args) { Task t1 = new Task(() => StartT1()); Task t2 = new Task(() => StartT2()); _TestBag = new ConcurrentBag<int>(); Console.WriteLine("Sample 4-3 Main {"); Console.WriteLine("Main t1 t2 started {"); t1.Start(); t2.Start(); Console.WriteLine("Main t1 t2 started }"); Console.WriteLine("Main wait t1 t2 end {"); Task.WaitAll(t1, t2); Console.WriteLine("Main wait t1 t2 end }"); Console.WriteLine("Sample 4-3 Main }"); Console.ReadKey(); } } } BlockingCollection 一個支持界限和阻塞的容器 Add :向容器中插入元素 TryTake:從容器中取出元素並刪除 TryPeek:從容器中取出元素,但不刪除。 CompleteAdding:告訴容器,添加元素完成。此時如果還想繼續添加會發生異常。 IsCompleted:告訴消費線程,生產者線程還在繼續運行中,任務還未完成。 示例程序: 程序中,消費者線程完全使用 while (!_TestBCollection.IsCompleted) 作為退出運行的判斷條件。 在Worker1中,有兩條語句被注釋掉了,當i 為50時設置CompleteAdding,但當繼續向其中插入元素時,系統拋出異常,提示無法再繼續插入。 using System; using System.Text; using System.Threading.Tasks; using System.Collections.Concurrent; namespace Sample4_4_concurrent_bag { class Program { internal static BlockingCollection<int> _TestBCollection; class ThreadWork1 // producer { public ThreadWork1() { } public void run() { System.Console.WriteLine("ThreadWork1 run { "); for (int i = 0; i < 100; i++) { System.Console.WriteLine("ThreadWork1 producer: " + i); _TestBCollection.Add(i); //if (i == 50) // _TestBCollection.CompleteAdding(); } _TestBCollection.CompleteAdding(); System.Console.WriteLine("ThreadWork1 run } "); } } class ThreadWork2 // consumer { public ThreadWork2() { } public void run() { int i = 0; int nCnt = 0; bool IsDequeuue = false; System.Console.WriteLine("ThreadWork2 run { "); while (!_TestBCollection.IsCompleted) { IsDequeuue = _TestBCollection.TryTake(out i); if (IsDequeuue) { System.Console.WriteLine("ThreadWork2 consumer: " + i * i + " =====" + i); nCnt++; } } System.Console.WriteLine("ThreadWork2 run } "); } } static void StartT1() { ThreadWork1 work1 = new ThreadWork1(); work1.run(); } static void StartT2() { ThreadWork2 work2 = new ThreadWork2(); work2.run(); } static void Main(string[] args) { Task t1 = new Task(() => StartT1()); Task t2 = new Task(() => StartT2()); _TestBCollection = new BlockingCollection<int>(); Console.WriteLine("Sample 4-4 Main {"); Console.WriteLine("Main t1 t2 started {"); t1.Start(); t2.Start(); Console.WriteLine("Main t1 t2 started }"); Console.WriteLine("Main wait t1 t2 end {"); Task.WaitAll(t1, t2); Console.WriteLine("Main wait t1 t2 end }"); Console.WriteLine("Sample 4-4 Main }"); Console.ReadKey(); } } } 當然可以嘗試在Work1中注釋掉 CompleteAdding 語句,此時Work2陷入循環無法退出。 ConcurrentDictionary 對於讀操作是完全無鎖的,當很多線程要修改數據時,它會使用細粒度的鎖。 AddOrUpdate:如果鍵不存在,方法會在容器中添加新的鍵和值,如果存在,則更新現有的鍵和值。 GetOrAdd:如果鍵不存在,方法會向容器中添加新的鍵和值,如果存在則返回現有的值,並不添加新值。 TryAdd:嘗試在容器中添加新的鍵和值。 TryGetValue:嘗試根據指定的鍵獲得值。 TryRemove:嘗試刪除指定的鍵。 TryUpdate:有條件的更新當前鍵所對應的值。 GetEnumerator:返回一個能夠遍歷整個容器的枚舉器。 程序示例: using System; using System.Text; using System.Threading.Tasks; using System.Collections.Concurrent; namespace Sample4_5_concurrent_dictionary { class Program { internal static ConcurrentDictionary<int, int> _TestDictionary; class ThreadWork1 // producer { public ThreadWork1() { } public void run() { System.Console.WriteLine("ThreadWork1 run { "); for (int i = 0; i < 100; i++) { System.Console.WriteLine("ThreadWork1 producer: " + i); _TestDictionary.TryAdd(i, i); } System.Console.WriteLine("ThreadWork1 run } "); } } class ThreadWork2 // consumer { public ThreadWork2() { } public void run() { int i = 0, nCnt = 0; int nValue = 0; bool IsOk = false; System.Console.WriteLine("ThreadWork2 run { "); while (nCnt < 100) { IsOk = _TestDictionary.TryGetValue(i, out nValue); if (IsOk) { System.Console.WriteLine("ThreadWork2 consumer: " + i * i + " =====" + i); nValue = nValue * nValue; _TestDictionary.AddOrUpdate(i, nValue, (key, value) => { return value = nValue; }); nCnt++; i++; } } System.Console.WriteLine("ThreadWork2 run } "); } } static void StartT1() { ThreadWork1 work1 = new ThreadWork1(); work1.run(); } static void StartT2() { ThreadWork2 work2 = new ThreadWork2(); work2.run(); } static void Main(string[] args) { Task t1 = new Task(() => StartT1()); Task t2 = new Task(() => StartT2()); bool bIsNext = true; int nValue = 0; _TestDictionary = new ConcurrentDictionary<int, int>(); Console.WriteLine("Sample 4-5 Main {"); Console.WriteLine("Main t1 t2 started {"); t1.Start(); t2.Start(); Console.WriteLine("Main t1 t2 started }"); Console.WriteLine("Main wait t1 t2 end {"); Task.WaitAll(t1, t2); Console.WriteLine("Main wait t1 t2 end }"); foreach (var pair in _TestDictionary) { Console.WriteLine(pair.Key + " : " + pair.Value); } System.Collections.Generic.IEnumerator<System.Collections.Generic.KeyValuePair<int, int>> enumer = _TestDictionary.GetEnumerator(); while (bIsNext) { bIsNext = enumer.MoveNext(); Console.WriteLine("Key: " + enumer.Current.Key + " Value: " + enumer.Current.Value); _TestDictionary.TryRemove(enumer.Current.Key, out nValue); } Console.WriteLine("\n\nDictionary Count: " + _TestDictionary.Count); Console.WriteLine("Sample 4-5 Main }"); Console.ReadKey(); } } }