使用Python發送、訂閱消息
使用插件 paho-mqtt
官方文檔:http://shaocheng.li/post/blog/2017-05-23
Paho 是一個開源的 MQTT 客戶端項目,提供多種語言的 MQTT 客戶端實現,包括 C、C++、C#、Java、Python、JavaScript 等,完全支持 MQTT v3.1 和 v3.1.1 。Paho Python Client 是它的 Python 語言版本,支持 Python 2.7 和 3.x 。更多特性可以查看 http://www.eclipse.org/paho/clients/python/ ,源碼和文檔在 https://github.com/eclipse/paho.mqtt.python 。
安裝
在 Python 環境中用 pip install paho-mqtt
命令安裝,或者下載源碼:
git clone https://github.com/eclipse/paho.mqtt.python.git
cd org.eclipse.paho.mqtt.python.git
python setup.py install
下面是一個簡單的例子,連接一個 borker ,訂閱系統默認話題,獲取 broker 的版本號:
import paho.mqtt.client as mqtt
def on_connect(client, userdata, flags, rc):
print("Connected with result code "+str(rc))
client.subscribe("$SYS/broker/version")
def on_message(client, userdata, msg):
print(msg.topic+" "+str(msg.payload))
client = mqtt.Client()
client.on_connect = on_connect
client.on_message = on_message
client.connect("iot.eclipse.org", 1883, 60)
client.loop_forever()
保存到 paho-mqtt.py 文件,執行:
$ python paho-mqtt.py
Connected with result code 0
$SYS/broker/version mosquitto version 1.4.10
編程
paho.mqtt 包提供了三個類,Client、Publish 和 Subscribe。Publish 和 Subscribe 提供了簡單的方法,一次性的發送或者接收消息,不會保持連接。Client 包含了新建客戶端、連接、訂閱、發送、回調函數等方法。通常的編程步驟是新建一個 Client 的實例,然后調用它提供的連接、發布和訂閱等方法與 broker 通訊:
- 新建一個 Client 實例
- 用一個 connect*() 函數連接 broker
- 用一個 loop*() 函數,維持與 broker 的連接
- 用 subscribe() 函數訂閱一個話題,接收消息
- 用 publish() 函數發布消息
- 用 disconnect() 函數斷開連接
下面主要介紹 Client 提供的方法,使用前先導入:
import paho.mqtt.client as mqtt
初始化
新建一個 Client 實例:
Client(client_id="", clean_session=True, userdata=None, protocol=MQTTv311, transport="tcp")
這是 Client 類的構造函數,參數的含義:
- client_id ,設置客戶端的 ID ,應該是一個字符串,連接時向 broker 提交。如果為空,會隨機生成一個 id ,此時,clean_session 必須設為 True 。
- clean_session ,布爾型,如果為 True ,斷開連接時,broker 會清除關於這個 client 的所有信息。如果為 False ,斷開連接時,broker 會保留這個客戶端的訂閱信息和消息隊列。
- userdata ,用戶自定義的數據,可以是任何類型,傳遞給回調函數。可以用 user_data_set() 函數更新。
- protocol ,設置 MQTT 協議的版本,MQTTv31 或者 MQTTv311 。
- transport , 傳輸協議,默認還是 tcp ,可以設為 websockets 。
構造實例:
import paho.mqtt.client as mqtt
mqttc = mqtt.Client()
可以調用 reinitialise() 重新初始化 Client :
reinitialise(client_id="", clean_session=True, userdata=None)
配置
這些函數用來設置 Client 的一些特性,通常在連接 broker 之前調用。
max_inflight_messages_set(self, inflight)
這個函數可以設置當 QoS>0 時,最多可以存在幾條動態消息(已經發送,還沒有確認成功的消息)。默認是 20 ,增加這個值會占用更多的內存,但是可以提升吞吐量。
max_queued_messages_set(self, queue_size)
這個函數可以設置當 QoS>0 時,發送消息隊列的最大值,默認是 0 ,表示無限制。當隊列滿時,舊消息會丟棄。
message_retry_set(retry)
當 Qos>0 時,如果發送消息后超過一定時間還沒有收到確認報文,就要重發消息,這個函數用於設置超時時間,單位是秒。默認是 5 秒,通常不用修改。
tls_set(ca_certs, certfile=None, keyfile=None, cert_reqs=ssl.CERT_REQUIRED,tls_version=ssl.PROTOCOL_TLSv1, ciphers=None)
配置 SSL 證書驗證的函數,必須在 connect*() 函數之前調動。幾個參數的含義:
- ca_certs ,指定 CA 根證書的路徑。
- certfile,keyfile ,指定客戶端私鑰和證書的路徑。
- cert_reqs ,設置客戶端對 broker 證書的需求,默認是 ssl.CERT_REQUIRED ,表示 broker 必須提供一個證書。
- tls_version ,設置 SSL/TLS 協議的版本,默認是 TLS v1 。
- ciphers ,設置本次連接的加密密碼,默認是 None 。
設置用戶名和密碼:
username_pw_set(username, password=None)
設置遺囑:
will_set(topic, payload=None, qos=0, retain=False)
當這個 client 斷開連接時,broker 會發布這個遺囑消息。參數的含義:
- topic ,遺囑消息的話題
- payload ,遺囑消息的內容,字符串類型,如果設為 None ,會發送一條長度為 0 消息。如果設置了 int 或者 float 類型的值,會當做字符串發送,如果你想發送真正的 int 或者 float 值,需要用 struct.pack() 生成消息。
- qos ,遺囑消息的安全等級
- retain ,如果設為 True ,遺囑消息會被設為保留消息
如果參數設置錯誤,函數會拋出 ValueError 異常。
連接
最基本的連接方法是 connect() :
connect(host, port=1883, keepalive=60, bind_address="")
連接到 broker ,這是一個阻塞函數,參數的含義:
- host ,broker 的 hostname 或者 IP
- port ,broker 的開放端口,默認是 1883 ,如果使能了 SSL/TLS ,端口可能是 8883
- keepalive ,心跳間隔,單位是秒,如果 broker 和 client 在這段時間內沒有任何通訊,client 會給 broker 發送一個 ping 消息
- bind_address ,如果 client 的本地計算機有多個網絡接口,可以用這個參數綁定其中的一個
client 調用該函數發起連接后,如果收到 broker 發來的 CONNACK 消息,就會執行 on_connect() 回調函數。除此之外,還有 connect_async() 和 connect_srv() 兩種函數可以連接 broker 。connect_async() 需要配合 loop_start() 函數以非阻塞的方式連接 broker。connect_srv() 是從 SRV DNS 獲取 broker 的地址,然后再連接。
調用過 connect*() 函數之后,可以調用 reconnect() 用現有的參數重新連接。調用 disconnect() 函數可以從 broker 斷開連接,斷開連接后,會執行 on_disconnect() 回調函數。
網絡循環
網絡循環的函數有四種,它們運行在后台,處理收發的消息。最基本的是 loop() :
loop(timeout=1.0, max_packets=1)
這個函數會通過 select() 函數阻塞,直到有消息需要收發,阻塞的時間用 timeout 參數設置,不能超過心跳時間 keepalive ,否則你的 client 會定時從 broker 斷開。max_packets 參數已經過時,無需設置。
另一個循環函數是 loop_forever() ,它會一直阻塞,直到 client 調用了 disconnect() ,並且,它會自動重連:
loop_forever(timeout=1.0, max_packets=1, retry_first_connection=False)
timeout 和 max_packets 參數已經過時,無需設置。
發布
publish(topic, payload=None, qos=0, retain=False)
向指定話題發送一條消息,參數的含義:
- topic ,這條消息所屬的話題
- payload ,消息內容,字符串類型,如果設為 None ,會發送一條長度為 0 消息。如果設置了 int 或者 3. float 類型的值,會當做字符串發送,如果你想發送真正的 int 或者 float 值,需要用 struct.pack() 生成消息。
- qos ,消息的安全等級
- retain ,如果設為 Ture ,這條消息會被設為保留消息
如果參數設置錯誤,會拋出 ValueError 異常。消息發送成功后,會執行 on_publish() 回調函數。
訂閱
subscribe(topic, qos=0)
向 broker 訂閱話題,參數 topic 設置話題名稱,qos 設置安全等級。如果只訂閱一個話題,直接設置兩個參數即可,例如:
subscribe(("my/topic", 1))
如果要訂閱多個話題,可以將每個話題放在一個元組中,多個話題組成一個列表:
subscribe([("my/topic", 0), ("another/topic", 2)])
當 broker 確認訂閱有效后,client 會執行 on_subscribe() 回調函數。如果要取消訂閱某個話題,可以調用 unsubscribe(topic) ,參數是字符串型,如果是取消多個話題,參數應該是一個字符串列表。取消成功的話,會執行 on_unsubscribe() 回調函數。
回調函數
當 broker 對 client 的連接請求做出回應時,會調用 on_connect() 回調函數,可以在該函數中判斷連接是否成功:
on_connect(client, userdata, flags, rc)
參數 client 是當前 client 的實例,userdata 是 Client() 或 userdata_set() 設置的用戶數據。flags 是 broker 發送的回應 flags ,字典類型。rc 表示連接結果,整數型,0 表示連接成功,連接失敗可能的值有:
- 錯誤的協議版本
- 無效的 client ID
- 服務器不可用
- 錯誤的用戶名或密碼
- 無法驗證
使用實例:
def on_connect(client, userdata, flags, rc):
print("Connection returned result: "+connack_string(rc))
mqttc.on_connect = on_connect
...
對應的,與 broker 斷開連接后,會執行 on_disconnect() 回調函數:
on_disconnect(client, userdata, rc)
rc 表示斷開連接的狀態,如果是 0 ,表示是調用了 disconnect() 引起的斷開連接,其他結果表示意外斷開,比如網絡中斷。使用實例:
def on_disconnect(client, userdata, rc):
if rc != 0:
print("Unexpected disconnection.")
mqttc.on_disconnect = on_disconnect
...
當 client 接收到已訂閱的話題的消息時,會調用 on_message() 回調函數,在該函數中判斷是哪個話題的消息,並處理消息內容:
on_message(client, userdata, message)
參數 message 是 MQTTMessage 類的實例,這個類包含的成員有 topic ,payload ,qos ,retain 。使用實例:
def on_message(client, userdata, message):
print("Received message '" + str(message.payload) + "' on topic '"
+ message.topic + "' with QoS " + str(message.qos))
mqttc.on_message = on_message
...
如果要用通配符同時處理多個話題的消息,例如用 sensors/# 匹配 sensors/temperature 和 sensors/humidity 話題,可以用 message_callback_add() 設置回調函數:
message_callback_add(sub, callback)
參數 sub 是一個使用通配符的話題過濾器,字符串型,用 callback 參數指定回掉函數,與 on_message() 相同的類型。
如果同時設置了 on_message() 和 message_callback_add() 回調函數,會首先尋找合適的 message_callback_add() 定義的話題過濾器,如果沒有匹配,才會調用 on_message() 。
實例
假設 broker 要求提供用戶名、密碼、證書和密鑰,下面是一個簡單的 client 例子:
$ cat path-mqtt.py
#!/usr/bin/python
import paho.mqtt.client as mqtt
cafile = "/etc/mosquitto/ca/ca.crt"
certfile = "/home/ubuntu/CA/client.crt"
keyfile = "/home/ubuntu/CA/client.key"
user = "guest"
passwd = "12345678"
server = "localhost"
port = 8883
def on_connect(client, userdata, flags, rc):
print("Connected with result code "+str(rc))
client.subscribe("$SYS/broker/version")
def on_message(client, userdata, msg):
print(msg.topic+" "+str(msg.payload))
client = mqtt.Client()
client.tls_set(cafile,certfile,keyfile)
client.username_pw_set(user,passwd)
client.on_connect = on_connect
client.on_message = on_message
client.connect(server, port, 60)
client.loop_forever()
執行:
$ ./path-mqtt.py
Connected with result code 0
$SYS/broker/version mosquitto version 1.4.11
簡單實例
# pip install paho-mqtt
import paho.mqtt.client as mqtt
import time
server = "192.168.1.168" # mqtt地址
port = 1883 # mqtt端口
def on_connect(client,userdata,flags,rc):
"""
mqtt連接成功的回調
"""
print("MQTT服務器連接返回碼:" + str(rc))
client.subscribe([("/wjw/znqs/123456789/1/down",0),("/wjw/znqs/123456789/2/down",0)])
def on_message(client,userdata,msg):
"""
mqtt收到訂閱消息回調
"""
print(msg.topic +" --> "+(msg.payload).decode())
client = mqtt.Client()
client.on_connect = on_connect # 綁定mqtt連接回調
client.on_message = on_message # 綁定mqtt訂閱消息回調
client.connect(server,port,60) # 連接mqtt服務
client.publish("/wjw/znqs/123456789/1/up", payload="1 2 3 4 5 6 7", qos=0, retain=False)
print(time.time() , " /wjw/znqs/123456789/1/up ---> 1 2 3 4 5 6 7")
client.loop_forever() # 阻塞函數
"""
while 1:
client.publish("/wjw/znqs/123456789/1/up", payload="1 2 3 4 5 6 7", qos=0, retain=False)
print(time.time() , " /wjw/znqs/123456789/1/up ---> 1 2 3 4 5 6 7")
time.sleep(5)
"""