MQTT介紹與使用


 

正文

  物聯網是新一代信息技術的重要組成部分,也是“信息化”時代的重要發展階段。其英文名稱是:“Internet of things(IoT)”。顧名思義,物聯網就是物物相連的互聯網。這有兩層意思:其一,物聯網的核心和基礎仍然是互聯網,是在互聯網基礎上的延伸和擴展的網絡;其二,其用戶端延伸和擴展到了任何物品與物品之間,進行信息交換和通信,也就是物物相息。物聯網通過智能感知、識別技術與普適計算等通信感知技術,廣泛應用於網絡的融合中,也因此被稱為繼計算機、互聯網之后世界信息產業發展的第三次浪潮。

  而在物聯網的應用上,對於信息傳輸,MQTT是一種再合適不過的協議工具了。

一、MQTT簡介

  MQTT(Message Queuing Telemetry Transport,消息隊列遙測傳輸協議),是一種基於發布/訂閱(publish/subscribe)模式的輕量級協議,該協議構建於TCP/IP協議之上,MQTT最大優點在於,可以以極少的代碼和有限的帶寬,為連接遠程設備提供實時可靠的消息服務。作為一種低開銷、低帶寬占用的即時通訊協議,使其在物聯網、小型設備、移動應用等方面有較廣泛的應用。

  MQTT是一個基於客戶端-服務器的消息發布/訂閱傳輸協議。MQTT協議是輕量、簡單、開放和易於實現的,這些特點使它適用范圍非常廣泛。在很多情況下,包括受限的環境中,如:機器與機器(M2M)通信和物聯網(IoT)。其在,通過衛星鏈路通信傳感器、偶爾撥號的醫療設備、智能家居、及一些小型化設備中已廣泛使用。  

二、特性

  MQTT協議工作在低帶寬、不可靠的網絡的遠程傳感器和控制設備通訊而設計的協議,它具有以下主要的幾項特性:

  (1)使用發布/訂閱消息模式,提供一對多的消息發布,解除應用程序耦合。

  (2)對負載內容屏蔽的消息傳輸。

  (3)使用TCP/IP提供網絡連接。

  主流的MQTT是基於TCP連接進行數據推送的,但是同樣有基於UDP的版本,叫做MQTT-SN。這兩種版本由於基於不同的連接方式,優缺點自然也就各有不同了。

  (4)有三種消息發布服務質量:

  “至多一次”,消息發布完全依賴底層TCP/IP網絡。會發生消息丟失或重復。這一級別可用於如下情況,環境傳感器數據,丟失一次讀記錄無所謂,因為不久后還會有第二次發送。這一種方式主要普通APP的推送,倘若你的智能設備在消息推送時未聯網,推送過去沒收到,再次聯網也就收不到了。

  “至少一次”,確保消息到達,但消息重復可能會發生。

  “只有一次”,確保消息到達一次。在一些要求比較嚴格的計費系統中,可以使用此級別。在計費系統中,消息重復或丟失會導致不正確的結果。這種最高質量的消息發布服務還可以用於即時通訊類的APP的推送,確保用戶收到且只會收到一次。 

  (5)小型傳輸,開銷很小(固定長度的頭部是2字節),協議交換最小化,以降低網絡流量。

  這就是為什么在介紹里說它非常適合“在物聯網領域,傳感器與服務器的通信,信息的收集”,要知道嵌入式設備的運算能力和帶寬都相對薄弱,使用這種協議來傳遞消息再適合不過了。

三、實現方式  

  實現MQTT協議需要客戶端和服務器端通訊完成,在通訊過程中,MQTT協議中有三種身份:發布者(Publish)、代理(Broker)(服務器)、訂閱者(Subscribe)。其中,消息的發布者和訂閱者都是客戶端,消息代理是服務器,消息發布者可以同時是訂閱者。

  MQTT傳輸的消息分為:主題(Topic)和負載(payload)兩部分:

  (1)Topic,可以理解為消息的類型,訂閱者訂閱(Subscribe)后,就會收到該主題的消息內容(payload);

  (2)payload,可以理解為消息的內容,是指訂閱者具體要使用的內容。

四、MQTT的搭建(ubuntu)

  1、apt-get安裝mqtt相關包

  

  

  2、測試mosquitto是否正確運行

  

  3、本機終端測試mqtt

  打開一個終端,訂閱主題

mosquitto_sub -h 192.168.1.102 -t "mqtt" -v

  【-h】指定要連接的MQTT服務器 
  【-t】訂閱主題,此處為mqtt 
  【-v】打印更多的調試信息

  再打開一個終端,發布主題

