在以前發布的博客“菜鳥是如何打造智能家居系統的”文章最后我提到了使用MQTT協議作為雲平台和設備之間的通信協議以達到消息傳遞的實時性,手機的消息推送也大多基於這種平台,首先搬來一段簡介。
MQTT(MQ Telemetry Transport),消息隊列遙測傳輸協議,輕量級的發布/訂閱協議, 適用於一些條件比較苛刻的環境,進行低帶寬、不可靠或間歇性的通信。目前已經是物聯網消息通信事實上的標准協議了。值得一提的是mqtt提供三種不同質量的消息服務:
- “至多一次”:消息發布完全依賴底層 TCP/IP 網絡。會發生消息丟失或重復。這一級別可用於如下情況,環境傳感器數據,丟失一次讀記錄無所謂,因為不久后還會有第二次發送。
- “至少一次”:確保消息到達,但消息重復可能會發生。
- “只有一次”:確保消息到達一次。這一級別可用於如下情況,在計費系統中,消息重復或丟失會導致不正確的結果。
一直沒時間搭建這個平台,前段時間到MQTT官網發現了一些很好的資源。
software · mqtt/mqtt.github.io Wiki
資源里既有基於MQTT(但不僅限於)開源消息代理中間件(Brokers/servers),又有測試客戶端,看了幾個代理中間件,也百度了一下,應用比較多的有ActiveMQ、Apollo、Mosquitto等。先選擇一個沒那么復雜的Mosquitto來嘗嘗鮮。
Mosquitto是一款實現了消息推送協議 MQTT v3.1 的開源消息代理軟件,提供輕量級的,支持可發布/可訂閱的的消息推送模式,使設備對設備之間的短消息通信變得簡單,比如現在應用廣泛的低功耗傳感器,手機、嵌入式計算機、微型控制器等移動設備。
安裝:(參考官網 http://mosquitto.org/download/)
服務器操作系統為CentOS7.0,使用最簡單的yum安裝
1.先加入yum源:
在/etc/yum.repos.d/目錄中新建一個mosquitto.repo文件,里面寫入:
[home_oojah_mqtt]
- name=mqtt (CentOS_CentOS-7)
type=rpm-md
baseurl=http://download.opensuse.org/repositories/home:/oojah:/mqtt/CentOS_CentOS-7/
gpgcheck=1
gpgkey=http://download.opensuse.org/repositories/home:/oojah:/mqtt/CentOS_CentOS-7//repodata/repomd.xml.key
enabled=1
熟悉命令的可以直接下載到服務器中重命名http://download.opensuse.org/repositories/home:/oojah:/mqtt/CentOS_CentOS-7/home:oojah:mqtt.repo
不熟悉命令操作的(比如說我)就直接新建文件ftp上傳吧。
2.開始安裝
yum search all mosquitto
yum install mosquitto mosquitto-clients
第一步先查找一下所有關於mosquitto的模塊。
顯現的模塊后面功能簡介,這里我先安裝了mosquitto mosquitto-clients兩個模塊用於后面的測試,以后要用上什么模塊我再安裝。
3.配置
安裝完成之后,所有配置文件會被放置於/etc/mosquitto/目錄下,
其中最重要的就是Mosquitto的配置文件,即mosquitto.conf
# Place your local configuration in /etc/mosquitto/conf.d/
pid_file /var/run/mosquitto.pid
persistence true
persistence_location /var/lib/mosquitto/
#log_dest file /var/log/mosquitto/mosquitto.log
include_dir /etc/mosquitto/conf.d
自定義的配置文件是放在/etc/mosquitto/conf.d/文件夾中,文件以.conf為擴展名。詳細的配置參數參考mosquitto.conf.example
# =================================================================
# General configuration
# =================================================================
# 客戶端心跳的間隔時間
#retry_interval 20
# 系統狀態的刷新時間
#sys_interval 10
# 系統資源的回收時間,0表示盡快處理
#store_clean_interval 10
# 服務進程的PID
#pid_file /var/run/mosquitto.pid
# 服務進程的系統用戶
#user mosquitto
# 客戶端心跳消息的最大並發數
#max_inflight_messages 10
# 客戶端心跳消息緩存隊列
#max_queued_messages 100
# 用於設置客戶端長連接的過期時間,默認永不過期
#persistent_client_expiration
# =================================================================
# Default listener
# =================================================================
# 服務綁定的IP地址
#bind_address
# 服務綁定的端口號
#port 1883
# 允許的最大連接數,-1表示沒有限制
#max_connections -1
# cafile:CA證書文件
# capath:CA證書目錄
# certfile:PEM證書文件
# keyfile:PEM密鑰文件
#cafile
#capath
#certfile
#keyfile
# 必須提供證書以保證數據安全性
#require_certificate false
# 若require_certificate值為true,use_identity_as_username也必須為true
#use_identity_as_username false
# 啟用PSK(Pre-shared-key)支持
#psk_hint
# SSL/TSL加密算法,可以使用“openssl ciphers”命令獲取
# as the output of that command.
#ciphers
# =================================================================
# Persistence
# =================================================================
# 消息自動保存的間隔時間
#autosave_interval 1800
# 消息自動保存功能的開關
#autosave_on_changes false
# 持久化功能的開關
persistence true
# 持久化DB文件
#persistence_file mosquitto.db
# 持久化DB文件目錄
#persistence_location /var/lib/mosquitto/
# =================================================================
# Logging
# =================================================================
# 4種日志模式:stdout、stderr、syslog、topic
# none 則表示不記日志,此配置可以提升些許性能
log_dest none
# 選擇日志的級別(可設置多項)
#log_type error
#log_type warning
#log_type notice
#log_type information
# 是否記錄客戶端連接信息
#connection_messages true
# 是否記錄日志時間
#log_timestamp true
# =================================================================
# Security
# =================================================================
# 客戶端ID的前綴限制,可用於保證安全性
#clientid_prefixes
# 允許匿名用戶
#allow_anonymous true
# 用戶/密碼文件,默認格式:username:password
#password_file
# PSK格式密碼文件,默認格式:identity:key
#psk_file
# pattern write sensor/%u/data
# ACL權限配置,常用語法如下:
# 用戶限制:user <username>
# 話題限制:topic [read|write] <topic>
# 正則限制:pattern write sensor/%u/data
#acl_file
# =================================================================
# Bridges
# =================================================================
# 允許服務之間使用“橋接”模式(可用於分布式部署)
#connection <name>
#address <host>[:<port>]
#topic <topic> [[[out | in | both] qos-level] local-prefix remote-prefix]
# 設置橋接的客戶端ID
#clientid
# 橋接斷開時,是否清除遠程服務器中的消息
#cleansession false
# 是否發布橋接的狀態信息
#notifications true
# 設置橋接模式下,消息將會發布到的話題地址
# $SYS/broker/connection/<clientid>/state
#notification_topic
# 設置橋接的keepalive數值
#keepalive_interval 60
# 橋接模式,目前有三種:automatic、lazy、once
#start_type automatic
# 橋接模式automatic的超時時間
#restart_timeout 30
# 橋接模式lazy的超時時間
#idle_timeout 60
# 橋接客戶端的用戶名
#username
# 橋接客戶端的密碼
#password
# bridge_cafile:橋接客戶端的CA證書文件
# bridge_capath:橋接客戶端的CA證書目錄
# bridge_certfile:橋接客戶端的PEM證書文件
# bridge_keyfile:橋接客戶端的PEM密鑰文件
#bridge_cafile
#bridge_capath
#bridge_certfile
#bridge_keyfile
# 自己的配置可以放到以下目錄中
include_dir /etc/mosquitto/conf.d
mosquitto -c /etc/mosquitto/mosquitto.conf -d
sudo /etc/init.d/mosquitto start
演示部分:
前面已經開啟了服務,如果沒有請參考前面步驟。在本例中,發布者、代理和訂閱者均為localhsot,但是在實際的情況下三種並不是同一個設備,在mosquitto中可通過-h(--host)設置主機名稱(hostname)。為了實現這個簡單的測試案例,需要在linux中打開三個控制台,分別代表代理服務器、發布者和訂閱者。
一、開啟另一個終端窗口,運行訂閱程序mosquitto_sub:
注意:
消息推送的發布和訂閱要有主題,選項[-t] 主題,即:mosquitto -t 主題
如需指定用戶名稱則加選項[-i] 用戶名,即:mosquitto_sub -t 主題 -i 訂閱端
mosquitto_sub -t mqtt
二、開啟另一個終端窗口,運行發布程序mosquitto_pub:
指定消息推送的主題,發布端用戶名和消息:
mosquitto_pub -t 主題 -i 發布端 -h 主機 -m 你好
*注意:如果消息中間有空格則消息要用引號括起來。
mosquitto_pub -h localhost -t mqtt -m "hello world."
這時候前面那個訂閱窗口就可以收到”hello world”的消息了。
Python的安裝環境就不講了。最新的paho-mqtt1.1使用下面的命令安裝
pip install paho-mqtt
import paho.mqtt.client as mqtt
# The callback for when the client receives a CONNACK response from the server.
def on_connect(client, userdata, flags, rc):
print("Connected with result code "+str(rc))
# Subscribing in on_connect() means that if we lose the connection and
# reconnect then subscriptions will be renewed.
client.subscribe("mqtt")
#訂閱,第一個參數是訂閱的主題
# The callback for when a PUBLISH message is received from the server.
def on_message(client, userdata, msg):
print(msg.topic+" "+str(msg.payload))
client = mqtt.Client()
client.on_connect = on_connect
client.on_message = on_message
client.connect("XXXXXXXXXX", 1883, 60)
#第一個參數為主機名,及Mosquitto所在服務器,第二個參數是端口
# Blocking call that processes network traffic, dispatches callbacks and
# handles reconnecting.
# Other loop*() functions are available that give a threaded interface and a
# manual interface.
client.loop_forever()

接下來我在服務器控制台發布一個消息
回頭看訂閱方
已經收到了推送的消息。
參考:mqtt消息中間件mosquitto的安裝和配置 - 斜風細雨