一、MQTT庫安裝
pip install paho-mqtt
二、代碼
# coding=utf-8 import paho.mqtt.client as mqtt import time,os,requests,json,threading cafile = r"E:\Python\rootCA.PEM" #身份認證文件 certfile = r"E:\Python\ClientCA_11111.PEM" keyfile = r"E:\Python\ClientKey_11111.PEM" host = "xxxxxx" #主機地址 port = 8883 #端口號 data={"state":{"desired":{"WIFI_AP":"on"}}} #發布信息 sub_topic="@askey/dvr/xxxxxx/status" #訂閱主題 pub_topic="@askey/dvr/xxxxxx/status" #發布主題 client = mqtt.Client() #創建一個mqtt對象 client.tls_set(cafile, certfile, keyfile) #加密身份認證 client.connect(host, port, 60) #連接mqtt服務器 client.loop_start() #后台運行一個線程來自動調用loop() def on_connect(client, userdata, flags, rc): if rc == 0: print('連接成功') #0代表連接成功 client.subscribe(sub_topic) #訂閱消息 else: print('Connect Error status {0}'.format(rc)) #連接失敗並顯示錯誤代碼 def on_message(client, userdata, msg): print("主題:"+msg.topic + " 消息:" + str(msg.payload.decode('utf-8'))) #接收消息后處理函數 def mqtt_subscribe(): client.on_connect = client.subscribe(sub_topic) # 設置連接上服務器回調函數 client.on_message = on_message # 設置接收到服務器消息回調函數 #client.loop_forever() # 守護連接狀態 def mqtt_publish(): while True: client.publish(pub_topic, payload=str(data), qos=1) # 發布消息 time.sleep(2) if __name__ == '__main__': p = threading.Thread(target=mqtt_publish, name="Thread_pub" , args=()).start() #發布主題線程 s = threading.Thread(target=mqtt_subscribe, name="Thread_sub", args=()).start() #訂閱主題線程