mosquitto_pub -h 192.168.1.102 -t "mqtt" -m "Hello Stonegeek"

  【-h】指定要連接的MQTT服務器 
  【-t】向指定主題推送消息 
  【-m】指定消息內容

  結果展示

  

五、MQTT權限配置

  前面我們基於Mosquitto服務器已經搭建成功了,但是默認是允許匿名用戶登錄,對於正式上線的項目則是需要進行用戶認證(當然,用戶一般都會與數據庫映射,不過在這里我們就會直接將用戶寫入配置文件中)

  1、Mosquitto服務器的配置文件為/etc/mosquitto/mosquitto.conf,關於用戶認證的方式和讀取的配置都在這個文件中進行

  配置文件參數說明:

ID allow_anonymous password_file  acl_file result
1 True(默認)     允許匿名方式登錄
2 False password_file   開啟用戶驗證機制
3 False password_file acl_file 開啟用戶驗證機制,但訪問控制不起作用
4 True password_file acl_file 用戶名及密碼不為空,將自動進行用戶驗證且受到訪問控制的限制;用戶名及密碼為空,將不進行用戶驗證且受到訪問控制的限制
5 False     無法啟動服務

  allow_anonymous允許匿名

  password-file密碼文件

  acl_file訪問控制列表

  2、修改配置文件

  命令:sudo vi /etc/mosquitto/mosquitto.conf

  

  3、添加用戶信息

  

  命令解釋: -c 創建一個用戶、/etc/mosquitto/pwfile.example 是將用戶創建到 pwfile.example  文件中、admin 是用戶名。 

        同樣連續會提示連續輸入兩次密碼。注意第二次創建用戶時不用加 -c 如果加 -c 會把第一次創建的用戶覆蓋。

        至此兩個用戶創建成功,此時如果查看 pwfile.example 文件會發現其中多了兩個用戶。

  4、添加Topic和用戶的關系

  

  5、用戶認證測試

  (1)重啟Mosquitto步驟    

  查看mosquitto的進程

  命令:ps -aux|grep mosquitto

  

  (2)殺死進程

  命令:sudo kill -9 pid

  

  (3)啟動

  命令:mosquitto -c /etc/mosquitto/mosquitto.conf 

  (4)訂閱端啟動(不加用戶)

  

  訂閱端啟動(加用戶)

  

  (5)發布端啟動

  

六、MQTT實現(Java語言)

  注意:由於我們在上面配置了MQTT的用戶權限控制,所以下面的用戶只能使用stonegeek登錄,否則項目會運行報錯,而且我們在上面設置的訪問控制列表中只有mtopic主題,所以我們必須使用此主題,否則,訂閱者會收不到已發布的主題內容(已經測試過了)

  

  下面是我們Java語言實現的MQTT服務的發布/訂閱

  1、添加Maven依賴

    <dependency>
      <groupId>org.eclipse.paho</groupId>
      <artifactId>org.eclipse.paho.client.mqttv3</artifactId>
      <version>1.1.1</version>
    </dependency>

  2、ServerMQTT.class

復制代碼
package com.stonegeek;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.eclipse.paho.client.mqttv3.MqttPersistenceException;
import org.eclipse.paho.client.mqttv3.MqttTopic;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
/**
 * Created by StoneGeek on 2018/6/5.
 * 博客地址:http://www.cnblogs.com/sxkgeek
 * 服務器向多個客戶端推送主題,即不同客戶端可向服務端訂閱相同的主題
 */
public class ServerMQTT {
    //tcp://MQTT安裝的服務器地址:MQTT定義的端口號
    public static final String HOST = "tcp://192.168.1.102:1883";
    //定義一個主題
    public static final String TOPIC = "mtopic";
    //定義MQTT的ID,可以在MQTT服務配置中指定
    private static final String clientid = "server11";

    private MqttClient client;
    private MqttTopic topic11;
    private String userName = "stonegeek";
    private String passWord = "123456";

    private MqttMessage message;

    /**
     * 構造函數
     * @throws MqttException
     */
    public ServerMQTT() throws MqttException {
        // MemoryPersistence設置clientid的保存形式,默認為以內存保存
        client = new MqttClient(HOST, clientid, new MemoryPersistence());
        connect();
    }

    /**
     *  用來連接服務器
     */
    private void connect() {
        MqttConnectOptions options = new MqttConnectOptions();
        options.setCleanSession(false);
        options.setUserName(userName);
        options.setPassword(passWord.toCharArray());
        // 設置超時時間
        options.setConnectionTimeout(10);
        // 設置會話心跳時間
        options.setKeepAliveInterval(20);
        try {
            client.setCallback(new PushCallback());
            client.connect(options);

            topic11 = client.getTopic(TOPIC);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    /**
     *
     * @param topic
     * @param message
     * @throws MqttPersistenceException
     * @throws MqttException
     */
    public void publish(MqttTopic topic , MqttMessage message) throws MqttPersistenceException,
            MqttException {
        MqttDeliveryToken token = topic.publish(message);
        token.waitForCompletion();
        System.out.println("message is published completely! "
                + token.isComplete());
    }

    /**
     *  啟動入口
     * @param args
     * @throws MqttException
     */
    public static void main(String[] args) throws MqttException {
        ServerMQTT server = new ServerMQTT();

        server.message = new MqttMessage();
        server.message.setQos(1);
        server.message.setRetained(true);
        server.message.setPayload("hello,topic11".getBytes());
        server.publish(server.topic11 , server.message);
        System.out.println(server.message.isRetained() + "------ratained狀態");
    }
}
復制代碼

  3、ClientMQTT.class

復制代碼
package com.stonegeek;
import java.util.concurrent.ScheduledExecutorService;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttTopic;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
/**
 * Created by StoneGeek on 2018/6/5.
 * 博客地址:http://www.cnblogs.com/sxkgeek
 */
public class ClientMQTT {
    public static final String HOST = "tcp://192.168.1.102:1883";
    public static final String TOPIC = "mtopic";
    private static final String clientid = "client11";
    private MqttClient client;
    private MqttConnectOptions options;
    private String userName = "stonegeek";
    private String passWord = "123456";

    private ScheduledExecutorService scheduler;

    private void start() {
        try {
            // host為主機名,clientid即連接MQTT的客戶端ID,一般以唯一標識符表示,MemoryPersistence設置clientid的保存形式,默認為以內存保存
            client = new MqttClient(HOST, clientid, new MemoryPersistence());
            // MQTT的連接設置
            options = new MqttConnectOptions();
            // 設置是否清空session,這里如果設置為false表示服務器會保留客戶端的連接記錄,這里設置為true表示每次連接到服務器都以新的身份連接
            options.setCleanSession(true);
            // 設置連接的用戶名
            options.setUserName(userName);
            // 設置連接的密碼
            options.setPassword(passWord.toCharArray());
            // 設置超時時間 單位為秒
            options.setConnectionTimeout(10);
            // 設置會話心跳時間 單位為秒 服務器會每隔1.5*20秒的時間向客戶端發送個消息判斷客戶端是否在線,但這個方法並沒有重連的機制
            options.setKeepAliveInterval(20);
            // 設置回調
            client.setCallback(new PushCallback());
            MqttTopic topic = client.getTopic(TOPIC);
            //setWill方法,如果項目中需要知道客戶端是否掉線可以調用該方法。設置最終端口的通知消息
            options.setWill(topic, "close".getBytes(), 2, true);

            client.connect(options);
            //訂閱消息
            int[] Qos  = {1};
            String[] topic1 = {TOPIC};
            client.subscribe(topic1, Qos);

        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    public static void main(String[] args) throws MqttException {
        ClientMQTT client = new ClientMQTT();
        client.start();
    }
}
復制代碼

  4、PushCallback.class

復制代碼
package com.stonegeek;

import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttMessage;

/**
* Created by StoneGeek on 2018/6/5.
* 博客地址:http://www.cnblogs.com/sxkgeek
* 發布消息的回調類
*
* 必須實現MqttCallback的接口並實現對應的相關接口方法CallBack 類將實現 MqttCallBack。
* 每個客戶機標識都需要一個回調實例。在此示例中,構造函數傳遞客戶機標識以另存為實例數據。
* 在回調中,將它用來標識已經啟動了該回調的哪個實例。
* 必須在回調類中實現三個方法:
*
* public void messageArrived(MqttTopic topic, MqttMessage message)接收已經預訂的發布。
*
* public void connectionLost(Throwable cause)在斷開連接時調用。
*
* public void deliveryComplete(MqttDeliveryToken token))
* 接收到已經發布的 QoS 1 或 QoS 2 消息的傳遞令牌時調用。
* 由 MqttClient.connect 激活此回調。
*/
public class PushCallback implements MqttCallback{
public void connectionLost(Throwable cause) {
// 連接丟失后,一般在這里面進行重連
System.out.println("連接斷開,可以做重連");
}

public void deliveryComplete(IMqttDeliveryToken token) {
System.out.println("deliveryComplete---------" + token.isComplete());
}

public void messageArrived(String topic, MqttMessage message) throws Exception {
// subscribe后得到的消息會執行到這里面
System.out.println("接收消息主題 : " + topic);
System.out.println("接收消息Qos : " + message.getQos());
System.out.println("接收消息內容 : " + new String(message.getPayload()));
}
}
復制代碼

  5、結果展示

  

  


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM