.NET Core + gRPC 實現數據串流 (Streaming)


引入

gRPC 是谷歌推出的一個高性能優秀的 RPC 框架,基於 HTTP/2 實現。並且該框架對 .NET Core 有着優秀的支持。
最近在做一個項目正好用到了 gRPC,遇到了需要串流傳輸的問題。

項目創建

首先還是需要安裝 .net core sdk,可以去 http://dot.net 下載。這里我使用的是 2.2.103 版本的 sdk。
mkdir RpcStreaming
cd RpcStreaming
dotnet new console
dotnet add package Grpc // 添加 gRPC 包
dotnet add package Grpc.Tools // 添加 gRPC 工具包
dotnet add package Google.Protobuf // 添加 Protobuf 支持

然后為了支持 protobuf 語言,我們需要修改項目配置文件,在項目中引入 .proto 文件以便生成對應的代碼。

在 RpcStreaming.csproj 中,加入<Protobuf Include="**/*.proto" />,除此之外還需要啟用最新語言支持(C# 7.3),方便我們將 Main 函數直接寫為 async 函數,直接設置為最新版本的語言即可,如下所示:
<Project Sdk="Microsoft.NET.Sdk">
  ...
  <PropertyGroup>
    ...
    <LangVersion>latest</LangVersion>
    ...
  </PropertyGroup>
  
  <ItemGroup>
    ...
    <Protobuf Include="**/*.proto" />
    ...
  </ItemGroup>
  ...
</Project>

這里我們使用了 wildcard 語法匹配了項目內的全部 proto 文件用於生成對應的代碼。

到這里,項目的創建就完成了。

編寫 Proto 文件

我們在項目目錄下建立一個 .proto 文件,用於描述 rpc 調用和消息類型。比如:RpcStreaming.proto
內容如下:
 1 synatx = "proto3";
 2 service RpcStreamingService {
 3   rpc GetStreamContent (StreamRequest) returns (stream StreamContent) {}
 4 }
 5 message StreamRequest {
 6   string fileName = 1;
 7 }
 8 message StreamContent {
 9   bytes content = 1;
10 }

做 RPC 請求時,我們向 RPC 服務器發送一個 StreamRequest 的 message,其中包含了文件路徑;為了讓服務器以流式傳輸數據,我們在 returns 內加一個 “stream”。

保存后,我們執行一次 dotnet build,這樣就會在 ./obj/Debug/netcoreapp2.2下自動生成 RPC 調用和消息類型的代碼。

編寫 Server 端代碼

為了編寫 RPC 調用服務端代碼,我們需要重寫自動生成的 C# 虛函數。
首先我們進入 ./obj/Debug/netcoreapp2.2 看看自動生成了什么代碼。
RpcStreaming.cs 中包含消息類型的定義,RpcStreamingGrpc.cs 中包含了對應 rpc 調用的函數原型。
我們查找一下我們剛剛在 proto 文件中聲明的 GetStreamContent。
可以在里面找到一個上方文檔注釋為 “Base class for server-side implementations RpcStreamingServiceBase” 的抽象類 RpcStreamingServiceBase,里面包含了我們要找的東西。
可以找到我們的 GetStreamContent 的默認實現:
public virtual global::System.Threading.Tasks.Task GetStreamContent(global::StreamRequest request, grpc::IServerStreamWriter<global::StreamContent> responseStream, grpc::ServerCallContext context)
{
    throw new grpc::RpcException(new grpc::Status(grpc::StatusCode.Unimplemented, ""));
}

這樣就簡單了,我們新建一個類 RpcServiceImpl,繼承 RpcStreamingService.RpcStreamingServiceBase,然后實現對應的方法即可。

為了串流,我們需要將數據流不斷寫入 response,這里給一個簡單的示例。
 1 using System;
 2 using System.IO;
 3 using System.Threading.Tasks;
 4 using Google.Protobuf;
 5 using Grpc.Core;
 6 namespace RpcStreaming
 7 {
 8     public class RpcStreamingServiceImpl : RpcStreamingService.RpcStreamingServiceBase
 9     {
10         public override Task GetStreamContent(StreamRequest request, IServerStreamWriter<StreamContent> response, ServerCallContext context)
11         {
12             return Task.Run(async () =>
13             {
14                 using (var fs = File.Open(request.FileName, FileMode.Open)) // 從 request 中讀取文件名並打開文件流
15           {
16                     var remainingLength = fs.Length; // 剩余長度
17               var buff = new byte[1048576]; // 緩沖區,這里我們設置為 1 Mb
18               while (remainingLength > 0) // 若未讀完則繼續讀取
19               {
20                         var len = await fs.ReadAsync(buff); // 異步從文件中讀取數據到緩沖區中
21                   remainingLength -= len; // 剩余長度減去剛才實際讀取的長度
22 
23                   // 向流中寫入我們剛剛讀取的數據
24                   await response.WriteAsync(new StreamContent
25                         {
26                             Content = ByteString.CopyFrom(buff, 0, len)
27                         });
28                     }
29                 }
30             });
31         }
32     }
33 }

 

啟動 RPC Server

首先需要:
1 using Google.Protobuf;
2 using Grpc.Core;

然后我們在 Main 函數中構建並啟動 RPC Server,監聽 localhost:23333

1 new Server
2 {
3     Services = { RpcStreamingService.BindService(new RpcStreamingServiceImpl()) }, // 綁定我們的實現
4     Ports = { new ServerPort("localhost", 23333, ServerCredentials.Insecure) }
5 }.Start();
6 Console.ReadKey();

這樣服務端就構建完成了。

編寫客戶端調用 RPC API

方便起見,我們先將 Main 函數改寫為 async 函數。
1 // 原來的 Main 函數
2 static void Main(string[] args) { ... }
3 // 改寫后的 Main 函數
4 static async Task Main(string[] args) { ... }

另外,還需要:

1 using System;
2 using System.IO;
3 using System.Threading.Tasks;
4 using Google.Protobuf;
5 using Grpc.Core;

然后我們在 Main 函數中添加調用代碼:

 1 var channel = new Channel("localhost:23333", ChannelCredentials.Insecure); // 建立到 localhost:23333 的 channel
 2 var client = new RpcStreamingService.RpcStreamingServiceClient(channel); // 建立 client
 3 // 調用 RPC API
 4 var result = client.GetStreamContent(new StreamRequest { FileName = "你想獲取的文件路徑" });
 5 var iter = result.ResponseStream; // 拿到響應流
 6 using (var fs = new FileStream("寫獲取的數據的文件路徑", FileMode.Create)) // 新建一個文件流用於存放我們獲取到數據
 7 {
 8     while (await iter.MoveNext()) // 迭代
 9     {
10         iter.Current.Content.WriteTo(fs); // 將數據寫入到文件流中
11     }
12 }

測試

dotnet run

會發現,我們想要獲取的文件的數據被不斷地寫到我們指定的文件中,每次 1 Mb。在我的電腦上測試,內網的環境下傳輸速度大概 80~90 Mb/s,幾乎跑滿了我的千兆網卡,速度非常理想。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM