rpc是一种server、client间的分发方式。
protobuf是google推出的一种高效的序列化方式,可以用于client、server间的通信,grpc就是用Google提出的用protobuf格式数据作为传输在她的一种rpc策略。
一、grpc基础
建议参看官方基础教程, 总的来说,有以下关键点:
1. .proto文件
message类似于C++中的结构体,语法也类似于C++;
rpc定义的则是一个rpc功能,接受request,回复response
例:new.proto
syntax = "proto3"; // 用第三版的proto语法 service NewAnalyse{ // 服务名 rpc Abnormal(ImgRequest) returns (AbnormalResponse); // 这里定义了两种功能 rpc Target(TargetRequest) returns (TargetResponse); } message Img { bytes data = 1; // 这里数字 1 应该是protobuf格式用于标记使用的(不用传输名称),对我们来说没什么用 int32 width = 2; // image width int32 height = 3; // image height int32 channels = 4; // RGB as default: 3 } message ImgRequest { Img image = 1; // message结构可以作为类型使用 } message AbnormalResponse { string resu = 1; float conf = 2; // eg: 0.953 }
2. 生成上面.proto文件的python版本
pip install grpc-tools # 需要先安装grpc工具
根据.proto文件,生成两个.py文件。这里生成功能文件new_pb2.py,new_pb2_grpc.py。(这里的2指的用于生成.py文件的API的version是2)
python -m grpc_tools.protoc -I. --python_out=./grpc_out/ --grpc_python_out=./grpc_out/ new.proto # -I指定源文件在的目录,new.proto就是源文件,后面两个参数指定生成的.py文件的位置
3. 根据功能文件定义client、server的代码
两个功能文件相当于将我们在.proto文件中定义的结构和功能都python化了,接下来我们只需要根据这两个文件编程即可。 ——内容可以参考博客
new_server.py
import grpc from grpc_out.new_pb2 import * from grpc_out.new_pb2_grpc import NewAnalyseServicer, add_NewAnalyseServicer_to_server # the implementation of interfaces in NewAnalyseServicer class NewAnalyse(NewAnalyseServicer): def __init__(self) -> None: pass def Abnormal(self, request, context): ''' 具体实现该功能,得到xx和yy ''' return AbnormalResponse(resu = xx, conf = yy)
# 定义服务执行的方式 def serve(): server = grpc.server(futures.ThreadPoolExecutor(max_workers=10)) add_NewAnalyseServicer_to_server(NewAnalyse(), server) # add servicer to server server.add_insecure_port('[::]:50000') # 指定本地端口50000 server.start() server.wait_for_termination() # 将在server终止时终端活动的线程 if __name__ == "__main__": serve()
new_client.py
import grpc from PIL import Image from grpc_out.new_pb2 import * from grpc_out.new_pb2_grpc import NewAnalyseStub def run(): with grpc.insecure_channel('10.10.0.11:50000') as channel: # with这一块执行结束后,channel也将关闭;这里10.10.0.11是监听的目标ip,也可也换成localhost,那就是监听本地的127.0.0.1:50000 stub = NewAnalyseStub(channel) img_path = './ke.jpg' img = np.array(Image.open(img_path)) height, width, channels = img.shape image = Img(data=np.ndarray.tobytes(img), height=height, width=width, channels=channels, img_type=img_path[img_path.rfind('.'):]) response = stub.Abnormal(ImgRequest(image=image))
print(response.resu) # 用点运算符访问 if __name__ == "__main__": run()
4. 运行
在一个终端内执行python new_server.py,另一个终端内执行python new_client.py。
如果没有问题,server端将根据收到功能请求和request,调用相应的功能函数,并返回结果,由rpc将结果返回到client端。
二、实践和问题解析
1. repeated类型
如果message中有一个元素数目不定,可以为它加上关键词repeated,可以理解为变长数组。
message TargetResponse { repeated string target = 1; }
在server代码中为repeated类型赋值的方法有两种
res = TargetResponse() # 1. 用add() targ = res.target.add() targ = 'xx' targ2 = res.target.add() targ2 = 'yy' ... # 2. 用append()和extend() targ = 'xx' res.target.append(targ) targ2 = 'yy' res.target.append(targ2)
res.target.extend([targ, targ2])
res.target.remove(targ) # 删除
在client代码中读取repeated类型的方法,和list类型的读取方法相同
for res in TargetResponse.target: ...
2. repeated和结构体不可直接赋值
1)指定域名和域变量赋值
# repeated类型不能直接赋值 TargetResponse.target = 'xx' # 错误 # 结构体不能直接赋值 image = Img() image.height = .. image.width = .. ... Response.image = image # 错误
Response.image.height = .. # 正确 Response.image.width = ..
2)使用CopyFrom个message赋值 ——参考stackoverflow
Response.image.CopyFrom(image)
* 执行时出现错误 Exception calling application: 1.0 has type float, but expected one of: bytes, unicode
在server代码中加print找出错点——proto中的类型和实现代码中的类型不一致
3. 传输的数据超出大小限制 ——参考StackOverflow
默认的数据大小限制为4MB(4194304B),普通的传输时是可以通过的,可以通过更改限制进行修改,修改的单位是byte,如下:
# server server = grpc.server(futures.ThreadPoolExecutor(max_workers=10), options = [ ('grpc.max_send_message_length', MAX_MESSAGE_LENGTH), ('grpc.max_receive_message_length', MAX_MESSAGE_LENGTH) ]) # client channel = grpc.insecure_channel('localhost:50051', options=[ ('grpc.max_send_message_length', MAX_MESSAGE_LENGTH), ('grpc.max_receive_message_length', MAX_MESSAGE_LENGTH) ])
* 其中MAX_MESSAGE_LENGTH是是数字,以byte为单位
4. 流式传输的使用,以视频传输为例 ——另一篇博客
5. 异常处理 ——更多参考文档
下面的例子是写在client中的,捕获的异常则是来自server,这个异常server自己无法捕获。
try: ... expect grpc.RpcError as rpc_error: print(rpc_error) if rpc_error.code() == grpc.StatusCode.UNKNOWN: print(rpc_error.details())