zmq模塊的理解和使用二


1. 問題描述

在之前的文章里(zmq模塊的理解和使用),寫過zmq有三種模式供選擇,我工作中主要采用PUB-SUB模式。使用場景主要包括一個server端和多個client端:

  server端:進行視頻讀取,並對每張圖片進行目標檢測和跟蹤,采用PUB模式,將檢測和跟蹤結果廣播出去

  client端:有多個客戶端,采用SUB模式,接收PUB端的數據,然后對數據進行處理

最近碰到了一個問題, 就是client端有時候會收不到server的數據,而且程序捕捉不到任何異常。網上找了些資料看,由於網絡環境不穩定,導致zmq底層的tcp連接不穩定,無法返回斷開狀態, 導致zmq自動重連機制無法生效。目前的解決方案有兩類:

  1. 采用zmq提供的TCP Keepalive

  2. 自己實現心跳模式,超時進行重連

通過比較后,決定采用心跳模式,靈活性和可靠性更強一點。

2. 解決方案測試

寫了個接單的模擬代碼,打算測試幾天觀察下效果

server:

服務端采用兩個topic,一個topic用來發送心跳數據(每隔1秒),另一個topic發送業務數據(目標檢測)。服務端示例代碼如下:

import zmq
import time
import random

def start_server(topics, url, port):
    ctx = zmq.Context()
    send_scoket = ctx.socket(zmq.PUB)
    responseUrl = "tcp://{}:{}".format(url, port)
    print("bind to: {}".format(responseUrl))
    send_scoket.bind(responseUrl)
    last_heartbeat = time.time()
    i = 1
    while True:
        # 每隔1秒,發送心跳數據
        if time.time()-last_heartbeat > 1:
            send_scoket.send_multipart([topics[1].encode("utf-8"), b'heartbeat'])
            last_heartbeat = time.time()
            print(i, "send heartbeat")

        # 以一定概率發送檢測數據,模擬視頻目標檢測
        if random.random() < 0.2:
            detection_message = "message{} for {}".format(i, topics[0])
            send_scoket.send_multipart([topics[0].encode("utf-8"), detection_message.encode("utf-8")])
            print(i, "send detection_message")

        i += 1
        time.sleep(0.5)


if __name__ =="__main__":
    topics = ['detection', 'heartbeat']
    url = "127.0.0.1"
    port = 4488
    start_server(topics, url, port)

client:

客戶端訂閱服務端的兩個topic,同時接收服務端的心跳數據和業務數據,每次都判斷有多長時間沒有接收到心跳數據,如果心跳數據超時,就進行重新連接。客戶端采用示例代碼如下,

import zmq
import time


def start_client1(topics, url, port):
    ctx = zmq.Context()
    recv_scoket = ctx.socket(zmq.SUB)
    requestUrl = "tcp://{}:{}".format(url, port)
    print("connect to: {}".format(requestUrl))
    recv_scoket.connect(requestUrl)
    for topic in topics:
       recv_scoket.subscribe(topic)
    last_heartbeat = 0
    while True:
        # 30秒收不到server的心跳數據,就進行重新連接
        if last_heartbeat != 0 and time.time() - last_heartbeat > 30:
            recv_scoket.disconnect(requestUrl)
            recv_scoket.connect(requestUrl)
            for topic in topics:
                recv_scoket.subscribe(topic)
            print("Reconnect pub server")
            time.sleep(2)   # 每2秒重試連接,重連太頻繁會導致接收不了數據
        try:
            data = recv_scoket.recv_multipart(flags=1)
            datatopic = data[0].decode()
            if datatopic.startswith("heartbeat"):  # 接收到心跳數據后,更新心跳接收到的時間
                last_heartbeat = time.time()
            print("receive message: ", data[1].decode("utf-8"))
        except zmq.error.Again as e:
            # print(e)
            pass


if __name__ == "__main__":
    topics = ['detection', 'heartbeat']
    url = "192.168.2.139"
    port = 4488
    start_client1(topics, url, port)

 

參考文檔:

  參考一: (PUB/SUB) Sub Silent Disconnect on Unreliable Connection · Issue #1199 · zeromq/libzmq · GitHub

  參考二:https://blog.csdn.net/bubbleyang/article/details/107559224

  參考三: https://blog.csdn.net/sinat_36265222/article/details/107252069


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM