(三)grpc-流式傳輸


1.單向流:服務端向客戶端流傳輸

創建一個protobuf 文件:

hello_world.proto

// 定義一個服務的框架,服務名和服務下的函數名,以及函數下的request 和response,
// 和resquest 和response 對應的參數

//使用哪種protobuf 協議
syntax="proto3";
package test;

//服務名:Mianmian,以及里面定義的rpc函數HelloMian
service Mianmian{
    rpc HelloMian(HelloMianReq) returns(HelloMianReply)
    {}   //添加一些插件
    rpc TestClientRecvStream(TestClientRecvStreamRequest) returns(stream TestClientRecvStreamResponse){}
}

//傳輸方式
// 1.unary 單程
// 2.stream  1. 雙向 客戶端請求服務端(流),服務端返回客戶端(流)
//             2.單向 服務端接收客戶端(流),服務端返回客戶端(非流)
//            3. 單向 服務端接收客戶端(非流),服務器send客戶端(流)
message HelloMianReq{
    string  name = 1;
    int32 age =2;
}

message HelloTestRuest{
    string  name = 1;
    int32 age =2;
    repeated string data = 3; //a=[1,2]
    map<string,int32> number=4; //string ,int32
}

message TestClientRecvStreamRequest{
    string data=1;
}
message  TestClientRecvStreamResponse{
    string result=1;
}
message HelloMianReply{
    string result =1;
}


message HelloTestResponse{
    string result =1;
}

編寫service 代碼:

service.py

#!/usr/bin/env python
# -*- coding:utf-8 -*-
#@Time  : 2021/6/5 13:37
#@Author: Tana
#@File  : service.py

import grpc
import hello_world_pb2 as pb2
import hello_world_pb2_grpc as pb2_grpc
from concurrent import futures
import time
class Mianmian(pb2_grpc.MianmianServicer):
    def HelloMian(self, request, context):
        name=request.name
        age=request.age

        result={"code":"succeed",
            "data":f'my name is {name},i am {age} years old'
        }
        return pb2.HelloMianReply(result=result)
    def TestClientRecvStream(self, request, context):
        #判斷客戶端是否活躍
        index=0
        while context.is_active():
            data=request.data
            time.sleep(1)
            index += 1
            yield pb2.TestClientRecvStreamResponse(
                result=f'send {index} {data}'
            )


def run():
    #定義grpc線程數量
    grpc_server=grpc.server(futures.ThreadPoolExecutor(max_workers=4))
    #注冊服務到grpc_server
    pb2_grpc.add_MianmianServicer_to_server(Mianmian(),grpc_server)
    #綁定ip和端口號
    grpc_server.add_insecure_port('0.0.0.0:5001')
    print("server will start at 0.0.0.0:5001")
    #這個start 在python里面會啟動一下就停了,所以需要寫一個無限循環
    grpc_server.start()
    try:
        while 1:
            print("1")
            time.sleep(3600)

    except KeyboardInterrupt:
        grpc_server.stop(0)

if __name__=="__main__":
    run()

編寫client端 代碼:

client.py

#!/usr/bin/env python
# -*- coding:utf-8 -*-
#@Time  : 2021/6/6 15:26
#@Author: Tana
#@File  : client.py
import grpc
import hello_world_pb2_grpc as pb2_grpc
import hello_world_pb2 as pb2

def run():
    #定義一個頻道,綁定服務器端對應的ip 和端口號
    conn = grpc.insecure_channel('0.0.0.0:5000')
    #生成客戶端
    client = pb2_grpc.MianmianStub(channel=conn)
    # response = client.HelloMian(pb2.HelloMianReq(
    #     name="mianmian",
    #     age=18
    # ))
    # print(response.result)
    #
    response=client.TestClientRecvStream(pb2.TestClientRecvStreamRequest(
        data='mianmian'
    ))

    for item in response:
        print(item.result)


if  __name__=='__main__':
    run()

 

2.單向流:客戶端流請求

創建一個protobuf 文件:

hello_world.proto

// 定義一個服務的框架,服務名和服務下的函數名,以及函數下的request 和response,
// 和resquest 和response 對應的參數

//使用哪種protobuf 協議
syntax="proto3";
package test;

