在.NET Core中使用Channel(一)


我最近一直在熟悉.net Core中引入的新Channel<T>類型。我想在它第一次發布的時候我了解過它,但是有關文章非常非常少,我不能理解它們與其他隊列有什么不同。

在使用了一段時間后,我終於看到了它們的吸引力和真正的力量。最值得注意的是大型異步后台操作,這些操作幾乎需要雙向通信來同步它們正在做的事情。這句話有點拗口,但希望在本系列文章結束時,你會清楚什么時候應該使用Channel<T>,什么時候應該使用一些更基本的東西,比如Queue<T>。

Channel是什么?

從核心來說,Channel本質上是.net中的一種新的集合類型,它與現有的Queue<T>類型非常相似,但有額外的好處。在真正嘗試研究這個主題時,我發現的問題是,許多現有的外部隊列技術(IBM MQ、Rabbit MQ等)都有“channel”的概念,它們的范圍從完全抽象的思維過程,到系統中實際的物理類型。

現在也許我大錯特錯,但如果你認為.net中的Channel就好比是允許等待新消息的一個隊列,並告訴生產者要保持隊列越來越大,消費者無法跟上,我認為這很難出錯。

這里我提到了一個關鍵詞,生產者/消費者。你可能還聽說過Pub/Sub。但它們是不可互換的。

Pub/Sub描述的是某人發布信息,一個或多個“訂閱者”監聽該信息並對其采取一定的響應行為。這里不存在負載平衡,因為當添加訂閱服務器時,它們本質上與其他所有人獲得相同消息的副本。

在圖表形式中,Pub/Sub看起來有點像這樣:

生產者/消費者描述生產者發布消息的行為,並且有一個或多個消費者可以對該消息進行操作,但是每個消息只讀取一次。它不會分發到每個訂閱者。

當然,用圖表的形式:

另一種思考生產者/消費者的方式是想象你去超市結賬。當顧客想結帳時,排隊的隊伍變長了,你可以簡單地打開更多的收銀台來處理這些顧客。這個小小的思考過程實際上是很重要的,因為如果你不能打開更多的收銀台怎么辦?排隊的隊伍應該越來越長嗎?如果收銀台操作員坐在那里,但沒有顧客怎么辦?他們是應該當天就打包回家呢,還是應該被告知坐着等客人來了再說。

這通常被稱為生產者-消費者問題,這是Channel要解決的問題。

基礎Channel示例

與Channel有關的所有東西都在System.Threading.Channels中。在以后的版本中,這似乎是與標准的.net Core項目捆綁在一起的,但如果不是,這里有一個nuget包:https://www.nuget.org/packages/System.Threading.Channels。

一個極其簡單的Channel示例是這樣的:

static async Task Main(string[] args)
{
    var myChannel = Channel.CreateUnbounded();

    for (int i = 0; i < 10; i++)
    {
        await myChannel.Writer.WriteAsync(i);
    }

    while (true)
    {
        var item = await myChannel.Reader.ReadAsync();
        Console.WriteLine(item);
    }
}

這里沒有太多可談的。我們創建了一個“無限的”通道(這意味着它可以容納無限項,但在本系列的后續內容中會有更多內容)。我們寫10項,讀10項,在這一點上,它與我們在.net中見過的任何其他隊列沒有太大區別。

Channel是線程安全的

沒錯,通道是線程安全的。這意味着多個線程可以讀寫同一個通道而不會出現問題。如果我們看一下這里的Channel源代碼,我們可以看到它是線程安全的,因為它使用鎖和內部“隊列”的組合來同步讀/寫器,一個接一個地讀/寫。

實際上,Channel的預期用例是多線程場景。例如,如果我們使用上面的基本代碼,當我們實際上不需要線程安全性時,維護線程安全性實際上會有一些開銷。所以在那個例子中,我們可能只使用Queue<T>更好。但是這段代碼呢?

static async Task Main(string[] args)
{
    var myChannel = Channel.CreateUnbounded();

    _ = Task.Factory.StartNew(async () =>
    {
        for (int i = 0; i < 10; i++)
        {
            await myChannel.Writer.WriteAsync(i);
            await Task.Delay(1000);
        }
    });

    while (true)
    {
        var item = await myChannel.Reader.ReadAsync();
        Console.WriteLine(item);
    }
}

在這里,我們有一個單獨的線程寫入消息,而我們的主線程讀取消息。你會注意到有趣的事情是,我們添加了消息之間的延遲。怎么能調用ReadAsync()?沒有TryDequeue或Dequeue,如果隊列中沒有消息,它就運行null,對嗎?

答案是Channel Reader的“ReadAsync()”方法實際上會“等待”一個消息。因此,不需要在等待消息時執行一些荒謬的循環,也不需要在等待時完全阻塞線程。我們將在以后的文章中進一步討論這個問題,但是你要知道你可以使用ReadAsync來等待新的消息,而不是編寫一些自定義的代碼來做同樣的事情。

接下來是什么?

現在你已經掌握了基礎知識,下一篇讓我們看看使用Channel一些更高級的場景。

 歡迎關注我的公眾號,如果你有喜歡的外文技術文章,可以通過公眾號留言推薦給我。

 

原文鏈接:https://dotnetcoretutorials.com/2020/11/24/using-channels-in-net-core-part-1-getting-started/

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM