- 一、並行編程 - 數據並行 System.Threading.Tasks.Parallel 類
- 二、並行編程 - Task任務
- 三、並行編程 - Task同步機制。TreadLocal類、Lock、Interlocked、Synchronization、ConcurrentQueue以及Barrier等
- 四、並行編程 - 並行LINQ(PLINQ) 的使用。AsParallel
- 五、並行編程 - 信號量
任務,基於線程池。其使我們對並行編程變得更簡單,且不用關心底層是怎么實現的。
System.Threading.Tasks.Task類是Task Programming Library(TPL)中最核心的一個類。
一、任務與線程
1:任務是架構在線程之上的,也就是說任務最終還是要拋給線程去執行。
2:任務跟線程不是一對一的關系,比如開10個任務並不是說會開10個線程,這一點任務有點類似線程池,但是任務相比線程池有很小的開銷和精確的控制。
我們用VS里面的“並行任務”看一看,快捷鍵Ctrl+D,K,或者找到“調試"->"窗口“->"並行任務“,我們在WaitAll方法處插入一個斷點,最終我們發現任務確實托管給了線程。
二、初識Task
兩種構建Task的方式,只是StartNew方法直接構建出了一個Task之后又調用了其Start方法。
Task.Factory.StartNew (() => { Console.WriteLine("Hello word!"); }); Task task =new Task
(() => { Console.WriteLine("Hello,Word!"); }); task.Start();
在Task內部執行的內容我們稱作為Task的Body,Task提供了多個初始化重載的方法。
public Task(Action action); public Task(Action<object> action,object state
);給action傳參數 public Task(Action action, CancellationToken cancellationToken); public Task(Action action, TaskCreationOptions creationOptions);
例如使用了重載方法的State參數:
Task task2 = new Task((obj ) =>
{ Console.WriteLine("Message: {0}", obj); },"Say \"Hello\" from task2
"); task2.Start();
補充細節
在創建Task的時候,Task有很多的構造函數的重載,一個主要的重載就是傳入TaskCreateOptions的枚舉:
- TaskCreateOptions.None:用默認的方式創建一個Task
- TaskCreateOptions.PreferFairness:請求scheduler盡量公平的執行Task(后續文章會將是,Task和線程一樣,有優先級的)
- TaskCreateOptions.LongRunning:聲明Task將會長時間的運行。
- TaskCreateOptions.AttachToParent:因為Task是可以嵌套的,所以這個枚舉就是把一個子task附加到一個父task中。
三、任務的結果
任務結束時,它可以把一些有用的狀態信總寫到共享對象中。這個共享對象必須是線程安全的。
另一個方式是使用返回某個結果的任務。使用Task類的泛型版本,就可以定義返冋某個結果的任務的返回類型。
使用返回值的Result屬性可獲取是在一個Task運行完成才會獲取的,所以task2是在task1運行完成后,才開始運行,也就是說上面的兩個result的值不管運行多少次都是不會變的。其中我們也可以通過CurrentId來獲取當前運行的Task的編號。
var loop = 0; var task1 = new Task<int>(() => { for (var i = 0; i < 1000; i++) loop += i; return loop; }); task1.Start(); var loopResut = task1.Result; var task2 = new Task<long>(obj=> { long res = 0; var looptimes = (int)obj; for (var i = 0; i < looptimes; i++) res += i; return res; },loopResut); task2.Start(); var resultTask2 = task2.Result; Console.WriteLine("任務1的結果':{0}\n任務2的結果:{1}", loopResut,resultTask2);
.NET 4.5 :Task.Run
在 .NET Framework 4.5 及更高版本(包括 .NET Core 和 .NET Standard)中,使用靜態 Task.Run 方法作為 TaskFactory.StartNew 的快捷方式。
Task.Run的跟Task.Factory.StarNew和new Task相差不多,不同的是前兩種是放進線程池立即執行,而Task.Run則是等線程池空閑后在執行。
Run方法只接受無參的Action和Func委托,另外兩個接受一個object類型的參數。
在msdn中TaskFactory.StartNew的備注信息如下:
四、連續任務
所謂的延續的Task就是在第一個Task完成后自動啟動下一個Task。我們通過ContinueWith方法來創建延續的Task。我們假設有一個接受xml解析的服務,首先從某個地方接受文件,然后解析入庫,最后發送是否解析正確的回執。在每次調用ContinueWith方法時,每次會把上次Task的引用傳入進來,以便檢測上次Task的狀態,比如我們可以使用上次Task的Result屬性來獲取返回值。
var ReceiveTask = new Task(() => ReceiveXml()); var ResolveTask = ReceiveTask .ContinueWith <bool>((r) => ResolveXml()); var SendFeedBackTask = ResolveTask.ContinueWith <string>((s) => SendFeedBack(s.Result)); ReceiveTask.Start(); Console.WriteLine(SendFeedBackTask.Result);
上面的代碼我們也可以這么寫:
var SendFeedBackTask = Task.Factory.StartNew(() => ReceiveXml()) .ContinueWith<bool>(s => ResolveXml()) .ContinueWith<string>(r => SendFeedBack(r.Result)); Console.WriteLine(SendFeedBackTask.Result);
無論前一個任務是如何結束的,前面的連續任務總是在前一個任務結束時啟動。使用 TaskContinuationOptions枚舉中的值,可以指定,連續任務只有在起始任務成功(或失敗)結束吋啟動。可能的值是 OnlyOnFaulted、NotOoFaulted、Onl)OnCanceIed、NotOnCanceled 和 OnlyOnRanToCompletion
Task t5 = t1.ContinueWith(DoOnError,
TaskContinuationOptions.OnlyOnFaulted);
五、分離嵌套任務
有些情況下我們需要創建嵌套的Task,嵌套里面又分為分離的和不分離的。其創建的方式很簡單,就是在Task的body里面創建一個新的Task。如果新的Task未指定AttachedToParent選項,那么就是分離嵌套的。我們看下面這段代碼。下面的代碼中outTask.Wait()表示等待outTask執行完成。
var outTask = Task.Factory.StartNew(() => { Console.WriteLine("Outer task beginning..."); var childTask = Task.Factory.StartNew(() => { Thread.SpinWait(3000000); Console.WriteLine("Detached nested task completed."); }); }); outTask.Wait(); Console.WriteLine("Outer task completed."); Console.ReadKey();
我們可以看到運行結果是:
六、子任務
我們將上面的代碼加上TaskCreationOptions選項:
如果父任務在子任務之前結束,父任務的狀態就顯示為WaitingForChildrenToComplete。只要子任務也結束時,父任務的狀態就變成RanToCompletion。.、當然,如果父任務用TaskCreatiooOptions 枚舉中的DetachedFromParent創建子任務時,這就無效。
var outTask = Task.Factory.StartNew(() => { Console.WriteLine("Outer task beginning..."); var childTask = Task.Factory.StartNew(() => { Thread.SpinWait(3000000); Console.WriteLine("Detached nested task completed."); },TaskCreationOptions.AttachedToParent); }); outTask.Wait(); Console.WriteLine("Outer task completed.");
看到運行結果:
七、取消任務
在4.0中給我們提供一個“取消標記”叫做CancellationTokenSource.Token,在創建task的時候傳入此參數,就可以將主線程和任務相關聯。我們通過cancellation的tokens來取消一個Task。
有點要特別注意的,當我們調用了Cancel()方法之后,.NET Framework不會強制性的去關閉運行的Task。我們自己必須去檢測之前在創建Task時候傳入的那個CancellationToken。
一旦cancel被調用,task將會拋出OperationCanceledException來中斷此任務的執行,最后將當前task的Status的IsCanceled屬性設為true。
1、在很多Task的Body里面包含循環,我們可以在輪詢的時候判斷IsCancellationRequested屬性是否為True,如果是True的話,就可以停止循環以及釋放資源,同時拋出OperationCanceledException異常出來。
2、或者在任務中設置“取消信號“叫做ThrowIfCancellationRequested,來等待主線程使用Cancel來通知。
3、檢測task是否被cancel就是調用CancellationToken.WaitHandle屬性。CancellationToken的WaitOne()方法會阻止task的運行,只有CancellationToken的cancel()方法被調用后,這種阻止才會釋放。
var cts = new CancellationTokenSource();
var ct = cts.Token;
var task = Task.Factory.StartNew(() =>
{
for (var i = 0; i < 10000000; i++)
{
if (ct.IsCancellationRequested)
{
Console.WriteLine("任務開始取消...");
throw new OperationCanceledException(ct);
}
//或者直接在檢測到異常時,扔出異常: token.ThrowIfCancellationRequested();
//或者等待 WaitHandle: token.WaitHandle.WaitOne();
}
},ct);//傳入CancellationToken作為Task第二個參數
ct.Register(() =>
{
Console.WriteLine("已經取消");
});
Thread.Sleep(5000);
cts.Cancel();//如果想要取消一個Task的運行,只要調用CancellationToken實例的Cancel()方法就可以了。
try
{
task.Wait();
}
catch (AggregateException e)
{
foreach (var v in e.InnerExceptions)
Console.WriteLine("msg: " + v.Message);
}
八、休眠:等待時間執行
在TPL中我們可以通過三種方式進行等待,一是通過CancellTaken的WaitHanle進行等待、第二種則是通過傳統的Tread.Sleep方法、第三種則通過Thread.SpainWait方法。
1、CancellToken方式:每次我們等待十秒鍾之后,再進行下次輸出。
有一點要注意:WaitOne()方法只有在設定的時間間隔到了,或者Cancel方法被調用,此時task才會被喚醒。如果如果cancel()方法被調用而導致task被喚醒,那么CancellationToken.WaitHandle.WaitOne()方法就會返回true,如果是因為設定的時間到了而導致task喚醒,那么CancellationToken.WaitHandle.WaitOne()方法返回false。
var cts = new CancellationTokenSource(); var ct = cts.Token; var task = new Task(() => { for (var i = 0; i < 100000; i++) { var cancelled = ct.WaitHandle.WaitOne(1000 ); Console.WriteLine(" {0}. Cancelled? {1}", i, cancelled); if (cancelled) { throw new OperationCanceledException(ct); } } }, ct); task.Start();
2、上面的功能如果我們要是通過Tread.Sleep方式實現:
var task = new Task(() => { for (var i = 0; i < 100000; i++) { Thread.Sleep(10000); var cancelled =ct.IsCancellationRequested; Console.WriteLine(" {0}. Cancelled? {1}", i, cancelled); if (cancelled) { throw new OperationCanceledException(ct); } } },ct);
3、Thread.SpainWait則跟上面兩種方式完全不同,上面的兩種方式都是會在線程調度程序不考慮改線程,直等到運行結束。而Thread.SpainWait的作用實質上會將處理器置於十分緊密的循環中,主要的作用是來實現同步鎖的作用。並不常用,大部分情況下我們可以通過Lock的方式來實現。
Thread.SpinWait(10000);
九、等待任務執行
在很多時候我們也許需要等待同時開啟的幾個線程完成之后再來做其他事,在TPL中提供了幾種方式來等待任務執行。Task.Wait等待單個任務完成;Task.WaitAll等待所有的Task完成、TaskAny等在其中的任何一個或則多個任務完成。
1、Task.Wait: 等待單獨的一個Task執行完成
共有5個重載:Wait()、Wait(CancellToken)、Wait(Int32)、Wait(TimeSpan)、Wait(TimeSpan、CancellToken)。各個重載方法的含義:
- 1)Wait():等待整個任務完成或者取消或者出現異常;
- 2)Wait(CancellToken):等待任務直到CancellToken調用取消或者完成,或者出現異常;
- 3)Wait(Int32):等待任務,未完成則到指定的時間;
- 4)Wait(TimeSpan):同上;
- 5)Wait(TimeSpan、CancellToken):等待任務到指定時間,或者CancellToken調用取消或者任務完成。
static void Main(string[] args) { var tokenSource = new CancellationTokenSource(); CancellationToken token = tokenSource.Token; Task task = createTask(token,6); task.Start(); Console.WriteLine("Wait() complete."); task.Wait(); Console.WriteLine("Task Completed."); task = createTask(token,3); task.Start(); Console.WriteLine("Wait(2) secs for task to complete."); bool completed = task.Wait(2000); Console.WriteLine("Wait ended - task completed: {0}", completed); task = createTask(token,4); task.Start(); Console.WriteLine("Wait(2,token) for task to complete."); completed = task.Wait(2000, token); Console.WriteLine("Wait ended - task completed: {0} task cancelled {1}", completed, task.IsCanceled); Console.WriteLine("Main method complete. Press enter to finish."); Console.ReadLine(); } static Task createTask(CancellationToken token,int loop) { return new Task(() => { for (int i = 0; i < loop; i++) { token.ThrowIfCancellationRequested(); Console.WriteLine("Task - Int value {0}", i); token.WaitHandle.WaitOne(1000); } }, token); }
循環都會等待1秒鍾,這樣我們可以看看Wait(2000)的效果,看看運行后的效果:
從上面的例子可以看出,wait方法子task執行完成之后會返回true。
注意:當在執行的task內部拋出了異常之后,這個異常在調用wait方法時會被再次拋出。后面再"異常處理篇"會講述。
2、Task.WaitAll方法: 等待多個task
是等待所有的任務完成,也有5個重載, 也可以傳遞時間以及Token參數,進行等待時間以及取消Token的控制。
var tokenSource = new CancellationTokenSource(); CancellationToken token = tokenSource.Token; var task1 = createTask(token,2); var task2 = createTask(token, 5); task1.Start(); task2.Start(); Console.WriteLine("Waiting for tasks to complete."); Task.WaitAll(task1, task2); Console.WriteLine("Tasks Completed.");
注意:如果在等在的多個task之中,有一個task拋出了異常,那么調用WaitAll()方法時就會拋出異常。
ContinueWith結合WaitAll來玩一把
當這兩者結合起來,我們就可以玩一些復雜一點的東西,比如說現在有4個任務,其中t1需要串行,t2-t3可以並行,t4需要串行.
ConcurrentStack<int> stack = new ConcurrentStack<int>(); //t1先執行 var t1 = Task.Factory.StartNew(() => { stack.Push(1); stack.Push(2); }); //t2,t3並行執行 var t2 = t1.ContinueWith
(t => { int result; stack.TryPop(out result); }); //t2,t3並行執行 var t3 = t1.ContinueWith
(t => { int result; stack.TryPop(out result); }); //等待t2和t3執行完 Task.WaitAll(t2, t3); //t4z再執行 var t4 = Task.Factory.StartNew(() => { Console.WriteLine("當前集合元素個數:" + stack.Count); });
3、Task.WaitAny
等待任何一個任務完成,完成之后返回其完成的任務的Index:
var tokenSource = new CancellationTokenSource(); CancellationToken token = tokenSource.Token; var task1 = createTask(token,2); var task2 = createTask(token, 5); task1.Start(); task2.Start(); Console.WriteLine("Waiting for tasks to complete."); varindex
= Task.WaitAny(task1, task2); Console.WriteLine("Tasks Completed.Index is {0}",index);
十、異常處理
在TPL中,異常的觸發器主要是這幾個:
Task.Wait(), Task.WaitAll(), Task,WaitAny(),Task.Result。而在TPL出現的異常都會以AggregateException的示例拋出,我們在進行基本的異常處理時,可以通過查看AggregateException的InnerExceptions來進行內部異常的捕獲:
var tokenSource = new CancellationTokenSource(); var token = tokenSource.Token; var task1 = new Task(() => { throw new NullReferenceException() {Source
="task1"}; }); var task2 = new Task(() => { throw new ArgumentNullException("a", "a para can not be null") { Source="task2"}; }); task1.Start(); task2.Start(); try { Task.WaitAll(task1, task2); } catch(AggregateException ex) { foreach (Exception inner in ex.InnerExceptions) { Console.WriteLine("Exception type {0} from {1}", inner.GetType(), inner.Source); } }
同時,我們還可以通過Task的幾個屬性來判斷Task的狀態,如:IsCompleted, IsFaulted, IsCancelled,Exception。
另外,AggregateException中還提供了Handle方法來給我們方法來給我們處理每個內部 異常,每個異常發生時都會調用Handle傳入的delegate ,同時我們需要通過返回True,False來告訴異常是否已經被處理,比如對於OperationCanceledException我們知道是取消了Task,是肯定可以處理的:
try { Task.WaitAll(task1, task2, task3, task4); } catch(AggregateException ex) { ex.Handle((e) => { if (e is OperationCanceledException) { return true; } else { return false; } }); }
十一、執行晚加載的Task(Lazily Task)
晚加載,或者又名延遲初始化,主要的好處就是避免不必要的系統開銷。在並行編程中,可以聯合使用Lazy變量和Task<>.Factory.StartNew()做到這點。(Lazy變量時.NET 4中的一個新特性,這里大家不用知道Lazy的具體細節)。
Lazy變量只有在用到的時候才會被初始化。所以我們可以把Lazy變量和task的創建結合:只有這個task要被執行的時候才去初始化。
// do the same thing in a single statement Lazy<Task<string>> lazyData2 = new Lazy<Task<string>>( () => Task<string>.Factory.StartNew(() => { Console.WriteLine("Task body working..."); return "Task Result"; })); Console.WriteLine("Calling second lazy variable"); Console.WriteLine("Result from task: {0}", lazyData2.Value.Result);
首先我們回想一下,在之前的系列文章中我們是怎么定義一個task的:直接new,或者通過task的factory來創建,因為創建task的代碼是在main函數中的,所以只要new了一個task,那么這個task就被初始化。現在如果用了Lazy的task,那么現在我們初始化的就是那個Lazy變量了,而沒有初始化task,(初始化Lazy變量的開銷小於初始化task),只有當調用了lazyData.Value時,Lazy變量中包含的那個task才會初始化。(這里歡迎大家提出自己的理解)