異步編程:.NET4.X 數據並行


任務並行庫 (TPL) .NET Framework 4 System.Threading System.Threading.Tasks 命名空間中的一組公共類型和 APITPL的目的在於簡化向應用程序中添加並行性和並發性的過程,從而提高開發人員的工作效率。TPL會動態地按比例調節並發程度,以便最有效地使用所有可用的處理器。此外,TPL還處理工作分區、ThreadPool 上的線程調度、取消支持、狀態管理以及其他低級別的細節操作。通過使用TPL,您可以在將精力集中於程序要完成的工作,同時最大程度地提高代碼的性能。

.NET Framework 4 開始,TPL 是編寫多線程代碼和並行代碼的首選方法。但是,並不是所有代碼都適合並行化;例如,如果某個循環在每次迭代時只執行少量工作,或它在很多次迭代時都不運行,那么並行化的開銷可能導致代碼運行更慢。此外,像任何多線程代碼一樣,並行化會增加程序執行的復雜性(容易產生bug)。盡管 TPL 簡化了多線程方案,但我們建議您對線程處理概念(例如,鎖、死鎖和爭用條件《異步編程:同步基元對象(上)》)進行基本的了解,以便能夠有效地使用 TPL

 

示例:異步編程:.NET4.X數據並行.rar

傳送門:異步編程系列目錄……

 

本主題分兩部分講解:

《異步編程:.NET4.X任務並行》

《異步編程:.NET4.X 數據並行》,本節所述內容:

 

並發與並行

       首先,我們要理解並發與並行的區別:

       在理解和設計支持多核操作時,我們可以借用甘特圖來幫助我們更清晰地知道多任務的運行情況,常用的甘特圖軟件有:GanttProject 、翰文橫道圖編制系統、Microsoft Office Project

       並發:一個處理器在“同一時段(時間間隔)”處理多個任務,各個任務之間快速交替執行。如圖:

       image

並行:多個處理器或者多核的處理器“同一時刻(時間點)”處理多個不同的任務。並行是真正的細粒度上的同時進行,既同一時間點上同時發生着多個並發。並行一定是並發,而並發不一定是並行。如圖:

  image

 

 

數據並行

數據並行是指對源集合或數組中的元素同時(即並行)執行相同操作的情況。

 

先稍微了解下ActionFunc委托,此兩委托由微軟提供;Action是一個沒有返回參數的委托,Func是一個有返回值的委托。

 

              並行循環             

當並行循環運行時,TPL會將數據源按照內置的分區算法(或者你可以自定義一個分區算法)將數據划分為多個不相交的子集,然后,從線程池中選擇線程並行地處理這些數據子集,每個線程只負責處理一個數據子集。在后台,任務計划程序將根據系統資源和工作負荷來對任務進行分區。如有可能,計划程序會在工作負荷變得不平衡的情況下在多個線程和處理器之間重新分配工作。

在對任何代碼(包括循環)進行並行化時,一個重要的目標是利用盡可能多的處理器,但不要過度並行化到使行處理的開銷讓任何性能優勢消耗殆盡的程度。比如:對於嵌套循環,只會對外部循環進行並行化,原因是不會在內部循環中執行太多工作。少量工作和不良緩存影響的組合可能會導致嵌套並行循環的性能降低。

由於循環體是並行運行的,迭代范圍的分區是根據可用的邏輯內核數、分區大小以及其他因素動態變化的,因此無法保證迭代的執行順序。

 

1.        Parallel.For

為固定數目的獨立For循環迭代提供了負載均衡的潛在並行執行。Parallel內部通過RangeManger對象實現負載均衡。

負載均衡的執行會嘗試將工作分發在不同的任務中,這樣所有的任務在大部分時間內部可以保持繁忙。負載均衡總是試圖減少任務的閑置時間。

    public static ParallelLoopResult For(int fromInclusive, int toExclusive
        , ParallelOptions parallelOptions, Action<int, ParallelLoopState> body);
        // 執行具有線程本地數據的 for 循環,泛型類型參數TLocal為本地線程數據類型。
    public static ParallelLoopResult For<TLocal>(int fromInclusive, int toExclusive
        , ParallelOptions parallelOptions
        , Func<TLocal> localInit
        , Func<int, ParallelLoopState, TLocal, TLocal> body
        , Action<TLocal> localFinally
        );

參數:

1)        返回ParallelLoopResult結構

System.Threading.Tasks.ParallelLoopResult是結構體,當所有線程均已完成時,For 將返回 ParallelLoopResult 對象。若你手動停止或中斷循環迭代時,此返回值特別有用,因為 ParallelLoopResult 存儲諸如完成運行的最后一個迭代等信息。

    public struct ParallelLoopResult
    {
        // 獲取該循環是否已運行完成。
        public bool IsCompleted { get; }
        // 獲取從中調用 ParallelLoopState.Break() 的最低迭代的索引。
        public long? LowestBreakIteration { get; }
}

l  如果 IsCompleted 返回 true,該循環的所有迭代均已執行,並且該循環沒有收到提前結束的請求.

l  如果 IsCompleted 返回 false

                                       i.              LowestBreakIteration 返回 null,則為調用 ParallelLoopState.Stop() 提前結束循環。

                                     ii.              LowestBreakIteration 返回非 null 整數值,則為調用 ParallelLoopState.Break() 提前結束循環。

2)        迭代范圍

對於迭代范圍(fromInclusive<= x <toExclusive)中的每個值調用一次body委托。如果 fromInclusive 大於或等於 toExclusive,則該方法立即返回,而無需執行任何迭代。

3)        ParallelOptions類型

ParallelOptions實例存儲用於配置 Parallel 類的方法的操作的選項。

public class ParallelOptions 
{
    public ParallelOptions();
    // 獲取或設置與此 ParallelOptions 實例關聯的 CancellationToken。
    public CancellationToken CancellationToken { get; set; }
    // 獲取或設置此 ParallelOptions 實例所允許的最大並行度。
    public int MaxDegreeOfParallelism { get; set; }
    // 獲取或設置與此 ParallelOptions 實例關聯的 TaskScheduler。
    public TaskScheduler TaskScheduler { get; set; }
}

a)        提供一個無參數的構造函數,此構造函數使用默認值初始化實例。MaxDegreeOfParallelism 初始化為 -1,表示並行量沒有上限設置;CancellationToken 初始化為CancellationToken.None不可取消的標記;TaskScheduler 初始化為默認計划程序 (TaskScheduler.Default)(CancellationToken取消示例請看章協作式取消)

b)        指定最大並行度

有時候,你並不希望在並行循環中使用所有的內核,因為你對剩余的內核有特定的需求和更好的使用計划。

通常指定Environment.ProcessorCount,或者是根據此值計算出來的值(egEnvironment.ProcessorCount-1)。默認情況下,如果沒有指定最大並行度,TPL就會允許通過啟發式算法提高或降低線程的數目,通常這樣會高於ProcessorCount,因為這樣可以更好地支持CPUI/O混合型的工作負荷。

4)        ParallelLoopState類型

可用來使 Tasks.Parallel 循環的迭代與其他迭代交互,並為 Parallel 類的循環提供提前退出循環的功能。此類的實例不要自行創建,它由 Parallel 類創建並提供給每個循環項,並且只應該在提供此實例的“循環內部”使用。

    public class ParallelLoopState
    {
        // 獲取循環的任何迭代是否已引發相應迭代未處理的異常。
        public bool IsExceptional { get; }
        // 獲取循環的任何迭代是否已調用 ParallelLoopState.Stop()。
        public bool IsStopped { get; }
        // 獲取在Parallel循環中調用 ParallelLoopState.Break() 的最低循環迭代。
        public long? LowestBreakIteration { get; }
        // 獲取循環的當前迭代是否應基於此迭代或其他迭代發出的請求退出。
        public bool ShouldExitCurrentIteration { get; }

        //通知Parallel循環當前迭代”之后”的其他迭代不需要運行。
        public void Break();
        //通知Parallel循環當前迭代“之外”的所有其他迭代不需要運行。
        public void Stop();
    }

a)        Break()

Break()用於通知Parallel循環當前迭代之后的其他迭代不需要運行。例如,對於從 0 1000 並行迭代的 for 循環,如果在第 100 次迭代調用 Break(),則低於 100 的所有迭代仍會運行(即使還未開始處理),並在退出循環之前處理完。從 101 1000 中還未開啟的迭代則會被放棄。

對於已經在執行的長時間運行迭代,Break()將為已運行還未結束的迭代對應ParallelLoopResult結構的LowestBreakIteration屬性設置為調用Bread()迭代項的索引。

b)        Stop()

Stop() 用於通知Parallel循環當前迭代之外的所有其他迭代不需要運行,無論它們是位於當前迭代的上方還是下方。

對於已經在執行的長時間運行迭代,可以檢查 IsStopped屬性,在觀測到是 true 時提前退出。

Stop 通常在基於搜索的算法中使用,在找到一個結果之后就不需要執行其他任何迭代。(比如在看視頻或漫畫時自動匹配響應最快的服務器)

c)        ShouldExitCurrentIteration 屬性

