上一篇簡單的介紹了TDF提供的一些Block,通過對這些Block配置和組合,可以滿足很多的數據處理的場景。這一篇將繼續介紹與這些Block配置的相關類,和挖掘一些高級功能。
在一些Block的構造函數中,我們常常可以看見需要你輸入DataflowBlockOptions 類型或者它的兩個派生類型ExecutionDataflowBlockOptions 和 GroupingDataflowBlockOptions。
DataflowBlockOptions
DataflowBlockOptions有五個屬性:BoundedCapacity,CancellationToken,MaxMessagesPerTask,NameFormat和TaskScheduler。
用BoundedCapacity來限定容量
這個屬性用來限制一個Block中最多可以緩存數據項的數量,大多數Block都支持這個屬性,這個值默認是DataflowBlockOptions.Unbounded = -1,也就是說沒有限制。開發人員可以制定這個屬性設置數量的上限。那后面的新數據將會延遲。比如說用一個BufferBlock連接一個ActionBlock,如果在ActionBlock上面設置了上限,ActionBlock處理的操作速度比較慢,留在ActionBlock中的數據到達了上限,那么余下的數據將留在BufferBlock中,直到ActionBlock中的數據量低於上限。這種情況常常會發生在生產者生產的速度大於消費者速度的時候,導致的問題是內存越來越大,數據操作越來越延遲。我們可以通過一個BufferBlock連接多個ActionBlock來解決這樣的問題,也就是負載均衡。一個ActionBlock滿了,就會放到另外一個ActionBlock中去了。
用CancellationToken來取消操作
TPL中常用的類型。在Block的構造函數中放入CancellationToken,Block將在它的整個生命周期中全程監控這個對象,只要在這個Block結束運行(調用Complete方法)前,用CancellationToken發送取消請求,該Block將會停止運行,如果Block中還有沒有處理的數據,那么將不會再被處理。
用MaxMessagesPerTask控制公平性
每一個Block內部都是異步處理,都是使用TPL的Task。TDF的設計是在保證性能的情況下,盡量使用最少的任務對象來完成數據的操作,這樣效率會高一些,一個任務執行完成一個數據以后,任務對象並不會銷毀,而是會保留着去處理下一個數據,直到沒有數據處理的時候,Block才會回收掉這個任務對象。但是如果數據來自於多個Source,公平性就很難保證。從其他Source來的數據必須要等到早前的那些Source的數據都處理完了才能被處理。這時我們就可以通過MaxMessagesPerTask來控制。這個屬性的默認值還是DataflowBlockOptions.Unbounded=-1,表示沒有上限。假如這個數值被設置為1的話,那么單個任務只會處理一個數據。這樣就會帶來極致的公平性,但是將帶來更多的任務對象消耗。
用NameFormat來定義Block名稱
MSDN上說屬性NameFormat用來獲取或設置查詢塊的名稱時要使用的格式字符串。
Block的名字Name=string.format(NameFormat, block.GetType ().Name, block.Completion.Id)。所以當我們輸入”{0}”的時候,名字就是block.GetType ().Name,如果我們數據的是”{1}”,那么名字就是block.Completion.Id。如果是“{2}”,那么就會拋出異常。
用TaskScheduler來調度Block行為
TaskScheduler是非常重要的屬性。同樣這個類型來源於TPL。每個Block里面都使用TaskScheduler來調度行為,無論是源Block和目標Block之間的數據傳遞,還是用戶自定義的執行數據方法委托,都是使用的TaskScheduler。如果沒有特別設置的話,將使用TaskScheduler.Default(System.Threading.Tasks.ThreadPoolTaskScheduler)來調度。我們可以使用其他的一些繼承於TaskScheduler的類型來設置這個調度器,一旦設置了以后,Block中的所有行為都會使用這個調度器來執行。.Net Framework 4中內建了兩個Scheduler,一個是默認的ThreadPoolTaskScheduler,另一個是用於UI線程切換的SynchronizationContextTaskScheduler。如果你使用的Block設計到UI的話,那可以使用后者,這樣在UI線程切換上面將更加方便。
.Net Framework 4.5 中,還有一個類型被加入到System.Threading.Tasks名稱空間下:ConcurrentExclusiveSchedulerPair。這個類是兩個TaskScheduler的組合。它提供兩個TaskScheduler:ConcurrentScheduler和ExclusiveScheduler;我們可以把這兩個TaskScheduler構造進要使用的Block中。他們保證了在沒有排他任務的時候(使用ExclusiveScheduler的任務),其他任務(使用ConcurrentScheduler)可以同步進行,當有排他任務在運行的時候,其他任務都不能運行。其實它里面就是一個讀寫鎖。這在多個Block操作共享資源的問題上是一個很方便的解決方案。
public ActionBlock<int> readerAB1; public ActionBlock<int> readerAB2; public ActionBlock<int> readerAB3; public ActionBlock<int> writerAB1; public BroadcastBlock<int> bb = new BroadcastBlock<int>((i) => { return i; }); public void Test() { ConcurrentExclusiveSchedulerPair pair = new ConcurrentExclusiveSchedulerPair(); readerAB1 = new ActionBlock<int>((i) => { Console.WriteLine("ReaderAB1 begin handling." + " Execute Time:" + DateTime.Now); Thread.Sleep(500); } , new ExecutionDataflowBlockOptions() { TaskScheduler = pair.ConcurrentScheduler }); readerAB2 = new ActionBlock<int>((i) => { Console.WriteLine("ReaderAB2 begin handling." + " Execute Time:" + DateTime.Now); Thread.Sleep(500); } , new ExecutionDataflowBlockOptions() { TaskScheduler = pair.ConcurrentScheduler }); readerAB3 = new ActionBlock<int>((i) => { Console.WriteLine("ReaderAB3 begin handling." + " Execute Time:" + DateTime.Now); Thread.Sleep(500); } , new ExecutionDataflowBlockOptions() { TaskScheduler = pair.ConcurrentScheduler }); writerAB1 = new ActionBlock<int>((i) => { Console.ForegroundColor = ConsoleColor.Red; Console.WriteLine("WriterAB1 begin handling." + " Execute Time:" + DateTime.Now); Console.ResetColor(); Thread.Sleep(3000); } , new ExecutionDataflowBlockOptions() { TaskScheduler = pair.ExclusiveScheduler }); bb.LinkTo(readerAB1); bb.LinkTo(readerAB2); bb.LinkTo(readerAB3); Task.Run(() => { while (true) { bb.Post(1); Thread.Sleep(1000); } }); Task.Run(() => { while (true) { Thread.Sleep(6000); writerAB1.Post(1); } }); }
用MaxDegreeOfParallelism來並行處理
通常,Block中處理數據都是單線程的,一次只能處理一個數據,比如說ActionBlock中自定義的代理。使用MaxDegreeOfParallelism可以讓你並行處理這些數據。屬性的定義是最大的並行處理個數。如果定義成-1的話,那就是沒有限制。用戶需要在實際情況中選擇這個值的大小,並不是越大越好。如果是平行處理的話,還應該考慮是否有共享資源。
TDF中的負載均衡
我們可以使用Block很方便的構成一個生產者消費者的模式來處理數據。當生產者產生數據的速度快於消費者的時候,消費者Block的Buffer中的數據會越來越多,消耗大量的內存,數據處理也會延時。這時,我們可以用一個生產者Block連接多個消費者Block來解決這個問題。由於多個消費者Block一定是並行處理,所以對共享資源的處理一定要做同步處理。
使用BoundedCapacity屬性來實現
當連接多個ActionBlock的時候,可以通過設置ActionBlock的BoundedCapacity屬性。當第一個滿了,就會放到第二個,第二個滿了就會放到第三個。
public BufferBlock<int> bb = new BufferBlock<int>(); public ActionBlock<int> ab1 = new ActionBlock<int>((i) => { Thread.Sleep(1000); Console.WriteLine("ab1 handle data" + i + " Execute Time:" + DateTime.Now); } , new ExecutionDataflowBlockOptions() { BoundedCapacity = 2 }); public ActionBlock<int> ab2 = new ActionBlock<int>((i) => { Thread.Sleep(1000); Console.WriteLine("ab2 handle data" + i + " Execute Time:" + DateTime.Now); } , new ExecutionDataflowBlockOptions() { BoundedCapacity = 2 }); public ActionBlock<int> ab3 = new ActionBlock<int>((i) => { Thread.Sleep(1000); Console.WriteLine("ab3 handle data:" + i + " Execute Time:" + DateTime.Now); } , new ExecutionDataflowBlockOptions() { BoundedCapacity = 2 }); public void Test() { bb.LinkTo(ab1); bb.LinkTo(ab2); bb.LinkTo(ab3); for (int i = 0; i < 9; i++) { bb.Post(i); } }
測試代碼可以從這里下載。


![PNQFKIK2OK)}SOWA]RF(~$M PNQFKIK2OK)}SOWA]RF(~$M](/image/aHR0cHM6Ly9pbWFnZXMwLmNuYmxvZ3MuY29tL2Jsb2cvMTU3MDAvMjAxMzAzLzAxMTYxNTA0LWU5YzI2NTk4MmNiNTRkMGViODQ0ZWUxYTFiZTFlOTJkLmpwZw==.png)