Asp.Net Core Grpc使用C#對象取代Proto定義


Asp.Net Core 3.0之后,對Grpc提供了高集成度的支持,對於需要連續傳輸大批量對象數據的應用場景而言,等於多了一條高鐵線路。如果沒有Grpc,連續傳輸大批量對象數據是一個很糾結的問題。用TCP的話,可以達到最高速度,但是傳輸過程的斷線續傳,對象數據的序列化和反序列化都要自己處理,開發效率低效。用HTTP的話,要頻繁調用POST,反復建立連接,傳輸性能差。Grpc能夠一次建立傳輸通道,多次傳輸對象數據,自動序列化和反序列化,並且采用ProtoBuf協議序列化對象數據,壓縮率接近二進制byte數組,實現了TCP的性能優勢和HTTP POST的使用方便性的完美結合。

 

但是Asp.Net Core使用proto文件定義傳輸對象比較費事,對於已經存在的Asp.Net Core Web項目,已經定義了很多DTO類,服務端和客戶端還有其他數據傳輸方式,例如MQTT,HTTP等等,為了Grpc重新寫一大堆代碼,非常麻煩。所以決定尋找能夠復用C#對象的Grpc解決方案。最終找到了這篇文章,使用protobuf-net.Grpc.AspNetCore解決了我的需求,非常感謝作者ElderJames。

https://www.cnblogs.com/ElderJames/p/code-first-generate-gRPC-services-and-clients-in-dotnet-core-3_0.html

《_NET Core 3.0中用 Code-First 方式創建 gRPC 服務與客戶端 - Elder_James - 博客園.html》

 

決定寫了一個Demo做練習。實現的需求是,客戶端連續發送帶有byte數組的對象到服務端,服務端保存對象,服務端會返回保存成功標志,客戶端可以根據服務器的響應動態改變發送內容。

新建Net Standar類庫GrpcShare,NuGet安裝庫

<PackageReference Include="Microsoft.Bcl.AsyncInterfaces" Version="1.1.0" />

<PackageReference Include="System.ServiceModel.Primitives" Version="4.7.0" />

定義DTO對象,服務類接口

    /// <summary>
    /// 文本消息
    /// </summary>
    [DataContract]
    public class Message
    {
        /// <summary>
        /// 內容
        /// </summary>
        [DataMember(Order = 1)]
        public string Context { get; set; }
    }

    /// <summary>
    /// 上傳數據包請求
    /// </summary>
    [DataContract]
    public class UploadRequest
    {
        /// <summary>
        /// 數據包索引
        /// </summary>
        [DataMember(Order = 1)]
        public int Index { get; set; }

        /// <summary>
        /// 采樣時間
        /// </summary>
        [DataMember(Order = 2)]
        public DateTime SampleTime { get; set; }

        /// <summary>
        /// 內容
        /// </summary>
        [DataMember(Order = 3)]
        public byte[] Content { get; set; }

        public override string ToString()
        {
            return $"發送第{Index}包數據, {SampleTime}, 長度={Content.Length}";
        }
    }

    /// <summary>
    /// 上傳數據包應答
    /// </summary>
    [DataContract]
    public class UploadReply
    {
        /// <summary>
        /// 數據包索引
        /// </summary>
        [DataMember(Order = 1)]
        public int Index { get; set; }

        /// <summary>
        /// 保存到數據庫成功標志
        /// </summary>
        [DataMember(Order = 2)]
        public bool ArchiveSuccess { get; set; }

        public override string ToString()
        {
            return $"收到第{Index}包數據, 保存成功標志={ArchiveSuccess}";
        }
    }

    /// <summary>
    /// 上傳數據包接口
    /// </summary>
    [ServiceContract]
    public interface IUpload
    {
        /// <summary>
        /// 簡單測試
        /// </summary>
        /// <param name="message"></param>
        /// <returns></returns>
        [OperationContract]
        ValueTask<string> Hi(string message);

        /// <summary>
        /// 測試
        /// </summary>
        /// <param name="message"></param>
        /// <returns></returns>
        [OperationContract]
        ValueTask<Message> Hello(Message message);

        /// <summary>
        /// 雙向流式上傳數據包
        /// 注意IAsyncEnumerable需要NuGet安裝Microsoft.Bcl.AsyncInterfaces,不是System.Interactive.Async
        /// </summary>
        /// <param name="stream"></param>
        /// <returns></returns>
        [OperationContract]
        IAsyncEnumerable<UploadReply> Upload(IAsyncEnumerable<UploadRequest> stream);
    }

新建Asp.net Core Web Api項目GrpcDemo,NuGet安裝庫

<PackageReference Include="protobuf-net.Grpc.AspNetCore" Version="1.0.22" />

 

Program定義Grpc服務端口

public static IHostBuilder CreateHostBuilder(string[] args) =>
            Host.CreateDefaultBuilder(args)
                .ConfigureWebHostDefaults(webBuilder =>
                {
                    webBuilder
                        .ConfigureKestrel(options =>
                        {
                            options.ListenLocalhost(9988, listenOptions =>
                            {
                                listenOptions.Protocols = HttpProtocols.Http2;
                            });
                        })
                        .UseStartup<Startup>();
                });

Startup添加Grpc服務和路由

public void ConfigureServices(IServiceCollection services)
        {
            services.AddControllers();

            //添加Grpc服務
            services.AddCodeFirstGrpc();
        }

app.UseEndpoints(endpoints =>
            {
                endpoints.MapControllers();

                //添加Grpc路由
                endpoints.MapGrpcService<UploadService>();
            });

實現服務類UploadService

/// <summary>
    /// 上傳數據包服務
    /// </summary>
    public class UploadService : IUpload
    {
        /// <summary>
        /// 簡單測試
        /// </summary>
        /// <param name="message"></param>
        /// <returns></returns>
        public ValueTask<string> Hi(string message)
        {
            Console.WriteLine($"收到客戶端問候={message}");

            return new ValueTask<string>("Hi,我是UploadService");
        }

        /// <summary>
        /// 測試
        /// </summary>
        /// <param name="message"></param>
        /// <returns></returns>
        public ValueTask<Message> Hello(Message message)
        {
            Console.WriteLine($"收到客戶端問候={message.Context}");

            var reply = new Message()
            {
                Context = "Hello,我是UploadService",
            };

            return new ValueTask<Message>(reply);
        }

        /// <summary>
        /// 雙向流式上傳數據包
        /// </summary>
        /// <param name="stream"></param>
        /// <returns></returns>
        public async IAsyncEnumerable<UploadReply> Upload(IAsyncEnumerable<UploadRequest> stream)
        {
            await foreach (var request in stream)
            {
                Console.WriteLine(request);

                await Task.Delay(TimeSpan.FromSeconds(1));

                var reply = new UploadReply
                {
                    Index = request.Index,
                    //模擬保存失敗
                    ArchiveSuccess = (DateTime.Now.Second % 3 < 2),
                };

                yield return reply;
            }

            Console.WriteLine($"客戶端關閉連接");
        }
    }

新建Net Core控制台項目UploadClient,NuGet安裝庫

    <PackageReference Include="Grpc.Net.Client" Version="2.26.0" />

    <PackageReference Include="protobuf-net.Grpc" Version="1.0.22" />

 

簡單測試很容易

            //如果服務端沒有加密傳輸,客戶端必須設置
            GrpcClientFactory.AllowUnencryptedHttp2 = true;

            using var http = GrpcChannel.ForAddress("http://localhost:9988");
            var client = http.CreateGrpcService<IUpload>();

            //簡單測試
            string request1 = "Hi, 我是UploadClient";
            Console.WriteLine(request1);

            var result1 = await client.Hi(request1);
            Console.WriteLine($"收到服務端回應={result1}");

在實現雙向流式,交互式傳輸時,遇到一個問題,客戶端如果需要根據服務端的響應,動態調整發送內容,該怎么辦呢?客戶端發送的參數是一個IAsyncEnumerable函數,它怎么把服務端響應作為參數再輸入進入?

await foreach (var reply in client.Upload(SendPackage()))

 

我沒有找到現成的例子,從ElderJames的例子受到啟發,把服務端響應放到一個Queue中,客戶端定期讀取隊列,算是勉強解決了這個問題。當然還有其他很多辦法,例如收到服務端響應發布一個消息事件,在事件處理函數中修改客戶端發送內容等。但是總覺得不夠簡便。

//流式上傳數據包
            await foreach (var reply in client.Upload(SendPackage()))
            {
                //收到服務端回應后,丟到FIFO
                replyQueue.Enqueue(reply);
            }

private static async IAsyncEnumerable<UploadRequest> SendPackage()
        {
            //上傳第一包數據
            var request = new UploadRequest
            {
                Index = 1,
                SampleTime = DateTime.Now,
                Content = Encoding.UTF8.GetBytes(DateTime.Now.ToString()),
            };

            Console.WriteLine(request);

            yield return request;

            while (request.Index < 10)
            {
                await Task.Delay(TimeSpan.FromSeconds(1));

                //從FIFO取出服務端回應
                if (!replyQueue.TryDequeue(out UploadReply reply))
                    continue;

                Console.WriteLine($"收到服務端回應={reply}");

                if (reply.ArchiveSuccess)
                {
                    //如果服務端存檔成功,上傳下一包數據
                    request = new UploadRequest
                    {
                        Index = reply.Index + 1,
                        SampleTime = DateTime.Now,
                        Content = Encoding.UTF8.GetBytes(DateTime.Now.ToString()),
                    };
                }
                else
                {
                    //如果服務端存檔失敗,重傳上一包數據
                }

                Console.WriteLine(request);

                yield return request;
            }
        }

這個DEMO的代碼地址:

https://github.com/woodsun2018/AspNetCoreGrpcDemo

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM