第三十四節:.Proto文件剖析、gRPC的四種傳輸模式(一元和流式)和常用配置


一. 剖析.Proto文件

先上一個proto文件 

//proto的版本
syntax = "proto3";   

//此處可以不指定
//option csharp_namespace = "GrpcService1";

package greet;
// The greeting service definition.
// 方法定義,Greeter對應Greeter+Service類,需要去GreeterService類中實現
service Greeter {
  // Sends a greeting
  rpc SayHello (HelloRequest) returns (HelloReply);
  //下面都是自定義的一些方法
  rpc CommitUserInfor (UserInfor) returns (ReplyModel);
  //下面是流式相關的方法
  rpc TestStream1 (HelloRequest) returns (stream HelloReply);
  rpc TestStream2 (stream HelloRequest) returns (HelloReply);
  rpc TestStream3 (stream HelloRequest) returns (stream HelloReply);
}
// The request message containing the user's name.
// 此處的傳入的參數,生成的時候自動首字母大寫了,在調用的時候都是首字母大寫的
message HelloRequest {
  string userName = 1;
}
// The response message containing the greetings.
// 此處的返回的參數,生成的時候自動首字母大寫了,在調用的時候都是首字母大寫的
message HelloReply {
  string replyMsg = 1;
}
//下面是自定義的類
message UserInfor{
    string userName=1;
    string userAge=2;
    string userAddress=3;
}
message ReplyModel{
    string status=1;
    string msg=2;
}
View Code

1.service xxXX:里面聲明的基本格式,方法名、傳入參數實體、傳出參數實體。

2.message xxx:用來自定義實體類,里面的實體屬性后面需要 =1,2,3 代表的是第n個參數,沒有其它特別作用。

注:這里寫的參數在生成的時候會自動映射成大寫開頭的了,每個方法對應的實現需要去xxXXService中實現。

下面附上proto中的數據類型在各種語言中的對應:

 

更詳細的介紹可參考:

  https://www.jianshu.com/p/f6ff6381a81a
  https://www.cnblogs.com/sanshengshui/p/9739521.html

 

二. 搭建步驟(一元)

1.項目准備

 GrpcService1 服務端

 GrpcClient1 客戶端(控制台)

 GrpcClient2 客戶端(Core MVC)

2. 服務端搭建

(1).新建gRPC服務GrpcService1,會自動生產greet.proto 和GreeterService, 其中前者是用來聲明接收返回參數、服務方法的,后者是對前者方法的實現。

:  *.proto 文件中的每個"一元"服務方法將在用於調用方法的具體gRPC 客戶端類型上產生兩個.NET 方法:異步方法和同步方法。

代碼分享:

/// <summary>
    /// 方法實現類
    /// </summary>
    public class GreeterService : Greeter.GreeterBase
    {
        private readonly ILogger<GreeterService> _logger;
        public GreeterService(ILogger<GreeterService> logger)
        {
            _logger = logger;
        }

        /// <summary>
        /// 默認生成的一元方法
        /// </summary>
        /// <param name="request"></param>
        /// <param name="context"></param>
        /// <returns></returns>
        public override Task<HelloReply> SayHello(HelloRequest request, ServerCallContext context)
        {
            _logger.LogInformation($"【{DateTime.Now.ToString()}】收到客戶端發送的信息為:{request.UserName}");
            return Task.FromResult(new HelloReply
            {
                ReplyMsg =  request.UserName
            });
        }

        /// <summary>
        /// 自定義的一元方法
        /// </summary>
        /// <param name="request"></param>
        /// <param name="context"></param>
        /// <returns></returns>
        public override Task<ReplyModel> CommitUserInfor(UserInfor request, ServerCallContext context)
        {
            _logger.LogInformation($"【{DateTime.Now.ToString()}】收到客戶端發送的信息為:{request.UserName},{request.UserAge},{request.UserAddress}");
            return Task.FromResult(new ReplyModel
            {
                Status="ok",
                Msg=$"提交成功,{request.UserName},{request.UserAge},{request.UserAddress}"
            });
        }

        /// <summary>
        /// 服務器端流式,客戶端普通
        /// </summary>
        /// <param name="request"></param>
        /// <param name="responseStream"></param>
        /// <param name="context"></param>
        /// <returns></returns>
        public override async Task TestStream1(HelloRequest request, IServerStreamWriter<HelloReply> responseStream, ServerCallContext context)
        {
            var counter = 0;
            while (!context.CancellationToken.IsCancellationRequested)
            {
                var message = $"How are you {request.UserName}? {++counter}";
                _logger.LogInformation($"Sending greeting {message}.");
                await responseStream.WriteAsync(new HelloReply { ReplyMsg = message });
                // Gotta look busy
                await Task.Delay(1000);
            }
        }

        /// <summary>
        /// 客戶端流式,服務端普通
        /// </summary>
        /// <param name="requestStream"></param>
        /// <param name="context"></param>
        /// <returns></returns>
        public override async Task<HelloReply> TestStream2(IAsyncStreamReader<HelloRequest> requestStream, ServerCallContext context)
        {
            var counter = 0;
            await foreach (var request in requestStream.ReadAllAsync())
            {
                counter += Convert.ToInt32(request.UserName.Substring(3));
                _logger.LogInformation(request.UserName);
            }
            return new HelloReply { ReplyMsg = $"counter={counter}" };
        }

        /// <summary>
        /// 客戶端和服務端都是流式
        /// </summary>
        /// <param name="requestStream"></param>
        /// <param name="responseStream"></param>
        /// <param name="context"></param>
        /// <returns></returns>
        public override async Task TestStream3(IAsyncStreamReader<HelloRequest> requestStream, IServerStreamWriter<HelloReply> responseStream, ServerCallContext context)
        {
            var counter = 0;
            var lastSendCounter = 0;
            var cts = new CancellationTokenSource();
            _ = Task.Run(async () =>
            {
                while (!cts.IsCancellationRequested)
                {
                    if (counter != lastSendCounter)
                    {
                        await responseStream.WriteAsync(new HelloReply
                        {
                            ReplyMsg = $"counter={counter}"
                        });

                        lastSendCounter = counter;
                    }
                    await Task.Delay(TimeSpan.FromSeconds(1));
                }
            }, cts.Token);
            await foreach (var request in requestStream.ReadAllAsync())
            {
                counter += Convert.ToInt32(request.UserName.Substring(3));
                _logger.LogInformation(request.UserName);
            }
            cts.Cancel();
        }


    }
View Code

(2).為了使一個項目下server端和client端公用一個greet.proto,將Server端中greet.proto拷貝到一個本地文件中,然后通過添加鏈接的方式進行添加,這里有兩種方式:

 A. 選中依賴項→右鍵添加'添加鏈接的服務'→選中服務引用,添加新的gRPC服務(生成類型選擇‘服務端’)

 B. 選中Protos→右鍵添加現有項→找到對應的proto文件,將右下角的添加改為添加為鏈接(這種添加方式生成的類型為‘服務端和客戶端’)

 

選中該項目可以看到添加的proto路徑和模式:

 

(3).配置StartUp類

 ConfigureServices: 注冊grpc服務   services.AddGrpc();

 Configure:映射grpc服務類  endpoints.MapGrpcService<GreeterService>();

 

PS:以上兩步創建gRPC項目時候自動配置的。

(4).啟動方式

 這里使用的是默認的Kestrel啟動,並采用http2協議, Kestrel是一個跨平台的適用於 ASP.NET Core 的 Web 服務器,默認情況下,ASP.NET Core 項目模板使用 Kestrel。在“Program.cs”中,ConfigureWebHostDefaults 方法調用 UseKestrel.

詳見:appsettings.json

PS:以上創建gRPC項目時候自動配置的.

 

3. 客戶端搭建(控制台)

(1).新建控制台程序GrpcClient1,並通過Nuget安裝程序集:【Google.Protobuf 3.12.3】【Grpc.Net.Client 2.30.0】【Grpc.Tools 2.30.0】

PS:此處也可以不nuget程序集,因為在添加連接服務的時候,會自動引入(版本可能不是最新的)

(2).通過‘添加鏈接的服務'的模式添加greet.proto,生成模式選擇'客戶端',如下:

 

(3).然后創建通道,創建客戶端,調用模板默認生成的SayHelloAsync一元方法測試效果

代碼分享:

{
                using var channel = GrpcChannel.ForAddress("https://localhost:5001");
                var client1 = new Greeter.GreeterClient(channel);
                var client2 = new Greeter.GreeterClient(channel);
                var reply = await client1.SayHelloAsync(new HelloRequest { UserName = "ypf" });
                Console.WriteLine("返回的消息為: " + reply.ReplyMsg);
                var reply2 = await client2.CommitUserInforAsync(new UserInfor() { UserName = "ypf", UserAge = "20", UserAddress = "China" });
                Console.WriteLine($"返回的信息為:status={reply2.Status},msg={reply2.Msg}");
            }
View Code

PS:創建通道是開銷高昂的操作,重用通道可帶來性能優勢。客戶端是輕型對象,無需緩存或重復使用。一個通道可以創建多個客戶端,每個客戶端是線程安全的。

 

4. 客戶端搭建(Core Mvc)

(1).新建Core Mvc程序GrpcClient2,,通過Nuget安裝程序集:【Grpc.AspNetCore 2.30.0】

(2).通過‘添加鏈接的服務'的模式添加greet.proto,生成模式選擇'客戶端',如下:

 

(3).在ConfigureService中注冊客戶端,並HomeController中進行注入。

代碼分享:

public void ConfigureServices(IServiceCollection services)
  {
            services.AddControllersWithViews();
            //注冊grpc指定客戶端
            services.AddGrpcClient<GreeterClient>(o =>
            {
                o.Address = new Uri("https://localhost:5001");
            });
   }
View Code

(4).進行一元代碼的調用測試

代碼分享:

public class HomeController : Controller
    {
        public GreeterClient _client;
        private ILoggerFactory _loggerFactory;
        public HomeController(GreeterClient client, ILoggerFactory loggerFactory)
        {
            this._client = client;
            _loggerFactory = loggerFactory;
        }

        /// <summary>
        /// 客戶端調用grpc方法
        /// </summary>
        /// <returns></returns>
        public async Task<IActionResult> Index()
        {
            #region 一元調用
            {
                var reply = await _client.SayHelloAsync(new HelloRequest { UserName = "ypf" });
                ViewBag.msg1 = $"返回的消息為:{ reply.ReplyMsg}";
                var reply2 = await _client.CommitUserInforAsync(new UserInfor() { UserName = "ypf", UserAge = "20", UserAddress = "China" });
                ViewBag.msg2 = $"返回的消息為:status={reply2.Status},msg={reply2.Msg}";
            }
            #endregion

            return View();
        }
    }
View Code

 

5. 測試

 最終將:GrpcService1、GrpcClient1、GrpcClient2,按照這個順序設置同時啟動,進行測試哦,運行結果如下:

 

 

三. 傳輸模式

1. 一元調用

指從客戶端發送請求消息開始,服務結束后,返回響應消息

如:SayHelloAsync、CommitUserInforAsync均為一元調用,只有一元調用才會同時生成異步方法和同步方法

詳細代碼和運行結果見上述二的搭建步驟。

2.客戶端普通,服務器流式處理

指客戶端向服務端發送消息,服務端拿到消息后,以流的形式回傳給客戶端.

服務器流式處理調用從客戶端發送請求消息開始,使用 C# 8 或更高版本,則可使用 await foreach 語法來讀取消息。 IAsyncStreamReader<T>.ReadAllAsync() 擴展方法讀取響應數據流中的所有消息.

客戶端代碼:

          using var channel = GrpcChannel.ForAddress("https://localhost:5001");
                var client = new Greeter.GreeterClient(channel);
                var cts = new CancellationTokenSource();
                cts.CancelAfter(TimeSpan.FromSeconds(8));
                //8秒后變為取消標記
                using var call = client.TestStream1(new HelloRequest { UserName = "ypf" }, cancellationToken: cts.Token);
                try
                {
                    await foreach (var message in call.ResponseStream.ReadAllAsync())
                    {
                        Console.WriteLine("Greeting: " + message.ReplyMsg);
                    }
                }
                catch (RpcException ex) when (ex.StatusCode == StatusCode.Cancelled)
                {
                    Console.WriteLine("Stream cancelled.");
                }

服務端代碼:

        /// <summary>
        /// 服務器端流式,客戶端普通
        /// </summary>
        /// <param name="request"></param>
        /// <param name="responseStream"></param>
        /// <param name="context"></param>
        /// <returns></returns>
        public override async Task TestStream1(HelloRequest request, IServerStreamWriter<HelloReply> responseStream, ServerCallContext context)
        {
            var counter = 0;
            while (!context.CancellationToken.IsCancellationRequested)
            {
                //只要標記沒有變為取消, 每隔1s向客戶端發一條消息
                var message = $"How are you {request.UserName}? {++counter}";
                _logger.LogInformation($"Sending greeting {message}.");
                await responseStream.WriteAsync(new HelloReply { ReplyMsg = message });           
                await Task.Delay(1000);
            }
        }

運行結果:

3.客戶端流式處理,服務端普通

指客戶端以流的方式發送消息,客戶端無需發送消息即可開始客戶端流式處理調用 。 客戶端可選擇使用 RequestStream.WriteAsync 發送消息。

客戶端發送完消息后,應調用 RequestStream.CompleteAsync 來通知服務。 服務返回響應消息時,調用完成。

客戶端代碼:

          using var channel = GrpcChannel.ForAddress("https://localhost:5001");
                var client = new Greeter.GreeterClient(channel);
                Random random = new Random();
                //無需發送消息即可開始客戶端流式處理調用
                using var call = client.TestStream2();
                for (var i = 0; i < 6; i++)
                {
                    //開始發送消息
                    await call.RequestStream.WriteAsync(new HelloRequest { UserName = $"ypf{random.Next(1, 10)}" });
                    await Task.Delay(TimeSpan.FromSeconds(1));
                }
                //結束發送,通知服務端
                await call.RequestStream.CompleteAsync();
                var response = await call;
                Console.WriteLine($"Count: {response.ReplyMsg}");

服務端代碼:

        /// <summary>
        /// 客戶端流式,服務端普通
        /// </summary>
        /// <param name="requestStream"></param>
        /// <param name="context"></param>
        /// <returns></returns>
        public override async Task<HelloReply> TestStream2(IAsyncStreamReader<HelloRequest> requestStream, ServerCallContext context)
        {
            var counter = 0;
            await foreach (var request in requestStream.ReadAllAsync())
            {
                counter += Convert.ToInt32(request.UserName.Substring(3));
                _logger.LogInformation(request.UserName);
            }
            return new HelloReply { ReplyMsg = $"counter={counter}" };
        }

運行結果:

 

4.雙向流式處理方法

指客戶端和服務端都以流的方式發送消息

客戶端無需發送消息即可開始雙向流式處理調用,客戶端可選擇使用 RequestStream.WriteAsync 發送消息.

客戶端代碼:

          using var channel = GrpcChannel.ForAddress("https://localhost:5001");
                var client = new Greeter.GreeterClient(channel);
                using var call = client.TestStream3();
                //_ = 符號代表放棄,但仍執行
                _ = Task.Run(async () =>
                {
                    await foreach (var message in call.ResponseStream.ReadAllAsync())
                    {
                        Console.WriteLine(message.ReplyMsg);
                    }
                });
                Random random = new Random();
                while (true)
                {
                    await call.RequestStream.WriteAsync(new HelloRequest { UserName = $"ypf{random.Next(1, 10)}" });
                    await Task.Delay(TimeSpan.FromSeconds(2));
                }

服務端代碼:

       /// <summary>
        /// 客戶端和服務端都是流式
        /// </summary>
        /// <param name="requestStream"></param>
        /// <param name="responseStream"></param>
        /// <param name="context"></param>
        /// <returns></returns>
        public override async Task TestStream3(IAsyncStreamReader<HelloRequest> requestStream, IServerStreamWriter<HelloReply> responseStream, ServerCallContext context)
        {
            var counter = 0;
            var lastSendCounter = 0;
            var cts = new CancellationTokenSource();
            _ = Task.Run(async () =>
            {
                while (!cts.IsCancellationRequested)
                {
                    if (counter != lastSendCounter)
                    {
                        await responseStream.WriteAsync(new HelloReply
                        {
                            ReplyMsg = $"counter={counter}"
                        });

                        lastSendCounter = counter;
                    }
                    await Task.Delay(TimeSpan.FromSeconds(1));
                }
            }, cts.Token);
            await foreach (var request in requestStream.ReadAllAsync())
            {
                counter += Convert.ToInt32(request.UserName.Substring(3));
                _logger.LogInformation(request.UserName);
            }
            cts.Cancel();
        }

運行結果:

 

 

四. 常用配置

配置表格詳見:https://docs.microsoft.com/zh-cn/aspnet/core/grpc/configuration?view=aspnetcore-3.1

1. 服務端配置

(1).全局配置

public void ConfigureServices(IServiceCollection services)
{
    services.AddGrpc(options =>
    {
        options.EnableDetailedErrors = true; //開啟異常返回
        options.MaxReceiveMessageSize = 2 * 1024 * 1024; // 2 MB
        options.MaxSendMessageSize = 5 * 1024 * 1024; // 5 MB
    });
}    

PS:

 A.異常消息通常被視為不應泄露給客戶端的敏感數據。 默認情況下,gRPC 不會將 gRPC 服務引發的異常的詳細信息發送到客戶端。 相反,客戶端將收到一條指示出錯的一般消息。 向客戶端發送的異常消息可以通過EnableDetailedErrors重寫(例如,在開發或測試中)。 不應在生產應用程序中向客戶端公開異常消息。

 B.傳入消息到 gRPC 的客戶端和服務將加載到內存中。 消息大小限制是一種有助於防止 gRPC消耗過多資源的機制。gRPC 使用每個消息的大小限制來管理傳入消息和傳出消息。 默認情況下,gRPC 限制傳入消息的大小為 4 MB。 傳出消息沒有限制。

(2).為單個服務配置

public void ConfigureServices(IServiceCollection services)
{
    services.AddGrpc().AddServiceOptions<GreeterService>(options =>
  {   options.MaxReceiveMessageSize = 2 * 1024 * 1024; // 2 MB    options.MaxSendMessageSize = 5 * 1024 * 1024; // 5 MB   }); }

注:單個服務的配置優先級高於全局配置。

附服務端配置表格:

2. 客戶都配置

var channel = GrpcChannel.ForAddress("https://localhost:5001", new GrpcChannelOptions
{
  MaxReceiveMessageSize = 5 * 1024 * 1024, // 5 MB
  MaxSendMessageSize = 2 * 1024 * 1024 // 2 MB
});

附客戶端配置表格: 

 

 

 

 

 

!

  • 作       者 : Yaopengfei(姚鵬飛)
  • 博客地址 : http://www.cnblogs.com/yaopengfei/
  • 聲     明1 : 如有錯誤,歡迎討論,請勿謾罵^_^。
  • 聲     明2 : 原創博客請在轉載時保留原文鏈接或在文章開頭加上本人博客地址,否則保留追究法律責任的權利。
 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM