C#(99):四、並行編程 - 並行LINQ(PLINQ) 的使用。AsParallel


用於對內存中的數據做並行運算,也就是說其只支持 LINQ to Object 的並行運算

一、AsParallel(並行化)

就是在集合后加個AsParallel()。

例如:

var numbers = Enumerable.Range(0, 100);
var result = numbers.AsParallel().AsOrdered().Where(i => i % 2 == 0);
foreach (var i in result)
   Console.WriteLine(i);

image

 

下面我們模擬給ConcurrentDictionary灌入1500w條記錄,看看串行和並行效率上的差異,注意我的老爺機是2個硬件線程。

static void Main(string[] args)
{
    var dic = LoadData();

    Stopwatch watch = new Stopwatch();

    watch.Start();

    //串行執行
    var query1 = (from n in dic.Values
                  where n.Age > 20 && n.Age < 25
                  select n).ToList();

    watch.Stop();

    Console.WriteLine("串行計算耗費時間:{0}", watch.ElapsedMilliseconds);

    watch.Restart();

    var query2 = (from n in dic.Values.AsParallel()
                  where n.Age > 20 && n.Age < 25
                  select n).ToList();

    watch.Stop();

    Console.WriteLine("並行計算耗費時間:{0}", watch.ElapsedMilliseconds);

    Console.Read();
}

public static ConcurrentDictionary<int, Student> LoadData()
{
    ConcurrentDictionary<int, Student> dic = new ConcurrentDictionary<int, Student>();

    //預加載1500w條記錄
    Parallel.For(0, 15000000, (i) =>
    {
        var single = new Student()
        {
            ID = i,
            Name = "hxc" + i,
            Age = i % 151,
            CreateTime = DateTime.Now.AddSeconds(i)
        };
        dic.TryAdd(i, single);
    });

    return dic;
}

public class Student
{
    public int ID { get; set; }

    public string Name { get; set; }

    public int Age { get; set; }

    public DateTime CreateTime { get; set; }
}

 orderby,sum(),average()等等這些聚合函數都是實現了並行化。

二、指定並行度

這個我在前面文章也說過,為了不讓並行計算占用全部的硬件線程,或許可能要留一個線程做其他事情。

var query2 = (from n in dic.Values.AsParallel().WithDegreeOfParallelism(Environment.ProcessorCount - 1)
where n.Age > 20 && n.Age < 25
                        orderby n.CreateTime descending
                        select n).ToList();

三、了解ParallelEnumerable類

首先這個類是Enumerable的並行版本,提供了很多用於查詢實現的一組方法,下圖為ParallelEnumerable類的方法,記住他們都是並行的。

 image

ConcurrentBag<int> bag = new ConcurrentBag<int>();
 var list = ParallelEnumerable.Range
(0, 10000);
 list.ForAll((i) =>
 {
     bag.Add(i);
 });

 Console.WriteLine("bag集合中元素個數有:{0}", bag.Count);
 Console.WriteLine("list集合中元素個數總和為:{0}", list.Sum());
 Console.WriteLine("list集合中元素最大值為:{0}", list.Max());
 Console.WriteLine("list集合中元素第一個元素為:{0}", list.FirstOrDefault());

四、plinq實現MapReduce算法

mapReduce是一個非常流行的編程模型,用於大規模數據集的並行計算,非常的牛X啊,記得mongodb中就用到了這個玩意。

  • map:  也就是“映射”操作,可以為每一個數據項建立一個鍵值對,映射完后會形成一個鍵值對的集合。
  • reduce:“化簡”操作,我們對這些巨大的“鍵值對集合“進行分組,統計等等。

下面我舉個例子,用Mapreduce來實現一個對age的分組統計。

static void Main(string[] args)
{
    List<Student> list = new List<Student>()
    {
        new Student(){ ID=1, Name="jack", Age=20},
        new Student(){ ID=1, Name="mary", Age=25},
        new Student(){ ID=1, Name="joe", Age=29},
        new Student(){ ID=1, Name="Aaron", Age=25},
    };

    //這里我們會對age建立一組鍵值對
    var map = list.AsParallel().ToLookup(i => i.Age, count => 1);

    //化簡統計
    var reduce = from IGrouping<int, int> singleMap
                 in map.AsParallel()
                 select new
                 {
                     Age = singleMap.Key,
                     Count = singleMap.Count()
                 };

    ///最后遍歷
    reduce.ForAll(i =>
    {
        Console.WriteLine("當前Age={0}的人數有:{1}人", i.Age, i.Count);
    });
}

public class Student
{
    public int ID { get; set; }

    public string Name { get; set; }

    public int Age { get; set; }

    public DateTime CreateTime { get; set; }
}

image

考慮一個簡單的例子,現有一個容量為1000000的單詞集,需要我們以降序列出其中出現次數超過100000的單詞(和其次數)。Map過程,使用PLINQ將集合按單詞分組,這里使用了Lookup容器接口,它與Dictionary類似,但是提供的是鍵-值集映射;Reduce過程,使用PLINQ歸約查詢即可。

image

某一次運行結果如下:

Word: you, Count: 142416
Word: van, Count: 115816
Word: next, Count: 110228


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM