【Python】多終端發送、訂閱mqtt消息


背景:曾參與過一個mqtt項目,大致項目背景為,設備終端作為客戶端通過mqtt平台發布某個主題的消息。研發想測試在現有的終端連接數下mqtt平台的穩定性,並且測試平台的連接上限。需要模擬多終端連接mqtt平台並發布、訂閱主題。

大致思路如下:

  1. 將已經注冊過的終端號填寫進excel中,讀取excel中的終端號
  2. 使用多線程實現多終端連接mqtt
  3. 多終端發布主題“devicePublish”
  4. 發送成功后,訂閱主題“設備終端號”

需要用到的包為paho-mqtt、threading

paho-mqtt 是目前 Python 中使用較多的 MQTT 客戶端庫,它在 Python 2.7 或 3.x 上為客戶端類提供了對 MQTT v3.1 和 v3.1.1 的支持。它還提供了一些幫助程序功能,使將消息發布到 MQTT 服務器變得非常簡單。

client.py

# -*- coding: utf-8 -*-
import json
import paho.mqtt.client as mqtt
from threading import Thread
import time
import xlrd

class MqttManager(Thread):
    def __init__(self, thread_name, HOST, PORT, terminal_id):
        Thread.__init__(self)
        self.thread_name = thread_name
        self.HOST = HOST
        self.PORT = PORT
        self.terminal_id = terminal_id

    def on_connect(self, client, userdata, flags, rc):
        print(self.terminal_id + ":" + "Connected with result code " + str(rc))
        client.subscribe(self.terminal_id)  # 訂閱消息    def on_message(self, client, userdata, msg):
        print(self.terminal_id + ":" + "主題:" + msg.topic + " 消息:" + str(msg.payload.decode('utf-8')))

    def on_subscribe(self, client, userdata, mid, granted_qos):
        print(self.terminal_id + ":" + "On Subscribed: qos = %d" % granted_qos)

    def on_disconnect(self, client, userdata, rc):
        if rc != 0:
            print(self.terminal_id + ":" + "Unexpected disconnection %s" % rc)

    def publish_data(self):
        # 發布的消息
        data = {
                  "deviceCode": self.terminal_id,
                  "method": "online",
                  "timestamp": str(round(time.time() * 1000))
              }

        param = json.dumps(data)
        return param

    def run(self):
        self.client = mqtt.Client(self.terminal_id)
        self.client.connect(HOST, PORT, 60)  # 60為keepalive的時間間隔
        self.client.username_pw_set("username", "passward")
        param = self.publish_data()
        self.client.publish("devicePublish", payload=param, qos=0)  # 發布消息
        self.client.on_connect = self.on_connect
        self.client.on_message = self.on_message  # 訂閱消息
        self.client.subscribe(self.terminal_id)
        self.client.on_disconnect = self.on_disconnect
        self.client.loop_forever()

if __name__ == "__main__":
    HOST = "127.0.0.1"
    PORT = 50009    
    excel_ter = 'C:/終端設備編號.xls' # 讀取終端設備編號表
    excel = xlrd.open_workbook(excel_ter)
    table = excel.sheet_by_index(0)
    nrows = table.nrows
    # 創建線程
    for i in range(nrows - 1):
        terminal_id = str(table.cell(i + 1, 1).value)
        thread = MqttManager("Thread-%s" % (i + 1), HOST, PORT, terminal_id)
        thread.start()
        print('thread %s is running...' % (i + 1) + str(terminal_id))
        time.sleep(2)


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM