在並行計算中,不可避免的會碰到多個任務共享變量,實例,集合。雖然task自帶了兩個方法:task.ContinueWith()和Task.Factory
.ContinueWhenAll()來實現任務串行化,但是這些簡單的方法遠遠不能滿足我們實際的開發需要,從.net 4.0開始,類庫給我們提供了很多
的類來幫助我們簡化並行計算中復雜的數據同步問題。
大體上分為二種:
① 並發集合類: 這個在先前的文章中也用到了,他們的出現不再讓我們過多的關注同步細節。
② 輕量級同步機制: 相對於老版本中那些所謂的重量級同步機制而言,新的機制更加節省cpu的額外開銷。
關於並發集合類沒什么好講的,如果大家熟悉非線程安全的集合,那么這些並發的集合對你來說小菜一碟,這一篇和下一篇我們仔細來玩玩這
些輕量級的同步機制。
一:Barrier(屏障同步)
1:基本概念
msdn對它的解釋是:使多個任務能夠采用並行方式依據某種算法在多個階段中協同工作。乍一看有點不懂,沒關系,我們采取提干法。
”多個任務“,”多個階段”,“協同”,仔細想想知道了,下一階段的執行必須等待上一個階段中多task全部執行完,那么我們實際中有這樣
的需求嗎?當然有的,比如我們數據庫中有100w條數據需要導入excel,為了在數據庫中加速load,我們需要開多個任務去跑,比如這
里的4個task,要想load產品表,必須等4個task都跑完用戶表才行,那么你有什么辦法可以讓task為了你兩肋插刀呢?它就是Barrier。
好,我們知道barrier叫做屏障,就像下圖中的“紅色線”,如果我們的屏障設為4個task就認為已經滿了的話,那么執行中先到的task必須等待
后到的task,通知方式也就是barrier.SignalAndWait(),屏障中線程設置操作為new Barrier(4,(i)=>{})。
啰嗦了半天,還是上下代碼說話:
1 using System.Collections.Concurrent;
2 using System.Threading.Tasks;
3 using System;
4 using System.Diagnostics;
5 using System.Collections.Generic;
6 using System.Linq;
7 using System.Threading;
8
9 class Program
10 {
11 //四個task執行
12 static Task[] tasks = new Task[4];
13
14 static Barrier barrier = null;
15
16 static void Main(string[] args)
17 {
18 barrier = new Barrier(tasks.Length, (i) =>
19 {
20 Console.WriteLine("**********************************************************");
21 Console.WriteLine("\n屏障中當前階段編號:{0}\n", i.CurrentPhaseNumber);
22 Console.WriteLine("**********************************************************");
23 });
24
25 for (int j = 0; j < tasks.Length; j++)
26 {
27 tasks[j] = Task.Factory.StartNew((obj) =>
28 {
29 var single = Convert.ToInt32(obj);
30
31 LoadUser(single);
32 barrier.SignalAndWait();
33
34 LoadProduct(single);
35 barrier.SignalAndWait();
36
37 LoadOrder(single);
38 barrier.SignalAndWait();
39 }, j);
40 }
41
42 Task.WaitAll(tasks);
43
44 Console.WriteLine("指定數據庫中所有數據已經加載完畢!");
45
46 Console.Read();
47 }
48
49 static void LoadUser(int num)
50 {
51 Console.WriteLine("當前任務:{0}正在加載User部分數據!", num);
52 }
53
54 static void LoadProduct(int num)
55 {
56 Console.WriteLine("當前任務:{0}正在加載Product部分數據!", num);
57 }
58
59 static void LoadOrder(int num)
60 {
61 Console.WriteLine("當前任務:{0}正在加載Order部分數據!", num);
62 }
63 }
2:死鎖問題
先前的例子我們也知道,屏障必須等待4個task通過SignalAndWait()來告知自己已經到達,當4個task全部達到后,我們可以通過
barrier.ParticipantsRemaining來獲取task到達狀態,那么如果有一個task久久不能到達那會是怎樣的情景呢?好,我舉個例子。
1 using System.Collections.Concurrent;
2 using System.Threading.Tasks;
3 using System;
4 using System.Diagnostics;
5 using System.Collections.Generic;
6 using System.Linq;
7 using System.Threading;
8
9 class Program
10 {
11 //四個task執行
12 static Task[] tasks = new Task[4];
13
14 static Barrier barrier = null;
15
16 static void Main(string[] args)
17 {
18 barrier = new Barrier(tasks.Length, (i) =>
19 {
20 Console.WriteLine("**********************************************************");
21 Console.WriteLine("\n屏障中當前階段編號:{0}\n", i.CurrentPhaseNumber);
22 Console.WriteLine("**********************************************************");
23 });
24
25 for (int j = 0; j < tasks.Length; j++)
26 {
27 tasks[j] = Task.Factory.StartNew((obj) =>
28 {
29 var single = Convert.ToInt32(obj);
30
31 LoadUser(single);
32 barrier.SignalAndWait();
33
34 LoadProduct(single);
35 barrier.SignalAndWait();
36
37 LoadOrder(single);
38 barrier.SignalAndWait();
39
40 }, j);
41 }
42
43 Task.WaitAll(tasks);
44
45 barrier.Dispose();
46
47 Console.WriteLine("指定數據庫中所有數據已經加載完畢!");
48
49 Console.Read();
50 }
51
52 static void LoadUser(int num)
53 {
54 Console.WriteLine("\n當前任務:{0}正在加載User部分數據!", num);
55
56 if (num == 0)
57 {
58 //num=0:表示0號任務
59 //barrier.ParticipantsRemaining == 0:表示所有task到達屏障才會退出
60 // SpinWait.SpinUntil: 自旋鎖,相當於死循環
61 SpinWait.SpinUntil(() => barrier.ParticipantsRemaining == 0);
62 }
63 }
64
65 static void LoadProduct(int num)
66 {
67 Console.WriteLine("當前任務:{0}正在加載Product部分數據!", num);
68 }
69
70 static void LoadOrder(int num)
71 {
72 Console.WriteLine("當前任務:{0}正在加載Order部分數據!", num);
73 }
74 }
我們發現程序在加載User表的時候卡住了,出現了類似死循環,這句SpinWait.SpinUntil(() => barrier.ParticipantsRemaining == 0)中
的ParticipantsRemaining==0 永遠也不能成立,導致task0永遠都不能退出,然而barrier還在一直等待task0調用SignalAndWait來結束屏障。
結果就是造成了相互等待的尷尬局面,我們下個斷點看看情況。
3:超時機制
當我們coding的時候遇到了這種問題還是很糾結的,所以我們必須引入一種“超時機制”,如果在指定的時候內所有的參與者(task)都
沒有到達屏障的話,我們就需要取消這些參與者的后續執行,幸好SignalAndWait給我們提供了超時的重載,為了能夠取消后續執行,我們
還要采用CancellationToken機制。
1 using System.Collections.Concurrent;
2 using System.Threading.Tasks;
3 using System;
4 using System.Diagnostics;
5 using System.Collections.Generic;
6 using System.Linq;
7 using System.Threading;
8
9 class Program
10 {
11 //四個task執行
12 static Task[] tasks = new Task[4];
13
14 static Barrier barrier = null;
15
16 static void Main(string[] args)
17 {
18 CancellationTokenSource cts = new CancellationTokenSource();
19
20 CancellationToken ct = cts.Token;
21
22 barrier = new Barrier(tasks.Length, (i) =>
23 {
24 Console.WriteLine("**********************************************************");
25 Console.WriteLine("\n屏障中當前階段編號:{0}\n", i.CurrentPhaseNumber);
26 Console.WriteLine("**********************************************************");
27 });
28
29 for (int j = 0; j < tasks.Length; j++)
30 {
31 tasks[j] = Task.Factory.StartNew((obj) =>
32 {
33 var single = Convert.ToInt32(obj);
34
35 LoadUser(single);
36
37 if (!barrier.SignalAndWait(2000))
38 {
39 //拋出異常,取消后面加載的執行
40 throw new OperationCanceledException(string.Format("我是當前任務{0},我拋出異常了!", single), ct);
41 }
42
43 LoadProduct(single);
44 barrier.SignalAndWait();
45
46 LoadOrder(single);
47 barrier.SignalAndWait();
48
49 }, j, ct);
50 }
51
52 //等待所有tasks 4s
53 Task.WaitAll(tasks, 4000);
54
55 try
56 {
57 for (int i = 0; i < tasks.Length; i++)
58 {
59 if (tasks[i].Status == TaskStatus.Faulted)
60 {
61 //獲取task中的異常
62 foreach (var single in tasks[i].Exception.InnerExceptions)
63 {
64 Console.WriteLine(single.Message);
65 }
66 }
67 }
68
69 barrier.Dispose();
70 }
71 catch (AggregateException e)
72 {
73 Console.WriteLine("我是總異常:{0}", e.Message);
74 }
75
76 Console.Read();
77 }
78
79 static void LoadUser(int num)
80 {
81 Console.WriteLine("\n當前任務:{0}正在加載User部分數據!", num);
82
83 if (num == 0)
84 {
85 //自旋轉5s
86 if (!SpinWait.SpinUntil(() => barrier.ParticipantsRemaining == 0, 5000))
87 return;
88 }
89
90 Console.WriteLine("當前任務:{0}正在加載User數據完畢!", num);
91 }
92
93 static void LoadProduct(int num)
94 {
95 Console.WriteLine("當前任務:{0}正在加載Product部分數據!", num);
96 }
97
98 static void LoadOrder(int num)
99 {
100 Console.WriteLine("當前任務:{0}正在加載Order部分數據!", num);
101 }
102 }
二:spinLock(自旋鎖)
我們初識多線程或者多任務時,第一個想到的同步方法就是使用lock或者Monitor,然而在4.0 之后給我們提供了另一把武器spinLock,
如果你的任務持有鎖的時間非常短,具體短到什么時候msdn也沒有給我們具體的答案,但是有一點值得確定的時,如果持有鎖的時候比較
短,那么它比那些重量級別的Monitor具有更小的性能開銷,它的用法跟Monitor很相似,下面舉個例子,Add2方法采用自旋鎖。
1 using System.Collections.Concurrent;
2 using System.Threading.Tasks;
3 using System;
4 using System.Diagnostics;
5 using System.Collections.Generic;
6 using System.Linq;
7 using System.Threading;
8
9 class Program
10 {
11 static SpinLock slock = new SpinLock(false);
12
13 static int sum1 = 0;
14
15 static int sum2 = 0;
16
17 static void Main(string[] args)
18 {
19 Task[] tasks = new Task[100];
20
21 for (int i = 1; i <= 100; i++)
22 {
23 tasks[i - 1] = Task.Factory.StartNew((num) =>
24 {
25 Add1((int)num);
26
27 Add2((int)num);
28
29 }, i);
30 }
31
32 Task.WaitAll(tasks);
33
34 Console.WriteLine("Add1數字總和:{0}", sum1);
35
36 Console.WriteLine("Add2數字總和:{0}", sum2);
37
38 Console.Read();
39 }
40
41 //無鎖
42 static void Add1(int num)
43 {
44 Thread.Sleep(100);
45
46 sum1 += num;
47 }
48
49 //自旋鎖
50 static void Add2(int num)
51 {
52 bool lockTaken = false;
53
54 Thread.Sleep(100);
55
56 try
57 {
58 slock.Enter(ref lockTaken);
59 sum2 += num;
60 }
61 finally
62 {
63 if (lockTaken)
64 slock.Exit(false);
65 }
66 }
67 }