當循環的迭代調用 Break Stop時,或一個迭代引發異常,或取消循環時,Parallel 類將主動嘗試禁止開始執行循環的其他迭代。但是,可能有無法阻止其他迭代啟動的情況。也可能是長時間運行的迭代已經開始執行的情況。在此類情況下,迭代可以通過顯式檢查 ShouldExitCurrentIteration 屬性,在該屬性返回 true 時停止執行。

5)        委托函數:localInitbodylocalFinally(委托中注意並行訪問問題)

a)        localInit       用於返回每個線程的本地數據的初始狀態的委托。

b)        body             將為每個迭代調用一次的委托。

c)        localFinally   用於對每個線程的本地狀態執行一個最終操作的委托。

對於參與循環執行的每個線程調用一次 localInit 委托(每個分區一個線程),並返回每個線程的初始本地狀態。這些初始狀態傳遞到每個線程上的第一個 body 調用。然后,該線程的每個后續body調用返回可能修改過的狀態值,並傳遞給下一個body調用。最后,每個線程上最后body調用的返回值傳遞給 localFinally 委托。每個線程調用一次 localFinally 委托,以對每個線程的本地狀態執行最終操作。

Parallel.For中三個委托執行流程如下:

                        i.              分區依據:Parallel.For也會為集合進行分區,分區算法由FCL內部RangeManger對象提供,以提供負載平衡。

                      ii.              RangeManger根據最大並發度將集合源拆分為多個小集合,再並行訪問其對應的RangeWorkerFindNewWork() 返回當前分區中是否還有迭代元素bool值。(FindNewWork()實現為無鎖(Interlocked)循環結構)

                    iii.              三個委托之間的變量值傳遞由內部聲明的局部變量支持。

// 整體思路:依據內置RangeManger算法分區,再由多個線程“並行”執行下面委托
// 第一步:
Action action =()=&gt;
    {
        try
        {
            localInit();

            Label_00FF:
            body();
            if(RangeWorker.FindNewWork())
            {
                Goto Lable_00FF;
            }
        }
        catch(){}
        finaly
        {
            localFinally();
        }
    }
// 第二步:再將action傳遞給Task的內部派生類ParallelForReplicatingTask,
// 根據最大並發級別(ParallelOptions.MaxDegreeOfParallelism)進行並行調用

6)        示例:(ParallelTest.cs,帶本地變量的Parallel.For

        public static void Parallel_For_Local_Test()
        { 
            int[] nums = Enumerable.Range(0, 1000000).ToArray<int>();
            long total = 0;
            ParallelLoopResult result = Parallel.For<long>(0, nums.Length,
                 () => { return 0; },
                 (j, loop, subtotal) =>
                 {
                     // 延長任務時間,更方便觀察下面得出的結論
                     Thread.SpinWait(200);
                     Console.WriteLine("當前線程ID為:{0},j為{1},subtotal為:{2}。"
                         , Thread.CurrentThread.ManagedThreadId, j.ToString(), subtotal.ToString());
                     if (j == 23)
                         loop.Break();
                     if (j > loop.LowestBreakIteration)
                     {
                         Thread.Sleep(4000);
                         Console.WriteLine("j為{0},等待4s種,用於判斷已開啟且大於阻斷迭代是否會運行完。", j.ToString());
                     }
                     Console.WriteLine("j為{0},LowestBreakIteration為:{1}", j.ToString(), loop.LowestBreakIteration);
                     subtotal += nums[j];
                     return subtotal;
                 },
                 (finalResult) => Interlocked.Add(ref total, finalResult)
            );
            Console.WriteLine("total值為:{0}", total.ToString());
            if (result.IsCompleted)
                Console.WriteLine("循環執行完畢");
            else
                Console.WriteLine("{0}"
                    , result.LowestBreakIteration.HasValue ? "調用了Break()阻斷循環." : "調用了Stop()終止循環.");
        }

         運行截圖:

image

分析一下:

a)        泛型類型參數TLocal為本地線程數據類型,本示例設置為long

b)        三個委托的參數解析body(j, loop, subtotal):首先初始委托localInit中返回了0,所以body委托中參數subtotal的初始值即為0body委托的參數j對應的是當前迭代索引,參數loop為當前迭代狀態ParallelLoopState對象;localFinally委托參數為body委托的返回值。

c)        三個委托三個階段中都可能並行運行,因此您必須同步對任何共享變量的訪問,如示例中在finally委托中使用了System.Threading.Interlocked對象。

d)        在索引為23的迭代中調用Break()后:

                        i.              索引小於23的所有迭代仍會運行(即使還未開始處理),並在退出循環之前處理完。

                      ii.              索引大於 23 的迭代若還未開啟則會被放棄;若已處於運行中則會在退出循環之前處理完。

e)        對於調用Break()之后,在任何循環迭代中訪問LowestBreakIteration屬性都會返回調用Break()的迭代對應的索引。

 

2.        Parallel.Foreach

為給定數目的獨立ForEach循環迭代提供了負載均衡的潛在並行執行。這個方法還支持自定義分區程序(抽象類Partitioner<TSource>),讓你可以完全掌控數據分發。

    // 對 System.Collections.IEnumerable 執行foreach 操作。
    public static ParallelLoopResult ForEach<TSource>(IEnumerable<TSource> source
        , ParallelOptions parallelOptions, Action<TSource, ParallelLoopState> body);
    // 對 System.Collections.IEnumerable 執行具有 64 位索引的 foreach 操作。
    public static ParallelLoopResult ForEach<TSource>(IEnumerable<TSource> source
        , ParallelOptions parallelOptions, Action<TSource,     ParallelLoopState, long> body);
    // 對 System.Collections.IEnumerable 執行具有線程本地數據的 foreach 操作。
    public static ParallelLoopResult ForEach<TSource, TLocal>(IEnumerable<TSource> source
        , ParallelOptions parallelOptions, Func<TLocal> localInit
        , Func<TSource, ParallelLoopState, TLocal, TLocal> body, Action<TLocal> localFinally);
    // 對 System.Collections.IEnumerable 執行具有線程本地數據和 64 位索引的 foreach 操作。
    public static ParallelLoopResult ForEach<TSource, TLocal>(IEnumerable<TSource> source
        , ParallelOptions parallelOptions, Func<TLocal> localInit
        , Func<TSource, ParallelLoopState, long, TLocal, TLocal> body, Action<TLocal> localFinally);

1)        如果打算要在 ForEach 方法中使用分區程序,你必須支持動態數量的分區,即:

a)        Partitioner<TSource>的派生類中重寫 GetDynamicPartitions() 方法和 SupportsDynamicPartitions屬性

b)        OrderablePartitioner<TSource>派生類中重寫GetOrderableDynamicPartitions() 方法和SupportsDynamicPartitions 屬性。

分區程序能夠在循環執行過程中隨時按需為新分區提供枚舉器。基本上,每當循環添加一個新並行任務時,它都會為該任務請求一個新分區。動態數量的分區程序在本質上也是負載平衡的。

2)        Parallel.ForEach還支持集合源為Partitioner<TSource>類型的重載,此重載不提供迭代索引。其中Partitioner<TSource>表示將一個數據源拆分成多個分區的特定方式。

    public abstract class Partitioner<TSource>     // partitioner [pa:'tiʃənə]瓜分者,分割者
    {
        protected Partitioner();
        // 獲取是否可以動態創建附加分區。
        public virtual bool SupportsDynamicPartitions { get; }
        // 將基礎集合分區成給定數目的分區,參數partitionCount為要創建的分區數。
        // 返回一個包含 partitionCount 枚舉器的列表。
        public abstract IList<IEnumerator<TSource>> GetPartitions(int partitionCount);
        // 創建一個可將基礎集合分區成可變數目的分區的對象。
        // 返回一個可針對基礎數據源創建分區的對象。
        public virtual IEnumerable<TSource> GetDynamicPartitions();
    }

示例見:CustomerPartitioner.cs

3)        Parallel.ForEach還支持集合源為OrderablePartitioner<TSource>類型的重載。OrderablePartitioner<TSource>表示將一個可排序數據源拆分成多個分區的特定方式,因此次重載提供迭代索引。

    public abstract class OrderablePartitioner<TSource> : Partitioner<TSource>
    {
        // 從派生類中的構造函數進行調用以便使用索引鍵上指定的約束初始化 OrderablePartitioner<TSource>
        protected OrderablePartitioner(bool keysOrderedInEachPartition
            , bool keysOrderedAcrossPartitions, bool keysNormalized);
        // 獲取是否按鍵增加的順序生成每個分區中的元素。
        public bool KeysOrderedInEachPartition { get; }
        // 獲取前一分區中的元素是否始終排在后一分區中的元素之前。
        public bool KeysOrderedAcrossPartitions { get; }
        // 獲取是否規范化順序鍵。如果為 true,則所有順序鍵為 [0 .. numberOfElements-1]。
        // 如果為 false,順序鍵仍必須互不相同,但只考慮其相對順序,而不考慮其絕對值。
        public bool KeysNormalized { get; }

        // 將基礎集合分區成給定數目的可排序分區。
        public override IList<IEnumerator<TSource>> GetPartitions(int partitionCount);
        // 創建一個可將基礎集合分區成可變數目的分區的對象。
        public override IEnumerable<TSource> GetDynamicPartitions();
        // 創建一個可將基礎集合分區成可變數目的分區的對象。
        public virtual IEnumerable<KeyValuePair<long, TSource>> GetOrderableDynamicPartitions();
        // 將基礎集合分區成指定數目的可排序分區。
        public abstract IList<IEnumerator<KeyValuePair<long, TSource>>> GetOrderablePartitions(int partitionCount);
    }

三個bool值為true所要遵循的規則:

a)        KeysOrderedInEachPartition :每個分區返回具有不斷增加的鍵索引的元素。

b)        KeysOrderedAcrossPartitions :對於返回的所有分區,分區 i 中的鍵索引大於分區 i-1 中的鍵索引。

c)        KeysNormalized :所有鍵索引將從零開始單調遞增(沒有間隔)。

示例見:CustomerOrderablePartitioner.cs

4)        ForEach中的3個委托調用流程:(委托中注意並行訪問問題)

a)         對於Parallel.ForEach()使用IEnumerable<TSource>集合重載的循環,會轉化為Parallel.For()循環調用邏輯。

b)         對於使用OrderablePartitioner<TSource>Partitioner<TSource>派生類構造的自定義分區的循環邏輯如下:

                                      i.              分區依據:由OrderablePartitioner<TSource>Partitioner<TSource>派生類提供自定義分區算法,注意要重寫動態數量分區相關方法。

                                    ii.              在各個線程中,先取緩存中的enumerator,若沒有才會獲取動態分區(即每個線程的動態分區只會獲取一次)

                                  iii.              三個委托之間的變量值傳遞由內部聲明局部變量支持。

// 總體思路:依據自定義算法分區,再由多個線程“並行”執行下面代碼
// 第一步:
Action action = ()=>
{
    try
    {
         localInit();

         // 在各個線程中,先取緩存中的enumerator,若沒有才會獲取動態分區(即每個線程的動態分區只會獲取一次)
        var enumerator = OrderablePartitioner<TSource>.GetOrderableDynamicPartitions();
        // 若為Partitioner<TSource>對象,則var enumerator =         Partitioner<TSource>.GetDynamicPartitions();
        while(enumerator.MoveNext())
        {
            body();
        }
    }
    catch(){}
    finaly
    {
         localFinally();
    }
}
// 第二步:再將action傳遞給Task的內部派生類ParallelForReplicatingTask,
// 它根據最大並發級別(ParallelOptions. MaxDegreeOfParallelism)進行並行調用. 

5)        分析一個重載

public static ParallelLoopResult ForEach<TSource, TLocal>(IEnumerable<TSource> source
    , ParallelOptions parallelOptions, Func<TLocal> localInit
    , Func<TSource, ParallelLoopState, long, TLocal, TLocal> body, Action<TLocal> localFinally);

a)        返回ParallelLoopResult結構;泛型參數TSource指定集合源元素的類型,泛型參數TLocal指定線程本地變量的類型。

b)        Func<TSource, ParallelLoopState, long, TLocal, TLocal> body委托參數解析:TSource為集合迭代特定項;ParallelLoopState為循環迭代項狀態;long為迭代索引;第一個TLocallocalInit委托返回的初始值;第二個TLocalbody委托自身返回值類型。

示例:(詳見:ParallelTest.cs

            int[] nums = Enumerable.Range(100, 1000000).ToArray<int>();
            long total = 0;
            Parallel.ForEach<int, long>(nums,
                 () => { return 0; },
                 (j, loop, index, subtotal) =>
                 {
                     subtotal += j;
                     Console.WriteLine("索引為{0},當前項值為{1}.", index.ToString(), j.ToString());
                     return subtotal;
                 },
                 (finalResult) => Interlocked.Add(ref total, finalResult)
            );

6)        ForEach 用於非泛型集合

可以用System.Linq命名空間中IEnumerable擴展API Cast<TResult>() 方法將集合轉換為泛型集合。

// 擴展API
public static class Enumerable
{
    ……
    public static IEnumerable<TResult> Cast<TResult>(this IEnumerable source);
}
// 示例
System.Collections.ArrayList fruits = new System.Collections.ArrayList();
fruits.Add("apple");
fruits.Add("mango");
IEnumerable<string> query = fruits.Cast<string>();

 

              Parallel.Invoke              

對給定的獨立任務提供潛在的並行執行。

public static void Invoke(params Action[] actions);
public static void Invoke(ParallelOptions parallelOptions, params Action[] actions);

Invoke內部通過Task.Factory.StartNew()來為每個委托參數創建並開啟任務並且在最后調用Task.WaitAll(Tasks[])來等待所有任務執行完成,所以此方法在每個提供的操作都完成后才會返回,與完成是因為正常終止還是異常終止無關。

       注意點:

1)        如果使用Parallel.Invoke加載運行委托的時間迥異,那么依需要最長時間的委托才能返回控制;並且還要考慮邏輯內核的使用情況,因為可能出現有單獨一個委托被延遲到后面單獨執行。

2)        在並行可擴展方面具有一定的局限性,因為Parallel.Invoke調用的是固定數目的委托。

3)        不能保證操作的執行順序或是否並行執行。

 

分區程序

若要對數據源操作進行並行化,其中一個必要步驟是將源分區為可由多個線程同時訪問的多個部分。

1.        Parallel支持的兩種分區程序:

1)        默認分區程序:”PLINQ並行查詢並行循環提供了默認的分區程序,該分區程序將以透明方式工作,即Parallel.For() 中提到的RangeManage分區對象。

2)        自定義分區程序:在某些情況下(eg:一個自定義集合類,根據您對該類的內部結構的了解,您能夠采用比默認分區程序更有效的方式對其進行分區。或者,根據您對在源集合中的不同位置處理元素所花費時間的了解,您可能需要創建大小不同的范圍分區),可能值得通過繼承OrderablePartitioner<TSource>  Partitioner<TSource>抽象類實現您自己的分區程序。

2.        兩種分區類型

1)        按范圍分區(屬於靜態數量的分區):

a)        適用於數據和其他已建立索引的集合源(egIList集合);

b)        並行循環或PLINQ查詢中的每個線程或任務分別接收唯一的開始和結束索引,以便在處理集合源時,不會覆蓋任何其他線程或被任何其他線程覆蓋;

c)        同步開銷:涉及的唯一同步開銷是創建范圍的初始工作;

d)        缺點:如果一個線程提前完成,它將無法幫助其他線程完成它們的工作。

示例關鍵代碼:

var rangePartitioner = Partitioner.Create(0, source.Length);
double[] results = new double[source.Length];
Parallel.ForEach(rangePartitioner, (range, loopState) =>
{
    for (int i = range.Item1; i < range.Item2; i++)
    {
        results[i] = source[i] * Math.PI;
    }
});

注意這個示例用范圍還有一個優勢:因為該示例主體開銷非常小,倘若不使用范圍分區,那么頻繁調用主體委托會使並行循環效率更低。而依范圍分區后,就使得一個區只會產生一次主體委托調用的開銷。

2)        按區塊分區(屬於動態數量的分區):

a)        適用於長度未知的鏈接列表或其他集合;

b)        並行循環或PLINQ查詢中的每個線程或任務分別處理一個區塊中一定數量的源元素,然后返回檢索其他元素

c)        區塊的大小可以任意(即使大小為1)。只要區塊不是太大,這種分區在本質上是負載平衡的,原因是為線程分配元素的操作不是預先確定的;

d)        同步開銷:當線程需要獲取另一個區塊時,都會產生同步開銷;

示例關鍵代碼:(詳見MyDynamicOrderablePartitioner.cs文件)

        // 分區程序
        public override IEnumerable<KeyValuePair<long, TSource>> GetOrderableDynamicPartitions()
        {
            return new ListDynamicPartitions(m_input);
        }

        // 枚舉對象
        private class ListDynamicPartitions : IEnumerable<KeyValuePair<long, TSource>>
        {
            private IList<TSource> m_input;
            private int m_pos = 0;
            public IEnumerator<KeyValuePair<long, TSource>> GetEnumerator()
            {
                while (true)
                {
                    // 由於使用到公共資源只有m_pos值類型索引,所以只需要保證m_pos訪問的原子性
                    int elemIndex = Interlocked.Increment(ref m_pos) - 1;
                    if (elemIndex >= m_input.Count)
                    {
                        yield break;
                    }
                    yield return new KeyValuePair<long, TSource>(elemIndex, m_input[elemIndex]);
                }
            }
            ……
         }

 

自定義分區程序

我們已經知道通過繼承OrderablePartitioner<TSource>  Partitioner<TSource>抽象類我們可以針對特定場合實現自己的分區程序。

下面展出一個示例,這個示例給我們展示了如何構建一個分區程序,這個示例為我們演示了“動態數量分區結合Parallel.ForEach()”和“靜態數量分區結合Parallel.Invoke()”的使用方式。

