在.NET Core中使用Channel(二)


在我們之前的文章中,看了一些非常簡單的例子來說明Channel是如何工作的,我們看到了一些非常漂亮的特性,但大多數情況下它與其他某某Queue實現非常相似。讓我們進入一些更高級的話題。我說的是高級,但其中很多都非常簡單。

讀/寫分離

如果你曾經在兩個類之間共享隊列,你就會知道任何一個類都可以讀/寫,即使它們本不應該這樣做。例如:

class MyProducer
{
    private readonly Queue<int> _queue;

    public MyProducer(Queue<int> queue)
    {
        _queue = queue;
    }
}

class MyConsumer
{
    private readonly Queue<int> _queue;

    public MyConsumer(Queue<int> queue)
    {
        _queue = queue;
    }
}

因此,生產者應該只寫隊列,消費者應該只讀隊列,在這兩種情況下,它們可以對隊列執行所有操作。雖然你可能在自己的腦海中希望消費者只讀取,但另一個開發人員可能會出現調用Enqueue,除了代碼審查之外,沒有什么可以阻止他們犯這個錯誤。

但是有了Channel,我們可以做不同的事情。

class Program
{
    static async Task Main(string[] args)
    {
        var myChannel = Channel.CreateUnbounded<int>();
        var producer = new MyProducer(myChannel.Writer);
        var consumer = new MyConsumer(myChannel.Reader);
    }
}

class MyProducer
{
    private readonly ChannelWriter<int> _channelWriter;

    public MyProducer(ChannelWriter<int> channelWriter)
    {
        _channelWriter = channelWriter;
    }
}

class MyConsumer
{
    private readonly ChannelReader<int> _channelReader;

    public MyConsumer(ChannelReader<int> channelReader)
    {
        _channelReader = channelReader;
    }
}

在這個例子中,我添加了一個main方法來向你展示如何創建writer/reader,但它非常簡單。這里我們可以看到,對於我們的生產者,我只傳遞給它一個ChannelWriter,所以它只能做寫操作。對於我們的消費者,我們傳遞給它一個ChannelReader,所以它只能讀取。

當然,這並不意味着其他開發人員不能修改代碼並開始注入根Channel對象,或者同時傳入ChannelWriter/ChannelReader,但這至少比之前的情況要好得多。

完成一個Channel

我們在前面看到,當在通道上調用ReadAsync()時,它實際上會在那里等待消息,但是如果沒有更多的消息到來呢?對於.net中的其他隊列,我們通常需要傳遞某種共享的布爾值或一個CancellationToken。但有了Channel,就更容易了。

考慮以下幾點:

static async Task Main(string[] args)
{
    var myChannel = Channel.CreateUnbounded<int>();

    _ = Task.Factory.StartNew(async () =>
    {
        for (int i = 0; i < 10; i++)
        {
            await myChannel.Writer.WriteAsync(i);
        }

        myChannel.Writer.Complete();
    });

    try
    {
        while (true)
        {
            var item = await myChannel.Reader.ReadAsync();
            Console.WriteLine(item);
            await Task.Delay(1000);
        }
    }catch(ChannelClosedException e)
    {
        Console.WriteLine("Channel was closed!");
    }
}

我讓第二個線程盡可能快地寫入我們的Channel,然后完成它。然后我們的讀取器緩慢讀取,每次讀取之間有1秒的延遲。注意,我們捕獲了ChannelClosedExecption,當你嘗試從關閉通道讀取最后消息之后時將調用它。

我只是想說清楚。在Channel上調用Complete()不會立即關閉通道並殺死讀取該通道的所有人。而是通知所有服務,一旦最后一條消息被讀取,我們就完成了。這很重要,因為這意味着當我們等待新條目時,當隊列是空的時,當隊列是滿的時,是否調用Complete()都無關緊要。我們可以肯定,我們將完成所有可得到的工作。

在Channel中使用IAsyncEnumerable

以我們試圖關閉一個Channel為例,有兩件事引起了我的注意。

我們有一個while(true)循環。這並不是真的那么糟糕,但它有點礙眼。

為了打破這個循環,並知道Channel已經完成,我們必須捕獲異常並將其吞下。

使用命令“ReadAllAsync()”來解決這些問題,它返回一個IAsyncEnumerable。代碼看起來有點像這樣:

static async Task Main(string[] args)
{
    var myChannel = Channel.CreateUnbounded<int>();

    _ = Task.Factory.StartNew(async () =>
    {
        for (int i = 0; i < 10; i++)
        {
            await myChannel.Writer.WriteAsync(i);
        }

        myChannel.Writer.Complete();
    });

    await foreach(var item in myChannel.Reader.ReadAllAsync())
    {
        Console.WriteLine(item);
        await Task.Delay(1000);
    }
}

現在代碼讀起來好多了,並且去掉了捕獲異常的一些多余的東西。因為我們使用的是IAsyncEnumerable,所以我們仍然可以像以前那樣等待每一項,但是我們不再需要捕獲異常,因為當Channel完成時,它只是簡單地說沒有其他東西了,然后循環退出。

同樣,這消除了在處理隊列時必須編寫的一些混亂代碼。以前你必須編寫某種無限循環,而現在它只是一個真正整潔的循環,可以處理底層的所有東西。

接下來是什么

到目前為止,我們一直在使用“無限的”通道。你可能已經猜到了,當然也可以選擇使用BoundedChannel。查看本系列的下一部分,更好地理解這些東西。

 歡迎關注我的公眾號,如果你有喜歡的外文技術文章,可以通過公眾號留言推薦給我。

 

原文鏈接:https://dotnetcoretutorials.com/2020/11/24/using-channels-in-net-core-part-2-advanced-channels/


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM