聊一聊C# 8.0中的await foreach


AsyncStreamsInCShaper8.0

很開心今天能與大家一起聊聊C# 8.0中的新特性-Async Streams,一般人通常看到這個詞表情是這樣.

簡單說,其實就是C# 8.0中支持await foreach.

或者說,C# 8.0中支持異步返回枚舉類型async Task<IEnumerable<T>>.

好吧,還不懂?Good,這篇文章就是為你寫的,看完這篇文章,你就能明白它的神奇之處了.

為什么寫這篇文章

Async Streams這個功能已經發布很久了,在去年的Build 2018 The future of C#就有演示,最近VS 2019發布,在該版本的Release Notes中,我再次看到了這個新特性,因為對異步編程不太熟悉,所以借着這個機會,學習新特性的同時,把異步編程重溫一遍.
本文內容,參考了Bassam Alugili在InfoQ中發表的Async Streams in C# 8,撰寫本博客前我已聯系上該作者並得到他支持.

Async / Await

C# 5 引入了 Async/Await,用以提高用戶界面響應能力和對 Web 資源的訪問能力。換句話說,異步方法用於執行不阻塞線程並返回一個標量結果的異步操作。

微軟多次嘗試簡化異步操作,因為 Async/Await 模式易於理解,所以在開發人員當中獲得了良好的認可。

詳見The Task asynchronous programming model in C#

常規示例

要了解問什么需要Async Streams,我們先來看看這樣的一個示例,求出5以內的整數的和.

static int SumFromOneToCount(int count)
        {
            ConsoleExt.WriteLine("SumFromOneToCount called!");

            var sum = 0;
            for (var i = 0; i <= count; i++)
            {
                sum = sum + i;
            }
            return sum;
        }

調用方法.

static void Main(string[] args)
        {
            const int count = 5;
            ConsoleExt.WriteLine($"Starting the application with count: {count}!");
            ConsoleExt.WriteLine("Classic sum starting.");
            ConsoleExt.WriteLine($"Classic sum result: {SumFromOneToCount(count)}");
            ConsoleExt.WriteLine("Classic sum completed.");
            ConsoleExt.WriteLine("################################################");
        }

輸出結果.

可以看到,整個過程就一個線程Id為1的線程自上而下執行,這是最基礎的做法.

Yield Return

接下來,我們使用yield運算符使得這個方法編程延遲加載,如下所示.

static IEnumerable<int> SumFromOneToCountYield(int count)
        {
            ConsoleExt.WriteLine("SumFromOneToCountYield called!");

            var sum = 0;
            for (var i = 0; i <= count; i++)
            {
                sum = sum + i;

                yield return sum;
            }
        }

主函數

static void Main(string[] args)
        {
            const int count = 5;
            ConsoleExt.WriteLine("Sum with yield starting.");
            foreach (var i in SumFromOneToCountYield(count))
            {
                ConsoleExt.WriteLine($"Yield sum: {i}");
            }
            ConsoleExt.WriteLine("Sum with yield completed.");

            ConsoleExt.WriteLine("################################################");
            ConsoleExt.WriteLine(Environment.NewLine);
        }

運行結果如下.

正如你在輸出窗口中看到的那樣,結果被分成幾個部分返回,而不是作為一個值返回。以上顯示的累積結果被稱為惰性枚舉。但是,仍然存在一個問題,即 sum 方法阻塞了代碼的執行。如果你查看線程ID,可以看到所有東西都在主線程1中運行,這顯然不完美,繼續改造.

Async Return

我們試着將async用於SumFromOneToCount方法(沒有yield關鍵字).

static async Task<int> SumFromOneToCountAsync(int count)
        {
            ConsoleExt.WriteLine("SumFromOneToCountAsync called!");

            var result = await Task.Run(() =>
            {
                var sum = 0;

                for (var i = 0; i <= count; i++)
                {
                    sum = sum + i;
                }
                return sum;
            });

            return result;
        }

主函數.

static async Task Main(string[] args)
        {
            const int count = 5;
            ConsoleExt.WriteLine("async example starting.");
            // Sum runs asynchronously! Not enough. We need sum to be async with lazy behavior.
            var result = await SumFromOneToCountAsync(count);
            ConsoleExt.WriteLine("async Result: " + result);
            ConsoleExt.WriteLine("async completed.");

            ConsoleExt.WriteLine("################################################");
            ConsoleExt.WriteLine(Environment.NewLine);
        }

運行結果.

我們可以看到計算過程是在另一個線程中運行,但結果仍然是作為一個值返回!任然不完美.

如果我們想把惰性枚舉(yield return)與異步方法結合起來,即返回Task<IEnumerable ,這怎么實現呢?

Task<IEnumerable >

我們根據假設把代碼改造一遍,使用Task<IEnumerable<T>>來進行計算.

可以看到,直接出現錯誤.

IAsyncEnumerable

其實,在C# 8.0中Task<IEnumerable >這種組合稱為IAsyncEnumerable 。這個新功能為我們提供了一種很好的技術來解決拉異步延遲加載的問題,例如從網站下載數據或從文件或數據庫中讀取記錄,與 IEnumerable 和 IEnumerator 類似,Async Streams 提供了兩個新接口 IAsyncEnumerable 和 IAsyncEnumerator ,定義如下:

