rpc是一種server、client間的分發方式。
protobuf是google推出的一種高效的序列化方式,可以用於client、server間的通信,grpc就是用Google提出的用protobuf格式數據作為傳輸在她的一種rpc策略。
一、grpc基礎
建議參看官方基礎教程, 總的來說,有以下關鍵點:
1. .proto文件
message類似於C++中的結構體,語法也類似於C++;
rpc定義的則是一個rpc功能,接受request,回復response
例:new.proto
syntax = "proto3"; // 用第三版的proto語法 service NewAnalyse{ // 服務名 rpc Abnormal(ImgRequest) returns (AbnormalResponse); // 這里定義了兩種功能 rpc Target(TargetRequest) returns (TargetResponse); } message Img { bytes data = 1; // 這里數字 1 應該是protobuf格式用於標記使用的(不用傳輸名稱),對我們來說沒什么用 int32 width = 2; // image width int32 height = 3; // image height int32 channels = 4; // RGB as default: 3 } message ImgRequest { Img image = 1; // message結構可以作為類型使用 } message AbnormalResponse { string resu = 1; float conf = 2; // eg: 0.953 }
2. 生成上面.proto文件的python版本
pip install grpc-tools # 需要先安裝grpc工具
根據.proto文件,生成兩個.py文件。這里生成功能文件new_pb2.py,new_pb2_grpc.py。(這里的2指的用於生成.py文件的API的version是2)
python -m grpc_tools.protoc -I. --python_out=./grpc_out/ --grpc_python_out=./grpc_out/ new.proto # -I指定源文件在的目錄,new.proto就是源文件,后面兩個參數指定生成的.py文件的位置
3. 根據功能文件定義client、server的代碼
兩個功能文件相當於將我們在.proto文件中定義的結構和功能都python化了,接下來我們只需要根據這兩個文件編程即可。 ——內容可以參考博客
new_server.py
import grpc from grpc_out.new_pb2 import * from grpc_out.new_pb2_grpc import NewAnalyseServicer, add_NewAnalyseServicer_to_server # the implementation of interfaces in NewAnalyseServicer class NewAnalyse(NewAnalyseServicer): def __init__(self) -> None: pass def Abnormal(self, request, context): ''' 具體實現該功能,得到xx和yy ''' return AbnormalResponse(resu = xx, conf = yy)
# 定義服務執行的方式 def serve(): server = grpc.server(futures.ThreadPoolExecutor(max_workers=10)) add_NewAnalyseServicer_to_server(NewAnalyse(), server) # add servicer to server server.add_insecure_port('[::]:50000') # 指定本地端口50000 server.start() server.wait_for_termination() # 將在server終止時終端活動的線程 if __name__ == "__main__": serve()
new_client.py
import grpc from PIL import Image from grpc_out.new_pb2 import * from grpc_out.new_pb2_grpc import NewAnalyseStub def run(): with grpc.insecure_channel('10.10.0.11:50000') as channel: # with這一塊執行結束后,channel也將關閉;這里10.10.0.11是監聽的目標ip,也可也換成localhost,那就是監聽本地的127.0.0.1:50000 stub = NewAnalyseStub(channel) img_path = './ke.jpg' img = np.array(Image.open(img_path)) height, width, channels = img.shape image = Img(data=np.ndarray.tobytes(img), height=height, width=width, channels=channels, img_type=img_path[img_path.rfind('.'):]) response = stub.Abnormal(ImgRequest(image=image))
print(response.resu) # 用點運算符訪問 if __name__ == "__main__": run()
4. 運行
在一個終端內執行python new_server.py,另一個終端內執行python new_client.py。
如果沒有問題,server端將根據收到功能請求和request,調用相應的功能函數,並返回結果,由rpc將結果返回到client端。
二、實踐和問題解析
1. repeated類型
如果message中有一個元素數目不定,可以為它加上關鍵詞repeated,可以理解為變長數組。
message TargetResponse { repeated string target = 1; }
在server代碼中為repeated類型賦值的方法有兩種
res = TargetResponse() # 1. 用add() targ = res.target.add() targ = 'xx' targ2 = res.target.add() targ2 = 'yy' ... # 2. 用append()和extend() targ = 'xx' res.target.append(targ) targ2 = 'yy' res.target.append(targ2)
res.target.extend([targ, targ2])
res.target.remove(targ) # 刪除
在client代碼中讀取repeated類型的方法,和list類型的讀取方法相同
for res in TargetResponse.target: ...
2. repeated和結構體不可直接賦值
1)指定域名和域變量賦值
# repeated類型不能直接賦值 TargetResponse.target = 'xx' # 錯誤 # 結構體不能直接賦值 image = Img() image.height = .. image.width = .. ... Response.image = image # 錯誤
Response.image.height = .. # 正確 Response.image.width = ..
2)使用CopyFrom個message賦值 ——參考stackoverflow
Response.image.CopyFrom(image)
* 執行時出現錯誤 Exception calling application: 1.0 has type float, but expected one of: bytes, unicode
在server代碼中加print找出錯點——proto中的類型和實現代碼中的類型不一致
3. 傳輸的數據超出大小限制 ——參考StackOverflow
默認的數據大小限制為4MB(4194304B),普通的傳輸時是可以通過的,可以通過更改限制進行修改,修改的單位是byte,如下:
# server server = grpc.server(futures.ThreadPoolExecutor(max_workers=10), options = [ ('grpc.max_send_message_length', MAX_MESSAGE_LENGTH), ('grpc.max_receive_message_length', MAX_MESSAGE_LENGTH) ]) # client channel = grpc.insecure_channel('localhost:50051', options=[ ('grpc.max_send_message_length', MAX_MESSAGE_LENGTH), ('grpc.max_receive_message_length', MAX_MESSAGE_LENGTH) ])
* 其中MAX_MESSAGE_LENGTH是是數字,以byte為單位
4. 流式傳輸的使用,以視頻傳輸為例 ——另一篇博客
5. 異常處理 ——更多參考文檔
下面的例子是寫在client中的,捕獲的異常則是來自server,這個異常server自己無法捕獲。
try: ... expect grpc.RpcError as rpc_error: print(rpc_error) if rpc_error.code() == grpc.StatusCode.UNKNOWN: print(rpc_error.details())