最近項目用到了protobuf,使用起來不難,有些細節地方簡單記錄下
1. protobuf介紹
Protobuf(Google Protocol Buffers)是google開發的的一套用於數據存儲,網絡通信時用於協議編解碼的工具庫.它和XML和Json數據差不多,把數據已某種形式保存起來.Protobuf相對與XML和Json的不同之處,它是一種二進制的數據格式,具有更高的傳輸,打包和解包效率。另外c++,java和python都可以解析Protobuf的數據,工作中可以用來在不同語言間進行數據交互。
2. python使用protobuf
2.1 下載和安裝protubuf
下載地址:https://github.com/protocolbuffers/protobuf/releases
從上面鏈接中下載對應的版本並解壓,將bin目錄添加到環境變量。隨后命令行輸入如下命令,查看protoc版本,驗證是否安裝成功
protoc --version #查看protoc的版本
2.2 編寫.proto格式文件
官方文檔:https://developers.google.com/protocol-buffers/docs/overview
根據protobuf的語法規則,編寫一個proto文件,制定協議和規則,規定數據的格式和類型。例如在做目標檢測時,下面圖片中有兩個目標(鹿和貓),對於檢測返回的數據格式,可以制定一個proto文件,命名為TargetDetection.proto,其格式如下:
syntax = "proto3"; /* option optimize_for = LITE_RUNTIME; */ package TargetDetection.proto; /* 矩形 */ message Rect { int32 x1 = 1; //矩形左上角的X坐標 int32 y1 = 2; //矩形左上角的Y坐標 int32 x2 = 3; //矩形右下角的X坐標 int32 y2 = 4; //矩形右下角的Y坐標 } /*目標的信息*/ message TargetInfo{ int32 targetId = 1; //目標編號 Rect box = 2; //目標在圖片中的位置 float boxScore = 3; //目標檢測的分數 string labelType = 4; //目標的分類 bytes imageData = 5; //將目標裁剪后保存成圖片數據 string imageType = 6; //圖片類型: jpg, png... string otherData= 9; //其他備注信息 } /* 目標檢測 */ message TargetDetection{ string ImageName = 1; //圖片名稱 int64 timestamp = 2; //時間戳 int32 width = 3; //圖片寬度 int32 height = 4; //圖片高度 repeated TargetInfo TargetList = 5; //目標列表 }

2.3 編譯.proto輸出py文件
寫好TargetDetection.proto協議文件后,就可以導出成python可以使用的文件。在命令行輸入如下命令,讀取TargetDetection.proto文件,在當前路徑下會生成一個TargetDetection_pb2.py,利用這個文件就可以進行數據序列化了
protoc ./TargetDetection.proto --python_out=./ #--python_out表示生成TargetDetection_pb2.py文件的存放路徑,通過-h可以查看相關參數
2.4 python進行序列化和反序列化
在python中使用protobuf,還需要安裝python對應的protobuf包(否則會報錯:No module named goofgle):
pip install protobuf==3.12.0
有了TargetDetection_pb2.py文件就可以愉快的使用了,當得到模型檢測數據后,可以進行序列化並傳輸出去
下面是對模型檢測數據的序列化:
import TargetDetection_pb2 import time import cv2 import os import zmq def serialize(detection_data, img_dir=r"./"): detection_event = TargetDetection_pb2.TargetDetection() #創建一個detection檢測事件 detection_event.ImageName = detection_data["img_name"] detection_event.timestamp = int(detection_data["timestamp"]) #協議定義的int64 detection_event.width = detection_data["width"] detection_event.height = detection_data["height"] for target in detection_data["targetLitst"]: target_event = detection_event.TargetList.add() #列表添加一個target事件 target_event.targetId = target['id'] target_event.box.x1 = target['rect'][0] #復合類型的賦值 target_event.box.y1 = target['rect'][1] target_event.box.x2 = target['rect'][2] target_event.box.y2 = target['rect'][3] target_event.boxScore = target['score'] target_event.labelType = target['type'] img = cv2.imread(os.path.join(img_dir,detection_data["img_name"])) x1, y1, x2, y2 = target['rect'] imgbytes = cv2.imencode(".jpg", img[y1:y2, x1:x2, :])[1].tobytes() #切割目標小圖並轉化為字節數據 target_event.imageData = imgbytes target_event.imageType = "jpg" target_event.otherData = "" bytesdata = detection_event.SerializeToString() #最后將整個事件序列化為字節 return bytesdata if __name__ == "__main__": detection_data = {"img_name": "animal.jpg", "timestamp": "1615882332331", "width": 1920, "height": 1080, "targetLitst": [{"id": 1, "rect": [150, 50, 960, 893], "score": 0.93, "type": "deer"}, {"id": 2, "rect": [945, 40, 1820, 931], "score": 0.85, "type": "cat"}]} bytesdata = serialize(detection_data)
下面是對序列化數據的解析示例:
import TargetDetection_pb2 import time import cv2 import os import zmq def serialize(detection_data, img_dir=r"./"): detection_event = TargetDetection_pb2.TargetDetection() #創建一個detection檢測事件 detection_event.ImageName = detection_data["img_name"] detection_event.timestamp = int(detection_data["timestamp"]) #協議定義的int64 detection_event.width = detection_data["width"] detection_event.height = detection_data["height"] for target in detection_data["targetLitst"]: target_event = detection_event.TargetList.add() #列表添加一個target事件 target_event.targetId = target['id'] target_event.box.x1 = target['rect'][0] #復合類型的賦值 target_event.box.y1 = target['rect'][1] target_event.box.x2 = target['rect'][2] target_event.box.y2 = target['rect'][3] target_event.boxScore = target['score'] target_event.labelType = target['type'] img = cv2.imread(os.path.join(img_dir,detection_data["img_name"])) x1, y1, x2, y2 = target['rect'] imgbytes = cv2.imencode(".jpg", img[y1:y2, x1:x2, :])[1].tobytes() #切割目標小圖並轉化為字節數據 target_event.imageData = imgbytes target_event.imageType = "jpg" target_event.otherData = "" def deserialize(bytesdata): detection_event = TargetDetection_pb2.TargetDetection() # 創建一個detection檢測事件 detection_event.ParseFromString(bytesdata) print(detection_event.ImageName) print(detection_event.timestamp) print(detection_event.width) print(detection_event.height) for target_event in detection_event.TargetList: print(target_event.targetId) print(target_event.box) print(target_event.boxScore) print(target_event.labelType)
if __name__ == "__main__": detection_data = {"img_name": "animal.jpg", "timestamp": "1615882332331", "width": 1920, "height": 1080, "targetLitst": [{"id": 1, "rect": [150, 50, 960, 893], "score": 0.93, "type": "deer"}, {"id": 2, "rect": [945, 40, 1820, 931], "score": 0.85, "type": "cat"}]} bytesdata = serialize(detection_data) deserialize(bytesdata)
2.5 實際應用
在項目中得到protobuf序列化的數據后,一般會通過zmq等通訊工具將數據發送出去,或者寫入到本地。
zmq發送數據
關於zmq的使用,參見之前的文章https://www.cnblogs.com/silence-cho/p/12657234.html
下面是將protobuf序列化的數據發送出去的示例:
import TargetDetection_pb2 import time import cv2 import os import zmq def set_zmq(topic, url, requestPort, responsePort): ctx = zmq.Context().instance() recvsocket = ctx.socket(zmq.SUB) recvsocket.subscribe(topic) requestUrl = "tcp://{}:{}".format(url, requestPort) recvsocket.connect(requestUrl) print('recvsocket bind to', requestUrl) sendsocket = ctx.socket(zmq.PUB) responseUrl = "tcp://{}:{}".format(url, responsePort) sendsocket.connect(responseUrl) print('sendsocket bind to', responseUrl) return sendsocket, recvsocket def serialize(detection_data, img_dir=r"./"): detection_event = TargetDetection_pb2.TargetDetection() #創建一個detection檢測事件 detection_event.ImageName = detection_data["img_name"] detection_event.timestamp = int(detection_data["timestamp"]) #協議定義的int64 detection_event.width = detection_data["width"] detection_event.height = detection_data["height"] for target in detection_data["targetLitst"]: target_event = detection_event.TargetList.add() #列表添加一個target事件 target_event.targetId = target['id'] target_event.box.x1 = target['rect'][0] #復合類型的賦值 target_event.box.y1 = target['rect'][1] target_event.box.x2 = target['rect'][2] target_event.box.y2 = target['rect'][3] target_event.boxScore = target['score'] target_event.labelType = target['type'] img = cv2.imread(os.path.join(img_dir,detection_data["img_name"])) x1, y1, x2, y2 = target['rect'] imgbytes = cv2.imencode(".jpg", img[y1:y2, x1:x2, :])[1].tobytes() #切割目標小圖並轉化為字節數據 target_event.imageData = imgbytes target_event.imageType = "jpg" target_event.otherData = "" bytesdata = detection_event.SerializeToString() #最后將整個事件序列化為字節 return bytesdata def save_event(new_data, name, save_dir="./"): frames = 3 save_bytes = frames.to_bytes(4, byteorder='big') for i in new_data: # print(len(i)) temp = len(i) save_bytes += temp.to_bytes(4, byteorder='big') save_bytes += i with open(os.path.join(save_dir,name), "wb") as f: f.write(save_bytes) def read_event(event_path): result = [] with open(event_path, "rb") as f: data = f.read() frames = int.from_bytes(data[:4], byteorder='big') #讀取前四個字節,得到共有幾幀數據 start_pos = 4 for i in range(frames): end_pos = start_pos + 4 data_length = int.from_bytes(data[start_pos:end_pos], byteorder='big') #讀取前4字節,獲取該幀數據的長度 # data_str = data[end_pos:end_pos+data_length].decode("utf-8") data_str = data[end_pos:end_pos+data_length] result.append(data_str) start_pos = end_pos + data_length print(result) return result def deserialize(bytesdata): detection_event = TargetDetection_pb2.TargetDetection() # 創建一個detection檢測事件 detection_event.ParseFromString(bytesdata) print(detection_event.ImageName) print(detection_event.timestamp) print(detection_event.width) print(detection_event.height) for target_event in detection_event.TargetList: print(target_event.targetId) print(target_event.box) print(target_event.boxScore) print(target_event.labelType) if __name__ == "__main__": topic = "animal.detection" url = "127.0.0.1" requestPort = 4601 responsePort = 4600 sendsocket, recvsocket = set_zmq(topic, url, requestPort, responsePort) detection_data = {"img_name": "animal.jpg", "timestamp": "1615882332331", "width": 1920, "height": 1080, "targetLitst": [{"id": 1, "rect": [150, 50, 960, 893], "score": 0.93, "type": "deer"}, {"id": 2, "rect": [945, 40, 1820, 931], "score": 0.85, "type": "cat"}]} bytesdata = serialize(detection_data) timestamp = int(time.time() * 1000) data = [topic.encode("utf-8"), str(timestamp).encode("utf-8"), bytesdata] #通過zmq將數據發送出去 sendsocket.send_multipart(data)
寫入本地
在項目中一般會將發送的zmq數據寫入本地作為日志一部分,zmq數據會有多幀,所以寫入數據時,一般會定義一個數據報文格式,類似tcp報文那種,但比較簡單,如下面是一個三幀數據的報文格式

下面是完整示例代碼:
import TargetDetection_pb2 import time import cv2 import os import zmq def set_zmq(topic, url, requestPort, responsePort): ctx = zmq.Context().instance() recvsocket = ctx.socket(zmq.SUB) recvsocket.subscribe(topic) requestUrl = "tcp://{}:{}".format(url, requestPort) recvsocket.connect(requestUrl) print('recvsocket bind to', requestUrl) sendsocket = ctx.socket(zmq.PUB) responseUrl = "tcp://{}:{}".format(url, responsePort) sendsocket.connect(responseUrl) print('sendsocket bind to', responseUrl) return sendsocket, recvsocket def serialize(detection_data, img_dir=r"./"): detection_event = TargetDetection_pb2.TargetDetection() #創建一個detection檢測事件 detection_event.ImageName = detection_data["img_name"] detection_event.timestamp = int(detection_data["timestamp"]) #協議定義的int64 detection_event.width = detection_data["width"] detection_event.height = detection_data["height"] for target in detection_data["targetLitst"]: target_event = detection_event.TargetList.add() #列表添加一個target事件 target_event.targetId = target['id'] target_event.box.x1 = target['rect'][0] #復合類型的賦值 target_event.box.y1 = target['rect'][1] target_event.box.x2 = target['rect'][2] target_event.box.y2 = target['rect'][3] target_event.boxScore = target['score'] target_event.labelType = target['type'] img = cv2.imread(os.path.join(img_dir,detection_data["img_name"])) x1, y1, x2, y2 = target['rect'] imgbytes = cv2.imencode(".jpg", img[y1:y2, x1:x2, :])[1].tobytes() #切割目標小圖並轉化為字節數據 target_event.imageData = imgbytes target_event.imageType = "jpg" target_event.otherData = "" bytesdata = detection_event.SerializeToString() #最后將整個事件序列化為字節 return bytesdata def save_event(new_data, name, save_dir="./"): frames = 3 save_bytes = frames.to_bytes(4, byteorder='big') for i in new_data: # print(len(i)) temp = len(i) save_bytes += temp.to_bytes(4, byteorder='big') save_bytes += i with open(os.path.join(save_dir,name), "wb") as f: f.write(save_bytes) def read_event(event_path): result = [] with open(event_path, "rb") as f: data = f.read() frames = int.from_bytes(data[:4], byteorder='big') #讀取前四個字節,得到共有幾幀數據 start_pos = 4 for i in range(frames): end_pos = start_pos + 4 data_length = int.from_bytes(data[start_pos:end_pos], byteorder='big') #讀取前4字節,獲取該幀數據的長度 # data_str = data[end_pos:end_pos+data_length].decode("utf-8") data_str = data[end_pos:end_pos+data_length] result.append(data_str) start_pos = end_pos + data_length print(result) return result def deserialize(bytesdata): detection_event = TargetDetection_pb2.TargetDetection() # 創建一個detection檢測事件 detection_event.ParseFromString(bytesdata) print(detection_event.ImageName) print(detection_event.timestamp) print(detection_event.width) print(detection_event.height) for target_event in detection_event.TargetList: print(target_event.targetId) print(target_event.box) print(target_event.boxScore) print(target_event.labelType) if __name__ == "__main__": topic = "animal.detection" url = "127.0.0.1" requestPort = 4601 responsePort = 4600 sendsocket, recvsocket = set_zmq(topic, url, requestPort, responsePort) detection_data = {"img_name": "animal.jpg", "timestamp": "1615882332331", "width": 1920, "height": 1080, "targetLitst": [{"id": 1, "rect": [150, 50, 960, 893], "score": 0.93, "type": "deer"}, {"id": 2, "rect": [945, 40, 1820, 931], "score": 0.85, "type": "cat"}]} bytesdata = serialize(detection_data) timestamp = int(time.time() * 1000) data = [topic.encode("utf-8"), str(timestamp).encode("utf-8"), bytesdata] #通過zmq將數據發送出去 # sendsocket.send_multipart(data) #將數據保存到本地 save_dir = r"F:\event\detection_event" name = topic + "_" + str(timestamp) # save_event(data, name, save_dir) save_event(data, name) #讀取數據並反序列化 event_path = r"./animal.detection_1615885149114" results_list = read_event(event_path) deserialize(results_list[-1])
參考:https://blog.csdn.net/u013210620/article/details/81317731
https://www.cnblogs.com/silence-cho/p/12657234.html