//服務名:Mianmian,以及里面定義的rpc函數HelloMian
service Mianmian{
    rpc HelloMian(HelloMianReq) returns(HelloMianReply)
    {}   //添加一些插件
    rpc TestClientRecvStream(TestClientRecvStreamRequest) returns(stream TestClientRecvStreamResponse){}

    rpc TestClientSendStream(stream TestClientSendStreamRequest)
        returns (TestClientSendStreamResponse) {}
}
//傳輸方式
// 1.unary 單程
// 2.stream  1. 雙向 客戶端請求服務端(流),服務端返回客戶端(流)
//             2.單向 服務端接收客戶端(流),服務端返回客戶端(非流)
//            3. 單向 服務端接收客戶端(非流),服務器send客戶端(流)
message HelloMianReq{
    string  name = 1;
    int32 age =2;
}

message HelloTestRuest{
    string  name = 1;
    int32 age =2;
    repeated string data = 3; //a=[1,2]
    map<string,int32> number=4; //string ,int32
}

message TestClientSendStreamRequest{
    string data=1;
}

message TestClientSendStreamResponse{
    string result=1;
}
message TestClientRecvStreamRequest{
    string data=1;
}
message  TestClientRecvStreamResponse{
    string result=1;
}
message HelloMianReply{
    string result =1;
}


message HelloTestResponse{
    string result =1;
}

編寫service 代碼:

service.py

#!/usr/bin/env python
# -*- coding:utf-8 -*-
#@Time  : 2021/6/5 13:37
#@Author: Tana
#@File  : service.py

import grpc
import hello_world_pb2 as pb2
import hello_world_pb2_grpc as pb2_grpc
from concurrent import futures
import time
class Mianmian(pb2_grpc.MianmianServicer):
    def HelloMian(self, request, context):
        name=request.name
        age=request.age

        result={"code":"succeed",
            "data":f'my name is {name},i am {age} years old'
        }
        return pb2.HelloMianReply(result=result)
    def TestClientRecvStream(self, request, context):
        #判斷客戶端是否活躍
        index=0
        while context.is_active():
            data=request.data
            time.sleep(1)
            index += 1
            yield pb2.TestClientRecvStreamResponse(
                result=f'send {index} {data}'
            )
      
def TestClientSendStream(self, request_iterator, context): for request in request_iterator: print(request.data) return pb2.TestClientSendStreamResponse(reslut="ok") def run(): #定義grpc線程數量 grpc_server=grpc.server(futures.ThreadPoolExecutor(max_workers=4)) #注冊服務到grpc_server pb2_grpc.add_MianmianServicer_to_server(Mianmian(),grpc_server) #綁定ip和端口號 grpc_server.add_insecure_port('0.0.0.0:5001') print("server will start at 0.0.0.0:5001") #這個start 在python里面會啟動一下就停了,所以需要寫一個無限循環 grpc_server.start() try: while 1: print("1") time.sleep(3600) except KeyboardInterrupt: grpc_server.stop(0) if __name__=="__main__": run()

編寫client端 代碼:

client.py

#!/usr/bin/env python
# -*- coding:utf-8 -*-
#@Time  : 2021/6/6 15:26
#@Author: Tana
#@File  : client.py
import grpc
import hello_world_pb2_grpc as pb2_grpc
import hello_world_pb2 as pb2
import time,random

def test():
    while 1:
        time.sleep(1)
        data=str(random.random())
        yield  pb2.TestClientRecvStreamRequest(data=data)
def run():
    #定義一個頻道,綁定服務器端對應的ip 和端口號
    conn = grpc.insecure_channel('0.0.0.0:5000')
    #生成客戶端
    client = pb2_grpc.MianmianStub(channel=conn)
    # response = client.HelloMian(pb2.HelloMianReq(
    #     name="mianmian",
    #     age=18
    # ))
    # print(response.result)
    #
    #服務端流傳輸
    # response=client.TestClientRecvStream(pb2.TestClientRecvStreamRequest(
    #     data='mianmian'
    # ))
    #
    # for item in response:
    #     print(item.result)

    #客戶端流請求
    response=client.TestClientSendStream(test())
    print(response.result())

if  __name__=='__main__':
    run()

 

2.雙向流:服務端向客戶端雙向流

創建一個protobuf 文件:

hello_world.proto

// 定義一個服務的框架,服務名和服務下的函數名,以及函數下的request 和response,
// 和resquest 和response 對應的參數

//使用哪種protobuf 協議
syntax="proto3";
package test;

//服務名:Mianmian,以及里面定義的rpc函數HelloMian
service Mianmian{
    rpc HelloMian(HelloMianReq) returns(HelloMianReply)
    {}   //添加一些插件
    rpc TestClientRecvStream(TestClientRecvStreamRequest) returns(stream TestClientRecvStreamResponse){}

    rpc TestClientSendStream(stream TestClientSendStreamRequest)
        returns (TestClientSendStreamResponse) {}
    rpc TestTwoWayStream(stream TestTwoWayStreamRequest) returns(stream TestTwoWayStreamResponse) {}
}
//傳輸方式
// 1.unary 單程
// 2.stream  1. 雙向 客戶端請求服務端(流),服務端返回客戶端(流)
//             2.單向 服務端接收客戶端(流),服務端返回客戶端(非流)
//            3. 單向 服務端接收客戶端(非流),服務器send客戶端(流)

message TestTwoWayStreamRequest{
    string data=1;
    }

message TestTwoWayStreamResponse{
    string result=1;
    }
message HelloMianReq{
    string  name = 1;
    int32 age =2;
}

message HelloTestRuest{
    string  name = 1;
    int32 age =2;
    repeated string data = 3; //a=[1,2]
    map<string,int32> number=4; //string ,int32
}

message TestClientSendStreamRequest{
    string data=1;
}

message TestClientSendStreamResponse{
    string result=1;
}
message TestClientRecvStreamRequest{
    string data=1;
}
message  TestClientRecvStreamResponse{
    string result=1;
}
message HelloMianReply{
    string result =1;
}


message HelloTestResponse{
    string result =1;
}

編寫service 代碼:

service.py

#!/usr/bin/env python
# -*- coding:utf-8 -*-
#@Time  : 2021/6/5 13:37
#@Author: Tana
#@File  : service.py

import grpc
import hello_world_pb2 as pb2
import hello_world_pb2_grpc as pb2_grpc
from concurrent import futures
import time
class Mianmian(pb2_grpc.MianmianServicer):
    def HelloMian(self, request, context):
        name=request.name
        age=request.age

        result={"code":"succeed",
            "data":f'my name is {name},i am {age} years old'
        }
        return pb2.HelloMianReply(result=result)
    def TestClientRecvStream(self, request, context):
        #判斷客戶端是否活躍
        index=0
        while context.is_active():
            data=request.data
            time.sleep(1)
            index += 1
            yield pb2.TestClientRecvStreamResponse(
                result=f'send {index} {data}'
            )
    def TestClientSendStream(self, request_iterator, context):
        for request in request_iterator:
            print(request.data)
        return pb2.TestClientSendStreamResponse(reslut="ok")
    def TestTwoWayStream(self, request_iterator, context):
        for request in request_iterator:
            data = request.data
            yield pb2.TestTwoWayStreamResponse(result="service send %s" % data)



def run():
    #定義grpc線程數量
    grpc_server=grpc.server(futures.ThreadPoolExecutor(max_workers=4))
    #注冊服務到grpc_server
    pb2_grpc.add_MianmianServicer_to_server(Mianmian(),grpc_server)
    #綁定ip和端口號
    grpc_server.add_insecure_port('0.0.0.0:5001')
    print("server will start at 0.0.0.0:5001")
    #這個start 在python里面會啟動一下就停了,所以需要寫一個無限循環
    grpc_server.start()
    try:
        while 1:
            print("1")
            time.sleep(3600)

    except KeyboardInterrupt:
        grpc_server.stop(0)

if __name__=="__main__":
    run()

 

編寫client端 代碼:

client.py

#!/usr/bin/env python
# -*- coding:utf-8 -*-
#@Time  : 2021/6/6 15:26
#@Author: Tana
#@File  : client.py
import grpc
import hello_world_pb2_grpc as pb2_grpc
import hello_world_pb2 as pb2
import time,random

def test():
    while 1:
        time.sleep(1)
        data=str(random.random())
        yield  pb2.TestClientRecvStreamRequest(data=data)
def run():
    #定義一個頻道,綁定服務器端對應的ip 和端口號
    conn = grpc.insecure_channel('0.0.0.0:5000')
    #生成客戶端
    client = pb2_grpc.MianmianStub(channel=conn)
    # response = client.HelloMian(pb2.HelloMianReq(
    #     name="mianmian",
    #     age=18
    # ))
    # print(response.result)
    #
    #服務端流傳輸
    # response=client.TestClientRecvStream(pb2.TestClientRecvStreamRequest(
    #     data='mianmian'
    # ))
    #
    # for item in response:
    #     print(item.result)

    # #客戶端流請求
    # response=client.TestClientSendStream(test())
    # print(response.result())

    #雙向流
    response = client.TestTwoWayStream(test())
    for res in response:
        print(res.result)

if  __name__=='__main__':
    run()

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM