Android MQTT的訂閱和發布消息


Android MQTT的訂閱和發布消息

MQTT協議簡述

MQTT(Message Queuing Telemetry Transport,消息隊列遙測傳輸協議),是基於發布/訂閱(Publish/Subscribe)模式的"輕量級"通訊協議,該協議構建於TCP/IP協議上,有IBM在1999年發布.MQTT最大的優點在於:可以以極少的代碼和有限的寬帶,為連接遠程設備提供可靠的消息服務.。作為一種開銷、低寬帶占用的即時通訊協議,在使其物聯網、小型設備、移動應用等方面較廣泛的應用。

MQTTMQTT是一個基於客戶端-服務器的消息發布/訂閱傳輸協議。MQTT協議是輕量、簡單、開放和易於實現的,這些特點使它適用范圍非常廣泛。在很多情況下,包括受限的環境中,如:機器與機器(M2M)通信和物聯網(IoT)。其在,通過衛星鏈路通信傳感器、偶爾撥號的醫療設備、智能家居、及一些小型化設備中已廣泛使用。

 

特點

 

1、實現簡單

2、提供數據傳輸的Qos

3、輕量、占用寬帶低

4、有三種消息發布服務質量  

  • "至多一次",消息發布完成依賴底層TCP/IP網絡。會發生消息丟失或重復。這一級可用於如下情況,壞境傳感器數據,丟失一次讀記錄無所謂,因為不久后還會有第二次發送
  • "至少一次",確保消息到達,但消息重復可能會發生。
  • "只有一次",確保消息到達一次,這一級別可用於如下情況,在計費系統中,消息重復或丟失會導致不正確的結果。

5、可傳輸任意類型的數據

6、可保持的會話(session)

 

MQTT協議原理

1 MQTT協議實現方式

實現MQTT協議需要客戶端和服務器端通訊完成,在通訊過程中,MQTT協議中有三種身份:發布者(Publish)、代理(Broker)(服務器)、訂閱者(Subscribe)。其中,消息的發布者和訂閱者都是客戶端,消息代理是服務器,消息發布者可以同時是訂閱者。

MQTT傳輸的消息分為:主題(Topic)和負載(payload)兩部分:

  • (1)Topic,可以理解為消息的類型,訂閱者訂閱(Subscribe)后,就會收到該主題的消息內容(payload);
  • (2)payload,可以理解為消息的內容,是指訂閱者具體要使用的內容。

2 網絡傳輸與應用消息

MQTT會構建底層網絡傳輸:它將建立客戶端到服務器的連接,提供兩者之間的一個有序的、無損的、基於字節流的雙向傳輸。

當應用數據通過MQTT網絡發送時,MQTT會把與之相關的服務質量(QoS)和主題名(Topic)相關連。

3 MQTT客戶端

一個使用MQTT協議的應用程序或者設備,它總是建立到服務器的網絡連接。客戶端可以:

  • (1)發布其他客戶端可能會訂閱的信息;
  • (2)訂閱其它客戶端發布的消息;
  • (3)退訂或刪除應用程序的消息;
  • (4)斷開與服務器連接。

4 MQTT服務器

MQTT服務器以稱為"消息代理"(Broker),可以是一個應用程序或一台設備。它是位於消息發布者和訂閱者之間,它可以:

  • (1)接受來自客戶的網絡連接;
  • (2)接受客戶發布的應用信息;
  • (3)處理來自客戶端的訂閱和退訂請求;
  • (4)向訂閱的客戶轉發應用程序消息。

5 MQTT協議中的訂閱、主題、會話

一、訂閱(Subscription)

訂閱包含主題篩選器(Topic Filter)和最大服務質量(QoS)。訂閱會與一個會話(Session)關聯。一個會話可以包含多個訂閱。每一個會話中的每個訂閱都有一個不同的主題篩選器。

二、會話(Session)

每個客戶端與服務器建立連接后就是一個會話,客戶端和服務器之間有狀態交互。會話存在於一個網絡之間,也可能在客戶端和服務器之間跨越多個連續的網絡連接。

三、主題名(Topic Name)

連接到一個應用程序消息的標簽,該標簽與服務器的訂閱相匹配。服務器會將消息發送給訂閱所匹配標簽的每個客戶端。

四、主題篩選器(Topic Filter)

一個對主題名通配符篩選器,在訂閱表達式中使用,表示訂閱所匹配到的多個主題。

五、負載(Payload)

消息訂閱者所具體接收的內容。

6 MQTT協議中的方法

MQTT協議中定義了一些方法(也被稱為動作),來於表示對確定資源所進行操作。這個資源可以代表預先存在的數據或動態生成數據,這取決於服務器的實現。通常來說,資源指服務器上的文件或輸出。主要方法有:

  • (1)Connect。等待與服務器建立連接。
  • (2)Disconnect。等待MQTT客戶端完成所做的工作,並與服務器斷開TCP/IP會話。
  • (3)Subscribe。等待完成訂閱。
  • (4)UnSubscribe。等待服務器取消客戶端的一個或多個topics訂閱。
  • (5)Publish。MQTT客戶端發送消息請求,發送完成后返回應用程序線程。

 

 

Android 下如何使用MQTT協議

  • 導入mqtt包
  • 配置MqttConnectOptions
  • 調用connect並將配置好的參數寫入
  • 通過指定的消息進行消息訂閱
  • 向訂閱topic對中發布消息
  • 通過mqttCallBack的回調對接收到的消息進行處理

導入mqtt包

1 implementation 'org.eclipse.paho:org.eclipse.paho.client.mqttv3:1.1.0' 
2 implementation 'org.eclipse.paho:org.eclipse.paho.android.service:1.1.1'

代碼

  1 public class MQTTService extends Service {
  2 
  3     public static final String TAG = MQTTService.class.getSimpleName();
  4 
  5     private static MqttAndroidClient client;
  6     private MqttConnectOptions conOpt;
  7 
  8     private String host = "tcp://xxx";   //服務器地址 
  9     private String userName = "xxx";   //賬號
 10     private String passWord = " xxx";   //密碼
 11     private static String myTopic = "topic";   //頻道名
 12     private String clientId = "mqtt_client";   //客戶端ID
 13 
 14     @Override
 15     public int onStartCommand(Intent intent, int flags, int startId) {
 16         init();
 17         return super.onStartCommand(intent, flags, startId);
 18     }
 19   //發布消息 msg
 20     public static void publish(String msg){
 21         String topic = myTopic;
 22         Integer qos = 2;
 23         Boolean retained = false;
 24         try {
 25             client.publish(topic, msg.getBytes(), qos.intValue(), retained.booleanValue());
 26         } catch (MqttException e) {
 27             e.printStackTrace();
 28         }
 29     }
 30 
 31     private void init() {
 32         // 服務器地址(協議+地址+端口號)
 33         String uri = host;
 34         client = new MqttAndroidClient(this, uri, clientId);
 35         // 設置MQTT監聽並且接受消息
 36         client.setCallback(mqttCallback);
 37 
 38         conOpt = new MqttConnectOptions();
 39         // 清除緩存
 40         conOpt.setCleanSession(true);
 41         // 設置超時時間,單位:秒
 42         conOpt.setConnectionTimeout(10);
 43         // 心跳包發送間隔,單位:秒
 44         conOpt.setKeepAliveInterval(20);
 45         // 用戶名
 46         conOpt.setUserName(userName);
 47         // 密碼
 48         conOpt.setPassword(passWord.toCharArray());
 49 
 50         // last will message
 51         boolean doConnect = true;
 52         String message = "{\"terminal_uid\":\"" + clientId + "\"}";
 53         String topic = myTopic;
 54         Integer qos = 2;
 55         Boolean retained = false;
 56         if ((!message.equals("")) || (!topic.equals(""))) {
 57             // 最后
 58             try {
 59                 conOpt.setWill(topic, message.getBytes(), qos.intValue(), retained.booleanValue());
 60             } catch (Exception e) {
 61                 Log.i(TAG, "Exception Occured", e);
 62                 doConnect = false;
 63                 iMqttActionListener.onFailure(null, e);
 64             }
 65         }
 66 
 67         if (doConnect) {
 68             doClientConnection();
 69         }
 70 
 71     }
 72 
 73     @Override
 74     public void onDestroy() {
 75         try {
 76             client.disconnect();  //服務銷毀,斷開連接
 77         } catch (MqttException e) {
 78             e.printStackTrace();
 79         }
 80         super.onDestroy();
 81     }
 82 
 83     /** 連接MQTT服務器 */
 84     private void doClientConnection() {
 85         if (!client.isConnected() && isConnectIsNomarl()) {
 86             try {
 87                 client.connect(conOpt, null, iMqttActionListener);
 88             } catch (MqttException e) {
 89                 e.printStackTrace();
 90             }
 91         }
 92 
 93     }
 94 
 95     // MQTT是否連接成功
 96     private IMqttActionListener iMqttActionListener = new IMqttActionListener() {
 97 
 98         @Override
 99         public void onSuccess(IMqttToken arg0) {
100             Log.i(TAG, "連接成功 ");
101             try {
102                 // 訂閱myTopic話題,當訂閱多條頻道,需要遍歷逐條訂閱,否則有可能訂閱失敗
103                 client.subscribe(myTopic,1);  
104             } catch (MqttException e) {
105                 e.printStackTrace();
106             }
107         }
108 
109         @Override
110         public void onFailure(IMqttToken arg0, Throwable arg1) {
111             arg1.printStackTrace();
112             // 連接失敗,重連
113         }
114     };
115 
116     // MQTT監聽並且接受消息
117     private MqttCallback mqttCallback = new MqttCallback() {
118 
119         @Override
120         public void messageArrived(String topic, MqttMessage message) throws Exception {
121 
122             String str1 = new String(message.getPayload());
123             MQTTMessage msg = new MQTTMessage(); //自定義接口
124             msg.setMessage(str1);
125           //訂閱信息,接收的信息message
126             String str2 = topic + ";qos:" + message.getQos() + ";retained:" + message.isRetained();
127             Log.i(TAG, "messageArrived:" + str1);
128             Log.i(TAG, str2);
129         }
130 
131         @Override
132         public void deliveryComplete(IMqttDeliveryToken arg0) {
133 
134         }
135 
136         @Override
137         public void connectionLost(Throwable arg0) {
138             // 失去連接,重連
139         }
140     };
141 
142     /** 判斷網絡是否連接 */
143     private boolean isConnectIsNomarl() {
144         ConnectivityManager connectivityManager = (ConnectivityManager) this.getApplicationContext().getSystemService(Context.CONNECTIVITY_SERVICE);
145         NetworkInfo info = connectivityManager.getActiveNetworkInfo();
146         if (info != null && info.isAvailable()) {
147             String name = info.getTypeName();
148             Log.i(TAG, "MQTT當前網絡名稱:" + name);
149             return true;
150         } else {
151             Log.i(TAG, "MQTT 沒有可用網絡");
152             return false;
153         }
154     }
155 
156     @Nullable
157     @Override
158     public IBinder onBind(Intent intent) {
159         return null;
160     }
161 }

 

 連接成功后,就可以實現和服務端消息的發送和接收。

最后在AndroidManifest.xml文件

注冊Services

1 <service android:name="org.eclipse.paho.android.service.MqttService" />   <!--導入包,Android自帶Mqtt的服務,需要注冊-->
2 <service android:name=".mqtttest.MQService"/>         <!--自己開啟的服務-->

權限

1     <uses-permission android:name="android.permission.WAKE_LOCK" />
2     <uses-permission android:name="android.permission.WRITE_EXTERNAL_STORAGE" />
3     <uses-permission android:name="android.permission.ACCESS_NETWORK_STATE" />
4     <uses-permission android:name="android.permission.READ_PHONE_STATE" />
5     <uses-permission android:name="android.permission.READ_EXTERNAL_STORAGE" />
6     <uses-permission android:name="android.permission.INTERNET" />

 

完結!

 

最后:請留下您的贊!阿里嘎多.


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM