Android MQTT的訂閱和發布消息
MQTT協議簡述
MQTT(Message Queuing Telemetry Transport,消息隊列遙測傳輸協議),是基於發布/訂閱(Publish/Subscribe)模式的"輕量級"通訊協議,該協議構建於TCP/IP協議上,有IBM在1999年發布.MQTT最大的優點在於:可以以極少的代碼和有限的寬帶,為連接遠程設備提供可靠的消息服務.。作為一種開銷、低寬帶占用的即時通訊協議,在使其物聯網、小型設備、移動應用等方面較廣泛的應用。
MQTTMQTT是一個基於客戶端-服務器的消息發布/訂閱傳輸協議。MQTT協議是輕量、簡單、開放和易於實現的,這些特點使它適用范圍非常廣泛。在很多情況下,包括受限的環境中,如:機器與機器(M2M)通信和物聯網(IoT)。其在,通過衛星鏈路通信傳感器、偶爾撥號的醫療設備、智能家居、及一些小型化設備中已廣泛使用。
特點
1、實現簡單
2、提供數據傳輸的Qos
3、輕量、占用寬帶低
4、有三種消息發布服務質量
- "至多一次",消息發布完成依賴底層TCP/IP網絡。會發生消息丟失或重復。這一級可用於如下情況,壞境傳感器數據,丟失一次讀記錄無所謂,因為不久后還會有第二次發送
- "至少一次",確保消息到達,但消息重復可能會發生。
- "只有一次",確保消息到達一次,這一級別可用於如下情況,在計費系統中,消息重復或丟失會導致不正確的結果。
5、可傳輸任意類型的數據
6、可保持的會話(session)
MQTT協議原理
1 MQTT協議實現方式
實現MQTT協議需要客戶端和服務器端通訊完成,在通訊過程中,MQTT協議中有三種身份:發布者(Publish)、代理(Broker)(服務器)、訂閱者(Subscribe)。其中,消息的發布者和訂閱者都是客戶端,消息代理是服務器,消息發布者可以同時是訂閱者。
MQTT傳輸的消息分為:主題(Topic)和負載(payload)兩部分:
- (1)Topic,可以理解為消息的類型,訂閱者訂閱(Subscribe)后,就會收到該主題的消息內容(payload);
- (2)payload,可以理解為消息的內容,是指訂閱者具體要使用的內容。
2 網絡傳輸與應用消息
MQTT會構建底層網絡傳輸:它將建立客戶端到服務器的連接,提供兩者之間的一個有序的、無損的、基於字節流的雙向傳輸。
當應用數據通過MQTT網絡發送時,MQTT會把與之相關的服務質量(QoS)和主題名(Topic)相關連。
3 MQTT客戶端
一個使用MQTT協議的應用程序或者設備,它總是建立到服務器的網絡連接。客戶端可以:
- (1)發布其他客戶端可能會訂閱的信息;
- (2)訂閱其它客戶端發布的消息;
- (3)退訂或刪除應用程序的消息;
- (4)斷開與服務器連接。
4 MQTT服務器
MQTT服務器以稱為"消息代理"(Broker),可以是一個應用程序或一台設備。它是位於消息發布者和訂閱者之間,它可以:
- (1)接受來自客戶的網絡連接;
- (2)接受客戶發布的應用信息;
- (3)處理來自客戶端的訂閱和退訂請求;
- (4)向訂閱的客戶轉發應用程序消息。
5 MQTT協議中的訂閱、主題、會話
一、訂閱(Subscription)
訂閱包含主題篩選器(Topic Filter)和最大服務質量(QoS)。訂閱會與一個會話(Session)關聯。一個會話可以包含多個訂閱。每一個會話中的每個訂閱都有一個不同的主題篩選器。
二、會話(Session)
每個客戶端與服務器建立連接后就是一個會話,客戶端和服務器之間有狀態交互。會話存在於一個網絡之間,也可能在客戶端和服務器之間跨越多個連續的網絡連接。
三、主題名(Topic Name)
連接到一個應用程序消息的標簽,該標簽與服務器的訂閱相匹配。服務器會將消息發送給訂閱所匹配標簽的每個客戶端。
四、主題篩選器(Topic Filter)
一個對主題名通配符篩選器,在訂閱表達式中使用,表示訂閱所匹配到的多個主題。
五、負載(Payload)
消息訂閱者所具體接收的內容。
6 MQTT協議中的方法
MQTT協議中定義了一些方法(也被稱為動作),來於表示對確定資源所進行操作。這個資源可以代表預先存在的數據或動態生成數據,這取決於服務器的實現。通常來說,資源指服務器上的文件或輸出。主要方法有:
- (1)Connect。等待與服務器建立連接。
- (2)Disconnect。等待MQTT客戶端完成所做的工作,並與服務器斷開TCP/IP會話。
- (3)Subscribe。等待完成訂閱。
- (4)UnSubscribe。等待服務器取消客戶端的一個或多個topics訂閱。
- (5)Publish。MQTT客戶端發送消息請求,發送完成后返回應用程序線程。
Android 下如何使用MQTT協議
- 導入mqtt包
- 配置MqttConnectOptions
- 調用connect並將配置好的參數寫入
- 通過指定的消息進行消息訂閱
- 向訂閱topic對中發布消息
- 通過mqttCallBack的回調對接收到的消息進行處理
導入mqtt包
1 implementation 'org.eclipse.paho:org.eclipse.paho.client.mqttv3:1.1.0' 2 implementation 'org.eclipse.paho:org.eclipse.paho.android.service:1.1.1'
代碼
1 public class MQTTService extends Service { 2 3 public static final String TAG = MQTTService.class.getSimpleName(); 4 5 private static MqttAndroidClient client; 6 private MqttConnectOptions conOpt; 7 8 private String host = "tcp://xxx"; //服務器地址 9 private String userName = "xxx"; //賬號 10 private String passWord = " xxx"; //密碼 11 private static String myTopic = "topic"; //頻道名 12 private String clientId = "mqtt_client"; //客戶端ID 13 14 @Override 15 public int onStartCommand(Intent intent, int flags, int startId) { 16 init(); 17 return super.onStartCommand(intent, flags, startId); 18 } 19 //發布消息 msg 20 public static void publish(String msg){ 21 String topic = myTopic; 22 Integer qos = 2; 23 Boolean retained = false; 24 try { 25 client.publish(topic, msg.getBytes(), qos.intValue(), retained.booleanValue()); 26 } catch (MqttException e) { 27 e.printStackTrace(); 28 } 29 } 30 31 private void init() { 32 // 服務器地址(協議+地址+端口號) 33 String uri = host; 34 client = new MqttAndroidClient(this, uri, clientId); 35 // 設置MQTT監聽並且接受消息 36 client.setCallback(mqttCallback); 37 38 conOpt = new MqttConnectOptions(); 39 // 清除緩存 40 conOpt.setCleanSession(true); 41 // 設置超時時間,單位:秒 42 conOpt.setConnectionTimeout(10); 43 // 心跳包發送間隔,單位:秒 44 conOpt.setKeepAliveInterval(20); 45 // 用戶名 46 conOpt.setUserName(userName); 47 // 密碼 48 conOpt.setPassword(passWord.toCharArray()); 49 50 // last will message 51 boolean doConnect = true; 52 String message = "{\"terminal_uid\":\"" + clientId + "\"}"; 53 String topic = myTopic; 54 Integer qos = 2; 55 Boolean retained = false; 56 if ((!message.equals("")) || (!topic.equals(""))) { 57 // 最后 58 try { 59 conOpt.setWill(topic, message.getBytes(), qos.intValue(), retained.booleanValue()); 60 } catch (Exception e) { 61 Log.i(TAG, "Exception Occured", e); 62 doConnect = false; 63 iMqttActionListener.onFailure(null, e); 64 } 65 } 66 67 if (doConnect) { 68 doClientConnection(); 69 } 70 71 } 72 73 @Override 74 public void onDestroy() { 75 try { 76 client.disconnect(); //服務銷毀,斷開連接 77 } catch (MqttException e) { 78 e.printStackTrace(); 79 } 80 super.onDestroy(); 81 } 82 83 /** 連接MQTT服務器 */ 84 private void doClientConnection() { 85 if (!client.isConnected() && isConnectIsNomarl()) { 86 try { 87 client.connect(conOpt, null, iMqttActionListener); 88 } catch (MqttException e) { 89 e.printStackTrace(); 90 } 91 } 92 93 } 94 95 // MQTT是否連接成功 96 private IMqttActionListener iMqttActionListener = new IMqttActionListener() { 97 98 @Override 99 public void onSuccess(IMqttToken arg0) { 100 Log.i(TAG, "連接成功 "); 101 try { 102 // 訂閱myTopic話題,當訂閱多條頻道,需要遍歷逐條訂閱,否則有可能訂閱失敗 103 client.subscribe(myTopic,1); 104 } catch (MqttException e) { 105 e.printStackTrace(); 106 } 107 } 108 109 @Override 110 public void onFailure(IMqttToken arg0, Throwable arg1) { 111 arg1.printStackTrace(); 112 // 連接失敗,重連 113 } 114 }; 115 116 // MQTT監聽並且接受消息 117 private MqttCallback mqttCallback = new MqttCallback() { 118 119 @Override 120 public void messageArrived(String topic, MqttMessage message) throws Exception { 121 122 String str1 = new String(message.getPayload()); 123 MQTTMessage msg = new MQTTMessage(); //自定義接口 124 msg.setMessage(str1); 125 //訂閱信息,接收的信息message 126 String str2 = topic + ";qos:" + message.getQos() + ";retained:" + message.isRetained(); 127 Log.i(TAG, "messageArrived:" + str1); 128 Log.i(TAG, str2); 129 } 130 131 @Override 132 public void deliveryComplete(IMqttDeliveryToken arg0) { 133 134 } 135 136 @Override 137 public void connectionLost(Throwable arg0) { 138 // 失去連接,重連 139 } 140 }; 141 142 /** 判斷網絡是否連接 */ 143 private boolean isConnectIsNomarl() { 144 ConnectivityManager connectivityManager = (ConnectivityManager) this.getApplicationContext().getSystemService(Context.CONNECTIVITY_SERVICE); 145 NetworkInfo info = connectivityManager.getActiveNetworkInfo(); 146 if (info != null && info.isAvailable()) { 147 String name = info.getTypeName(); 148 Log.i(TAG, "MQTT當前網絡名稱:" + name); 149 return true; 150 } else { 151 Log.i(TAG, "MQTT 沒有可用網絡"); 152 return false; 153 } 154 } 155 156 @Nullable 157 @Override 158 public IBinder onBind(Intent intent) { 159 return null; 160 } 161 }
連接成功后,就可以實現和服務端消息的發送和接收。
最后在AndroidManifest.xml文件
注冊Services
1 <service android:name="org.eclipse.paho.android.service.MqttService" /> <!--導入包,Android自帶Mqtt的服務,需要注冊--> 2 <service android:name=".mqtttest.MQService"/> <!--自己開啟的服務-->
權限
1 <uses-permission android:name="android.permission.WAKE_LOCK" /> 2 <uses-permission android:name="android.permission.WRITE_EXTERNAL_STORAGE" /> 3 <uses-permission android:name="android.permission.ACCESS_NETWORK_STATE" /> 4 <uses-permission android:name="android.permission.READ_PHONE_STATE" /> 5 <uses-permission android:name="android.permission.READ_EXTERNAL_STORAGE" /> 6 <uses-permission android:name="android.permission.INTERNET" />
完結!