C#8.0: 在 LINQ 中支持異步的 IAsyncEnumerable


C# 8.0中,提供了一種新的IAsyncEnumerable<T>接口,在對集合進行迭代時,支持異步操作。比如在讀取文本中的多行字符串時,如果讀取每行字符串的時候使用同步方法,那么會導致線程堵塞。IAsyncEnumerable<T>可以解決這種情況,在迭代的時候支持使用異步方法。也就是說,之前我們使用foreach來對IEnumerable進行迭代,現在可以使用await foreach來對IAsyncEnumerable<T>進行迭代,每個項都是可等待的。這種新的接口稱為async-streams,將會隨.NET Core 3發布。我們來看一下如何在LINQ中實現異步的迭代。

使用常規的IEnumerable<T>

首先我們創建一個新的Console項目,基於.NET Core 3

namespace AsyncLinqDemo
{
   class Program
  {
       static void Main(string[] args)
      {
           Console.WriteLine("Input the file path:");
           var file = Console.ReadLine();
           var lines = ReadAllLines(file);
           foreach (var line in lines)
          {
               Console.WriteLine(line);
          }
      }

       static IEnumerable<string> ReadAllLines(string file)
      {
           using (var fs = File.OpenRead(file))
          {
               using (var sr = new StreamReader(fs))
              {
                   while (true)
                  {
                       string line = sr.ReadLine();
                       if(line == null)
                      {
                           break;
                      }
                       yield return line;
                  }
              }
          }
      }
  }
}

 

這是一個很簡單的Console程序,實現了一個簡單的返回類型為IEnumerable<string>ReadAllLines(string file)方法,從文本文件中逐行讀取文本,並逐行輸出。如果文本內容較少的話,沒什么問題。但如果我們使用過aync/await,就會了解,在IO操作如讀取或寫入文件的時候,最好使用異步方法以避免線程阻塞。讓我們來改進一下。

使用異步的IAsyncEnumerable<T>

可以優化的是下面這句:

string line = sr.ReadLine();

 

對於IO操作,最好使用異步方式。這里可使用相應的異步方法:

string line = await sr.ReadLineAsync();

 

我們說“異步是傳染的”,如果這里使用異步,那么相應的該方法的返回值也要使用異步,所以需要將返回值改為static async Task<IEnumerable<string>>,但這樣會得到一個錯誤:

ErrorCS1624The body of 'Program.ReadAllLines(string)' cannot be an iterator block because 'Task<IEnumerable<string>>' is not an iterator interface typeAsyncLinqDemoC:\Source\Workspaces\Console\AsyncLinqDemo\AsyncLinqDemo\Program.cs23Active

 

因為Task<IEnumerable<string>>並不是一個可以迭代的接口類型,所以我們無法在方法內部使用yield關鍵字。解決問題的辦法是使用新的IAsyncEnumerable接口:

static async IAsyncEnumerable<string> ReadAllLines(string file)
{
   using (var fs = File.OpenRead(file))
  {
       using (var sr = new StreamReader(fs))
      {
           while (true)
          {
               string line = await sr.ReadLineAsync();
               if(line == null)
              {
                   break;
              }
               yield return line;
          }

      }
  }
}

 

F12查看該接口的定義:

namespace System.Collections.Generic
{
   public interface IAsyncEnumerable<out T>
  {
       IAsyncEnumerator<T> GetAsyncEnumerator(CancellationTokencancellationToken = default);
  }
}

 

這是一個異步的迭代器,並提供了CancellationToken。再按F12查看IAsyncEnumerator<T>的定義,可發現里面是這樣的:

namespace System.Collections.Generic
{
   public interface IAsyncEnumerator<out T> : IAsyncDisposable
  {
       T Current { get; }
       ValueTask<bool> MoveNextAsync();
  }
}

 

這里MoveNextAsync()方法實際是返回了一個結果類型為boolTask,每次迭代都是可等待的,從而實現了迭代器的異步。

使用await foreach消費IAsyncEnumerable<T>

當我們做了以上改動之后,ReadAllLines()方法返回的是一個支持異步的IAsyncEnumerable,那么在使用的時候,也不能簡單的使用foreach了。修改Main方法如下:

static async Task Main(string[] args)
{
   Console.WriteLine("Input the file path:");
   var file = Console.ReadLine();
   var lines = ReadAllLines(file);
   await foreach (var line in lines)
  {
       Console.WriteLine(line);
  }
}

 

首先在foreach之前添加await關鍵字,還要需要將Main方法由void改為async Task。這樣整個程序都是異步執行了,不會再導致堵塞了。這個例子只是一個簡單的demo,是否使用異步並不會感覺到明顯的區別。如果在迭代內部需要比較重的操作,如從網絡獲取大量數據或讀取大量磁盤文件,異步的優勢還是會比較明顯的。

使用LINQ消費IAsyncEnumerable<T>

使用LINQ來操作集合是常用的功能。如果使用IEnumberable,在Main方法中可以做如下改動:

var lines = ReadAllLines(file);
var res = from line in lines where line.StartsWith("ERROR: ") selectline.Substring("ERROR: ".Length);
foreach (var line in res)
{
   Console.WriteLine(line);
}

 

或:

var res = lines.Where(x => x.StartsWith("ERROR: ")).Select(x => x.Substring("ERROR: ".Length));

 

如果使用了新的IAsyncEnumerable,你會發現無法使用Where等操作符了:

ErrorCS1936Could not find an implementation of the query pattern for source type 'IAsyncEnumerable<string>'. 'Where' not found.AsyncLinqDemoC:\Source\Workspaces\Console\AsyncLinqDemo\AsyncLinqDemo\Program.cs16Active

 

目前LINQ還沒有提供對IAsyncEnumerable的原生支持,不過微軟提供了一個Nuget包來實現此功能。在項目中打開Nuget Package Manger搜索安裝System.Linq.Async,注意該包目前還是預覽版,所以要勾選include prerelease才能看到。安裝該Nuget包后,Linq查詢語句中的錯誤就消失了。

System.Linq.Async這個包中,對每個同步的LINQ方法都做了相應的擴展。所以基本上代碼無需什么改動即可正常編譯。

對於LINQ中的條件語句,也可以使用WhereAwait()方法來支持await

public static IAsyncEnumerable<TSource> WhereAwait<TSource>(thisIAsyncEnumerable<TSource> source, Func<TSource, int, ValueTask<bool>>predicate);

 

如需要在條件語句中進行IO或網絡請求等異步操作,可以這樣用:

var res = lines.WhereAwait(async x => await DoSomeHeavyOperationsAsync(x));

 

DoSomeHeavyOperationsAsync方法的簽名如下:

private static ValueTask<bool> DoSomeHeavyOperationsAsync(string x)
{
   //Do some works...
}

 

小結

通過以上的示例,我們簡要了解了如何使用IAsyncEnumerable接口以及如何在LINQ中實現異步查詢。在使用該接口時,我們需要創建一個自定義方法返回IAsyncEnumerable<T>來代替IEnumberable<T>,這個方法可稱為async-iterator方法,需要注意以下幾點:

  • 該方法應該被聲明為async

  • 返回IAsyncEnumerable<T>

  • 同時使用awaityield。如await foreachyield returnyield break等。

例如:

async IAsyncEnumerable<int> GetValuesFromServer()
{
   while (true)
  {
       IEnumerable<int> batch = await GetNextBatch();
       if (batch == null) yield break;

       foreach (int item in batch)
      {
           yield return item;
      }
  }
}

 

此外還有一些限制:

  • 無法在tryfinally塊中使用任何形式的yield語句。

  • 無法在包含任何catch語句的try語句中使用yield return語句。

     

期待.NET Core 3的正式發布!

 

了解新西蘭IT行業真實碼農生活

請長按上方二維碼關注“程序員在新西蘭”


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM