初識gRPC還是一位做JAVA的同事在項目中用到了它,為了C#的客戶端程序和java的服務器程序進行通信和數據交換,當時還是對方編譯成C#,我直接調用。
后來,自己下來做了C#版本gRPC編寫,搜了很多資料,但許多都是從入門開始?調用說“Say Hi!”這種官方標准的入門示例,然后遇到各種問題……
關於gRPC和Protobuf介紹,就不介紹了,網絡上一搜一大把,隨便一抓都是標准的官方,所以直接從使用說起。
gPRC源代碼:https://github.com/grpc/grpc;
protobuf的代碼倉庫:github倉庫地址:https://github.com/google/protobuf ;Google下載protobuff下載地址:https://developers.google.com/protocol-buffers/docs/downloads 。
1、新建解決方案
分別在VS中新建解決方案:GrpcTest;再在解決方案中新建三個項目:GrpcClient、GrpcServer、GrpcService,對應的分別是客戶端(wpf窗體程序)、服務端(控制台程序)、gRPC服務者(控制台程序)。在GrpcClient和GrpcServer項目中添加對GrpcService的引用。
在VS中對3個項目添加工具包引用:右鍵點擊“解決方案gRPCDemo”,點擊“管理解決方案的NuGet程序包”,在瀏覽中分別搜索"Grpc"、"Grpc.Tools"、"Google.Protobuf",然后點擊右面項目,全選,再點擊安裝(也可以用視圖 -> 窗口 -> 程序包管理器控制台 中的"Install-Package Grpc"進行這一步,這里不提供這種方法,有興趣自己百度)。
2、proto文件的語法
對於使用gRPC的通信框架,需要使用到對應的通信文件。在gRPC中,使用到的是proto格式的文件,對應的自然有其相應的語法。本文不詳細闡述該文件的語法,感興趣可以去官網看標准的語法,這兒有一個鏈接,中文翻譯比較全的https://www.codercto.com/a/45372.html。需要對其文章內的1.3進行補充下:
- required:一個格式良好的消息一定要含有1個這種字段。表示該值是必須要設置的。
- optional:消息格式中該字段可以有0個或1個值(不超過1個)。
- repeated:在一個格式良好的消息中,這種字段可以重復任意多次(包括0次)。重復的值的順序會被保留。表示該值可以重復,相當於C#中的List。
本示例項目實現文件傳輸,因此在項目GrpcService中添加一個FileTransfer.proto文件,文件內容如下:
syntax = "proto3"; package GrpcService; service FileTransfer{ rpc FileDownload (FileRequest) returns (stream FileReply); rpc FileUpload (stream FileReply) returns(stream FileReturn); } //請求下載文件時,所需下載文件的文件名稱集合 message FileRequest{ repeated string FileNames=1;//文件名集合 //repeated重復字段 類似鏈表;optional可有可無的字段;required必要設置字段 string Mark = 2;//攜帶的包 } //下載和上傳文件時的應答數據 message FileReply{ string FileName=1;//文件名 int32 Block = 2;//標記---第幾個數據 bytes Content = 3;//數據 string Mark = 4;//攜帶的包 } //數據上傳時的返回值 message FileReturn{ string FileName=1;//文件名 string Mark = 2;//攜帶的包 }
3、編譯proto文件為C#代碼
proto文件僅僅只是定義了相關的數據,如果需要在代碼中使用該格式,就需要將它編譯成C#代碼文件。
PS:網上可以找到的編譯,需要下載相關的代碼,見博文。其他的也較為繁瑣,所以按照自己理解的來寫了。注意,我的項目是放在D盤根目錄下的。
首先打開cmd窗口,然后在窗口中輸入:D:\GrpcTest\packages\Grpc.Tools.2.32.0\tools\windows_x86\protoc.exe -ID:\GrpcTest\GrpcService --csharp_out D:\GrpcTest\GrpcService D:\GrpcTest\GrpcService\FileTransfer.proto --grpc_out D:\GrpcTest\GrpcService --plugin=protoc-gen-grpc=D:\GrpcTest\packages\Grpc.Tools.2.32.0\tools\windows_x86\grpc_csharp_plugin.exe
輸入上文后,按enter鍵,回車編譯。
命令解讀:
- D:\GrpcTest\packages\Grpc.Tools.2.32.0\tools\windows_x86\protoc.exe :調用的編譯程序路徑,注意版本不同路徑稍有不一樣。
- -ID:\GrpcTest\GrpcService :-I 指定一個或者多個目錄,用來搜索.proto文件的。所以上面那行的D:\GrpcTest\GrpcService\FileTransfer.proto 已經可以換成FileTransfer.proto了,因為-I已經指定了。注意:如果不指定,那就是當前目錄。
- --csharp_out D:\GrpcTest\GrpcService D:\GrpcTest\GrpcService\FileTransfer.proto :(--csharp_out)生成C#代碼、存放路徑、文件。當然還能cpp_out、java_out、javanano_out、js_out、objc_out、php_out、python_out、ruby_out 這時候你就應該知道,可以支持多語言的,才用的,生成一些文件,然后給各個語言平台調用。參數1(D:\GrpcTest\GrpcService)是輸出路徑,參數2(D:\GrpcTest\GrpcService\FileTransfer.proto)是proto的文件名或者路徑。
- --grpc_out D:\GrpcTest\GrpcService :grpc_out是跟服務相關,創建,調用,綁定,實現相關。生成的玩意叫xxxGrpc.cs。與前面的區別是csharp_out是輸出類似於咱們平時寫的實體類,接口,定義之類的。生成的文件叫xxx.cs
- --plugin=protoc-gen-grpc=D:\GrpcTest\packages\Grpc.Tools.2.32.0\tools\windows_x86\grpc_csharp_plugin.exe :這個就是csharp的插件,python有python的,java有java的。
編譯后,會在新增兩個文件(文件位置與你的輸出位置有關),並將兩個文件加入到GrpcService項目中去:

4、編寫服務端的文件傳輸服務
在GrpcServer項目中,新建一個FileImpl並繼承自GrpcService.FileTransfer.FileTransferBase,然后復寫其方法FileDownload和FileUpload方法,以供客戶端進行調用。
/// <summary> /// 文件傳輸類 /// </summary> class FileImpl:GrpcService.FileTransfer.FileTransferBase { /// <summary> /// 文件下載 /// </summary> /// <param name="request">下載請求</param> /// <param name="responseStream">文件寫入流</param> /// <param name="context">站點上下文</param> /// <returns></returns> public override async Task FileDownload(FileRequest request, global::Grpc.Core.IServerStreamWriter<FileReply> responseStream, global::Grpc.Core.ServerCallContext context) { List<string> lstSuccFiles = new List<string>();//傳輸成功的文件 DateTime startTime = DateTime.Now;//傳輸文件的起始時間 int chunkSize = 1024 * 1024;//每次讀取的數據 var buffer = new byte[chunkSize];//數據緩沖區 FileStream fs = null;//文件流 try { //reply.Block數字的含義是服務器和客戶端約定的 for (int i = 0; i < request.FileNames.Count; i++) { string fileName = request.FileNames[i];//文件名 string filePath = Path.GetFullPath($".//Files\\{fileName}");//文件路徑 FileReply reply = new FileReply { FileName = fileName, Mark = request.Mark };//應答數據 Console.WriteLine($"{request.Mark},下載文件:{filePath}");//寫入日志,下載文件 if (File.Exists(filePath)) { fs = new FileStream(filePath, FileMode.Open, FileAccess.Read, FileShare.Read, chunkSize, useAsync: true); //fs.Length 可以告訴客戶端所傳文件大小 int readTimes = 0;//讀取次數 while (true) { int readSise = fs.Read(buffer, 0, buffer.Length);//讀取數據 if (readSise > 0)//讀取到了數據,有數據需要發送 { reply.Block = ++readTimes; reply.Content = Google.Protobuf.ByteString.CopyFrom(buffer, 0, readSise); await responseStream.WriteAsync(reply); } else//沒有數據了,就告訴對方,讀取完了 { reply.Block = 0; reply.Content = Google.Protobuf.ByteString.Empty; await responseStream.WriteAsync(reply); lstSuccFiles.Add(fileName); Console.WriteLine($"{request.Mark},完成發送文件:{filePath}");//日志,記錄發送成功 break;//跳出去 } } fs?.Close(); } else { Console.WriteLine($"文件【{filePath}】不存在。");//寫入日志,文件不存在 reply.Block = -1;//-1的標記為文件不存在 await responseStream.WriteAsync(reply);//告訴客戶端,文件狀態 } } //告訴客戶端,文件傳輸完成 await responseStream.WriteAsync(new FileReply { FileName = string.Empty, Block = -2,//告訴客戶端,文件已經傳輸完成 Content = Google.Protobuf.ByteString.Empty, Mark = request.Mark }); } catch(Exception ex) { Console.WriteLine($"{request.Mark},發生異常({ex.GetType()}):{ex.Message}"); } finally { fs?.Dispose(); } Console.WriteLine($"{request.Mark},文件傳輸完成。共計【{lstSuccFiles.Count / request.FileNames.Count}】,耗時:{DateTime.Now - startTime}"); } /// <summary> /// 上傳文件 /// </summary> /// <param name="requestStream">請求流</param> /// <param name="responseStream">響應流</param> /// <param name="context">站點上下文</param> /// <returns></returns> public override async Task FileUpload(global::Grpc.Core.IAsyncStreamReader<FileReply> requestStream, global::Grpc.Core.IServerStreamWriter<FileReturn> responseStream, global::Grpc.Core.ServerCallContext context) { List<string> lstFilesName = new List<string>();//文件名 List<FileReply> lstContents = new List<FileReply>();//數據集合 FileStream fs = null; DateTime startTime = DateTime.Now;//開始時間 string mark = string.Empty; string savePath = string.Empty; try { //reply.Block數字的含義是服務器和客戶端約定的 while (await requestStream.MoveNext())//讀取數據 { var reply = requestStream.Current; mark = reply.Mark; if (reply.Block == -2)//傳輸完成 { Console.WriteLine($"{mark},完成上傳文件。共計【{lstFilesName.Count}】個,耗時:{DateTime.Now-startTime}"); break; } else if (reply.Block == -1)//取消了傳輸 { Console.WriteLine($"文件【{reply.FileName}】取消傳輸!");//寫入日志 lstContents.Clear(); fs?.Close();//釋放文件流 if (!string.IsNullOrEmpty(savePath) && File.Exists(savePath))//如果傳輸不成功,刪除該文件 { File.Delete(savePath); } savePath = string.Empty; break; } else if(reply.Block==0)//文件傳輸完成 { if (lstContents.Any())//如果還有數據,就寫入文件 { lstContents.OrderBy(c => c.Block).ToList().ForEach(c => c.Content.WriteTo(fs)); lstContents.Clear(); } lstFilesName.Add(savePath);//傳輸成功的文件 fs?.Close();//釋放文件流 savePath = string.Empty; //告知客戶端,已經完成傳輸 await responseStream.WriteAsync(new FileReturn { FileName= reply.FileName, Mark=mark }); } else { if(string.IsNullOrEmpty(savePath))//有新文件來了 { savePath = Path.GetFullPath($".//Files\\{reply.FileName}");//文件路徑 fs = new FileStream(savePath, FileMode.Create, FileAccess.ReadWrite); Console.WriteLine($"{mark},上傳文件:{savePath},{DateTime.UtcNow.ToString("HH:mm:ss:ffff")}"); } lstContents.Add(reply);//加入鏈表 if (lstContents.Count() >= 20)//每個包1M,20M為一個集合,一起寫入數據。 { lstContents.OrderBy(c => c.Block).ToList().ForEach(c => c.Content.WriteTo(fs)); lstContents.Clear(); } } } } catch(Exception ex) { Console.WriteLine($"{mark},發生異常({ex.GetType()}):{ex.Message}"); } finally { fs?.Dispose(); } } }
在main函數中添加服務:
class Program { static void Main(string[] args) { //提供服務 Server server = new Server() { Services = {GrpcService.FileTransfer.BindService(new FileImpl())}, Ports = {new ServerPort("127.0.0.1",50000,ServerCredentials.Insecure)} }; //服務開始 server.Start(); while(Console.ReadLine().Trim().ToLower()!="exit") { } //結束服務 server.ShutdownAsync(); } }
5、編寫客戶端的文件傳輸功能
首先定義一個文件傳輸結果類TransferResult<T>,用於存放文件的傳輸結果。
/// <summary> /// 傳輸結果 /// </summary> /// <typeparam name="T"></typeparam> class TransferResult<T> { /// <summary> /// 傳輸是否成功 /// </summary> public bool IsSuccessful { get; set; } /// <summary> /// 消息 /// </summary> public string Message { get; set; } /// <summary> /// 標記類型 /// </summary> public T Tag { get; set; } = default; }
然后在GrpcClinet項目中添加一個FileTransfer的類,並實現相關方法:
class FileTransfer { /// <summary> /// 獲取通信客戶端 /// </summary> /// <returns>通信頻道、客戶端</returns> static (Channel, GrpcService.FileTransfer.FileTransferClient) GetClient() { //偵聽IP和端口要和服務器一致 Channel channel = new Channel("127.0.0.1", 50000, ChannelCredentials.Insecure); var client = new GrpcService.FileTransfer.FileTransferClient(channel); return (channel, client); } /// <summary> /// 下載文件 /// </summary> /// <param name="fileNames">需要下載的文件集合</param> /// <param name="mark">標記</param> /// <param name="saveDirectoryPath">保存路徑</param> /// <param name="cancellationToken">異步取消命令</param> /// <returns>下載任務(是否成功、原因、失敗文件名)</returns> public static async Task<TransferResult<List<string>>> FileDownload(List<string> fileNames, string mark, string saveDirectoryPath, System.Threading.CancellationToken cancellationToken = new System.Threading.CancellationToken()) { var result = new TransferResult<List<string>>() { Message = $"文件保存路徑不正確:{saveDirectoryPath}" }; if (!System.IO.Directory.Exists(saveDirectoryPath)) { return await Task.Run(() => result);//文件路徑不存在 } if (fileNames.Count == 0) { result.Message = "未包含任何文件"; return await Task.Run(() => result);//文件路徑不存在 } result.Message = "未能連接到服務器"; FileRequest request = new FileRequest() { Mark = mark };//請求數據 request.FileNames.AddRange(fileNames);//將需要下載的文件名賦值 var lstSuccFiles = new List<string>();//傳輸成功的文件 string savePath = string.Empty;//保存路徑 System.IO.FileStream fs = null; Channel channel = null;//申明通信頻道 GrpcService.FileTransfer.FileTransferClient client = null; DateTime startTime = DateTime.Now; try { (channel, client) = GetClient(); using (var call = client.FileDownload(request)) { List<FileReply> lstContents = new List<FileReply>();//存放接收的數據 var reaponseStream = call.ResponseStream; //reaponseStream.Current.Block數字的含義是服務器和客戶端約定的 while (await reaponseStream.MoveNext(cancellationToken))//開始接收數據 { if (cancellationToken.IsCancellationRequested) { break; } if (reaponseStream.Current.Block == -2)//說明文件已經傳輸完成了 { result.Message = $"完成下載任務【{lstSuccFiles.Count}/{fileNames.Count}】,耗時:{DateTime.Now - startTime}"; result.IsSuccessful = true; break; } else if (reaponseStream.Current.Block == -1)//當前文件傳輸錯誤 { Console.WriteLine($"文件【{reaponseStream.Current.FileName}】傳輸失敗!");//寫入日志 lstContents.Clear(); fs?.Close();//釋放文件流 if (!string.IsNullOrEmpty(savePath) && File.Exists(savePath))//如果傳輸不成功,刪除該文件 { File.Delete(savePath); } savePath = string.Empty; } else if (reaponseStream.Current.Block == 0)//當前文件傳輸完成 { if (lstContents.Any())//如果還有數據,就寫入文件 { lstContents.OrderBy(c => c.Block).ToList().ForEach(c => c.Content.WriteTo(fs)); lstContents.Clear(); } lstSuccFiles.Add(reaponseStream.Current.FileName);//傳輸成功的文件 fs?.Close();//釋放文件流 savePath = string.Empty; } else//有文件數據過來 { if (string.IsNullOrEmpty(savePath))//如果字節流為空,則說明時新的文件數據來了 { savePath = Path.Combine(saveDirectoryPath, reaponseStream.Current.FileName); fs = new FileStream(savePath, FileMode.Create, FileAccess.ReadWrite); } lstContents.Add(reaponseStream.Current);//加入鏈表 if (lstContents.Count() >= 20)//每個包1M,20M為一個集合,一起寫入數據。 { lstContents.OrderBy(c => c.Block).ToList().ForEach(c => c.Content.WriteTo(fs)); lstContents.Clear(); } } } if (cancellationToken.IsCancellationRequested) { fs?.Close();//釋放文件流 result.IsSuccessful = false; result.Message = $"用戶取消下載。已完成下載【{lstSuccFiles.Count}/{fileNames.Count}】,耗時:{DateTime.Now - startTime}"; } } fs?.Close();//釋放文件流 if (!result.IsSuccessful && !string.IsNullOrEmpty(savePath) && File.Exists(savePath))//如果傳輸不成功,那么久刪除該文件 { File.Delete(savePath); } } catch (Exception ex) { if (!cancellationToken.IsCancellationRequested) { result.Message = $"文件傳輸發生異常:{ex.Message}"; } } finally { fs?.Dispose(); } result.Tag = fileNames.Except(lstSuccFiles).ToList();//獲取失敗文件集合 //關閉通信、並返回結果 return await channel?.ShutdownAsync().ContinueWith(t => result); } /// <summary> /// 文件上傳 /// </summary> /// <param name="filesPath">文件路徑</param> /// <param name="mark">標記</param> /// <param name="cancellationToken">異步取消命令</param> /// <returns>下載任務(是否成功、原因、成功的文件名)</returns> public static async Task<TransferResult<List<string>>> FileUpload(List<string> filesPath, string mark, System.Threading.CancellationToken cancellationToken = new System.Threading.CancellationToken()) { var result = new TransferResult<List<string>> { Message = "沒有文件需要下載" }; if (filesPath.Count == 0) { return await Task.Run(() => result);//沒有文件需要下載 } result.Message = "未能連接到服務器。"; var lstSuccFiles = new List<string>();//傳輸成功的文件 int chunkSize = 1024 * 1024; byte[] buffer = new byte[chunkSize];//每次發送的大小 FileStream fs = null;//文件流 Channel channel = null;//申明通信頻道 GrpcService.FileTransfer.FileTransferClient client = null; DateTime startTime = DateTime.Now; try { (channel, client) = GetClient(); using (var stream = client.FileUpload())//連接上傳文件的客戶端 { //reply.Block數字的含義是服務器和客戶端約定的 foreach (var filePath in filesPath)//遍歷集合 { if (cancellationToken.IsCancellationRequested) break;//取消了傳輸 FileReply reply = new FileReply() { FileName = Path.GetFileName(filePath), Mark = mark }; if (!File.Exists(filePath))//文件不存在,繼續下一輪的發送 { Console.WriteLine($"文件不存在:{filePath}");//寫入日志 continue; } fs = new FileStream(filePath, FileMode.Open, FileAccess.Read, FileShare.Read, chunkSize, useAsync: true); int readTimes = 0; while (true) { if (cancellationToken.IsCancellationRequested) { reply.Block = -1;//取消了傳輸 reply.Content = Google.Protobuf.ByteString.Empty; await stream.RequestStream.WriteAsync(reply);//發送取消傳輸的命令 break;//取消了傳輸 } int readSize = fs.Read(buffer, 0, buffer.Length);//讀取數據 if (readSize > 0) { reply.Block = ++readTimes;//更新標記,發送數據 reply.Content = Google.Protobuf.ByteString.CopyFrom(buffer, 0, readSize); await stream.RequestStream.WriteAsync(reply); } else { Console.WriteLine($"完成文件【{filePath}】的上傳。"); reply.Block = 0;//傳送本次文件發送結束的標記 reply.Content = Google.Protobuf.ByteString.Empty; await stream.RequestStream.WriteAsync(reply);//發送結束標記 //等待服務器回傳 await stream.ResponseStream.MoveNext(cancellationToken); if (stream.ResponseStream.Current != null && stream.ResponseStream.Current.Mark == mark) { lstSuccFiles.Add(filePath);//記錄成功的文件 } break;//發送下一個文件 } } fs?.Close(); } if (cancellationToken.IsCancellationRequested) { fs?.Close();//釋放文件流 result.IsSuccessful = false; result.Message = $"用戶取消了上傳文件。已完成【{lstSuccFiles.Count}/{filesPath.Count}】,耗時:{DateTime.Now - startTime}"; } else { result.IsSuccessful = true; result.Message = $"完成文件上傳。共計【{lstSuccFiles.Count}/{filesPath.Count}】,耗時:{DateTime.Now - startTime}"; await stream.RequestStream.WriteAsync(new FileReply { Block = -2,//傳輸結束 Mark = mark });//發送結束標記 } } } catch (Exception ex) { if (!cancellationToken.IsCancellationRequested) { result.Message = $"文件上傳發生異常({ex.GetType()}):{ex.Message}"; } } finally { fs?.Dispose(); } Console.WriteLine(result.Message); result.Tag = lstSuccFiles; //關閉通信、並返回結果 return await channel?.ShutdownAsync().ContinueWith(t => result); } }
現在可以在客戶端窗體內進行調用了:
private string GetFilePath() { // Create OpenFileDialog Microsoft.Win32.OpenFileDialog dlg = new Microsoft.Win32.OpenFileDialog(); // Set filter for file extension and default file extension dlg.Title = "選擇文件"; dlg.Filter = "所有文件(*.*)|*.*"; dlg.FileName = "選擇文件夾."; dlg.FilterIndex = 1; dlg.ValidateNames = false; dlg.CheckFileExists = false; dlg.CheckPathExists = true; dlg.Multiselect = false;//允許同時選擇多個文件 // Display OpenFileDialog by calling ShowDialog method Nullable<bool> result = dlg.ShowDialog(); // Get the selected file name and display in a TextBox if (result == true) { // Open document return dlg.FileName; } return string.Empty; } // 打開文件 private void btnOpenUpload_Click(object sender, RoutedEventArgs e) { lblUploadPath.Content = GetFilePath(); } CancellationTokenSource uploadTokenSource; //上傳 private async void btnUpload_Click(object sender, RoutedEventArgs e) { lblMessage.Content = string.Empty; uploadTokenSource = new CancellationTokenSource(); List<string> fileNames = new List<string>(); fileNames.Add(lblUploadPath.Content.ToString()); var result = await ServerNet.FileTransfer.FileUpload(fileNames, "123", uploadTokenSource.Token); lblMessage.Content = result.Message; uploadTokenSource = null; } //取消上傳 private void btnCancelUpload_Click(object sender, RoutedEventArgs e) { uploadTokenSource?.Cancel(); } //打開需要下載的文件 private void btnOpenDownload_Click(object sender, RoutedEventArgs e) { txtDownloadPath.Text = GetFilePath(); } //下載文件 private async void btnDownload_Click(object sender, RoutedEventArgs e) { lblMessage.Content = string.Empty; downloadTokenSource = new CancellationTokenSource(); List<string> fileNames = new List<string>(); fileNames.Add(System.IO.Path.GetFileName(txtDownloadPath.Text)); var result= await ServerNet.FileTransfer.FileDownload(fileNames, "123", Environment.CurrentDirectory, downloadTokenSource.Token); lblMessage.Content = result.Message; downloadTokenSource = null; } CancellationTokenSource downloadTokenSource; //下載取消 private void btnCancelDownload_Click(object sender, RoutedEventArgs e) { downloadTokenSource?.Cancel(); }
6、源代碼
https://files.cnblogs.com/files/pilgrim/GrpcTest.rar
