在 DotNetty 中實現同步請求


一、背景

DotNetty 本身是一個優秀的網絡通訊框架,不過它是基於異步事件驅動來處理另一端的響應,需要在單獨的 Handler 去處理相應的返回結果。而在我們的實際使用當中,尤其是 客戶端程序 基本都是 請求-響應 模型,在發送了數據時候需要等待服務器的響應才能進行下一步操作,如果服務器返回的是錯誤信息,則需要進行特殊的處理。

類似於下面這種方式:

public async void Button1_Click()
{
    var result = await DotNettyClient.SendData("Hello");
    
    if(result == "Error")
    {
        throw new Exception("服務器返回錯誤!");
    }
    
    Console.WriteLine($"Hello {result}");
}

二、解決思路

參閱了大部分資料之后,發現在 Java 的 Netty 當中可以使用 Future / Promise 來實現,那么 C# 是否有類似的組件呢?答案是有的,他們對應的就是 TaskTaskCompletionSource,前者是給調用者的任務,而后者則是用於設置響應任務的結果。

那么我們就可以這么來處理,當客戶端發送請求時,附帶唯一的一個請求 ID,並將 TaskCompletionSource 放在一個請求字典當中,請求 ID 作為字典的 Key,值是 TaskCompletionSource,之后返回一個 Task。當客戶端接收到服務器響應的時候,通過 TaskCompletionSource 設置之前那個 Task 的結果,這樣我們接收到響應之后,就會從之前 await 的地方繼續執行。

這里我自己的需求僅僅是類似於 同步阻塞式 的操作,所以我直接使用一個隊列來做簡單處理,並沒有用唯一的請求 ID 來表示不同的請求,也沒有使用字典,因為我可以 保證在同一時間內有且僅有一個客戶端請求被發起,而且也做了響應的超時處理機制。

三、代碼實現

實現起來超級簡單,只需要在發起請求的時候,創建一個 TaskCompletionSource<TResponse> 對象。這個泛型參數指的是你想要的返回值類型,這里我以 TResponse 代替,下面的 DEMO 我會用 string 類型進行演示。

創建好一個 TaskCompletionSource<TResponse> 之后,在發送方法里面,我們可以將其對象放在一個先進先出的隊列當中,然后將其 Task 屬性作為發送方法的返回值。

我們再來到處理服務器響應的 Handler 當中,從隊列里面拿去之前存放的 TaskCompletionSource<TResponse> 對象,調用其 SetResult() 方法,將具體響應進行設置。

通過以上的操作,我們在發送數據的時候,就可以使用 await 關鍵字等待服務端的響應,但不會阻塞線程,當客戶端接收到服務端響應時,就會恢復到之前 await 的位置繼續執行。

數據發送方法:

public static class DotNettyClient
{
    static DotNettyClient()
    {
        RequestQueue = new Queue<TaskCompletionSource<string>>();
    }
    
    public static Queue<TaskCompletionSource<string>> RequestQueue { get; set; }
    
    public static async Task<string> SendData(string data)
    {
        var resultTask = new TaskCompletionSource<string>();
        
        var buffer = new Unpooled.Buffer();
        buffer.WriteBytes(Encoding.UTF8.GetBytes(data));
        await _clientChannel.WriteAndFlushAsync(buffer);
        
        RequestQueue.Enqueue(resultTask);
        
        return await resultTask.Task;
    }
}

服務端響應處理:

public class ProtocolHandler : ChannelHandlerAdapter
{
	public override void ChannelRead(IChannelHandlerContext context, object message)
	{
		if(message is string response)
		{
			if(!DotNettyClient.RequestQueue.TryDequeue(out TaskCompletionSource<string> result)) return;
			result.SetResult(response);
		}
	}
}

這里我就不再編寫解析器,主要說明一下代碼的思路,下面在使用的時候就如同第一節說的一樣,直接使用 await 關鍵字等待響應結果即可。

四、缺陷

在這里我並沒有展示多個異步請求的情況,如果是用戶同時發起多個請求的時候,你可以通過數據的唯一 ID 來標識每一個請求,讀取時根據唯一 ID 從字典獲取數據,這樣在接收服務端響應的時候就能處理這種情況了。

五、參考資料


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM