1. 問題描述
在之前的文章里(zmq模塊的理解和使用),寫過zmq有三種模式供選擇,我工作中主要采用PUB-SUB模式。使用場景主要包括一個server端和多個client端:
server端:進行視頻讀取,並對每張圖片進行目標檢測和跟蹤,采用PUB模式,將檢測和跟蹤結果廣播出去
client端:有多個客戶端,采用SUB模式,接收PUB端的數據,然后對數據進行處理
最近碰到了一個問題, 就是client端有時候會收不到server的數據,而且程序捕捉不到任何異常。網上找了些資料看,由於網絡環境不穩定,導致zmq底層的tcp連接不穩定,無法返回斷開狀態, 導致zmq自動重連機制無法生效。目前的解決方案有兩類:
1. 采用zmq提供的TCP Keepalive
2. 自己實現心跳模式,超時進行重連
通過比較后,決定采用心跳模式,靈活性和可靠性更強一點。
2. 解決方案測試
寫了個接單的模擬代碼,打算測試幾天觀察下效果
server:
服務端采用兩個topic,一個topic用來發送心跳數據(每隔1秒),另一個topic發送業務數據(目標檢測)。服務端示例代碼如下:
import zmq import time import random def start_server(topics, url, port): ctx = zmq.Context() send_scoket = ctx.socket(zmq.PUB) responseUrl = "tcp://{}:{}".format(url, port) print("bind to: {}".format(responseUrl)) send_scoket.bind(responseUrl) last_heartbeat = time.time() i = 1 while True: # 每隔1秒,發送心跳數據 if time.time()-last_heartbeat > 1: send_scoket.send_multipart([topics[1].encode("utf-8"), b'heartbeat']) last_heartbeat = time.time() print(i, "send heartbeat") # 以一定概率發送檢測數據,模擬視頻目標檢測 if random.random() < 0.2: detection_message = "message{} for {}".format(i, topics[0]) send_scoket.send_multipart([topics[0].encode("utf-8"), detection_message.encode("utf-8")]) print(i, "send detection_message") i += 1 time.sleep(0.5) if __name__ =="__main__": topics = ['detection', 'heartbeat'] url = "127.0.0.1" port = 4488 start_server(topics, url, port)
client:
客戶端訂閱服務端的兩個topic,同時接收服務端的心跳數據和業務數據,每次都判斷有多長時間沒有接收到心跳數據,如果心跳數據超時,就進行重新連接。客戶端采用示例代碼如下,
import zmq import time def start_client1(topics, url, port): ctx = zmq.Context() recv_scoket = ctx.socket(zmq.SUB) requestUrl = "tcp://{}:{}".format(url, port) print("connect to: {}".format(requestUrl)) recv_scoket.connect(requestUrl) for topic in topics: recv_scoket.subscribe(topic) last_heartbeat = 0 while True: # 30秒收不到server的心跳數據,就進行重新連接 if last_heartbeat != 0 and time.time() - last_heartbeat > 30: recv_scoket.disconnect(requestUrl) recv_scoket.connect(requestUrl) for topic in topics: recv_scoket.subscribe(topic) print("Reconnect pub server") time.sleep(2) # 每2秒重試連接,重連太頻繁會導致接收不了數據 try: data = recv_scoket.recv_multipart(flags=1) datatopic = data[0].decode() if datatopic.startswith("heartbeat"): # 接收到心跳數據后,更新心跳接收到的時間 last_heartbeat = time.time() print("receive message: ", data[1].decode("utf-8")) except zmq.error.Again as e: # print(e) pass if __name__ == "__main__": topics = ['detection', 'heartbeat'] url = "192.168.2.139" port = 4488 start_client1(topics, url, port)
參考文檔:
參考一: (PUB/SUB) Sub Silent Disconnect on Unreliable Connection · Issue #1199 · zeromq/libzmq · GitHub
參考二:https://blog.csdn.net/bubbleyang/article/details/107559224
參考三: https://blog.csdn.net/sinat_36265222/article/details/107252069