示例見:CustomerPartitioner.cs

        class SingleElementPartitioner<T> : Partitioner<T>
        { …… }

        public static void Test()
        {
            String[] collection = new string[]{"red", "orange", "yellow", "green", "blue", "indigo", 
                "violet", "black", "white", "grey"};
            SingleElementPartitioner<string> myPart = new SingleElementPartitioner<string>(collection);

            Console.WriteLine("示例:Parallel.ForEach");
            Parallel.ForEach(myPart, item =>
                {
                    Console.WriteLine("  item = {0}, thread id = {1}"
                        , item, Thread.CurrentThread.ManagedThreadId);
                }
            );


            Console.WriteLine("靜態數量的分區:2個分區,2個任務");
            var staticPartitions = myPart.GetPartitions(2);
            int index = 0;
            Action staticAction = () =>
                {
                    int myIndex = Interlocked.Increment(ref index) - 1;
                    var myItems = staticPartitions[myIndex];
                    int id = Thread.CurrentThread.ManagedThreadId;

                    while (myItems.MoveNext())
                    {
                        // 保證多個線程有機會執行
                        Thread.Sleep(50);
                        Console.WriteLine("  item = {0}, thread id = {1}"
                            , myItems.Current, Thread.CurrentThread.ManagedThreadId);

                    }
                    myItems.Dispose();
                };
            Parallel.Invoke(staticAction, staticAction);


            Console.WriteLine("動態分區: 3個任務 ");
            var dynamicPartitions = myPart.GetDynamicPartitions();
            Action dynamicAction = () =>
                {
                    var enumerator = dynamicPartitions.GetEnumerator();
                    int id = Thread.CurrentThread.ManagedThreadId;

                    while (enumerator.MoveNext())
                    {
                        Thread.Sleep(50);
                        Console.WriteLine("  item = {0}, thread id = {1}", enumerator.Current, id);
                    }
                };
            Parallel.Invoke(dynamicAction, dynamicAction, dynamicAction);
        }

 

快速創建可排序分區

       .NET為我們提供的System.Collections.Concurrent.Partitioner 對象可實現快速獲得可排序分區的方式。

namespace System.Collections.Concurrent
{
    // 提供針對數組、列表和可枚舉項的常見分區策略,創建一個可排序分區程序。
    public static class Partitioner
    {
        // 參數:
        // loadBalance:該值指示創建的分區程序是否應在各分區之間保持動態負載平衡,而不是靜態負載平衡。
        // EnumerablePartitionerOptions:控制分區緩沖行為的選項。
        // rangeSize:每個子范圍的大小。
        // 范圍:fromInclusive <= 范圍< toExclusive

        public static OrderablePartitioner<TSource> Create<TSource>(IEnumerable<TSource> source);
        public static OrderablePartitioner<TSource> Create<TSource>(IEnumerable<TSource> source, EnumerablePartitionerOptions partitionerOptions);
        public static OrderablePartitioner<TSource> Create<TSource>(IList<TSource> list, bool loadBalance); 
        public static OrderablePartitioner<Tuple<int, int>> Create(int fromInclusive, int toExclusive);
        public static OrderablePartitioner<Tuple<long, long>> Create(long fromInclusive, long toExclusive);
        public static OrderablePartitioner<TSource> Create<TSource>(TSource[] array, bool loadBalance);
        public static OrderablePartitioner<Tuple<int, int>> Create(int fromInclusive, int toExclusive, int rangeSize);
        public static OrderablePartitioner<Tuple<long, long>> Create(long fromInclusive, long toExclusive, long rangeSize);
}

    [Flags][Serializable]
    public enum EnumerablePartitionerOptions
    {
        None = 0,
        NoBuffering = 1,
    }
}

1.        Partitioner.Create創建的分區與負載平衡

Partitioner.Create重載

負載平衡

Create<TSource>(IEnumerable<TSource>)

始終

Create<TSource>(TSource[], Boolean)

Create<TSource>(IList<TSource>, Boolean)

將布爾型參數指定為 true

Create(Int32, Int32)

Create(Int32, Int32, Int32)

Create(Int64, Int64)

Create(Int64, Int64, Int64)

從不

2.        EnumerablePartitionerOptions

EnumerablePartitionerOptions枚舉傳遞給Partitioner.Create()用於指示在快速創建分區時是否啟用緩存提高來實現最優性能。

1)        當傳遞EnumerablePartitionerOptions.None時,指示默認為啟用緩存。在分好區后,每個線程會加鎖,在臨界區中,第一次迭代獲取該分區元素時,會獲取這一分區的所有迭代元素並緩存下來。

2)        當傳遞EnumerablePartitionerOptions.NoBuffering時,指示為不啟用緩存。每個線程會加鎖,在臨界區中,每次迭代都從同一個集合源獲取需要的一個迭代元素,因為每次只獲取一個,所以也不會再進行分區。

 

處理並行循環中的異常

ParallelFor ForEach 重載沒有任何用於處理可能引發的異常的特殊機制。並行循環中的異常處理邏輯需要處理可能在多個線程上同時引發類似異常的情況,以及一個線程上引發的異常導致在另一個線程上引發另一個異常的情況。通過將循環中的所有異常包裝在 System.AggregateException 中。eg:

        try
        {
            // 在 ProcessDataInParallel 拋出 throw new ArgumentException();
            ProcessDataInParallel(data);
        }
        catch (AggregateException ae)
        {
            foreach (var ex in ae.InnerExceptions)
            {
                if (ex is ArgumentException)
                    Console.WriteLine(ex.Message);
                else
                    throw ex;
            }
        }

 

注意事項

1.        在做性能測試時,請避免在循環內進行諸如 Console.WriteLineDebug.Write 等調用。因為同步調用共享資源(如控制台或文件系統)將大幅降低並行循環的性能

2.        將串行代碼轉化為並行代碼,需要檢查可並行化的熱點。

熱點指的是代碼中消費大量時間運行的部分,這是算法性能的瓶頸。如果熱點可以被分解為很多能並行運行的部分,那么熱點就可以獲得加速。但如果被分解為多部分代碼的單體並沒有消費大量的運行時間,那么TPL所引入的開銷就有可能會完全消減並行化帶來的加速,甚至可能導致並行化的代碼比串行化代碼運行得還慢。(TPL所引入的開銷:在轉化過程中,我們常常需要將方法的局部變量變為委托方法的內部變量以創建安全無狀態的並行化代碼,這樣的變化會讓每次迭代執行更多指令;另外還增加了大量的內存分配操作,這也會致使垃圾回收器(GC)觸發的更頻繁)

3.        避免過度並行化

倘若對操作過度並行化,那么並行循環很可能比順序循環的運行速度還慢。規則:

a)        嵌套循環中只對外部循環進行並行化。

b)        對於body委托開銷小而循環次數多的情況,可以采取按范圍分區的方式。

c)        循環中很多次迭代都不執行。

4.        不要調用非線程安全的方法。對於線程安全方法的調用也要清楚內部同步消耗,來判斷是否應該使用並行化方式。

5.        避免在UI線程上執行並行循環。應該使用任務封裝並行循環,比如:

private void button1_Click(object sender, EventArgs e)
{
    Task.Factory.StartNew(() =>
        Parallel.For(0, N, i =>
        {
            button1.Invoke((Action)delegate { DisplayProgress(i); });
        })
    );
}

6.        在由 Parallel.Invoke 調用的委托中等待時要小心

在某些情況下,當等待任務時,該任務可在正在執行等待操作的線程上以同步方式執行(詳見:局部隊列內聯機制)。這樣可提高性能,因為它利用了將以其他方式阻止的現有線程,因此不需要附加線程。但此性能優化在某些情況下可能會導致死鎖。例如,兩個任務可能運行相同的委托代碼,該代碼在事件發生時發出信號,並等待另一個任務發出信號。如果在相同線程上將第二個任務內聯到第一個,並且第一個任務進入等待狀態,則第二個任務將永遠無法發出其事件信號。為了避免發生這種情況,您可以在等待操作上指定超時,或使用 Thread ThreadPool 來確保任務不會發生內聯。

7.        不要假定 ForEachFor ForAll 的迭代始終並行執行

請務必謹記,ForForEach ForAll<TSource> 循環中的迭代不一定並行執行。因此,您應避免編寫任何依賴於並行執行的正確性或依賴於按任何特定順序執行迭代的代碼。例如,此代碼有可能會死鎖:

 

ManualResetEventSlim mre = new ManualResetEventSlim();
Enumerable.Range(0, Environment.ProcessorCount * 100)
    .AsParallel()
    .ForAll((j) =>
        {
            if (j == Environment.ProcessorCount)
            {
                Console.WriteLine("Set on {0} with value of {1}",
                    Thread.CurrentThread.ManagedThreadId, j);
                mre.Set();
            }
            else
            {
                Console.WriteLine("Waiting on {0} with value of {1}",
                    Thread.CurrentThread.ManagedThreadId, j);
                mre.Wait();
            }
        }); //deadlocks

在此示例中,一個迭代設置事件,而所有其他迭代則等待事件。  在事件設置迭代完成之前,任何等待迭代均無法完成。但是,在事件設置迭代有機會執行之前,等待迭代可能會阻止用於執行並行循環的所有線程。這將導致死鎖事件設置迭代將從不執行,並且等待迭代將從不覺醒。

 

 

 

       本節博文內容到此結束,主要是解說了Parallel處理數據並行化的方式、Parallel迭代原理、分區原理、自定義分區以及使用Parallel的注意事項。接下來我會寫一篇關於Task類的博文,敬請觀賞。若此文對你有幫助,還請園友多幫推薦、推薦……

 

 

參考資料:MSDN

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM