1. 簡介
C#並發編程經典實例 是一本關於使用C#進行並發編程的入門參考書,使用“問題-解決方案-討論”的模式講解了以下這些概念:
- 面向異步編程的async和await
- 使用TPL(任務並行庫)
- 創建數據流管道的TPL Dataflow庫
- 基於LINQ的Reactive Extensions
- 為並發代碼編寫單元測試
- 並發方法之間的互操作
- 不可變、線程安全和生產者/消費者集合
- 並發代碼中的取消功能支持
- 支持異步的面向對象編程
- 線程同步訪問數據
我還挺喜歡這本書的,只有短短的170頁卻提供了大量的最佳實踐,介紹了當時最新的C#平台並發開發技術,作為參考書時至今日依然很有推薦價值。不過篇幅所限,從入門知識到最佳實踐之間往往缺乏過渡。例如第四章《數據流基礎》,前一頁還在介紹要安裝哪個Nuget包才可以使用數據流,下一頁突然討論《鏈接數據流塊》、《傳遞出錯信息》,至於數據流有哪些類型各自的使用場景都沒介紹到,於是我只好配合博客園上的這篇文章 TPL DataFlow初探 來學習數據流的知識。
2. 實現一個下載工具的UI
為什么這篇文章放在UWP板塊下面?
這本書2015年在國內出版,讀了這本書后感覺很有用。最近重讀了這本書,試着用UWP復習一下書上的知識,除了有些Nuget包的名字變了其它內容都適用於UWP開發,最終成果是一個(十分陽春的)下載工具UI,所以就放在UWP板塊下了。
2.1 基礎的async/await
private async void OnAddLinks(object sender, RoutedEventArgs e)
{
var dialog = new AddDownloadDialog();
await dialog.ShowAsync();
if (dialog.Downloads == null)
return;
…
…
}
基礎的用法沒什么好說的。
微軟的文檔提到“應將“‘Async’作為后綴添加到所編寫的每個異步方法名稱中。”,但即使沒這樣做VS和R#也沒有提示。
2.2 同時開始一組任務並等待它們完成
private async Task<IEnumerable<Downloader>> AddNewDownloadAsync(IEnumerable<Uri> links, CancellationToken cancellationToken)
{
var downlodTasks = links.Select(Downloader.CreateAsync);
var downlodTasksArray = downlodTasks.ToArray();
var downloads = await Task.WhenAll(downlodTasksArray);
return downloads;
}
反正就是使用Task<TResult[]> WhenAll
。
2.3 一組任務中任一任務完成時的處理
Task<Downloader> Selector(Uri link) => Downloader.CreateAsync(link, cancellationToken);
var downlodTasks = links.Select(Selector);
var progressTasks = downlodTasks.Select(async t =>
{
var result = await t.ToObservable().Timeout(TimeSpan.FromSeconds(6));
await _mutex.WaitAsync(cancellationToken);
try
{
if (cancellationToken.IsCancellationRequested == false)
{
FinishedTasks++;
_downloads.Add(t.Result);
}
}
finally
{
_mutex.Release();
}
return result;
}).ToArray();
var downloads = await Task.WhenAll(progressTasks);
2.4 發出取消請求
由CancellationTokenSource發出取消請求,CancellationToken則讓代碼能夠響應取消請求。
try
{
_cancellationTokenSource = new CancellationTokenSource();
await AddNewDownloadAsync(_cancellationTokenSource.Token);
}
catch (OperationCanceledException ex)
{
InAppNotification.Show("Task Paused:" + ex.Message, 5000);
}
catch (Exception ex)
{
ProgressControl.State = ProgressState.Faulted;
InAppNotification.Show("Task Error:" + ex.Message, 5000);
}
_cancellationTokenSource.Cancel();
上面代碼演示了如何通過CancellationTokenSource發出取消請求,被取消的代碼應該會拋出OperationCanceledException。也有可能被取消的代碼還來不及響應取消就完成或報錯了。
2.5 通過輪詢響應取消請求
while (ReceivedBytes < TotalBytes)
{
await Task.Delay(TimeSpan.FromSeconds(1), cts.Token);
var bytesReceived = random.Next(1024 * 1024);
ReceivedBytes += bytesReceived;
cancellationToken.ThrowIfCancellationRequested();
}
被取消的代碼可以通過ThrowIfCancellationRequested()
拋出OperationCanceledException。也可以通過檢查IsCancellationRequested再做其它處理,但拋出OperationCanceledException是標准處理方式。
如果再下一層代碼里支持取消,則應該將CancellationToken傳遞給它,例如這里的Task.Delay。
2.6 超時后取消
var downlodTasks = links.Select(link =>
{
var cts = new CancellationTokenSource();
var token = cts.Token;
cts.CancelAfter(TimeSpan.FromSeconds(5));
return Downloader.CreateAsync(link, token);
});
var downlodTasksArray = downlodTasks.ToArray();
var downloads = await Task.WhenAll(downlodTasksArray);
CancellationTokenSource調用CancelAfter(TimeSpan delay)
或者使用構造函數CancellationTokenSource(TimeSpan delay)
設置取消前等待的時間間隔都可以實現超時后取消。
2.7 使用Rx實現超時
上面的方法實現超時其實相當於發出了一個取消請求,最終會拋出一個OperationCanceledException,有時會難以區分用戶的取消操作和超時后被取消。我有時會用Rx來實現超時。
var result = await t.ToObservable().Timeout(TimeSpan.FromSeconds(6));
這段代碼會拋出TimeoutException,更加有超時的感覺。但是CancellationTokenSource沒有被取消,所以原本以為被取消的代碼仍會繼續偷偷摸摸地執行下去。
2.8 報告進度
public async Task StartDownloadAsync(IProgress<int> progress, CancellationToken cancellationToken)
{
_cancellationToken = cancellationToken;
var random = new Random();
using (var cts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken))
{
while (ReceivedBytes < TotalBytes)
{
await Task.Delay(TimeSpan.FromSeconds(1), cts.Token);
var bytesReceived = random.Next(1024 * 1024);
ReceivedBytes += bytesReceived;
progress?.Report(bytesReceived);
cancellationToken.ThrowIfCancellationRequested();
}
}
}
var progress = new Progress<int>();
progress.ProgressChanged += (s, e) =>
{
DownloadedData?.Invoke(this, e);
OnPropertyChanged(nameof(Downloader));
};
_cancellationTokenSource = new CancellationTokenSource();
await Downloader.StartDownloadAsync(progress, _cancellationTokenSource.Token);
使用IProgress
event EventHandler
接收進度。
IProgress
可以是異步的,所以T最好定義為一個不可變類型或者至少是值類型。
2.9 限制每次只開始5個下載
_semaphore = new SemaphoreSlim(5);
var tasks = dialog.Downloads.Select(async item =>
{
var model = new DownloaderModel { Downloader = item };
Downloads.Add(model);
model.DownloadedData += OnDownloadData;
await _semaphore.WaitAsync();
try
{
await model.StartDownloadAsync();
}
catch (OperationCanceledException)
{
//do nothing
}
finally
{
_semaphore.Release();
}
}).ToArray();
await Task.WhenAll(tasks);
雖然有幾種方法實現,但SemaphoreSlim看着挺好理解的。
2.10 使用Rx的緩沖統計下載速度
private void OnDownloadData(object sender, int e)
{
_progress.Report(e);
}
當下載進度更新時使用IProgress
報告進度。
var progress = new Progress<int>();
_progress = progress;
var reports = Observable.FromEventPattern<int>(handler => progress.ProgressChanged += handler, handler => progress.ProgressChanged -= handler);
reports.Buffer(TimeSpan.FromSeconds(1)).Subscribe(async x =>
{
await CoreApplication.MainView.CoreWindow.Dispatcher.RunAsync(Windows.UI.Core.CoreDispatcherPriority.Normal, () =>
{
SpeedElement.Text = string.Format("{0} Bytes/S", x.Sum(s => s.EventArgs).ToString("N0"));
});
});
這段代碼收集ProgressChanged事件,並每一秒鍾把收集到的事件作為一個集合發布。
3. 書中的其它建議
一旦你輸入new Thread(),那就糟糕了,說明項目中的代碼太過時了。
比起老式的多線程機制,采用高級的抽象機制會讓程序功能更加強大、效率更高。事實上UWP好像只能使用線程池,不能直接訪問及控制線程(因為習慣用Task沒關心線程,也許有我不知道的方式),看起來微軟希望開發者使用Task這個更合理的抽象而不是直接使用線程。
在編寫任務並行程序時,要格外留意下閉包(closure)捕獲的變量。
這是個常見的錯誤,幸好很多情況下R#都會提示這個錯誤。
基本的lock語句就可以很好地處理99%的情況了。
經常在Code Review時看到Monitor或ReaderWriterLockSlim之類的。但是,我明白的,比起直接用lock這樣寫比較帥氣(但我還是會要求改過來)。
應該把lock語句使用的對象設為私有變量,並且永遠不要暴露給非本類的方法。
lock一個屬性,或者直接lock(this)都十分危險。我真的CodeReview過因為習慣性地lock(this)而產生死鎖的代碼。
另外鎖對象的使用范圍盡量小,不要在多個語句中使用同一個鎖對象。
在UI線程上執行代碼時,永遠不要使用針對特定平台的類型。WPF、Silverlight、iOS、Android都有Dispatcher類,Windows應用商店平台使用CoreDispatcher、Windows Forms有ISynchronizeInvoke接口。不要在新寫的代碼中使用這些類型,就當它們不存在吧。使用這些類型會使代碼無所謂綁定到某個特定平台上。SynchronizationContext是通用的,基於上述類型的抽象類。
在UWP中,在線程中調用UI元素通常如下:
await Task.Run(async () =>
{
await CoreApplication.MainView.CoreWindow.Dispatcher.RunAsync(Windows.UI.Core.CoreDispatcherPriority.Normal, () =>
{
Header.Text = "some message";
});
});
如果使用SynchronizationContext,則代碼如下:
var synchronizationContext = SynchronizationContext.Current;
await Task.Run(() =>
{
synchronizationContext.Post(a =>
{
Header.Text = "some message";
}, null);
});
看起來SynchronizationContext確實更通用一些。
4. 延伸閱讀
本書只介紹了使用技術,很少深入講解內部機制,需要深入理解異步編程可以參考微軟的官方文檔:
異步編程
使用 Async 和 Await 的異步編程
異步概述
基於任務的異步模式 (TAP)