承接上一篇,我們繼續說下.net4.0中的同步機制,是的,當出現了並行計算的時候,輕量級別的同步機制應運而生,在信號量這一塊
出現了一系列的輕量級,今天繼續介紹下面的3個信號量 CountdownEvent,SemaphoreSlim,ManualResetEventSlim。
一:CountdownEvent
這種采用信號狀態的同步基元非常適合在動態的fork,join的場景,它采用“信號計數”的方式,就比如這樣,一個麻將桌只能容納4個
人打麻將,如果后來的人也想搓一把碰碰運氣,那么他必須等待直到麻將桌上的人走掉一位。好,這就是簡單的信號計數機制,從技術角
度上來說它是定義了最多能夠進入關鍵代碼的線程數。
但是CountdownEvent更牛X之處在於我們可以動態的改變“信號計數”的大小,比如一會兒能夠容納8個線程,一下又4個,一下又10個,
這樣做有什么好處呢?還是承接上一篇文章所說的,比如一個任務需要加載1w條數據,那么可能出現這種情況。
加載User表: 根據user表的數據量,我們需要開5個task。
加載Product表: 產品表數據相對比較多,計算之后需要開8個task。
加載order表: 由於我的網站訂單豐富,計算之后需要開12個task。
先前的文章也說了,我們需要協調task在多階段加載數據的同步問題,那么如何應對這里的5,8,12,幸好,CountdownEvent給我們提供了
可以動態修改的解決方案。
1 using System.Collections.Concurrent; 2 using System.Threading.Tasks; 3 using System; 4 using System.Diagnostics; 5 using System.Collections.Generic; 6 using System.Linq; 7 using System.Threading; 8 9 class Program 10 { 11 //默認的容納大小為“硬件線程“數 12 static CountdownEvent cde = new CountdownEvent(Environment.ProcessorCount); 13 14 static void Main(string[] args) 15 { 16 //加載User表需要5個任務 17 var userTaskCount = 5; 18 19 //重置信號 20 cde.Reset(userTaskCount); 21 22 for (int i = 0; i < userTaskCount; i++) 23 { 24 Task.Factory.StartNew((obj) => 25 { 26 LoadUser(obj); 27 }, i); 28 } 29 30 //等待所有任務執行完畢 31 cde.Wait(); 32 33 Console.WriteLine("\nUser表數據全部加載完畢!\n"); 34 35 //加載product需要8個任務 36 var productTaskCount = 8; 37 38 //重置信號 39 cde.Reset(productTaskCount); 40 41 for (int i = 0; i < productTaskCount; i++) 42 { 43 Task.Factory.StartNew((obj) => 44 { 45 LoadProduct(obj); 46 }, i); 47 } 48 49 cde.Wait(); 50 51 Console.WriteLine("\nProduct表數據全部加載完畢!\n"); 52 53 //加載order需要12個任務 54 var orderTaskCount = 12; 55 56 //重置信號 57 cde.Reset(orderTaskCount); 58 59 for (int i = 0; i < orderTaskCount; i++) 60 { 61 Task.Factory.StartNew((obj) => 62 { 63 LoadOrder(obj); 64 }, i); 65 } 66 67 cde.Wait(); 68 69 Console.WriteLine("\nOrder表數據全部加載完畢!\n"); 70 71 Console.WriteLine("\n(*^__^*) 嘻嘻,恭喜你,數據全部加載完畢\n"); 72 73 Console.Read(); 74 } 75 76 static void LoadUser(object obj) 77 { 78 try 79 { 80 Console.WriteLine("當前任務:{0}正在加載User部分數據!", obj); 81 } 82 finally 83 { 84 cde.Signal(); 85 } 86 } 87 88 static void LoadProduct(object obj) 89 { 90 try 91 { 92 Console.WriteLine("當前任務:{0}正在加載Product部分數據!", obj); 93 } 94 finally 95 { 96 cde.Signal(); 97 } 98 } 99 100 static void LoadOrder(object obj) 101 { 102 try 103 { 104 Console.WriteLine("當前任務:{0}正在加載Order部分數據!", obj); 105 } 106 finally 107 { 108 cde.Signal(); 109 } 110 } 111 }
我們看到有兩個主要方法:Wait和Signal。每調用一次Signal相當於麻將桌上走了一個人,直到所有人都搓過麻將wait才給放行,這里同樣要
注意也就是“超時“問題的存在性,尤其是在並行計算中,輕量級別給我們提供了”取消標記“的機制,這是在重量級別中不存在的,比如下面的
重載public bool Wait(int millisecondsTimeout, CancellationToken cancellationToken),具體使用可以看前一篇文章的介紹。
二:SemaphoreSlim
在.net 4.0之前,framework中有一個重量級的Semaphore,人家可以跨進程同步,咋輕量級不行,msdn對它的解釋為:限制可同時訪問
某一資源或資源池的線程數。關於它的重量級demo,我的上一個系列有演示,你也可以理解為CountdownEvent是 SemaphoreSlim的功能加
強版,好了,舉一個輕量級使用的例子。
1 using System.Collections.Concurrent; 2 using System.Threading.Tasks; 3 using System; 4 using System.Diagnostics; 5 using System.Collections.Generic; 6 using System.Linq; 7 using System.Threading; 8 9 class Program 10 { 11 static SemaphoreSlim slim = new SemaphoreSlim(Environment.ProcessorCount, 12); 12 13 static void Main(string[] args) 14 { 15 for (int i = 0; i < 12; i++) 16 { 17 Task.Factory.StartNew((obj) => 18 { 19 Run(obj); 20 }, i); 21 } 22 23 Console.Read(); 24 } 25 26 static void Run(object obj) 27 { 28 slim.Wait(); 29 30 Console.WriteLine("當前時間:{0}任務 {1}已經進入。", DateTime.Now, obj); 31 32 //這里busy3s中 33 Thread.Sleep(3000); 34 35 slim.Release(); 36 } 37 }
同樣,防止死鎖的情況,我們需要知道”超時和取消標記“的解決方案,像SemaphoreSlim這種定死的”線程請求范圍“,其實是降低了擴展性,
所以說,試水有風險,使用需謹慎,在覺得有必要的時候使用它。
三: ManualResetEventSlim
相信它的重量級別大家都知道是ManualReset,而這個輕量級別采用的是"自旋等待“+”內核等待“,也就是說先采用”自旋等待的方式“等待,
直到另一個任務調用set方法來釋放它。如果遲遲等不到釋放,那么任務就會進入基於內核的等待,所以說如果我們知道等待的時間比較短,采
用輕量級的版本會具有更好的性能,原理大概就這樣,下面舉個小例子。
1 using System.Collections.Concurrent; 2 using System.Threading.Tasks; 3 using System; 4 using System.Diagnostics; 5 using System.Collections.Generic; 6 using System.Linq; 7 using System.Threading; 8 9 class Program 10 { 11 //2047:自旋的次數 12 static ManualResetEventSlim mrs = new ManualResetEventSlim(false, 2047); 13 14 static void Main(string[] args) 15 { 16 17 for (int i = 0; i < 12; i++) 18 { 19 Task.Factory.StartNew((obj) => 20 { 21 Run(obj); 22 }, i); 23 } 24 25 Console.WriteLine("當前時間:{0}我是主線程{1},你們這些任務都等2s執行吧:\n", 26 DateTime.Now, 27 Thread.CurrentThread.ManagedThreadId); 28 Thread.Sleep(2000); 29 30 mrs.Set(); 31 32 Console.Read(); 33 } 34 35 static void Run(object obj) 36 { 37 mrs.Wait(); 38 39 Console.WriteLine("當前時間:{0}任務 {1}已經進入。", DateTime.Now, obj); 40 } 41 }