正文
物聯網是新一代信息技術的重要組成部分,也是“信息化”時代的重要發展階段。其英文名稱是:“Internet of things(IoT)”。顧名思義,物聯網就是物物相連的互聯網。這有兩層意思:其一,物聯網的核心和基礎仍然是互聯網,是在互聯網基礎上的延伸和擴展的網絡;其二,其用戶端延伸和擴展到了任何物品與物品之間,進行信息交換和通信,也就是物物相息。物聯網通過智能感知、識別技術與普適計算等通信感知技術,廣泛應用於網絡的融合中,也因此被稱為繼計算機、互聯網之后世界信息產業發展的第三次浪潮。
而在物聯網的應用上,對於信息傳輸,MQTT是一種再合適不過的協議工具了。
一、MQTT簡介
MQTT(Message Queuing Telemetry Transport,消息隊列遙測傳輸協議),是一種基於發布/訂閱(publish/subscribe)模式的輕量級協議,該協議構建於TCP/IP協議之上,MQTT最大優點在於,可以以極少的代碼和有限的帶寬,為連接遠程設備提供實時可靠的消息服務。作為一種低開銷、低帶寬占用的即時通訊協議,使其在物聯網、小型設備、移動應用等方面有較廣泛的應用。
MQTT是一個基於客戶端-服務器的消息發布/訂閱傳輸協議。MQTT協議是輕量、簡單、開放和易於實現的,這些特點使它適用范圍非常廣泛。在很多情況下,包括受限的環境中,如:機器與機器(M2M)通信和物聯網(IoT)。其在,通過衛星鏈路通信傳感器、偶爾撥號的醫療設備、智能家居、及一些小型化設備中已廣泛使用。
二、特性
MQTT協議工作在低帶寬、不可靠的網絡的遠程傳感器和控制設備通訊而設計的協議,它具有以下主要的幾項特性:
(1)使用發布/訂閱消息模式,提供一對多的消息發布,解除應用程序耦合。
(2)對負載內容屏蔽的消息傳輸。
(3)使用TCP/IP提供網絡連接。
主流的MQTT是基於TCP連接進行數據推送的,但是同樣有基於UDP的版本,叫做MQTT-SN。這兩種版本由於基於不同的連接方式,優缺點自然也就各有不同了。
(4)有三種消息發布服務質量:
“至多一次”,消息發布完全依賴底層TCP/IP網絡。會發生消息丟失或重復。這一級別可用於如下情況,環境傳感器數據,丟失一次讀記錄無所謂,因為不久后還會有第二次發送。這一種方式主要普通APP的推送,倘若你的智能設備在消息推送時未聯網,推送過去沒收到,再次聯網也就收不到了。
“至少一次”,確保消息到達,但消息重復可能會發生。
“只有一次”,確保消息到達一次。在一些要求比較嚴格的計費系統中,可以使用此級別。在計費系統中,消息重復或丟失會導致不正確的結果。這種最高質量的消息發布服務還可以用於即時通訊類的APP的推送,確保用戶收到且只會收到一次。
(5)小型傳輸,開銷很小(固定長度的頭部是2字節),協議交換最小化,以降低網絡流量。
這就是為什么在介紹里說它非常適合“在物聯網領域,傳感器與服務器的通信,信息的收集”,要知道嵌入式設備的運算能力和帶寬都相對薄弱,使用這種協議來傳遞消息再適合不過了。
三、實現方式
實現MQTT協議需要客戶端和服務器端通訊完成,在通訊過程中,MQTT協議中有三種身份:發布者(Publish)、代理(Broker)(服務器)、訂閱者(Subscribe)。其中,消息的發布者和訂閱者都是客戶端,消息代理是服務器,消息發布者可以同時是訂閱者。
MQTT傳輸的消息分為:主題(Topic)和負載(payload)兩部分:
(1)Topic,可以理解為消息的類型,訂閱者訂閱(Subscribe)后,就會收到該主題的消息內容(payload);
(2)payload,可以理解為消息的內容,是指訂閱者具體要使用的內容。
四、MQTT的搭建(ubuntu)
1、apt-get安裝mqtt相關包
2、測試mosquitto是否正確運行
3、本機終端測試mqtt
打開一個終端,訂閱主題
mosquitto_sub -h 192.168.1.102 -t "mqtt" -v
【-h】指定要連接的MQTT服務器
【-t】訂閱主題,此處為mqtt
【-v】打印更多的調試信息
再打開一個終端,發布主題
mosquitto_pub -h 192.168.1.102 -t "mqtt" -m "Hello Stonegeek"
【-h】指定要連接的MQTT服務器
【-t】向指定主題推送消息
【-m】指定消息內容
結果展示
五、MQTT權限配置
前面我們基於Mosquitto服務器已經搭建成功了,但是默認是允許匿名用戶登錄,對於正式上線的項目則是需要進行用戶認證(當然,用戶一般都會與數據庫映射,不過在這里我們就會直接將用戶寫入配置文件中)
1、Mosquitto服務器的配置文件為/etc/mosquitto/mosquitto.conf,關於用戶認證的方式和讀取的配置都在這個文件中進行
配置文件參數說明:
ID | allow_anonymous | password_file | acl_file | result |
1 | True(默認) | 允許匿名方式登錄 | ||
2 | False | password_file | 開啟用戶驗證機制 | |
3 | False | password_file | acl_file | 開啟用戶驗證機制,但訪問控制不起作用 |
4 | True | password_file | acl_file | 用戶名及密碼不為空,將自動進行用戶驗證且受到訪問控制的限制;用戶名及密碼為空,將不進行用戶驗證且受到訪問控制的限制 |
5 | False | 無法啟動服務 |
allow_anonymous允許匿名
password-file密碼文件
acl_file訪問控制列表
2、修改配置文件
命令:sudo vi /etc/mosquitto/mosquitto.conf
3、添加用戶信息
命令解釋: -c 創建一個用戶、/etc/mosquitto/pwfile.example 是將用戶創建到 pwfile.example 文件中、admin 是用戶名。
同樣連續會提示連續輸入兩次密碼。注意第二次創建用戶時不用加 -c 如果加 -c 會把第一次創建的用戶覆蓋。
至此兩個用戶創建成功,此時如果查看 pwfile.example 文件會發現其中多了兩個用戶。
4、添加Topic和用戶的關系
5、用戶認證測試
(1)重啟Mosquitto步驟
查看mosquitto的進程
命令:ps -aux|grep mosquitto
(2)殺死進程
命令:sudo kill -9 pid
(3)啟動
命令:mosquitto -c /etc/mosquitto/mosquitto.conf
(4)訂閱端啟動(不加用戶)
訂閱端啟動(加用戶)
(5)發布端啟動
六、MQTT實現(Java語言)
注意:由於我們在上面配置了MQTT的用戶權限控制,所以下面的用戶只能使用stonegeek登錄,否則項目會運行報錯,而且我們在上面設置的訪問控制列表中只有mtopic主題,所以我們必須使用此主題,否則,訂閱者會收不到已發布的主題內容(已經測試過了)
下面是我們Java語言實現的MQTT服務的發布/訂閱
1、添加Maven依賴
<dependency> <groupId>org.eclipse.paho</groupId> <artifactId>org.eclipse.paho.client.mqttv3</artifactId> <version>1.1.1</version> </dependency>
2、ServerMQTT.class
package com.stonegeek; import org.eclipse.paho.client.mqttv3.MqttClient; import org.eclipse.paho.client.mqttv3.MqttConnectOptions; import org.eclipse.paho.client.mqttv3.MqttDeliveryToken; import org.eclipse.paho.client.mqttv3.MqttException; import org.eclipse.paho.client.mqttv3.MqttMessage; import org.eclipse.paho.client.mqttv3.MqttPersistenceException; import org.eclipse.paho.client.mqttv3.MqttTopic; import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence; /** * Created by StoneGeek on 2018/6/5. * 博客地址:http://www.cnblogs.com/sxkgeek * 服務器向多個客戶端推送主題,即不同客戶端可向服務端訂閱相同的主題 */ public class ServerMQTT { //tcp://MQTT安裝的服務器地址:MQTT定義的端口號 public static final String HOST = "tcp://192.168.1.102:1883"; //定義一個主題 public static final String TOPIC = "mtopic"; //定義MQTT的ID,可以在MQTT服務配置中指定 private static final String clientid = "server11"; private MqttClient client; private MqttTopic topic11; private String userName = "stonegeek"; private String passWord = "123456"; private MqttMessage message; /** * 構造函數 * @throws MqttException */ public ServerMQTT() throws MqttException { // MemoryPersistence設置clientid的保存形式,默認為以內存保存 client = new MqttClient(HOST, clientid, new MemoryPersistence()); connect(); } /** * 用來連接服務器 */ private void connect() { MqttConnectOptions options = new MqttConnectOptions(); options.setCleanSession(false); options.setUserName(userName); options.setPassword(passWord.toCharArray()); // 設置超時時間 options.setConnectionTimeout(10); // 設置會話心跳時間 options.setKeepAliveInterval(20); try { client.setCallback(new PushCallback()); client.connect(options); topic11 = client.getTopic(TOPIC); } catch (Exception e) { e.printStackTrace(); } } /** * * @param topic * @param message * @throws MqttPersistenceException * @throws MqttException */ public void publish(MqttTopic topic , MqttMessage message) throws MqttPersistenceException, MqttException { MqttDeliveryToken token = topic.publish(message); token.waitForCompletion(); System.out.println("message is published completely! " + token.isComplete()); } /** * 啟動入口 * @param args * @throws MqttException */ public static void main(String[] args) throws MqttException { ServerMQTT server = new ServerMQTT(); server.message = new MqttMessage(); server.message.setQos(1); server.message.setRetained(true); server.message.setPayload("hello,topic11".getBytes()); server.publish(server.topic11 , server.message); System.out.println(server.message.isRetained() + "------ratained狀態"); } }
3、ClientMQTT.class
package com.stonegeek; import java.util.concurrent.ScheduledExecutorService; import org.eclipse.paho.client.mqttv3.MqttClient; import org.eclipse.paho.client.mqttv3.MqttConnectOptions; import org.eclipse.paho.client.mqttv3.MqttException; import org.eclipse.paho.client.mqttv3.MqttTopic; import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence; /** * Created by StoneGeek on 2018/6/5. * 博客地址:http://www.cnblogs.com/sxkgeek */ public class ClientMQTT { public static final String HOST = "tcp://192.168.1.102:1883"; public static final String TOPIC = "mtopic"; private static final String clientid = "client11"; private MqttClient client; private MqttConnectOptions options; private String userName = "stonegeek"; private String passWord = "123456"; private ScheduledExecutorService scheduler; private void start() { try { // host為主機名,clientid即連接MQTT的客戶端ID,一般以唯一標識符表示,MemoryPersistence設置clientid的保存形式,默認為以內存保存 client = new MqttClient(HOST, clientid, new MemoryPersistence()); // MQTT的連接設置 options = new MqttConnectOptions(); // 設置是否清空session,這里如果設置為false表示服務器會保留客戶端的連接記錄,這里設置為true表示每次連接到服務器都以新的身份連接 options.setCleanSession(true); // 設置連接的用戶名 options.setUserName(userName); // 設置連接的密碼 options.setPassword(passWord.toCharArray()); // 設置超時時間 單位為秒 options.setConnectionTimeout(10); // 設置會話心跳時間 單位為秒 服務器會每隔1.5*20秒的時間向客戶端發送個消息判斷客戶端是否在線,但這個方法並沒有重連的機制 options.setKeepAliveInterval(20); // 設置回調 client.setCallback(new PushCallback()); MqttTopic topic = client.getTopic(TOPIC); //setWill方法,如果項目中需要知道客戶端是否掉線可以調用該方法。設置最終端口的通知消息 options.setWill(topic, "close".getBytes(), 2, true); client.connect(options); //訂閱消息 int[] Qos = {1}; String[] topic1 = {TOPIC}; client.subscribe(topic1, Qos); } catch (Exception e) { e.printStackTrace(); } } public static void main(String[] args) throws MqttException { ClientMQTT client = new ClientMQTT(); client.start(); } }
4、PushCallback.class
package com.stonegeek;
import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttMessage;
/**
* Created by StoneGeek on 2018/6/5.
* 博客地址:http://www.cnblogs.com/sxkgeek
* 發布消息的回調類
*
* 必須實現MqttCallback的接口並實現對應的相關接口方法CallBack 類將實現 MqttCallBack。
* 每個客戶機標識都需要一個回調實例。在此示例中,構造函數傳遞客戶機標識以另存為實例數據。
* 在回調中,將它用來標識已經啟動了該回調的哪個實例。
* 必須在回調類中實現三個方法:
*
* public void messageArrived(MqttTopic topic, MqttMessage message)接收已經預訂的發布。
*
* public void connectionLost(Throwable cause)在斷開連接時調用。
*
* public void deliveryComplete(MqttDeliveryToken token))
* 接收到已經發布的 QoS 1 或 QoS 2 消息的傳遞令牌時調用。
* 由 MqttClient.connect 激活此回調。
*/
public class PushCallback implements MqttCallback{
public void connectionLost(Throwable cause) {
// 連接丟失后,一般在這里面進行重連
System.out.println("連接斷開,可以做重連");
}
public void deliveryComplete(IMqttDeliveryToken token) {
System.out.println("deliveryComplete---------" + token.isComplete());
}
public void messageArrived(String topic, MqttMessage message) throws Exception {
// subscribe后得到的消息會執行到這里面
System.out.println("接收消息主題 : " + topic);
System.out.println("接收消息Qos : " + message.getQos());
System.out.println("接收消息內容 : " + new String(message.getPayload()));
}
}
5、結果展示