grpc與protobuf使用


 

rpc是一種server、client間的分發方式。

protobuf是google推出的一種高效的序列化方式,可以用於client、server間的通信,grpc就是用Google提出的用protobuf格式數據作為傳輸在她的一種rpc策略。

 

一、grpc基礎

  建議參看官方基礎教程, 總的來說,有以下關鍵點:

1. .proto文件

message類似於C++中的結構體,語法也類似於C++;

rpc定義的則是一個rpc功能,接受request,回復response

例:new.proto

syntax = "proto3";         // 用第三版的proto語法

service NewAnalyse{        // 服務名
    rpc Abnormal(ImgRequest) returns (AbnormalResponse);  // 這里定義了兩種功能
    rpc Target(TargetRequest) returns (TargetResponse);
}

message Img {
    bytes data = 1;             // 這里數字 1 應該是protobuf格式用於標記使用的(不用傳輸名稱),對我們來說沒什么用
    int32 width = 2;            // image width
    int32 height = 3;           // image height
    int32 channels = 4;         // RGB as default: 3
}

message ImgRequest {
    Img image = 1;        // message結構可以作為類型使用
}

message AbnormalResponse {
    string resu = 1; 
    float conf = 2;            // eg: 0.953
}

 

2. 生成上面.proto文件的python版本

pip install grpc-tools    # 需要先安裝grpc工具

  根據.proto文件,生成兩個.py文件。這里生成功能文件new_pb2.py,new_pb2_grpc.py。(這里的2指的用於生成.py文件的API的version是2)

python -m grpc_tools.protoc -I. --python_out=./grpc_out/ --grpc_python_out=./grpc_out/ new.proto    # -I指定源文件在的目錄,new.proto就是源文件,后面兩個參數指定生成的.py文件的位置

3. 根據功能文件定義client、server的代碼

  兩個功能文件相當於將我們在.proto文件中定義的結構和功能都python化了,接下來我們只需要根據這兩個文件編程即可。  ——內容可以參考博客

new_server.py

import grpc
from grpc_out.new_pb2 import *
from grpc_out.new_pb2_grpc import NewAnalyseServicer, add_NewAnalyseServicer_to_server

# the implementation of interfaces in NewAnalyseServicer
class NewAnalyse(NewAnalyseServicer):
    def __init__(self) -> None:
        pass

    def Abnormal(self, request, context):
     '''
      具體實現該功能,得到xx和yy
     '''        
        return AbnormalResponse(resu = xx, conf = yy)

# 定義服務執行的方式
def serve(): server = grpc.server(futures.ThreadPoolExecutor(max_workers=10)) add_NewAnalyseServicer_to_server(NewAnalyse(), server) # add servicer to server server.add_insecure_port('[::]:50000')            # 指定本地端口50000 server.start() server.wait_for_termination()            # 將在server終止時終端活動的線程 if __name__ == "__main__": serve()

 

new_client.py

import grpc
from PIL import Image
from grpc_out.new_pb2 import *
from grpc_out.new_pb2_grpc import NewAnalyseStub

def run():
    with grpc.insecure_channel('10.10.0.11:50000') as channel:   # with這一塊執行結束后,channel也將關閉;這里10.10.0.11是監聽的目標ip,也可也換成localhost,那就是監聽本地的127.0.0.1:50000
        stub = NewAnalyseStub(channel)
        img_path = './ke.jpg'
        img = np.array(Image.open(img_path))
        height, width, channels = img.shape
        image = Img(data=np.ndarray.tobytes(img), height=height, width=width, 
                    channels=channels, img_type=img_path[img_path.rfind('.'):])
        response = stub.Abnormal(ImgRequest(image=image))
     print(response.resu)                      # 用點運算符訪問
if __name__ == "__main__": run()

 

4. 運行

  在一個終端內執行python new_server.py,另一個終端內執行python new_client.py。

如果沒有問題,server端將根據收到功能請求和request,調用相應的功能函數,並返回結果,由rpc將結果返回到client端。

 

二、實踐和問題解析

1. repeated類型

   如果message中有一個元素數目不定,可以為它加上關鍵詞repeated,可以理解為變長數組。

message TargetResponse {
    repeated string target = 1;
}

  在server代碼中為repeated類型賦值的方法有兩種

res = TargetResponse()

# 1. 用add()
targ = res.target.add()
targ = 'xx'
targ2 = res.target.add()
targ2 = 'yy'
...

# 2. 用append()和extend()
targ = 'xx'
res.target.append(targ)
targ2 = 'yy'
res.target.append(targ2)
res.target.extend([targ, targ2])
res.target.remove(targ)    # 刪除

 

  在client代碼中讀取repeated類型的方法,和list類型的讀取方法相同

for res in TargetResponse.target:
    ...

 

2. repeated和結構體不可直接賦值

1)指定域名和域變量賦值

# repeated類型不能直接賦值
TargetResponse.target = 'xx'    # 錯誤

# 結構體不能直接賦值
image = Img()
image.height = ..
image.width = ..
...
Response.image = image        # 錯誤
Response.image.height = .. # 正確 Response.image.width = ..

2)使用CopyFrom個message賦值  ——參考stackoverflow

Response.image.CopyFrom(image)

 

* 執行時出現錯誤 Exception calling application: 1.0 has type float, but expected one of: bytes, unicode

  在server代碼中加print找出錯點——proto中的類型和實現代碼中的類型不一致

3. 傳輸的數據超出大小限制  ——參考StackOverflow

  默認的數據大小限制為4MB(4194304B),普通的傳輸時是可以通過的,可以通過更改限制進行修改,修改的單位是byte,如下:

# server
server = grpc.server(futures.ThreadPoolExecutor(max_workers=10), options = [
        ('grpc.max_send_message_length', MAX_MESSAGE_LENGTH),
        ('grpc.max_receive_message_length', MAX_MESSAGE_LENGTH) ])

# client
channel = grpc.insecure_channel('localhost:50051', options=[
        ('grpc.max_send_message_length', MAX_MESSAGE_LENGTH),
        ('grpc.max_receive_message_length', MAX_MESSAGE_LENGTH) ])

* 其中MAX_MESSAGE_LENGTH是是數字,以byte為單位

 

4. 流式傳輸的使用,以視頻傳輸為例  ——另一篇博客

5. 異常處理  ——更多參考文檔

  下面的例子是寫在client中的,捕獲的異常則是來自server,這個異常server自己無法捕獲。

try:
    ...
expect grpc.RpcError as rpc_error:
    print(rpc_error)
    if rpc_error.code() == grpc.StatusCode.UNKNOWN:
        print(rpc_error.details())

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM