背景:曾參與過一個mqtt項目,大致項目背景為,設備終端作為客戶端通過mqtt平台發布某個主題的消息。研發想測試在現有的終端連接數下mqtt平台的穩定性,並且測試平台的連接上限。需要模擬多終端連接mqtt平台並發布、訂閱主題。
大致思路如下:
- 將已經注冊過的終端號填寫進excel中,讀取excel中的終端號
- 使用多線程實現多終端連接mqtt
- 多終端發布主題“devicePublish”
- 發送成功后,訂閱主題“設備終端號”
需要用到的包為paho-mqtt、threading
paho-mqtt 是目前 Python 中使用較多的 MQTT 客戶端庫,它在 Python 2.7 或 3.x 上為客戶端類提供了對 MQTT v3.1 和 v3.1.1 的支持。它還提供了一些幫助程序功能,使將消息發布到 MQTT 服務器變得非常簡單。
client.py
# -*- coding: utf-8 -*-
import json
import paho.mqtt.client as mqtt
from threading import Thread
import time
import xlrd
class MqttManager(Thread):
def __init__(self, thread_name, HOST, PORT, terminal_id):
Thread.__init__(self)
self.thread_name = thread_name
self.HOST = HOST
self.PORT = PORT
self.terminal_id = terminal_id
def on_connect(self, client, userdata, flags, rc):
print(self.terminal_id + ":" + "Connected with result code " + str(rc))
client.subscribe(self.terminal_id) # 訂閱消息 def on_message(self, client, userdata, msg):
print(self.terminal_id + ":" + "主題:" + msg.topic + " 消息:" + str(msg.payload.decode('utf-8')))
def on_subscribe(self, client, userdata, mid, granted_qos):
print(self.terminal_id + ":" + "On Subscribed: qos = %d" % granted_qos)
def on_disconnect(self, client, userdata, rc):
if rc != 0:
print(self.terminal_id + ":" + "Unexpected disconnection %s" % rc)
def publish_data(self):
# 發布的消息
data = {
"deviceCode": self.terminal_id,
"method": "online",
"timestamp": str(round(time.time() * 1000))
}
param = json.dumps(data)
return param
def run(self):
self.client = mqtt.Client(self.terminal_id)
self.client.connect(HOST, PORT, 60) # 60為keepalive的時間間隔
self.client.username_pw_set("username", "passward")
param = self.publish_data()
self.client.publish("devicePublish", payload=param, qos=0) # 發布消息
self.client.on_connect = self.on_connect
self.client.on_message = self.on_message # 訂閱消息
self.client.subscribe(self.terminal_id)
self.client.on_disconnect = self.on_disconnect
self.client.loop_forever()
if __name__ == "__main__":
HOST = "127.0.0.1"
PORT = 50009
excel_ter = 'C:/終端設備編號.xls' # 讀取終端設備編號表
excel = xlrd.open_workbook(excel_ter)
table = excel.sheet_by_index(0)
nrows = table.nrows
# 創建線程
for i in range(nrows - 1):
terminal_id = str(table.cell(i + 1, 1).value)
thread = MqttManager("Thread-%s" % (i + 1), HOST, PORT, terminal_id)
thread.start()
print('thread %s is running...' % (i + 1) + str(terminal_id))
time.sleep(2)