關鍵要點
- 異步編程技術提供了一種提高程序響應能力的方法。
- Async/Await模式在C# 5中首次亮相,但只能返回單個標量值。
- C# 8添加了異步流(Async Streams),允許異步方法返回多個值,從而擴展了其可用性。
- 異步流提供了一種用於表示異步數據源的絕佳方法。
- 異步流是Java和JavaScript中使用的反應式編程模型的替代方案。
C# 5引入了Async/Await,用以提高用戶界面響應能力和對Web資源的訪問能力。換句話說,異步方法用於執行不阻塞線程並返回一個標量結果的異步操作。
微軟多次嘗試簡化異步操作,因為Async/Await模式易於理解,所以在開發人員當中獲得了良好的認可。
現有異步方法的一個重要不足是它必須提供一個標量返回結果(一個值)。比如這個方法async Task<int> DoAnythingAsync(),DoAnythingAsync的結果是一個整數(一個值)。
由於存在這個限制,你不能將這個功能與yield關鍵字一起使用,並且也不能將其與async IEnumerable<int>(返回異步枚舉)一起使用。
如果可以將Async/Await特性與yield操作符一起使用,我們就可以使用非常強大的編程模型(如異步數據拉取或基於拉取的枚舉,在F#中被稱為異步序列)。
C# 8中新提出的Async Streams去掉了標量結果的限制,並允許異步方法返回多個結果。
這個變更將使異步模式變得更加靈活,這樣就可以按照延遲異步序列的方式從數據庫中獲取數據,或者按照異步序列的方式下載數據(這些數據在可用時以塊的形式返回)。
例如:
foreach await (var streamChunck in asyncStreams) { Console.WriteLine($“Received data count = {streamChunck.Count}”); }
Reactive Extensions(Rx)是解決異步編程問題的另一種方法。Rx越來越受到開發人員的歡迎。很多其他編程語言(如Java和JavaScript)已經實現了這種技術(RxJava、RxJS)。Rx基於推送式編程模型(Push Programming Model),也稱為反應式編程。反應式編程是事件驅動編程的一種類型,它處理的是數據而不是通知。
通常,在推送式編程模型中,你不需要控制Publisher。數據被異步推送到隊列中,消費者在數據到達時消費數據。與Rx不同,Async Streams可以按需被調用,並生成多個值,直到達到枚舉的末尾。
在本文中,我將對拉取模型和推送模型進行比較,並演示每一種技術各自的適用場景。我將使用很多代碼示例向你展示整個概念和它們的優點,最后,我將討論Async Streams功能,並向你展示示例代碼。
拉取式編程模型與推送式編程模型
圖-1-拉取式編程模型與推送式編程模型
我使用的例子是著名的生產者和消費者問題,但在我們的場景中,生產者不是生成食物,而是生成數據,消費者消費的是生成的數據,如圖-1所示。拉取模型很容易理解。消費者詢問並拉取生產者的數據。另一種方法是使用推送模型。生產者將數據發布到隊列中,消費者通過訂閱隊列來接收所需的數據。
拉取模型更合適“快生產者和慢消費者”的場景,因為消費者可以從生產者那里拉取其所需的數據,避免消費者出現溢出。推送模型更適合“慢生產者和快消費者”的場景,因為生產者可以將數據推送給消費者,避免消費者不必要的等待時間。
Rx和Akka Streams(流式編程模型)使用了回壓技術(一種流量控制機制)。它使用拉取模型或推送模型來解決上面提到的生產者和消費者問題。
在下面的示例中,我使用了一個慢消費者從快生產者那里異步拉取數據序列。消費者在處理完一個元素后,會向生產者請求下一個元素,依此類推,直到到達序列的末尾。
動機和背景
要了解我們為什么需要Async Streams,讓我們來看下面的代碼。
// 對參數(count)進行循環相加操作 static int SumFromOneToCount(int count) { ConsoleExt.WriteLine("SumFromOneToCount called!"); var sum = 0; for (var i = 0; i <= count; i++) { sum = sum + i; } return sum; }
方法調用:
const int count = 5; ConsoleExt.WriteLine($"Starting the application with count: {count}!"); ConsoleExt.WriteLine("Classic sum starting."); ConsoleExt.WriteLine($"Classic sum result: {SumFromOneToCount(count)}"); ConsoleExt.WriteLine("Classic sum completed."); ConsoleExt.WriteLine("################################################"); ConsoleExt.WriteLine(Environment.NewLine);
輸出:
我們可以通過使用yield運算符讓這個方法變成惰性的,如下所示。
static IEnumerable<int> SumFromOneToCountYield(int count) { ConsoleExt.WriteLine("SumFromOneToCountYield called!"); var sum = 0; for (var i = 0; i <= count; i++) { sum = sum + i; yield return sum; } }
調用方法:
const int count = 5; ConsoleExt.WriteLine("Sum with yield starting."); foreach (var i in SumFromOneToCountYield(count)) { ConsoleExt.WriteLine($"Yield sum: {i}"); } ConsoleExt.WriteLine("Sum with yield completed."); ConsoleExt.WriteLine("################################################"); ConsoleExt.WriteLine(Environment.NewLine);
輸出:
正如你在輸出窗口中看到的那樣,結果被分成幾個部分返回,而不是作為一個值返回。以上顯示的累積結果被稱為惰性枚舉。但是,仍然存在一個問題,即sum方法阻塞了代碼的執行。如果你查看線程,可以看到所有東西都在主線程中運行。
現在,讓我們將async應用於第一個方法SumFromOneToCount上(沒有yield關鍵字)。
static async Task<int> SumFromOneToCountAsync(int count) { ConsoleExt.WriteLine("SumFromOneToCountAsync called!"); var result = await Task.Run(() => { var sum = 0; for (var i = 0; i <= count; i++) { sum = sum + i; } return sum; }); return result; }
調用方法:
const int count = 5; ConsoleExt.WriteLine("async example starting."); // 相加操作是異步進行得!這樣還不夠,我們要求不僅是異步的,還必須是惰性的。 var result = await SumFromOneToCountAsync(count); ConsoleExt.WriteLine("async Result: " + result); ConsoleExt.WriteLine("async completed."); ConsoleExt.WriteLine("################################################"); ConsoleExt.WriteLine(Environment.NewLine);
輸出:
我們可以看到計算過程是在另一個線程中運行,但結果仍然是作為一個值返回!
想象一下,我們可以按照命令式風格將惰性枚舉(yield return)與異步方法結合起來。這種組合稱為Async Streams。這是C# 8中新提出的功能。這個新功能為我們提供了一種很好的技術來解決拉取式編程模型問題,例如從網站下載數據或從文件或數據庫中讀取記錄。
讓我們嘗試使用當前的C# 版本。我將async關鍵字添加到SumFromOneToCountYield方法中,如下所示。
圖-2 組合使用async關鍵字和yield發生錯誤
我們試着將async添加到SumFromOneToCountYield,但直接出現錯誤,如上所示!
讓我們試試別的吧。我們可以將IEnumerable放入任務中並刪除yield關鍵字,如下所示:
static async Task<IEnumerable<int>> SumFromOneToCountTaskIEnumerable(int count) { ConsoleExt.WriteLine("SumFromOneToCountAsyncIEnumerable called!"); var collection = new Collection<int>(); var result = await Task.Run(() => { var sum = 0; for (var i = 0; i <= count; i++) { sum = sum + i; collection.Add(sum); } return collection; }); return result; }
調用方法:
const int count = 5; ConsoleExt.WriteLine("SumFromOneToCountAsyncIEnumerable started!"); var scs = await SumFromOneToCountTaskIEnumerable(count); ConsoleExt.WriteLine("SumFromOneToCountAsyncIEnumerable done!"); foreach (var sc in scs) { // 這不是我們想要的,結果將作為塊返回!!!! ConsoleExt.WriteLine($"AsyncIEnumerable Result: {sc}"); } ConsoleExt.WriteLine("################################################"); ConsoleExt.WriteLine(Environment.NewLine);
輸出:
可以看到,我們異步計算所有的內容,但仍然存在一個問題。結果(所有結果都在集合中累積)作為一個塊返回,但這不是我們想要的惰性行為,我們的目標是將惰性行為與異步計算風格相結合。
為了實現所需的行為,你需要使用外部庫,如Ix(Rx的一部分),或者你必須使用新提出的C#特性Async Streams。
回到我們的代碼示例。我使用了一個外部庫來顯示異步行為。
static async Task ConsumeAsyncSumSeqeunc(IAsyncEnumerable<int> sequence) { ConsoleExt.WriteLineAsync("ConsumeAsyncSumSeqeunc Called"); await sequence.ForEachAsync(value => { ConsoleExt.WriteLineAsync($"Consuming the value: {value}"); // 模擬延遲! Task.Delay(TimeSpan.FromSeconds(1)).Wait(); }); } static IEnumerable<int> ProduceAsyncSumSeqeunc(int count) { ConsoleExt.WriteLineAsync("ProduceAsyncSumSeqeunc Called"); var sum = 0; for (var i = 0; i <= count; i++) { sum = sum + i; // 模擬延遲! Task.Delay(TimeSpan.FromSeconds(0.5)).Wait(); yield return sum; } }
調用方法:
const int count = 5; ConsoleExt.WriteLine("Starting Async Streams Demo!"); // 啟動一個新任務,用於生成異步數據序列! IAsyncEnumerable<int> pullBasedAsyncSequence = ProduceAsyncSumSeqeunc(count).ToAsyncEnumerable(); ConsoleExt.WriteLineAsync("X#X#X#X#X#X#X#X#X#X# Doing some other work X#X#X#X#X#X#X#X#X#X#"); // 啟動另一個新任務,用於消費異步數據序列! var consumingTask = Task.Run(() => ConsumeAsyncSumSeqeunc(pullBasedAsyncSequence)); // 出於演示目的,等待任務完成! consumingTask.Wait(); ConsoleExt.WriteLineAsync("Async Streams Demo Done!");
輸出:
最后,我們實現了我們想要的行為!我們可以在枚舉上進行異步迭代。
源代碼在這里。
客戶端/服務器端的異步拉取
我將使用一個更現實的例子來解釋這個概念。客戶端/服務器端架構是演示這一功能優勢的絕佳方法。
客戶端/服務器端同步調用
客戶端向服務器端發送請求,客戶端必須等待(客戶端被阻塞),直到服務器端做出響應,如圖-3所示。
圖-3 同步數據拉取,客戶端等待請求完成
異步數據拉取
客戶端發出數據請求然后繼續執行其他操作。一旦有數據到達,客戶端就繼續處理達到的數據。
圖-4 異步數據拉取,客戶端可以在請求數據時執行其他操作
異步序列數據拉取
客戶端發出數據塊請求,然后繼續執行其他操作。一旦數據塊到達,客戶端就處理接收到的數據塊並詢問下一個數據塊,依此類推,直到達到最后一個數據塊為止。這正是Async Streams想法的來源。圖-5顯示了客戶端可以在收到任何數據時執行其他操作或處理數據塊。
圖-5 異步序列數據拉取(Async Streams),客戶端未被阻塞!
Async Streams
與IEnumerable<T>和IEnumerator<T>類似,Async Streams提供了兩個新接口IAsyncEnumerable<T>和IAsyncEnumerator<T>,定義如下:
public interface IAsyncEnumerable<out T> { IAsyncEnumerator<T> GetAsyncEnumerator(); } public interface IAsyncEnumerator<out T> : IAsyncDisposable { Task<bool> MoveNextAsync(); T Current { get; } } // Async Streams Feature可以被異步銷毀 public interface IAsyncDisposable { Task DiskposeAsync(); }
Jonathan Allen已經在InfoQ網站上介紹過這個主題,我不想在這里再重復一遍,所以我建議你也閱讀一下他的文章。
關鍵在於Task<bool> MoveNextAsync()的返回值(從bool改為Task<bool>,bool IEnumerator.MoveNext())。這樣可以讓整個計算和迭代都保持異步。大多數情況下,這仍然是拉取模型,即使它是異步的。IAsyncDisposable接口可用於進行異步清理。有關異步的更多信息,請點擊此處。
語法
最終語法應如下所示:
foreach await (var dataChunk in asyncStreams) { // 處理數據塊或做一些其他的事情! }
如上所示,我們現在可以按順序計算多個值,而不只是計算單個值,同時還能夠等待其他異步操作結束。
重寫微軟的示例
我重寫了微軟的演示代碼,你可以從我的GitHub下載相關代碼。
這個例子背后的想法是創建一個大的MemoryStream(20000字節的數組),並按順序異步迭代集合中的元素或MemoryStream。每次迭代從數組中拉取8K字節。
在(1)處,我們創建了一個大字節數組並填充了一些虛擬值。在(2)處,我們定義了一個叫作checksum的變量。我們將使用checksum來確保計算的總和是正確的。數組和checksum位於內存中,並通過一個元組返回,如(3)所示。
在(4)處,AsEnumarble(或者叫AsAsyncEnumarble)是一種擴展方法,用於模擬由8KB塊組成的異步流( (6)處所示的BufferSize = 8000)。
通常,你不必繼承IAsyncEnumerable,但在上面的示例中,微軟這樣做是為了簡化演示,如(5)處所示。
(7)處是“foreach”,它從異步內存流中拉取8KB的塊數據。當消費者(foreach代碼塊)准備好接收更多數據時,拉取過程是順序進行的,然后它從生產者(內存流數組)中拉取更多的數據。最后,當迭代完成后,應用程序將’c’的校驗和與checksum進行比較,如果它們匹配,就打印出“Checksums match!”,如(8)所示!
微軟演示的輸出窗口:
概要
我們已經討論過Async Streams,它是一種出色的異步拉取技術,可用於進行生成多個值的異步計算。
Async Streams背后的編程概念是異步拉取模型。我們請求獲取序列的下一個元素,並最終得到答復。這與IObservable<T>的推送模型不同,后者生成與消費者狀態無關的值。Async Streams提供了一種表示異步數據源的絕佳方法,例如,當消費者尚未准備好處理更多數據時。示例包含了Web應用程序或從數據庫中讀取記錄。