並發編程相關概念


>>返回《C# 並發編程》

1. 概念介紹

現在我們先說明幾個概念:

  • 並發
    • 就是同時做多件事情,比如:
      • 程序寫入數據庫的同時響應用戶輸入
      • 服務器處理第一個請求的同時響應第二個請求。
  • 多線程
    • 是並發的一種形式,它采用多個線程來執行程序,
      • 注意: 多線程是並發的一種形式,但並不是唯一的形式。
    • 多線程是比較基礎的技術,我們需要理解,知曉原理,但是真正使用時最好使用對多線程進行封裝的類,這樣能更好的節省資源,減少問題的產生。
  • 並行處理
    • 把正在執行的大量的任務分割成小塊,分配給多個同時運行的線程。
    • 這樣會使處理器的利用效率最大化,使用時需要注意因為控制不好的化會在短時間內極大的降低服務器的處理性能
  • 異步編程
    • 並發的一種形式,它采用未來模式(future)或回調機制(callback),以避免產生不必要的線程
      • 在 .NET 中,新版中使用 TaskTask<TResult>類型實現未來模式,在老式異步編程 API 中,采用回調或事件(event)
    • 也是關鍵字 asyncawait 解決的問題
  • 響應式編程
    • 一種聲明式的編程模式,程序在該模式中對事件做出響應。

2. 異步編程

2.1. async運行過程

  • async 方法在開始時以同步方式執行。在 async 方法內部,運行到await 關鍵字會執行一個異步等待
    • 它首先檢查操作是否已經完成,如果完成了,就繼續運行 (同步方式)。
    • 否則,它會暫停 async 方法,並返回,留下一個未完成task
    • 一段時間后,await住的操作完成,async 方法就恢復運行(不一定是原來的線程,具體看同步上下文的配置)。

2.2. async運行中同步上下文簡介

  • 最常見的情況是,用 await 語句等待一個任務完成,這時會捕捉同步上下文。
    • 如果當前 SynchronizationContext 不為空,這個上下文就是當前 SynchronizationContext
    • 如果當前 SynchronizationContext 為空,則這個上下文為當前 TaskScheduler。
  • 該方法會在這個上下文中繼續運行。
    • 運行 UI 線程時采用 UI 上下文
    • 處理 ASP.NET 請求時采用 ASP.NET 請求上下文
    • 其他很多情況下則采用線程池上下文(上下文為 null 時)

注意: 最好的做法是,在核心庫代碼中一直使用 ConfigureAwait 。在外圍的用戶界面代碼中,只在需要時才恢復上下文。

2.3. 創建Task實例

  • 兩種基本的方法可以創建 Task 實例。
    • 有些任務表示 CPU 需要實際執行的指令,創建這種計算類的任務時,使用 Task.Run
      • 如UI線程觸發的事件,如讀取目錄信息,配合 await 關鍵字,將任務交給線程池完成,解決讀取時窗體卡頓情況
    • 如需要按照特定的計划運行,則用 TaskFactory.StartNew
    • 其他的任務表示一個通知( notification 操作會在回調中完成再通知回來),創建這種基於事件的任務時,使用 TaskCompletionSource<T>
      • 大部分 I/O 型任務采用 TaskCompletionSource<T>

2.4. 捕獲異步異常類型

  1. 捕獲await拋出的異常,我們更想要
try
{
    await Task.Run(() => throw new NotSupportedException());
}
catch (Exception ex)
{
    //print: NotSupportedException
    Console.WriteLine(ex.GetType().Name);
}
  1. Wait()方法,異常類型被包裝
try
{
    Task task = Task.Run(() => throw new NotSupportedException());
    task.Wait();
}
catch (Exception ex)
{
    //print: AggregateException
    Console.WriteLine(ex.GetType().Name);
}

3. 並行編程

  • 並行編程可臨時提高 CPU 利用率,以提高吞吐量。
    • 若客戶端系統中的 CPU 經常處於空閑狀態,這個方法就非常有用
    • 通常並不適合服務器系統,將降低本身的並行處理能力,並且不會有實際的好處。

反面教材: 之前在工作中出現一起事故,實施好的項目,3個月后每天凌晨出現大量設備掉線的情況。

  1. 由於數據超時時間時3個月,而且發現出現問題的日志和數據清理發生時間有關聯關系
  2. 排查代碼發現文件清理器,清理數據使用的Parallel類,並行刪除文件,而且沒有對並發數限制
  3. 文件清理器運行時,導致服務器性能急劇下降,造成處理設備消息延遲,心跳超時導致掉線
  4. 重構了文件清理器代碼,解決了這個問題
  • 數據並行(data parallelism):
    • 是指有大量的數據需要處理,並且每一塊數據的處理過程基本上是彼此獨立的。
  • 任務並行(task parallelim):
    • 是指需要執行大量任務,並且每個任務的執行過程基本上是彼此獨立的。

3.1. Parallel

不保證順序執行。

//ForEach
int[] arr = new int[] { 1, 2, 3, 4 };
Parallel.ForEach(arr, item => Console.Write(item));
System.Console.WriteLine();

//PLINQ
var sum = arr.AsParallel().Select(item => item * 2).Sum();
System.Console.WriteLine($"sum:{sum}.");

//Invoke
int num = 10;
Parallel.Invoke(
    () => num += 2,
    () => num -= 2,
    () => num -= num,
    () => num += 2
);
System.Console.WriteLine($"num:{num}.");
/* 
print:
1243
sum:20.
num:0.
*/

3.2. 異常處理

系統會把這些異常封裝在 AggregateException 類里,在程序中拋給代碼。 這一特點對所有方法都是一樣的,包括 Parallel.ForEach、Paralle.lInvoke、Task.Wait 等。 AggregateException 類型有幾個實用的 Flatten 和 Handle 方法,用來簡化錯誤處理的代碼:

try 
{
    Parallel.Invoke(() => { throw new Exception(); },
    () => { throw new Exception(); });
}
catch (AggregateException ex)
{
    ex.Handle(exception =>
    {
        Console.WriteLine(exception);
        return true; //“已經處理” 
    });
}

3.3. 注意事項

在編寫任務並行程序時,要格外留意下閉包(closure)捕獲的變量。 記住閉包捕獲的是引用(不是值),因此可以在結束時以不明顯地方式地分享這些變量。

  • 任務不要特別短
    • 沒必要用,直接同步執行
  • 也不要特別長
    • 應采用更可控的方式,削峰設計

4. 響應式編程

如果事件中帶有參數,那么最好 采用響應式編程,而不是常規的事件處理程序。

//System.Runtime.dll namespace:System 中定義了這些接口
interface IObserver<in T>
{
    void OnNext(T item);
    void OnCompleted();
    void OnError(Exception error);
}
interface IObservable<out T>
{
    IDisposable Subscribe(IObserver<T> observer);
}

Rx(Rx-Main)中定義了響應式編程的封裝,后面會有介紹。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM