AsyncStreamsInCShaper 8.0
C# 8.0中支持異步返回枚舉類型async Task<IEnumerable<T>>
sync Streams這個功能已經發布很久了,在去年的Build 2018 The future of C#就有演示
C# 5引入了 Async/Await,用以提高用戶界面響應能力和對 Web 資源的訪問能力。換句話說,異步方法用於執行不阻塞線程並返回一個標量結果的異步操作。
常規示例
要了解問什么需要Async Streams,我們先來看看這樣的一個示例,求出5以內的整數的和.
static int SumFromOneToCount(int count)
{
ConsoleExt.WriteLine("SumFromOneToCount called!");
var sum = 0;
for (var i = 0; i <= count; i++)
{
sum = sum + i;
}
return sum;
}
調用方法.
static void Main(string[] args)
{
const int count = 5;
ConsoleExt.WriteLine($"Starting the application with count: {count}!");
ConsoleExt.WriteLine("Classic sum starting.");
ConsoleExt.WriteLine($"Classic sum result: {SumFromOneToCount(count)}");
ConsoleExt.WriteLine("Classic sum completed.");
ConsoleExt.WriteLine("################################################");
}
輸出結果.
可以看到,整個過程就一個線程Id為1的線程自上而下執行,這是最基礎的做法.
Yield Return
接下來,我們使用yield運算符使得這個方法編程延遲加載,如下所示.
static IEnumerable<int> SumFromOneToCountYield(int count)
{
ConsoleExt.WriteLine("SumFromOneToCountYield called!");
var sum = 0;
for (var i = 0; i <= count; i++)
{
sum = sum + i;
yield return sum;
}
}
主函數
static void Main(string[] args)
{
const int count = 5;
ConsoleExt.WriteLine("Sum with yield starting.");
foreach (var i in SumFromOneToCountYield(count))
{
ConsoleExt.WriteLine($"Yield sum: {i}");
}
ConsoleExt.WriteLine("Sum with yield completed.");
ConsoleExt.WriteLine("################################################");
ConsoleExt.WriteLine(Environment.NewLine);
}
運行結果如下.
正如你在輸出窗口中看到的那樣,結果被分成幾個部分返回,而不是作為一個值返回。以上顯示的累積結果被稱為惰性枚舉。但是,仍然存在一個問題,即 sum 方法阻塞了代碼的執行。如果你查看線程ID,可以看到所有東西都在主線程1中運行,這顯然不完美,繼續改造.
Async Return
我們試着將async用於SumFromOneToCount方法(沒有yield關鍵字).
static async Task<int> SumFromOneToCountAsync(int count)
{
ConsoleExt.WriteLine("SumFromOneToCountAsync called!");
var result = await Task.Run(() =>
{
var sum = 0;
for (var i = 0; i <= count; i++)
{
sum = sum + i;
}
return sum;
});
return result;
}
主函數
static async Task Main(string[] args)
{
const int count = 5;
ConsoleExt.WriteLine("async example starting.");
// Sum runs asynchronously! Not enough. We need sum to be async with lazy behavior.
var result = await SumFromOneToCountAsync(count);
ConsoleExt.WriteLine("async Result: " + result);
ConsoleExt.WriteLine("async completed.");
ConsoleExt.WriteLine("################################################");
ConsoleExt.WriteLine(Environment.NewLine);
}
運行結果.
我們可以看到計算過程是在另一個線程中運行,但結果仍然是作為一個值返回!任然不完美.
如果我們想把惰性枚舉(yield return)與異步方法結合起來,即返回Task<IEnumerable,這怎么實現呢?
IAsyncEnumerable
其實,在C# 8.0中Task<IEnumerable>這種組合稱為IAsyncEnumerable。
這個新功能為我們提供了一種很好的技術來解決拉異步延遲加載的問題,例如從網站下載數據或從文件或數據庫中讀取記錄,與 IEnumerable 和 IEnumerator 類似,Async Streams 提供了兩個新接口 IAsyncEnumerable 和 IAsyncEnumerator,定義如下:
public interface IAsyncEnumerable<out T>
{
IAsyncEnumerator<T> GetAsyncEnumerator();
}
public interface IAsyncEnumerator<out T> : IAsyncDisposable
{
Task<bool> MoveNextAsync();
T Current { get; }
}
// Async Streams Feature 可以被異步銷毀
public interface IAsyncDisposable
{
Task DiskposeAsync();
}
AsyncStream
下面,我們就來見識一下AsyncStrema的威力,我們使用IAsyncEnumerable來對函數進行改造,如下.
static async Task ConsumeAsyncSumSeqeunc(IAsyncEnumerable<int> sequence)
{
ConsoleExt.WriteLineAsync("ConsumeAsyncSumSeqeunc Called");
await foreach (var value in sequence)
{
ConsoleExt.WriteLineAsync($"Consuming the value: {value}");
// simulate some delay!
await Task.Delay(TimeSpan.FromSeconds(1));
};
}
private static async IAsyncEnumerable<int> ProduceAsyncSumSeqeunc(int count)
{
ConsoleExt.WriteLineAsync("ProduceAsyncSumSeqeunc Called");
var sum = 0;
for (var i = 0; i <= count; i++)
{
sum = sum + i;
// simulate some delay!
await Task.Delay(TimeSpan.FromSeconds(0.5));
yield return sum;
}
}
主函數
static async Task Main(string[] args)
{
const int count = 5;
ConsoleExt.WriteLine("Starting Async Streams Demo!");
// Start a new task. Used to produce async sequence of data!
IAsyncEnumerable<int> pullBasedAsyncSequence = ProduceAsyncSumSeqeunc(count);
// Start another task; Used to consume the async data sequence!
var consumingTask = Task.Run(() => ConsumeAsyncSumSeqeunc(pullBasedAsyncSequence));
await Task.Delay(TimeSpan.FromSeconds(3));
ConsoleExt.WriteLineAsync("X#X#X#X#X#X#X#X#X#X# Doing some other work X#X#X#X#X#X#X#X#X#X#");
// Just for demo! Wait until the task is finished!
await consumingTask;
ConsoleExt.WriteLineAsync("Async Streams Demo Done!");
}
如果一切順利,那么就能看到這樣的運行結果了.
最后,看到這就是我們想要的結果,在枚舉的基礎上,進行了異步迭代.
可以看到,整個計算過程並沒有造成主線程的阻塞,其中,值得重點關注的是紅色方框區域的線程5!線程5!線程5!線程5在請求下一個結果后,並沒有等待結果返回,而是去了Main()函數中做了別的事情,等待請求的結果返回后,線程5又接着執行foreach中任務.
我們已經討論過 Async Streams,它是一種出色的異步拉取技術,可用於進行生成多個值的異步計算。
Async Streams 背后的編程概念是異步拉取模型。我們請求獲取序列的下一個元素,並最終得到答復。
Async Streams 提供了一種處理異步數據源的絕佳方法,希望對大家能夠有所幫助。