在樹莓派上使用 MQTT


樹莓派 由英國樹莓派基金會開發,是一款基於 ARM 的微型計算機主板。該主板提供 USB 接口和以太網接口,可以連接鍵盤、鼠標和網線,該主板具備 PC 的基本功能,同時樹莓派集成了 Wi-Fi,藍牙以及大量 GPIO,被廣泛運用在教學、家庭娛樂、物聯網等。

MQTT 是一種基於發布/訂閱模式的 輕量級物聯網消息傳輸協議 ,可以用極少的代碼和帶寬為聯網設備提供實時可靠的消息服務,它適用於硬件資源有限的設備及帶寬有限的網絡環境。因此,MQTT 協議廣泛應用於物聯網、移動互聯網、智能硬件、車聯網、電力能源等行業。

在此項目中,我們將在樹莓派上使用 Python 編寫簡單的 MQTT 客戶端,並實現該客戶端與 MQTT 服務器的連接、訂閱、取消訂閱、收發消息等功能。

環境搭建

安裝 Python3

本項目使用 Python3 進行開發,一般情況下,樹莓派系統會內置 Python3,如果不確定系統內是否已經安裝,可以使用下面的命令進行確認。

python3 --version 

如果顯示 Python 3.x.x(x 表示數字)則表示已經安裝,否則請使用 apt 命令安裝(或跟隨 Python3 安裝指南 操作 )。

sudo apt install python3 

安裝 MQTT 客戶端庫

為了方便連接到 MQTT 服務器,我們需要安裝 paho-mqtt 庫。可以選擇以下兩種方法之一進行安裝。

使用源碼安裝

git clone https://github.com/eclipse/paho.mqtt.python 
cd paho.mqtt.python 
python3 setup.py install

使用 pip3 安裝

pip3 install paho-mqtt 

MQTT 的使用

連接 MQTT 服務器

本文將使用 EMQ X 提供的 免費公共 MQTT 服務器,該服務基於 EMQ X 的 MQTT 物聯網雲平台 創建。服務器接入信息如下:

  • Broker: broker.emqx.io
  • TCP Port: 1883
  • Websocket Port: 8083

如果有需要,您也可以使用 docker 在本地快速安裝 EMQ X 服務器。

docker run -d --name emqx -p 1883:1883 -p 8083:8083 -p 8084:8084 -p 18083:18083 emqx/emqx 

連接示例代碼

# test_connect.py 
import paho.mqtt.client as mqtt 

# 回調函數。當嘗試與 MQTT broker 建立連接時,觸發該函數。
# client 是本次連接的客戶端實例。
# userdata 是用戶的信息,一般為空。但如果有需要,也可以通過 user_data_set 函數設置。
# flags 保存服務器響應標志的字典。
# rc 是響應碼。
# 一般情況下,我們只需要關注 rc 響應碼是否為 0 就可以了。
def on_connect(client, userdata, flags, rc):
    if rc == 0:
        print("Connected success")
    else:
        print(f"Connected fail with code {rc}")

client = mqtt.Client() 
client.on_connect = on_connect 
client.connect("broker.emqx.io", 1883, 60) 
client.loop_forever() 

將上面的代碼保存為 test_connect.py 文件,並運行

python3 test_connect.py 

我們在 on_connect 函數里對響應碼進行了判斷,為 0 則輸出 Connected success 表示連接成功。如果返回的是其它數字,我們就需要對照下面的響應碼進行判斷。

0: 連接成功
1: 連接失敗-不正確的協議版本
2: 連接失敗-無效的客戶端標識符
3: 連接失敗-服務器不可用
4: 連接失敗-錯誤的用戶名或密碼
5: 連接失敗-未授權
6-255: 未定義
如果是其它問題,可以檢查網絡情況,或者確認是否安裝了 `paho-mqtt`。 

在 MQTT 協議的概念中,消息是通過主題傳遞的,比如設備 A 向主題 T 發送消息,那么只有訂閱了主題 T 的設備才能接收到。所以僅僅接入 MQTT 服務器並沒有太大意議,要完整地使用 MQTT 服務,我們還需要知道如何訂閱和發布消息。

訂閱消息

打開任意編輯器,輸入下面的代碼,並保存為 subscriber.py 文件:

# subscriber.py
import paho.mqtt.client as mqtt

def on_connect(client, userdata, flags, rc):
    print(f"Connected with result code {rc}")
    # 訂閱,需要放在 on_connect 里
    # 如果與 broker 失去連接后重連,仍然會繼續訂閱 raspberry/topic 主題
    client.subscribe("raspberry/topic")

# 回調函數,當收到消息時,觸發該函數
def on_message(client, userdata, msg):
    print(f"{msg.topic} {msg.payload}")
    
client = mqtt.Client()
client.on_connect = on_connect
client.on_message = on_message

# 設置遺囑消息,當樹莓派斷電,或者網絡出現異常中斷時,發送遺囑消息給其他客戶端
client.will_set('raspberry/status', {"status": "Off"})

# 創建連接,三個參數分別為 broker 地址,broker 端口號,保活時間
client.connect("broker.emqx.io", 1883, 60)

# 設置網絡循環堵塞,在調用 disconnect() 或程序崩潰前,不會主動結束程序
client.loop_forever()

調用 subscribe() 函數,可以讓樹莓派訂閱一個主題。在上面的代碼中,我們使用它訂閱了 raspberry/topic 主題,並監聽消息。

另外,我們還使用 will_set() 設置了遺囑消息。 遺囑消息是 MQTT 的一個特性,當設備在意外斷開網絡連接后,會向某個特定的主題發送消息。通過這個特性,我們可以得知樹莓派是否斷電,或者出現網絡異常。

發布消息

打開任意編輯器,輸入下面的代碼,並保存為 publisher.py 文件:

import paho.mqtt.client as mqtt
import time

def on_connect(client, userdata, flags, rc):
    print(f"Connected with result code {rc}")
    
client = mqtt.Client()
client.on_connect = on_connect
client.connect("broker.emqx.io", 1883, 60)

# 每間隔 1 秒鍾向 raspberry/topic 發送一個消息,連續發送 5 次
for i in range(5):
  	# 四個參數分別為:主題,發送內容,QoS, 是否保留消息
    client.publish('raspberry/topic', payload=i, qos=0, retain=False)
    print(f"send {i} to raspberry/topic")
    time.sleep(1)

client.loop_forever()

調用 publish() 函數,可以向一個主題發送消息。在上面的代碼中,我們使用了它向主題 raspberry/topic 發送消息。其中參數 QoS 是另一個 MQTT 特性,如果你想了解更多 QoS 的內容,可以查看 MQTT QoS(服務質量)介紹,這里我們暫且設為 0。

測試

我們使用 MQTT 5.0 客戶端工具 - MQTT X 進行以下測試。

測試訂閱消息

運行 Python 代碼,並主動發送一個消息。

  1. 打開終端,運行 Python 代碼,監聽消息 。

    python3 subscriber.py
    
  2. 使用 MQTT X 客戶端與 MQTT 服務器建立連接,並向主題 raspberry/topic 發送消息 。

  1. 查看樹莓派終端信息,將會看到已成功接收到 MQTT X 發布的消息。

測試發布消息

  1. 在 MQTT X 客戶端中訂閱 raspberry/topic 主題 。

  2. 在終端運行 Python 代碼。

  3. 在 MQTT X 客戶端中,查看樹莓派發送的消息。

測試遺囑消息

接下來測試一下遺囑消息是否設置成功。

  1. 在 MQTT X 客戶端中,訂閱 raspberry/status

  1. 中斷程序,或者斷開樹莓派的網絡。

  2. 在 MQTT X 客戶端中,查看 raspberry/status 主題接收到的消息。

總結

我們完成了在樹莓派上使用 Python MQTT 客戶端庫 paho-mqtt 編寫測試客戶端, 並實現了測試客戶端與 MQTT 服務器的連接、訂閱、取消訂閱、收發消息等功能。

至此,您已經學會了如何簡單的使用 MQTT 服務,雖然這只是 MQTT 服務的一小部分,但也足夠完成很多有意思的事,比如:

1. 使用手機發送 MQTT 消息,遠程控制樹莓派。
2. 定時將樹莓派的設備信息發送到 MQTT 服務器,通過手機接收消息,可以進行全天候監控。
3. 可以通過將樹莓派接入 MQTT 服務器,並配合各類傳感器及 ESP 模塊創建很多有趣的物聯網應用。

接下來我們將會陸續發布更多關於物聯網開發及樹莓派的相關文章,盡情關注。

版權聲明: 本文為 EMQ 原創,轉載請注明出處。

原文鏈接:https://www.emqx.io/cn/blog/use-mqtt-with-raspberry-pi


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM