1.單向流:服務端向客戶端流傳輸
創建一個protobuf 文件:
hello_world.proto
// 定義一個服務的框架,服務名和服務下的函數名,以及函數下的request 和response, // 和resquest 和response 對應的參數 //使用哪種protobuf 協議 syntax="proto3"; package test; //服務名:Mianmian,以及里面定義的rpc函數HelloMian service Mianmian{ rpc HelloMian(HelloMianReq) returns(HelloMianReply) {} //添加一些插件 rpc TestClientRecvStream(TestClientRecvStreamRequest) returns(stream TestClientRecvStreamResponse){} } //傳輸方式 // 1.unary 單程 // 2.stream 1. 雙向 客戶端請求服務端(流),服務端返回客戶端(流) // 2.單向 服務端接收客戶端(流),服務端返回客戶端(非流) // 3. 單向 服務端接收客戶端(非流),服務器send客戶端(流) message HelloMianReq{ string name = 1; int32 age =2; } message HelloTestRuest{ string name = 1; int32 age =2; repeated string data = 3; //a=[1,2] map<string,int32> number=4; //string ,int32 } message TestClientRecvStreamRequest{ string data=1; } message TestClientRecvStreamResponse{ string result=1; } message HelloMianReply{ string result =1; } message HelloTestResponse{ string result =1; }
編寫service 代碼:
service.py
#!/usr/bin/env python # -*- coding:utf-8 -*- #@Time : 2021/6/5 13:37 #@Author: Tana #@File : service.py import grpc import hello_world_pb2 as pb2 import hello_world_pb2_grpc as pb2_grpc from concurrent import futures import time class Mianmian(pb2_grpc.MianmianServicer): def HelloMian(self, request, context): name=request.name age=request.age result={"code":"succeed", "data":f'my name is {name},i am {age} years old' } return pb2.HelloMianReply(result=result) def TestClientRecvStream(self, request, context): #判斷客戶端是否活躍 index=0 while context.is_active(): data=request.data time.sleep(1) index += 1 yield pb2.TestClientRecvStreamResponse( result=f'send {index} {data}' ) def run(): #定義grpc線程數量 grpc_server=grpc.server(futures.ThreadPoolExecutor(max_workers=4)) #注冊服務到grpc_server pb2_grpc.add_MianmianServicer_to_server(Mianmian(),grpc_server) #綁定ip和端口號 grpc_server.add_insecure_port('0.0.0.0:5001') print("server will start at 0.0.0.0:5001") #這個start 在python里面會啟動一下就停了,所以需要寫一個無限循環 grpc_server.start() try: while 1: print("1") time.sleep(3600) except KeyboardInterrupt: grpc_server.stop(0) if __name__=="__main__": run()
編寫client端 代碼:
client.py
#!/usr/bin/env python # -*- coding:utf-8 -*- #@Time : 2021/6/6 15:26 #@Author: Tana #@File : client.py import grpc import hello_world_pb2_grpc as pb2_grpc import hello_world_pb2 as pb2 def run(): #定義一個頻道,綁定服務器端對應的ip 和端口號 conn = grpc.insecure_channel('0.0.0.0:5000') #生成客戶端 client = pb2_grpc.MianmianStub(channel=conn) # response = client.HelloMian(pb2.HelloMianReq( # name="mianmian", # age=18 # )) # print(response.result) # response=client.TestClientRecvStream(pb2.TestClientRecvStreamRequest( data='mianmian' )) for item in response: print(item.result) if __name__=='__main__': run()
2.單向流:客戶端流請求
創建一個protobuf 文件:
hello_world.proto
// 定義一個服務的框架,服務名和服務下的函數名,以及函數下的request 和response, // 和resquest 和response 對應的參數 //使用哪種protobuf 協議 syntax="proto3"; package test; //服務名:Mianmian,以及里面定義的rpc函數HelloMian service Mianmian{ rpc HelloMian(HelloMianReq) returns(HelloMianReply) {} //添加一些插件 rpc TestClientRecvStream(TestClientRecvStreamRequest) returns(stream TestClientRecvStreamResponse){} rpc TestClientSendStream(stream TestClientSendStreamRequest) returns (TestClientSendStreamResponse) {} } //傳輸方式 // 1.unary 單程 // 2.stream 1. 雙向 客戶端請求服務端(流),服務端返回客戶端(流) // 2.單向 服務端接收客戶端(流),服務端返回客戶端(非流) // 3. 單向 服務端接收客戶端(非流),服務器send客戶端(流) message HelloMianReq{ string name = 1; int32 age =2; } message HelloTestRuest{ string name = 1; int32 age =2; repeated string data = 3; //a=[1,2] map<string,int32> number=4; //string ,int32 } message TestClientSendStreamRequest{ string data=1; } message TestClientSendStreamResponse{ string result=1; } message TestClientRecvStreamRequest{ string data=1; } message TestClientRecvStreamResponse{ string result=1; } message HelloMianReply{ string result =1; } message HelloTestResponse{ string result =1; }
編寫service 代碼:
service.py
#!/usr/bin/env python # -*- coding:utf-8 -*- #@Time : 2021/6/5 13:37 #@Author: Tana #@File : service.py import grpc import hello_world_pb2 as pb2 import hello_world_pb2_grpc as pb2_grpc from concurrent import futures import time class Mianmian(pb2_grpc.MianmianServicer): def HelloMian(self, request, context): name=request.name age=request.age result={"code":"succeed", "data":f'my name is {name},i am {age} years old' } return pb2.HelloMianReply(result=result) def TestClientRecvStream(self, request, context): #判斷客戶端是否活躍 index=0 while context.is_active(): data=request.data time.sleep(1) index += 1 yield pb2.TestClientRecvStreamResponse( result=f'send {index} {data}' )
def TestClientSendStream(self, request_iterator, context): for request in request_iterator: print(request.data) return pb2.TestClientSendStreamResponse(reslut="ok") def run(): #定義grpc線程數量 grpc_server=grpc.server(futures.ThreadPoolExecutor(max_workers=4)) #注冊服務到grpc_server pb2_grpc.add_MianmianServicer_to_server(Mianmian(),grpc_server) #綁定ip和端口號 grpc_server.add_insecure_port('0.0.0.0:5001') print("server will start at 0.0.0.0:5001") #這個start 在python里面會啟動一下就停了,所以需要寫一個無限循環 grpc_server.start() try: while 1: print("1") time.sleep(3600) except KeyboardInterrupt: grpc_server.stop(0) if __name__=="__main__": run()
編寫client端 代碼:
client.py
#!/usr/bin/env python # -*- coding:utf-8 -*- #@Time : 2021/6/6 15:26 #@Author: Tana #@File : client.py import grpc import hello_world_pb2_grpc as pb2_grpc import hello_world_pb2 as pb2 import time,random def test(): while 1: time.sleep(1) data=str(random.random()) yield pb2.TestClientRecvStreamRequest(data=data) def run(): #定義一個頻道,綁定服務器端對應的ip 和端口號 conn = grpc.insecure_channel('0.0.0.0:5000') #生成客戶端 client = pb2_grpc.MianmianStub(channel=conn) # response = client.HelloMian(pb2.HelloMianReq( # name="mianmian", # age=18 # )) # print(response.result) # #服務端流傳輸 # response=client.TestClientRecvStream(pb2.TestClientRecvStreamRequest( # data='mianmian' # )) # # for item in response: # print(item.result) #客戶端流請求 response=client.TestClientSendStream(test()) print(response.result()) if __name__=='__main__': run()
2.雙向流:服務端向客戶端雙向流
創建一個protobuf 文件:
hello_world.proto
// 定義一個服務的框架,服務名和服務下的函數名,以及函數下的request 和response, // 和resquest 和response 對應的參數 //使用哪種protobuf 協議 syntax="proto3"; package test; //服務名:Mianmian,以及里面定義的rpc函數HelloMian service Mianmian{ rpc HelloMian(HelloMianReq) returns(HelloMianReply) {} //添加一些插件 rpc TestClientRecvStream(TestClientRecvStreamRequest) returns(stream TestClientRecvStreamResponse){} rpc TestClientSendStream(stream TestClientSendStreamRequest) returns (TestClientSendStreamResponse) {} rpc TestTwoWayStream(stream TestTwoWayStreamRequest) returns(stream TestTwoWayStreamResponse) {} } //傳輸方式 // 1.unary 單程 // 2.stream 1. 雙向 客戶端請求服務端(流),服務端返回客戶端(流) // 2.單向 服務端接收客戶端(流),服務端返回客戶端(非流) // 3. 單向 服務端接收客戶端(非流),服務器send客戶端(流) message TestTwoWayStreamRequest{ string data=1; } message TestTwoWayStreamResponse{ string result=1; } message HelloMianReq{ string name = 1; int32 age =2; } message HelloTestRuest{ string name = 1; int32 age =2; repeated string data = 3; //a=[1,2] map<string,int32> number=4; //string ,int32 } message TestClientSendStreamRequest{ string data=1; } message TestClientSendStreamResponse{ string result=1; } message TestClientRecvStreamRequest{ string data=1; } message TestClientRecvStreamResponse{ string result=1; } message HelloMianReply{ string result =1; } message HelloTestResponse{ string result =1; }
編寫service 代碼:
service.py
#!/usr/bin/env python # -*- coding:utf-8 -*- #@Time : 2021/6/5 13:37 #@Author: Tana #@File : service.py import grpc import hello_world_pb2 as pb2 import hello_world_pb2_grpc as pb2_grpc from concurrent import futures import time class Mianmian(pb2_grpc.MianmianServicer): def HelloMian(self, request, context): name=request.name age=request.age result={"code":"succeed", "data":f'my name is {name},i am {age} years old' } return pb2.HelloMianReply(result=result) def TestClientRecvStream(self, request, context): #判斷客戶端是否活躍 index=0 while context.is_active(): data=request.data time.sleep(1) index += 1 yield pb2.TestClientRecvStreamResponse( result=f'send {index} {data}' ) def TestClientSendStream(self, request_iterator, context): for request in request_iterator: print(request.data) return pb2.TestClientSendStreamResponse(reslut="ok") def TestTwoWayStream(self, request_iterator, context): for request in request_iterator: data = request.data yield pb2.TestTwoWayStreamResponse(result="service send %s" % data) def run(): #定義grpc線程數量 grpc_server=grpc.server(futures.ThreadPoolExecutor(max_workers=4)) #注冊服務到grpc_server pb2_grpc.add_MianmianServicer_to_server(Mianmian(),grpc_server) #綁定ip和端口號 grpc_server.add_insecure_port('0.0.0.0:5001') print("server will start at 0.0.0.0:5001") #這個start 在python里面會啟動一下就停了,所以需要寫一個無限循環 grpc_server.start() try: while 1: print("1") time.sleep(3600) except KeyboardInterrupt: grpc_server.stop(0) if __name__=="__main__": run()
編寫client端 代碼:
client.py
#!/usr/bin/env python # -*- coding:utf-8 -*- #@Time : 2021/6/6 15:26 #@Author: Tana #@File : client.py import grpc import hello_world_pb2_grpc as pb2_grpc import hello_world_pb2 as pb2 import time,random def test(): while 1: time.sleep(1) data=str(random.random()) yield pb2.TestClientRecvStreamRequest(data=data) def run(): #定義一個頻道,綁定服務器端對應的ip 和端口號 conn = grpc.insecure_channel('0.0.0.0:5000') #生成客戶端 client = pb2_grpc.MianmianStub(channel=conn) # response = client.HelloMian(pb2.HelloMianReq( # name="mianmian", # age=18 # )) # print(response.result) # #服務端流傳輸 # response=client.TestClientRecvStream(pb2.TestClientRecvStreamRequest( # data='mianmian' # )) # # for item in response: # print(item.result) # #客戶端流請求 # response=client.TestClientSendStream(test()) # print(response.result()) #雙向流 response = client.TestTwoWayStream(test()) for res in response: print(res.result) if __name__=='__main__': run()