轉MQTT--Python進行發布、訂閱測試


前言

 使用python編寫程序進行測試MQTT的發布和訂閱功能。首先要安裝:pip install paho-mqtt

測試發布(pub)

 我的MQTT部署在阿里雲的服務器上面,所以我在本機上編寫了python程序進行測試。

然后在shell里面重新打開一個終端,訂閱一個主題為“chat” mosquitto_sub -t chat

 在本機上測試遠程的MQTT的發布功能就是把自己作為一個發送信息的人,當自己發送信息的時候,所有訂閱過該主題(topic)的對象都將收到自己發送的信息。 
mqtt_client.py

# encoding: utf-8

import paho.mqtt.client as mqtt

HOST = "101.200.46.138"
PORT = 1883

def test():
    client = mqtt.Client()
    client.connect(HOST, PORT, 60)
    client.publish("chat","hello liefyuan",2) # 發布一個主題為'chat',內容為‘hello liefyuan’的信息
    client.loop_forever()

if __name__ == '__main__':
    test()

  

 

注解函數:

client.connect(self, host, port, keepalive, bind_address)

client.publish(self, topic, payload, qos, retain) ---保留(retain)

client.subscribe(self, topic, qos)

 

注: MQTT傳輸的消息分為:主題(Topic)和負載(payload)兩部分:

(1)Topic,可以理解為消息的類型,訂閱者訂閱(Subscribe)后,就會收到該主題的消息內容(payload);

(2)payload,可以理解為消息的內容,是指訂閱者具體要使用的內容。

(3)當應用數據通過MQTT網絡發送時,MQTT會把與之相關的服務質量(QoS)和主題名(Topic)相關連。

測試訂閱(sub)

 在本機上編寫程序測試訂閱功能,就是讓自己的程序作為一個接收者,同一個主題沒有發布(pub)信息的時候,就自己一直等候。

# encoding: utf-8


import paho.mqtt.client as mqtt


def on_connect(client, userdata, flags, rc):
    print("Connected with result code "+str(rc))
    client.subscribe("chat")


def on_message(client, userdata, msg):
    print(msg.topic+" " + ":" + str(msg.payload))

client = mqtt.Client()
client.on_connect = on_connect
client.on_message = on_message
client.connect("www.liefyuan.top", 1883, 60)
client.loop_forever()

  


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM