protobuf的使用(python)


  最近項目用到了protobuf,使用起來不難,有些細節地方簡單記錄下

1. protobuf介紹  

  Protobuf(Google Protocol Buffers)是google開發的的一套用於數據存儲,網絡通信時用於協議編解碼的工具庫.它和XML和Json數據差不多,把數據已某種形式保存起來.Protobuf相對與XML和Json的不同之處,它是一種二進制的數據格式,具有更高的傳輸,打包和解包效率。另外c++,java和python都可以解析Protobuf的數據,工作中可以用來在不同語言間進行數據交互。
  

2. python使用protobuf

2.1 下載和安裝protubuf

  下載地址:https://github.com/protocolbuffers/protobuf/releases

  從上面鏈接中下載對應的版本並解壓,將bin目錄添加到環境變量。隨后命令行輸入如下命令,查看protoc版本,驗證是否安裝成功

protoc --version    #查看protoc的版本

 

2.2 編寫.proto格式文件

  官方文檔:https://developers.google.com/protocol-buffers/docs/overview

  根據protobuf的語法規則,編寫一個proto文件,制定協議和規則,規定數據的格式和類型。例如在做目標檢測時,下面圖片中有兩個目標(鹿和貓),對於檢測返回的數據格式,可以制定一個proto文件,命名為TargetDetection.proto,其格式如下:

syntax = "proto3";
/* option optimize_for = LITE_RUNTIME; */
package TargetDetection.proto;

/* 矩形 */
message Rect {
    int32 x1 = 1; //矩形左上角的X坐標
    int32 y1 = 2; //矩形左上角的Y坐標
    int32 x2 = 3; //矩形右下角的X坐標
    int32 y2 = 4; //矩形右下角的Y坐標
}



/*目標的信息*/
message TargetInfo{
    int32 targetId = 1;    //目標編號
    Rect box = 2;           //目標在圖片中的位置
    float boxScore = 3;     //目標檢測的分數
    string labelType = 4;   //目標的分類
    bytes imageData = 5;    //將目標裁剪后保存成圖片數據
    string imageType = 6;   //圖片類型: jpg, png...
    string otherData= 9;    //其他備注信息
}

/* 目標檢測 */
message TargetDetection{
    string ImageName = 1;        //圖片名稱
    int64 timestamp = 2;        //時間戳
    int32 width = 3;        //圖片寬度
    int32 height = 4;        //圖片高度
    repeated TargetInfo TargetList = 5; //目標列表
}
TargetDetction.proto

2.3 編譯.proto輸出py文件

   寫好TargetDetection.proto協議文件后,就可以導出成python可以使用的文件。在命令行輸入如下命令,讀取TargetDetection.proto文件,在當前路徑下會生成一個TargetDetection_pb2.py,利用這個文件就可以進行數據序列化了

protoc ./TargetDetection.proto  --python_out=./  #--python_out表示生成TargetDetection_pb2.py文件的存放路徑,通過-h可以查看相關參數

 

2.4 python進行序列化和反序列化

  在python中使用protobuf,還需要安裝python對應的protobuf包(否則會報錯:No module named goofgle):

pip install protobuf==3.12.0

  有了TargetDetection_pb2.py文件就可以愉快的使用了,當得到模型檢測數據后,可以進行序列化並傳輸出去

  下面是對模型檢測數據的序列化:

import TargetDetection_pb2
import time
import cv2
import os
import zmq

def serialize(detection_data, img_dir=r"./"):
    detection_event = TargetDetection_pb2.TargetDetection()  #創建一個detection檢測事件
    detection_event.ImageName = detection_data["img_name"]
    detection_event.timestamp = int(detection_data["timestamp"])  #協議定義的int64
    detection_event.width = detection_data["width"]
    detection_event.height = detection_data["height"]

    for target in detection_data["targetLitst"]:
        target_event = detection_event.TargetList.add()  #列表添加一個target事件
        target_event.targetId = target['id']
        target_event.box.x1 = target['rect'][0]       #復合類型的賦值
        target_event.box.y1 = target['rect'][1]
        target_event.box.x2 = target['rect'][2]
        target_event.box.y2 = target['rect'][3]
        target_event.boxScore = target['score']
        target_event.labelType = target['type']
        img = cv2.imread(os.path.join(img_dir,detection_data["img_name"]))
        x1, y1, x2, y2 = target['rect']
        imgbytes = cv2.imencode(".jpg", img[y1:y2, x1:x2, :])[1].tobytes()   #切割目標小圖並轉化為字節數據
        target_event.imageData = imgbytes
        target_event.imageType = "jpg"
        target_event.otherData = ""

    bytesdata = detection_event.SerializeToString()   #最后將整個事件序列化為字節
    return bytesdata


if __name__ == "__main__":

    detection_data = {"img_name": "animal.jpg", "timestamp": "1615882332331", "width": 1920, "height": 1080,
                      "targetLitst": [{"id": 1, "rect": [150, 50, 960, 893], "score": 0.93, "type": "deer"},
                                      {"id": 2, "rect": [945, 40, 1820, 931], "score": 0.85, "type": "cat"}]}

    bytesdata = serialize(detection_data) 

下面是對序列化數據的解析示例:

import TargetDetection_pb2
import time
import cv2
import os
import zmq

def serialize(detection_data, img_dir=r"./"):
    detection_event = TargetDetection_pb2.TargetDetection()  #創建一個detection檢測事件
    detection_event.ImageName = detection_data["img_name"]
    detection_event.timestamp = int(detection_data["timestamp"])  #協議定義的int64
    detection_event.width = detection_data["width"]
    detection_event.height = detection_data["height"]

    for target in detection_data["targetLitst"]:
        target_event = detection_event.TargetList.add()  #列表添加一個target事件
        target_event.targetId = target['id']
        target_event.box.x1 = target['rect'][0]       #復合類型的賦值
        target_event.box.y1 = target['rect'][1]
        target_event.box.x2 = target['rect'][2]
        target_event.box.y2 = target['rect'][3]
        target_event.boxScore = target['score']
        target_event.labelType = target['type']
        img = cv2.imread(os.path.join(img_dir,detection_data["img_name"]))
        x1, y1, x2, y2 = target['rect']
        imgbytes = cv2.imencode(".jpg", img[y1:y2, x1:x2, :])[1].tobytes()   #切割目標小圖並轉化為字節數據
        target_event.imageData = imgbytes
        target_event.imageType = "jpg"
        target_event.otherData = ""

def deserialize(bytesdata):
    detection_event = TargetDetection_pb2.TargetDetection()  # 創建一個detection檢測事件
    detection_event.ParseFromString(bytesdata)
    print(detection_event.ImageName)
    print(detection_event.timestamp)
    print(detection_event.width)
    print(detection_event.height)
    for target_event in detection_event.TargetList:
        print(target_event.targetId)
        print(target_event.box)
        print(target_event.boxScore)
        print(target_event.labelType)
if __name__ == "__main__": detection_data = {"img_name": "animal.jpg", "timestamp": "1615882332331", "width": 1920, "height": 1080, "targetLitst": [{"id": 1, "rect": [150, 50, 960, 893], "score": 0.93, "type": "deer"}, {"id": 2, "rect": [945, 40, 1820, 931], "score": 0.85, "type": "cat"}]} bytesdata = serialize(detection_data) deserialize(bytesdata)

2.5 實際應用

  在項目中得到protobuf序列化的數據后,一般會通過zmq等通訊工具將數據發送出去,或者寫入到本地。

zmq發送數據

  關於zmq的使用,參見之前的文章https://www.cnblogs.com/silence-cho/p/12657234.html

  下面是將protobuf序列化的數據發送出去的示例: 

import TargetDetection_pb2
import time
import cv2
import os
import zmq

def set_zmq(topic, url, requestPort, responsePort):
    ctx = zmq.Context().instance()
    recvsocket = ctx.socket(zmq.SUB)
    recvsocket.subscribe(topic)
    requestUrl = "tcp://{}:{}".format(url, requestPort)
    recvsocket.connect(requestUrl)
    print('recvsocket bind to', requestUrl)

    sendsocket = ctx.socket(zmq.PUB)
    responseUrl = "tcp://{}:{}".format(url, responsePort)
    sendsocket.connect(responseUrl)
    print('sendsocket bind to', responseUrl)

    return sendsocket, recvsocket


def serialize(detection_data, img_dir=r"./"):
    detection_event = TargetDetection_pb2.TargetDetection()  #創建一個detection檢測事件
    detection_event.ImageName = detection_data["img_name"]
    detection_event.timestamp = int(detection_data["timestamp"])  #協議定義的int64
    detection_event.width = detection_data["width"]
    detection_event.height = detection_data["height"]

    for target in detection_data["targetLitst"]:
        target_event = detection_event.TargetList.add()  #列表添加一個target事件
        target_event.targetId = target['id']
        target_event.box.x1 = target['rect'][0]       #復合類型的賦值
        target_event.box.y1 = target['rect'][1]
        target_event.box.x2 = target['rect'][2]
        target_event.box.y2 = target['rect'][3]
        target_event.boxScore = target['score']
        target_event.labelType = target['type']
        img = cv2.imread(os.path.join(img_dir,detection_data["img_name"]))
        x1, y1, x2, y2 = target['rect']
        imgbytes = cv2.imencode(".jpg", img[y1:y2, x1:x2, :])[1].tobytes()   #切割目標小圖並轉化為字節數據
        target_event.imageData = imgbytes
        target_event.imageType = "jpg"
        target_event.otherData = ""

    bytesdata = detection_event.SerializeToString()   #最后將整個事件序列化為字節
    return bytesdata


def save_event(new_data, name, save_dir="./"):
    frames = 3
    save_bytes = frames.to_bytes(4, byteorder='big')
    for i in new_data:
        # print(len(i))
        temp = len(i)
        save_bytes += temp.to_bytes(4, byteorder='big')
        save_bytes += i
    with open(os.path.join(save_dir,name), "wb") as f:
        f.write(save_bytes)

def read_event(event_path):
    result = []
    with open(event_path, "rb") as f:
        data = f.read()
        frames = int.from_bytes(data[:4], byteorder='big')  #讀取前四個字節,得到共有幾幀數據
        start_pos = 4
        for i in range(frames):
            end_pos = start_pos + 4
            data_length = int.from_bytes(data[start_pos:end_pos], byteorder='big')  #讀取前4字節,獲取該幀數據的長度
            # data_str = data[end_pos:end_pos+data_length].decode("utf-8")
            data_str = data[end_pos:end_pos+data_length]
            result.append(data_str)
            start_pos = end_pos + data_length
    print(result)
    return result


def deserialize(bytesdata):
    detection_event = TargetDetection_pb2.TargetDetection()  # 創建一個detection檢測事件
    detection_event.ParseFromString(bytesdata)
    print(detection_event.ImageName)
    print(detection_event.timestamp)
    print(detection_event.width)
    print(detection_event.height)
    for target_event in detection_event.TargetList:
        print(target_event.targetId)
        print(target_event.box)
        print(target_event.boxScore)
        print(target_event.labelType)



if __name__ == "__main__":
    topic = "animal.detection"
    url = "127.0.0.1"
    requestPort = 4601
    responsePort = 4600
    sendsocket, recvsocket = set_zmq(topic, url, requestPort, responsePort)

    detection_data = {"img_name": "animal.jpg", "timestamp": "1615882332331", "width": 1920, "height": 1080,
                      "targetLitst": [{"id": 1, "rect": [150, 50, 960, 893], "score": 0.93, "type": "deer"},
                                      {"id": 2, "rect": [945, 40, 1820, 931], "score": 0.85, "type": "cat"}]}

    bytesdata = serialize(detection_data)
    timestamp = int(time.time() * 1000)
    data = [topic.encode("utf-8"), str(timestamp).encode("utf-8"), bytesdata]

    #通過zmq將數據發送出去
    sendsocket.send_multipart(data)
zmq發送序列化的數據

寫入本地

  在項目中一般會將發送的zmq數據寫入本地作為日志一部分,zmq數據會有多幀,所以寫入數據時,一般會定義一個數據報文格式,類似tcp報文那種,但比較簡單,如下面是一個三幀數據的報文格式

  下面是完整示例代碼:

import TargetDetection_pb2
import time
import cv2
import os
import zmq

def set_zmq(topic, url, requestPort, responsePort):
    ctx = zmq.Context().instance()
    recvsocket = ctx.socket(zmq.SUB)
    recvsocket.subscribe(topic)
    requestUrl = "tcp://{}:{}".format(url, requestPort)
    recvsocket.connect(requestUrl)
    print('recvsocket bind to', requestUrl)

    sendsocket = ctx.socket(zmq.PUB)
    responseUrl = "tcp://{}:{}".format(url, responsePort)
    sendsocket.connect(responseUrl)
    print('sendsocket bind to', responseUrl)

    return sendsocket, recvsocket


def serialize(detection_data, img_dir=r"./"):
    detection_event = TargetDetection_pb2.TargetDetection()  #創建一個detection檢測事件
    detection_event.ImageName = detection_data["img_name"]
    detection_event.timestamp = int(detection_data["timestamp"])  #協議定義的int64
    detection_event.width = detection_data["width"]
    detection_event.height = detection_data["height"]

    for target in detection_data["targetLitst"]:
        target_event = detection_event.TargetList.add()  #列表添加一個target事件
        target_event.targetId = target['id']
        target_event.box.x1 = target['rect'][0]       #復合類型的賦值
        target_event.box.y1 = target['rect'][1]
        target_event.box.x2 = target['rect'][2]
        target_event.box.y2 = target['rect'][3]
        target_event.boxScore = target['score']
        target_event.labelType = target['type']
        img = cv2.imread(os.path.join(img_dir,detection_data["img_name"]))
        x1, y1, x2, y2 = target['rect']
        imgbytes = cv2.imencode(".jpg", img[y1:y2, x1:x2, :])[1].tobytes()   #切割目標小圖並轉化為字節數據
        target_event.imageData = imgbytes
        target_event.imageType = "jpg"
        target_event.otherData = ""

    bytesdata = detection_event.SerializeToString()   #最后將整個事件序列化為字節
    return bytesdata


def save_event(new_data, name, save_dir="./"):
    frames = 3
    save_bytes = frames.to_bytes(4, byteorder='big')
    for i in new_data:
        # print(len(i))
        temp = len(i)
        save_bytes += temp.to_bytes(4, byteorder='big')
        save_bytes += i
    with open(os.path.join(save_dir,name), "wb") as f:
        f.write(save_bytes)

def read_event(event_path):
    result = []
    with open(event_path, "rb") as f:
        data = f.read()
        frames = int.from_bytes(data[:4], byteorder='big')  #讀取前四個字節,得到共有幾幀數據
        start_pos = 4
        for i in range(frames):
            end_pos = start_pos + 4
            data_length = int.from_bytes(data[start_pos:end_pos], byteorder='big')  #讀取前4字節,獲取該幀數據的長度
            # data_str = data[end_pos:end_pos+data_length].decode("utf-8")
            data_str = data[end_pos:end_pos+data_length]
            result.append(data_str)
            start_pos = end_pos + data_length
    print(result)
    return result


def deserialize(bytesdata):
    detection_event = TargetDetection_pb2.TargetDetection()  # 創建一個detection檢測事件
    detection_event.ParseFromString(bytesdata)
    print(detection_event.ImageName)
    print(detection_event.timestamp)
    print(detection_event.width)
    print(detection_event.height)
    for target_event in detection_event.TargetList:
        print(target_event.targetId)
        print(target_event.box)
        print(target_event.boxScore)
        print(target_event.labelType)



if __name__ == "__main__":
    topic = "animal.detection"
    url = "127.0.0.1"
    requestPort = 4601
    responsePort = 4600
    sendsocket, recvsocket = set_zmq(topic, url, requestPort, responsePort)

    detection_data = {"img_name": "animal.jpg", "timestamp": "1615882332331", "width": 1920, "height": 1080,
                      "targetLitst": [{"id": 1, "rect": [150, 50, 960, 893], "score": 0.93, "type": "deer"},
                                      {"id": 2, "rect": [945, 40, 1820, 931], "score": 0.85, "type": "cat"}]}

    bytesdata = serialize(detection_data)
    timestamp = int(time.time() * 1000)
    data = [topic.encode("utf-8"), str(timestamp).encode("utf-8"), bytesdata]

    #通過zmq將數據發送出去
    # sendsocket.send_multipart(data)

    #將數據保存到本地
    save_dir = r"F:\event\detection_event"
    name = topic + "_" + str(timestamp)
    # save_event(data, name, save_dir)
    save_event(data, name)
    
    #讀取數據並反序列化
    event_path = r"./animal.detection_1615885149114"
    results_list = read_event(event_path)
    deserialize(results_list[-1])
讀寫zmq數據

 

 參考:https://blog.csdn.net/u013210620/article/details/81317731

   https://www.cnblogs.com/silence-cho/p/12657234.html


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM