說起Parallel.For大家都不會陌生,很簡單,不就是一個提供並行功能的for循環嗎? 或許大家平時使用到的差不多就是其中最簡單的那個重載方法,而真實情況
下Parallel.For里面有14個重載,而其中那些比較復雜的重載方法,或許還有同學還不知道怎么用呢~~~ 剛好我最近我有應用場景了,給大家介紹介紹,廢話不多說,
先給大家看一下這個並行方法的重載一覽表吧。。。
一:遇到的場景
我遇到的場景是這樣的,項目中有這樣一個功能,這個功能需要根據多個維度對一組customerIDList進行篩選,最后求得多個維度所篩選出客戶的並集,我舉個
例子:現有8個維度:
1. 交易行為
2.營銷活動
3.地區
4.新老客戶
5.營銷渠道
6.客戶屬性
7.客戶分組
8.商品
每個維度都能篩選出一批customerid出來,然后對8組customerid求並集,這種場景很明顯要提升性能的話,你必須要做並行處理,當然能夠實現的方式有很多種,
比如我定義8個task<T>,然后使用WaitAll等待一下,最后再累計每個Result的結果就可以了,代碼如下:
1 class Program 2 { 3 static void Main(string[] args) 4 { 5 List<string> rankList = Enum.GetNames(typeof(FilterType)).ToList(); 6 7 Task<HashSet<int>>[] tasks = new Task<HashSet<int>>[rankList.Count]; 8 9 var hashCustomerIDList = new HashSet<int>(); //求customerid的並集 10 11 for (int i = 0; i < tasks.Length; i++) 12 { 13 tasks[i] = Task.Factory.StartNew<HashSet<int>>((obj) => 14 { 15 //業務方法,耗損性能中。。。 16 var smallCustomerIDHash = GetXXXMethod(rankList[(int)obj]); 17 18 return smallCustomerIDHash; 19 }, i); 20 } 21 22 Task.WaitAll(tasks); 23 24 foreach (var task in tasks) 25 { 26 foreach (var item in task.Result) 27 { 28 hashCustomerIDList.Add(item); 29 } 30 } 31 } 32 33 static HashSet<int> GetXXXMethod(string rank) 34 { 35 return new HashSet<int>(); 36 } 37 38 public enum FilterType 39 { 40 交易行為 = 1, 41 營銷活動 = 2, 42 地區 = 4, 43 新老客戶 = 8, 44 營銷渠道 = 16, 45 客戶屬性 = 32, 46 客戶分組 = 64, 47 商品 = 128 48 } 49 }
上面的代碼的邏輯還是很簡單的,我使用的是Task<T>的模式,當然你也可以用void形式的Task,然后在里面lock代碼的時候對hashCustomerIDList進行
插入,實現起來也是非常簡單的,我就不演示了,那下面的問題來了,有沒有更爽更直接的方式,看人家看上去更有檔次一點的方法,而且還要達到這種效果呢?
二:Parallel.For復雜重載
回到文章開頭的話題,首先我們仔細分析一下下面這個復雜的重載方法。
1 // 2 // 摘要: 3 // 執行具有線程本地數據的 for(在 Visual Basic 中為 For)循環,其中可能會並行運行迭代,而且可以監視和操作循環的狀態。 4 // 5 // 參數: 6 // fromInclusive: 7 // 開始索引(含)。 8 // 9 // toExclusive: 10 // 結束索引(不含)。 11 // 12 // localInit: 13 // 用於返回每個任務的本地數據的初始狀態的函數委托。 14 // 15 // body: 16 // 將為每個迭代調用一次的委托。 17 // 18 // localFinally: 19 // 用於對每個任務的本地狀態執行一個最終操作的委托。 20 // 21 // 類型參數: 22 // TLocal: 23 // 線程本地數據的類型。 24 // 25 // 返回結果: 26 // 包含有關已完成的循環部分的信息的結構。 27 // 28 // 異常: 29 // T:System.ArgumentNullException: 30 // body 參數為 null。- 或 -localInit 參數為 null。- 或 -localFinally 參數為 null。 31 // 32 // T:System.AggregateException: 33 // 包含在所有線程上引發的全部單個異常的異常。 34 public static ParallelLoopResult For<TLocal>(int fromInclusive, int toExclusive, Func<TLocal> localInit, Func<int, ParallelLoopState, TLocal, TLocal> body, Action<TLocal> localFinally);
從上面的代碼區域中看,你可以看到上面提供了5個參數,而最后意思的就是后面三個,如果你對linq的擴展方法比較熟悉的話,你會發現這個其實就是一個並行版本
的累加器(Aggregate)操作,因為他們都是具有三個區域:第一個區域就是初始化區域(localInit),就是累積之前的一個初始化操作,第二個區域其實就是一個迭代
區域,說白了就是foreach/for循環,for循環之中,會把計算結果累計到當初初始化區域設置的變量中,第三個區域就是foreach/for之后的一個最終計算區,三者合起
來就是一個並行累加器,為了方便大家更好的理解,我就扒一下源碼給大家看看:
由於圖太大,就截兩張圖了,大家一定要仔細體會一下這里面的tlocal變量,因為這個tlocal的使用貫穿着三個區域,所以大家一定要好好體會下面這幾句代碼
1 TLocal tLocal = default(TLocal); 2 3 tLocal = localInit(); 4 5 while(xxx<xxx){ 6 tLocal = bodyWithLocal(num5, parallelLoopState, tLocal); 7 } 8 localFinally(tLocal);
當你理解了tLocal具有累積foreach中的item結果之后,你就應該很明白下面這個body=>(item, loop, total) 和 finally => (total) 中total的含義了,
對吧,當你明白了,然后大家可以看看下面這段代碼,是不是用一個方法就搞定了原來需要分階段實現的一個業務邏輯呢?
1 class Program 2 { 3 static void Main(string[] args) 4 { 5 List<string> rankList = Enum.GetNames(typeof(FilterType)).ToList(); 6 7 var hashCustomerIDList = new HashSet<int>(); //求customerid的並集 8 9 //並行計算 7個 維度的 總和 10 Parallel.For(0, rankList.Count, () => { return new List<int>(); }, (item, loop, total) => 11 { 12 //業務方法,耗損性能中。。。 13 var smallCustomerIDHash = GetXXXMethod(rankList[item]); 14 15 total.AddRange(smallCustomerIDHash); 16 17 return total; 18 }, (total) => 19 { 20 lock (hashCustomerIDList) 21 { 22 foreach (var customerID in total) 23 { 24 hashCustomerIDList.Add(customerID); 25 } 26 } 27 }); 28 } 29 30 static HashSet<int> GetXXXMethod(string rank) 31 { 32 return new HashSet<int>(); 33 } 34 35 public enum FilterType 36 { 37 交易行為 = 1, 38 營銷活動 = 2, 39 地區 = 4, 40 新老客戶 = 8, 41 營銷渠道 = 16, 42 客戶屬性 = 32, 43 客戶分組 = 64, 44 商品 = 128 45 } 46 }
好了,本篇就先說這么多,希望這個具有並行累加器效果的Parallel.For能夠給你帶來一絲靈感~~~