public interface IAsyncEnumerable<out T>
    {
        IAsyncEnumerator<T> GetAsyncEnumerator();
    }

    public interface IAsyncEnumerator<out T> : IAsyncDisposable
    {
        Task<bool> MoveNextAsync();
        T Current { get; }
    }

   // Async Streams Feature 可以被異步銷毀 
   public interface IAsyncDisposable
   {
      Task DiskposeAsync();
   }

AsyncStream

下面,我們就來見識一下AsyncStrema的威力,我們使用IAsyncEnumerable來對函數進行改造,如下.

static async Task ConsumeAsyncSumSeqeunc(IAsyncEnumerable<int> sequence)
        {
            ConsoleExt.WriteLineAsync("ConsumeAsyncSumSeqeunc Called");

            await foreach (var value in sequence)
            {
                ConsoleExt.WriteLineAsync($"Consuming the value: {value}");

                // simulate some delay!
                await Task.Delay(TimeSpan.FromSeconds(1));
            };
        }

        private static async IAsyncEnumerable<int> ProduceAsyncSumSeqeunc(int count)
        {
            ConsoleExt.WriteLineAsync("ProduceAsyncSumSeqeunc Called");
            var sum = 0;

            for (var i = 0; i <= count; i++)
            {
                sum = sum + i;

                // simulate some delay!
                await Task.Delay(TimeSpan.FromSeconds(0.5));

                yield return sum;
            }
        }

主函數.

 static async Task Main(string[] args)
        {
            const int count = 5;
            ConsoleExt.WriteLine("Starting Async Streams Demo!");

            // Start a new task. Used to produce async sequence of data!
            IAsyncEnumerable<int> pullBasedAsyncSequence = ProduceAsyncSumSeqeunc(count);

            // Start another task; Used to consume the async data sequence!
            var consumingTask = Task.Run(() => ConsumeAsyncSumSeqeunc(pullBasedAsyncSequence));

            await Task.Delay(TimeSpan.FromSeconds(3));

            ConsoleExt.WriteLineAsync("X#X#X#X#X#X#X#X#X#X# Doing some other work X#X#X#X#X#X#X#X#X#X#");

            // Just for demo! Wait until the task is finished!
            await consumingTask;

            ConsoleExt.WriteLineAsync("Async Streams Demo Done!");
        }

如果一切順利,那么就能看到這樣的運行結果了.

最后,看到這就是我們想要的結果,在枚舉的基礎上,進行了異步迭代.
可以看到,整個計算過程並沒有造成主線程的阻塞,其中,值得重點關注的是紅色方框區域的線程5!線程5!線程5!線程5在請求下一個結果后,並沒有等待結果返回,而是去了Main()函數中做了別的事情,等待請求的結果返回后,線程5又接着執行foreach中任務.

Client/Server的異步拉取

如果還沒有理解Async Streams的好處,那么我借助客戶端 / 服務器端架構是演示這一功能優勢的絕佳方法。

同步調用

客戶端向服務器端發送請求,客戶端必須等待(客戶端被阻塞),直到服務器端做出響應.

示例中Yield Return就是以這種方式執行的,所以整個過程只有一個線程即線程1在處理.

異步調用

客戶端發出數據塊請求,然后繼續執行其他操作。一旦數據塊到達,客戶端就處理接收到的數據塊並詢問下一個數據塊,依此類推,直到達到最后一個數據塊為止。這正是 Async Streams 想法的來源。

最后一個示例就是以這種方式執行的,線程5詢問下一個數據后並沒有等待結果返回,而是去做了Main()函數中的別的事情,數據到達后,線程5又繼續處理foreach中的任務.

Tips

如果你使用的是.net core 2.2及以下版本,會遇到這樣的報錯.

需要安裝.net core 3.0 preview的SDK(截至至博客撰寫日期4月9日,.net core SDK最新版本為3.0.100-preview3-010431),安裝好SDK后,如果你是VS 2019正式版,可能無法選擇.net core 3.0,vs 2019 正式版默認情況下沒有開啟對預覽版.net core 3.0 的支持.

根據網友補充,需要在VS 2019正式版本中需要開啟使用 .Net core SDK 預覽版,才能創建3.0的項目.
工具 > 選項 > 項目和解決方案 > .Net Core > 使用 .Net core SDK 預覽版

總結

我們已經討論過 Async Streams,它是一種出色的異步拉取技術,可用於進行生成多個值的異步計算。

Async Streams 背后的編程概念是異步拉取模型。我們請求獲取序列的下一個元素,並最終得到答復。Async Streams 提供了一種處理異步數據源的絕佳方法,希望對大家能夠有所幫助。

文章中涉及的所有代碼已保存在我的GitHub中,請盡情享用!
https://github.com/liuzhenyulive/AsyncStreamsInCShaper8.0

致謝

之前一直感覺國外的大師級開發者遙不可及甚至高高在上,在遇到Bassam Alugili之后,我才真正感受到技術交流沒有高低貴賤,正如他對我說的 The most important thing in this world is sharing the knowledge!
Thank you,I will keep going!!

參考文獻: Async Streams in C# 8 https://www.infoq.com/articles/Async-Streams


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM