MQTT為了物聯網的消息傳遞而設計,業余時間弄了個報警器,之前用長輪詢的實現感覺略麻煩,測試了一下MQTT的實現。
個人感覺使用比較簡單,對網絡問題的處理也比較完善,但是某些方面的靈活性略微不足,而且中文資料相對較少。
簡單使用
服務端用mosquitto,客戶端用python-paho-mqtt。
服務端
安裝mosquitto,然后systemctl start mosquitto啟動對應的服務。
公網環境下建議將配置文件中的默認端口1883改為其它端口,避免被直接掃描。
一些安全方面的設置也建議加上。
客戶端
subscriber
#!/bin/python
import paho.mqtt.subscribe as subscribe
# 當調用這個函數時,程序會堵塞在這里,直到有一條消息發送到 topics/topic1 主題
msg = subscribe.simple("topics/topic1", hostname="your ip", port=yourport,
retained=False, client_id="youid", clean_session=False, qos=1)
print(f"{msg.topic} {msg.payload}")
hostname和port需要改為正確的參數。
其中,網絡環境好並且不需要離線接收消息時,可以不設置clean_session
、client_id
、qos
三個參數。
publisher
發送一條消息
#!/bin/python
import paho.mqtt.publish as publish
publish.single("topics/topic1", "a message", hostname="your ip", port=yourport, qos=1)
其它
MQTT 幾個基本概念
通常由3部分構成:subscriber訂閱客戶端、publisher發布消息客戶端、Server服務器。
主題topic,類似一連串消息的標識符。
Message,具體的消息,對應於每個topic。
publisher向服務器指定主題發送消息。
subscriber連接服務器並且指定主題,當publisher向訂閱的主題推送消息后,服務器會推送到對應的subscriber。
retained消息
MQTT使用了一個retained消息機制,用於保存主題的狀態。publisher可以向主題發送retained消息,在subscriber獲取retained消息時(獲取參數中retained=True)服務端會返回最后一條retained消息,每一次都會返回而非普通消息的那種只讀取一次。ratained消息更像是一種保存消息的狀態,用在主題狀態的設置,如開門的感應器,ratained消息用於標記門是否打開。
在python的paho庫中,publisher的retained參數默認是False,而subscriber的retained參數默認為True,這個有點小坑。
QoS(Quality of Service)與離線消息
在subscriber和publisher中都可以指定,定義消息的可靠性級別,服務器會取兩個客戶端中較低的級別作為主題消息對應的處理級別。
QoS0,At most once,至多一次; QoS1,At least once,至少一次; QoS2,Exactly once,確保只有一次。
使用默認參數時,如果subscriber掉線,publisher發送的消息會丟失。
要想subscriber在離線后重新連接,還能收到publisher的消息,需要下面的設置:
- publisher與subscriber的QoS級別都設置為0以上;
- subscriber的
clean_session
設置為False; - subscriber設置固定的
client_id
.
存儲消息歷史記錄
這個翻了一下網上的資料,感覺略不靠譜。MQTT的設計中沒有考慮消息在服務端的存儲,通常采用下面的幾個方案:
- 使用另一個subscriber獲取消息並存儲;
- 使用EMQ等支持插件的服務端,通過插件處理消息。
- 換kafka…