gRPC流式傳輸,以視頻傳輸為例


 

  流式傳輸需要關鍵字stream,stream 修飾的結構在使用時應該是可以迭代的,即iterable。下面是client -> server 的視頻流式傳輸。

video.proto

syntax = "proto3";

service New{
    rpc Video_transport(stream FrameRequest) returns (FrameResponse);  // no check for all frames
}
message FrameRequest {
    bytes f_data = 1;           // frame data field
    int32 goon = 2;             // if goon == 0, this is end informer, if >0, not the final frame
}
message FrameResponse{
    string state = 1;
}

 

  如client代碼中所示,根據yield返回生成器的特性,每次取完一幀,就會用生成器將該幀的信息返回給stub,發送給server。

client.py  使用yield——參考自GitHub、官方教程

from grpc_out.video_pb2 import *
from grpc_out.video_pb2_grpc import NewStub

from PIL import Image

import cv2, grpc
import numpy as np

fheight, fwidth, fchannels, fps = 0,0,0,0
def transport_video(video_path):
    cap = cv2.VideoCapture(video_path)            # read video
    if not cap.isOpened():
        print('the video does not exist.')
        return False

    global fheight, fwidth, fchannels, fps          # information that help to reconstruct the video
    h, w = 256, 256
    fps = int(cap.get(cv2.CAP_PROP_FPS))
    ret, frame = cap.read()
    while ret:
        img = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
        img = np.array(cv2.resize(img, (h, w)))       # resize so that the data block won't be too big
        fheight, fwidth, fchannels = img.shape
        yield FrameRequest(f_data=np.ndarray.tobytes(img), goon=1) # 'yield' could provide an iterable genetator
        ret, frame = cap.read()
    cap.release()
    print('video read ended.')
    yield FrameRequest(f_data=None, goon=0)         # send end signal


def run():
    with grpc.insecure_channel('10.xx.xx.xx:50000') as channel:   # then the channel could close after the 'with' block
        stub = NewStub(channel)
        ## transport video to server

        video_path = './examples/C_30080000.mp4'
        response = stub.Video_transport(transport_video(video_path))
        print(response.state)


if __name__ == "__main__":
    run()

 ** 另一種非常見的不使用 yield 的方法,參看GitHub,似乎不可行(沒跑通,不可迭代)。

server.py

from concurrent import futures
from grpc_out.video_pb2 import *
from grpc_out.video_pb2_grpc import NewServicer, add_NewServicer_to_server

import os, subprocess, cv2, grpc
import numpy as np
from PIL import Image

# the implementation of interfaces in NewServicer
class New(NewServicer):
    def __init__(self) -> None:
        self.frames = []
    
    def save_video(self, video_path):                 # construct video from frames and save to local disk
        video = cv2.VideoWriter(video_path, cv2.VideoWriter_fourcc(*'mp4v'), fps, (height, width))# things about height, width, channels, fps should be provided
        for img in self.frames:
            img_arr = np.frombuffer(img, dtype=np.uint8)         # bytes -> ndarray
            img_arr = np.reshape(img_arr, (height, width, channels))
            img = cv2.cvtColor(np.array(Image.fromarray(img_arr)), cv2.COLOR_RGB2BGR)
            video.write(img )
        video.release()
        self.frames.clear()

    def Video_transport(self, request_iterator, context):
        print(' *** Reciving video frames...')
        if len(self.frames): 
            print('Clear frames cache...')
            self.frames.clear()
        for f_info in request_iterator:                # it's an iterator, each element contains frame's bytes
            if not f_info.goon:                        # ened, empty frame
                self.save_video(video_path)
                print('---> ' + str(len(self.frames)) + ' frames (all) received.')
                return FrameResponse(state='ok')
            self.frames.append(f_info.f_data)
        
def serve():
    server = grpc.server(futures.ThreadPoolExecutor(max_workers=10))             # thread pool supports multi-request
    add_NewServicer_to_server(New(), server)                    # add servicer to server
    server.add_insecure_port('[::]:520')
    server.start()
    server.wait_for_termination()           # block the calling thread if the server terminates

if __name__ == "__main__":
    serve()

 

  上面雙方啟動后,會產生一個client->server的流,流中每次只傳輸一幀,只有當一個視頻中的所有幀都傳輸完畢后,Video_transport 這個服務才會結束並給出response。

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM