Asp.Net Core 3.0之后,對Grpc提供了高集成度的支持,對於需要連續傳輸大批量對象數據的應用場景而言,等於多了一條高鐵線路。如果沒有Grpc,連續傳輸大批量對象數據是一個很糾結的問題。用TCP的話,可以達到最高速度,但是傳輸過程的斷線續傳,對象數據的序列化和反序列化都要自己處理,開發效率低效。用HTTP的話,要頻繁調用POST,反復建立連接,傳輸性能差。Grpc能夠一次建立傳輸通道,多次傳輸對象數據,自動序列化和反序列化,並且采用ProtoBuf協議序列化對象數據,壓縮率接近二進制byte數組,實現了TCP的性能優勢和HTTP POST的使用方便性的完美結合。
但是Asp.Net Core使用proto文件定義傳輸對象比較費事,對於已經存在的Asp.Net Core Web項目,已經定義了很多DTO類,服務端和客戶端還有其他數據傳輸方式,例如MQTT,HTTP等等,為了Grpc重新寫一大堆代碼,非常麻煩。所以決定尋找能夠復用C#對象的Grpc解決方案。最終找到了這篇文章,使用protobuf-net.Grpc.AspNetCore解決了我的需求,非常感謝作者ElderJames。
https://www.cnblogs.com/ElderJames/p/code-first-generate-gRPC-services-and-clients-in-dotnet-core-3_0.html
《_NET Core 3.0中用 Code-First 方式創建 gRPC 服務與客戶端 - Elder_James - 博客園.html》
決定寫了一個Demo做練習。實現的需求是,客戶端連續發送帶有byte數組的對象到服務端,服務端保存對象,服務端會返回保存成功標志,客戶端可以根據服務器的響應動態改變發送內容。
新建Net Standar類庫GrpcShare,NuGet安裝庫
<PackageReference Include="Microsoft.Bcl.AsyncInterfaces" Version="1.1.0" />
<PackageReference Include="System.ServiceModel.Primitives" Version="4.7.0" />
定義DTO對象,服務類接口
/// <summary> /// 文本消息 /// </summary> [DataContract] public class Message { /// <summary> /// 內容 /// </summary> [DataMember(Order = 1)] public string Context { get; set; } } /// <summary> /// 上傳數據包請求 /// </summary> [DataContract] public class UploadRequest { /// <summary> /// 數據包索引 /// </summary> [DataMember(Order = 1)] public int Index { get; set; } /// <summary> /// 采樣時間 /// </summary> [DataMember(Order = 2)] public DateTime SampleTime { get; set; } /// <summary> /// 內容 /// </summary> [DataMember(Order = 3)] public byte[] Content { get; set; } public override string ToString() { return $"發送第{Index}包數據, {SampleTime}, 長度={Content.Length}"; } } /// <summary> /// 上傳數據包應答 /// </summary> [DataContract] public class UploadReply { /// <summary> /// 數據包索引 /// </summary> [DataMember(Order = 1)] public int Index { get; set; } /// <summary> /// 保存到數據庫成功標志 /// </summary> [DataMember(Order = 2)] public bool ArchiveSuccess { get; set; } public override string ToString() { return $"收到第{Index}包數據, 保存成功標志={ArchiveSuccess}"; } } /// <summary> /// 上傳數據包接口 /// </summary> [ServiceContract] public interface IUpload { /// <summary> /// 簡單測試 /// </summary> /// <param name="message"></param> /// <returns></returns> [OperationContract] ValueTask<string> Hi(string message); /// <summary> /// 測試 /// </summary> /// <param name="message"></param> /// <returns></returns> [OperationContract] ValueTask<Message> Hello(Message message); /// <summary> /// 雙向流式上傳數據包 /// 注意IAsyncEnumerable需要NuGet安裝Microsoft.Bcl.AsyncInterfaces,不是System.Interactive.Async /// </summary> /// <param name="stream"></param> /// <returns></returns> [OperationContract] IAsyncEnumerable<UploadReply> Upload(IAsyncEnumerable<UploadRequest> stream); }
新建Asp.net Core Web Api項目GrpcDemo,NuGet安裝庫
<PackageReference Include="protobuf-net.Grpc.AspNetCore" Version="1.0.22" />
Program定義Grpc服務端口
public static IHostBuilder CreateHostBuilder(string[] args) => Host.CreateDefaultBuilder(args) .ConfigureWebHostDefaults(webBuilder => { webBuilder .ConfigureKestrel(options => { options.ListenLocalhost(9988, listenOptions => { listenOptions.Protocols = HttpProtocols.Http2; }); }) .UseStartup<Startup>(); });
Startup添加Grpc服務和路由
public void ConfigureServices(IServiceCollection services) { services.AddControllers(); //添加Grpc服務 services.AddCodeFirstGrpc(); } app.UseEndpoints(endpoints => { endpoints.MapControllers(); //添加Grpc路由 endpoints.MapGrpcService<UploadService>(); });
實現服務類UploadService
/// <summary> /// 上傳數據包服務 /// </summary> public class UploadService : IUpload { /// <summary> /// 簡單測試 /// </summary> /// <param name="message"></param> /// <returns></returns> public ValueTask<string> Hi(string message) { Console.WriteLine($"收到客戶端問候={message}"); return new ValueTask<string>("Hi,我是UploadService"); } /// <summary> /// 測試 /// </summary> /// <param name="message"></param> /// <returns></returns> public ValueTask<Message> Hello(Message message) { Console.WriteLine($"收到客戶端問候={message.Context}"); var reply = new Message() { Context = "Hello,我是UploadService", }; return new ValueTask<Message>(reply); } /// <summary> /// 雙向流式上傳數據包 /// </summary> /// <param name="stream"></param> /// <returns></returns> public async IAsyncEnumerable<UploadReply> Upload(IAsyncEnumerable<UploadRequest> stream) { await foreach (var request in stream) { Console.WriteLine(request); await Task.Delay(TimeSpan.FromSeconds(1)); var reply = new UploadReply { Index = request.Index, //模擬保存失敗 ArchiveSuccess = (DateTime.Now.Second % 3 < 2), }; yield return reply; } Console.WriteLine($"客戶端關閉連接"); } }
新建Net Core控制台項目UploadClient,NuGet安裝庫
<PackageReference Include="Grpc.Net.Client" Version="2.26.0" />
<PackageReference Include="protobuf-net.Grpc" Version="1.0.22" />
簡單測試很容易
//如果服務端沒有加密傳輸,客戶端必須設置 GrpcClientFactory.AllowUnencryptedHttp2 = true; using var http = GrpcChannel.ForAddress("http://localhost:9988"); var client = http.CreateGrpcService<IUpload>(); //簡單測試 string request1 = "Hi, 我是UploadClient"; Console.WriteLine(request1); var result1 = await client.Hi(request1); Console.WriteLine($"收到服務端回應={result1}");
在實現雙向流式,交互式傳輸時,遇到一個問題,客戶端如果需要根據服務端的響應,動態調整發送內容,該怎么辦呢?客戶端發送的參數是一個IAsyncEnumerable函數,它怎么把服務端響應作為參數再輸入進入?
await foreach (var reply in client.Upload(SendPackage()))
我沒有找到現成的例子,從ElderJames的例子受到啟發,把服務端響應放到一個Queue中,客戶端定期讀取隊列,算是勉強解決了這個問題。當然還有其他很多辦法,例如收到服務端響應發布一個消息事件,在事件處理函數中修改客戶端發送內容等。但是總覺得不夠簡便。
//流式上傳數據包 await foreach (var reply in client.Upload(SendPackage())) { //收到服務端回應后,丟到FIFO replyQueue.Enqueue(reply); } private static async IAsyncEnumerable<UploadRequest> SendPackage() { //上傳第一包數據 var request = new UploadRequest { Index = 1, SampleTime = DateTime.Now, Content = Encoding.UTF8.GetBytes(DateTime.Now.ToString()), }; Console.WriteLine(request); yield return request; while (request.Index < 10) { await Task.Delay(TimeSpan.FromSeconds(1)); //從FIFO取出服務端回應 if (!replyQueue.TryDequeue(out UploadReply reply)) continue; Console.WriteLine($"收到服務端回應={reply}"); if (reply.ArchiveSuccess) { //如果服務端存檔成功,上傳下一包數據 request = new UploadRequest { Index = reply.Index + 1, SampleTime = DateTime.Now, Content = Encoding.UTF8.GetBytes(DateTime.Now.ToString()), }; } else { //如果服務端存檔失敗,重傳上一包數據 } Console.WriteLine(request); yield return request; } }
這個DEMO的代碼地址:
https://github.com/woodsun2018/AspNetCoreGrpcDemo