- 一、並行編程 - 數據並行 System.Threading.Tasks.Parallel 類
- 二、並行編程 - Task任務
- 三、並行編程 - Task同步機制。TreadLocal類、Lock、Interlocked、Synchronization、ConcurrentQueue以及Barrier等
- 四、並行編程 - 並行LINQ(PLINQ) 的使用。AsParallel
- 五、並行編程 - 信號量
用於對內存中的數據做並行運算,也就是說其只支持 LINQ to Object 的並行運算
一、AsParallel(並行化)
就是在集合后加個AsParallel()。
例如:
var numbers = Enumerable.Range(0, 100); var result = numbers.AsParallel().AsOrdered().Where(i => i % 2 == 0); foreach (var i in result) Console.WriteLine(i);
下面我們模擬給ConcurrentDictionary灌入1500w條記錄,看看串行和並行效率上的差異,注意我的老爺機是2個硬件線程。
static void Main(string[] args) { var dic = LoadData(); Stopwatch watch = new Stopwatch(); watch.Start(); //串行執行 var query1 = (from n in dic.Values where n.Age > 20 && n.Age < 25 select n).ToList(); watch.Stop(); Console.WriteLine("串行計算耗費時間:{0}", watch.ElapsedMilliseconds); watch.Restart(); var query2 = (from n in dic.Values.AsParallel() where n.Age > 20 && n.Age < 25 select n).ToList(); watch.Stop(); Console.WriteLine("並行計算耗費時間:{0}", watch.ElapsedMilliseconds); Console.Read(); } public static ConcurrentDictionary<int, Student> LoadData() { ConcurrentDictionary<int, Student> dic = new ConcurrentDictionary<int, Student>(); //預加載1500w條記錄 Parallel.For(0, 15000000, (i) => { var single = new Student() { ID = i, Name = "hxc" + i, Age = i % 151, CreateTime = DateTime.Now.AddSeconds(i) }; dic.TryAdd(i, single); }); return dic; } public class Student { public int ID { get; set; } public string Name { get; set; } public int Age { get; set; } public DateTime CreateTime { get; set; } }
orderby,sum(),average()等等這些聚合函數都是實現了並行化。
二、指定並行度
這個我在前面文章也說過,為了不讓並行計算占用全部的硬件線程,或許可能要留一個線程做其他事情。
var query2 = (from n in dic.Values.AsParallel().WithDegreeOfParallelism(Environment.ProcessorCount - 1)
where n.Age > 20 && n.Age < 25 orderby n.CreateTime descending select n).ToList();
三、了解ParallelEnumerable類
首先這個類是Enumerable的並行版本,提供了很多用於查詢實現的一組方法,下圖為ParallelEnumerable類的方法,記住他們都是並行的。
ConcurrentBag<int> bag = new ConcurrentBag<int>(); var list = ParallelEnumerable.Range
(0, 10000); list.ForAll((i) => { bag.Add(i); }); Console.WriteLine("bag集合中元素個數有:{0}", bag.Count); Console.WriteLine("list集合中元素個數總和為:{0}", list.Sum()); Console.WriteLine("list集合中元素最大值為:{0}", list.Max()); Console.WriteLine("list集合中元素第一個元素為:{0}", list.FirstOrDefault());
四、plinq實現MapReduce算法
mapReduce是一個非常流行的編程模型,用於大規模數據集的並行計算,非常的牛X啊,記得mongodb中就用到了這個玩意。
- map: 也就是“映射”操作,可以為每一個數據項建立一個鍵值對,映射完后會形成一個鍵值對的集合。
- reduce:“化簡”操作,我們對這些巨大的“鍵值對集合“進行分組,統計等等。
下面我舉個例子,用Mapreduce來實現一個對age的分組統計。
static void Main(string[] args) { List<Student> list = new List<Student>() { new Student(){ ID=1, Name="jack", Age=20}, new Student(){ ID=1, Name="mary", Age=25}, new Student(){ ID=1, Name="joe", Age=29}, new Student(){ ID=1, Name="Aaron", Age=25}, }; //這里我們會對age建立一組鍵值對 var map = list.AsParallel().ToLookup(i => i.Age, count => 1); //化簡統計 var reduce = from IGrouping<int, int> singleMap in map.AsParallel() select new { Age = singleMap.Key, Count = singleMap.Count() }; ///最后遍歷 reduce.ForAll(i => { Console.WriteLine("當前Age={0}的人數有:{1}人", i.Age, i.Count); }); } public class Student { public int ID { get; set; } public string Name { get; set; } public int Age { get; set; } public DateTime CreateTime { get; set; } }
考慮一個簡單的例子,現有一個容量為1000000的單詞集,需要我們以降序列出其中出現次數超過100000的單詞(和其次數)。Map過程,使用PLINQ將集合按單詞分組,這里使用了Lookup容器接口,它與Dictionary類似,但是提供的是鍵-值集映射;Reduce過程,使用PLINQ歸約查詢即可。
某一次運行結果如下:
Word: you, Count: 142416
Word: van, Count: 115816
Word: next, Count: 110228