8天玩轉並行開發——第一天 Parallel的使用


 

      隨着多核時代的到來,並行開發越來越展示出它的強大威力,像我們這樣的碼農再也不用過多的關注底層線程的實現和手工控制,

要了解並行開發,需要先了解下兩個概念:“硬件線程”和“軟件線程”。

 

1. 硬件線程

    相信大家手頭的電腦都是雙核以上的,像我這樣古董的電腦都是雙核的,這樣的雙核叫做物理內核。

 

硬件線程又叫做邏輯內核,我們可以在”任務管理器“中查看”性能“標簽頁,如下圖,我們知道有2個硬件線程。

 

 

一般情況下,一個物理內核對應一個邏輯內核,比如我這里的2對2。當然如果你的cpu采用的是超線程技術,那么可能就會有4個物理內核對應

8個硬件線程,現在有很多服務器都有8個硬件線程,上午在公司的服務器上截了個圖。

我們要知道並行開發要做的事情就是將任務分攤給這些硬件線程去並行執行來達到負載和加速。

 

2. 軟件線程

    相信這個大家最熟悉了,我們知道傳統的代碼都是串行的,就一個主線程,當我們為了實現加速而開了很多工作線程,這些工作線程

也就是軟件線程。

 

好,我們知道了基本概念就ok了,在.net 4.0中,微軟給我們提供了一個新的命名空間:System.Threading.Tasks。這里面有很多好玩

的東西,作為第一篇就介紹下最基礎,最簡單的Parallel的使用。

 

 

一: Parallel的使用

在Parallel下面有三個常用的方法invoke,for和forEach。

1:  Parallel.Invoke

    這是最簡單,最簡潔的將串行的代碼並行化。

 1 class Program
2 {
3 static void Main(string[] args)
4 {
5 var watch = Stopwatch.StartNew();
6
7 watch.Start();
8
9 Run1();
10
11 Run2();
12
13 Console.WriteLine("我是串行開發,總共耗時:{0}\n", watch.ElapsedMilliseconds);
14
15 watch.Restart();
16
17 Parallel.Invoke(Run1, Run2);
18
19 watch.Stop();
20
21 Console.WriteLine("我是並行開發,總共耗時:{0}", watch.ElapsedMilliseconds);
22
23 Console.Read();
24 }
25
26 static void Run1()
27 {
28 Console.WriteLine("我是任務一,我跑了3s");
29 Thread.Sleep(3000);
30 }
31
32 static void Run2()
33 {
34 Console.WriteLine("我是任務二,我跑了5s");
35 Thread.Sleep(5000);
36 }
37 }

在這個例子中可以獲取二點信息:

第一:一個任務是可以分解成多個任務,采用分而治之的思想。

第二:盡可能的避免子任務之間的依賴性,因為子任務是並行執行,所以就沒有誰一定在前,誰一定在后的規定了。

 

2:Parallel.for

 我們知道串行代碼中也有一個for,但是那個for並沒有用到多核,而Paraller.for它會在底層根據硬件線程的運行狀況來充分的使用所有的可

利用的硬件線程,注意這里的Parallel.for的步行是1。

這里我們來演示一下,向一個線程安全的集合插入數據,當然這個集合采用原子性來實現線程同步,比那些重量級的鎖機制更加的節省消耗。

 1  class Program
2 {
3 static void Main(string[] args)
4 {
5 for (int j = 1; j < 4; j++)
6 {
7 Console.WriteLine("\n第{0}次比較", j);
8
9 ConcurrentBag<int> bag = new ConcurrentBag<int>();
10
11 var watch = Stopwatch.StartNew();
12
13 watch.Start();
14
15 for (int i = 0; i < 20000000; i++)
16 {
17 bag.Add(i);
18 }
19
20 Console.WriteLine("串行計算:集合有:{0},總共耗時:{1}", bag.Count, watch.ElapsedMilliseconds);
21
22 GC.Collect();
23
24 bag = new ConcurrentBag<int>();
25
26 watch = Stopwatch.StartNew();
27
28 watch.Start();
29
30 Parallel.For(0, 20000000, i =>
31 {
32 bag.Add(i);
33 });
34
35 Console.WriteLine("並行計算:集合有:{0},總共耗時:{1}", bag.Count, watch.ElapsedMilliseconds);
36
37 GC.Collect();
38
39 }
40 }
41 }

 

可以看的出,加速的效果還是比較明顯的。

 

3:Parallel.forEach
    forEach的獨到之處就是可以將數據進行分區,每一個小區內實現串行計算,分區采用Partitioner.Create實現。

 class Program
{
static void Main(string[] args)
{
for (int j = 1; j < 4; j++)
{
Console.WriteLine("\n第{0}次比較", j);

ConcurrentBag<int> bag = new ConcurrentBag<int>();

var watch = Stopwatch.StartNew();

watch.Start();

for (int i = 0; i < 3000000; i++)
{
bag.Add(i);
}

Console.WriteLine("串行計算:集合有:{0},總共耗時:{1}", bag.Count, watch.ElapsedMilliseconds);

GC.Collect();

bag = new ConcurrentBag<int>();

watch = Stopwatch.StartNew();

watch.Start();

Parallel.ForEach(Partitioner.Create(0, 3000000), i =>
{
for (int m = i.Item1; m < i.Item2; m++)
{
bag.Add(m);
}
});

Console.WriteLine("並行計算:集合有:{0},總共耗時:{1}", bag.Count, watch.ElapsedMilliseconds);

GC.Collect();

}
}
}

這里還是要說一下:Partitioner.Create(0, 3000000)。

第一:我們要分區的范圍是0-3000000。

第二:我們肯定想知道系統給我們分了幾個區? 很遺憾,這是系統內部協調的,無權告訴我們,當然系統也不反對我們自己指定分區個數,

        這里可以使用Partitioner.Create的第六個重載,比如這樣:Partitioner.Create(0, 3000000, Environment.ProcessorCount),

        因為 Environment.ProcessorCount能夠獲取到當前的硬件線程數,所以這里也就開了2個區。

 

下面分享下並行計算中我們可能有的疑惑?

<1> 如何中途退出並行循環?

      是的,在串行代碼中我們break一下就搞定了,但是並行就不是這么簡單了,不過沒關系,在並行循環的委托參數中提供了一個

ParallelLoopState,該實例提供了Break和Stop方法來幫我們實現。

Break: 當然這個是通知並行計算盡快的退出循環,比如並行計算正在迭代100,那么break后程序還會迭代所有小於100的。

Stop:這個就不一樣了,比如正在迭代100突然遇到stop,那它啥也不管了,直接退出。

 

下面舉個例子,當迭代到1000的時候退出循環

 1   class Program
2 {
3 static void Main(string[] args)
4 {
5 var watch = Stopwatch.StartNew();
6
7 watch.Start();
8
9 ConcurrentBag<int> bag = new ConcurrentBag<int>();
10
11 Parallel.For(0, 20000000, (i, state) =>
12 {
13 if (bag.Count == 1000)
14 {
15 state.Break();
16 return;
17 }
18 bag.Add(i);
19 });
20
21 Console.WriteLine("當前集合有{0}個元素。", bag.Count);
22
23 }
24 }

 

<2> 並行計算中拋出異常怎么處理?

 首先任務是並行計算的,處理過程中可能會產生n多的異常,那么如何來獲取到這些異常呢?普通的Exception並不能獲取到異常,然而為並行誕生的AggregateExcepation就可以獲取到一組異常。

class Program
{
static void Main(string[] args)
{
try
{
Parallel.Invoke(Run1, Run2);
}
catch (AggregateException ex)
{
foreach (var single in ex.InnerExceptions)
{
Console.WriteLine(single.Message);
}
}

Console.Read();
}

static void Run1()
{
Thread.Sleep(3000);
throw new Exception("我是任務1拋出的異常");
}

static void Run2()
{
Thread.Sleep(5000);

throw new Exception("我是任務2拋出的異常");
}
}

 

<3> 並行計算中我可以留一個硬件線程出來嗎?

  默認的情況下,底層機制會盡可能多的使用硬件線程,然而我們使用手動指定的好處是我們可以在2,4,8個硬件線程的情況下來進行測量加速比。

 class Program
{
static void Main(string[] args)
{
var bag = new ConcurrentBag<int>();

ParallelOptions options = new ParallelOptions();

//指定使用的硬件線程數為1
options.MaxDegreeOfParallelism = 1;

Parallel.For(0, 300000, options, i =>
{
bag.Add(i);
});

Console.WriteLine("並行計算:集合有:{0}", bag.Count);

}
}

 

————————————————————————————————————————————————————————————

————————————————————————————————————————————————————————————

友情提示:如果不喜歡看文章,可以移步本系列的  C#IL解讀完整視頻 【一把傘的錢哦

————————————————————————————————————————————————————————————

————————————————————————————————————————————————————————————


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM