一、MQTT(消息隊列)簡介
MQTT(MQ Telemetry Transport)是IBM開發的一種網絡應用層的協議,提供輕量級的,支持可發布/可訂閱的的消息推送模式,使設備對設備之間的短消息通信變得簡單,比如現在應用廣泛的低功耗傳感器,手機、嵌入式計算機、微型控制器等移動設備。
常用的有eclipse paho、activeMQ、阿里MQTT和其他的實現,本實例使用paho
1、使用場景:
1、不可靠、網絡帶寬小的網絡
2、運行的設備CPU、內存非常有限
(我個人主要是用在服務端與嵌入式客戶端進行消息和廣告的通信)
2、特點:
1、基於發布/訂閱模型的協議
2、他是二進制協議,二進制的特點就是緊湊、占用空間小。他的協議頭只有2個字節
3、提供了三種消息可能性保障(Qos):0:最多一次 、1:最少一次 、2:只有一次
3、關鍵字
1、HOST:搭載MQTT的服務器地址
2、TOPIC:消息主題,可以被客戶端訂閱,實現對應消息的收發
3、clientId:客戶端ID,用於服務器對不同客戶端的識別
4、subscribe/unsubscribe:客戶端對消息主題的訂閱和取消訂閱
5、Qos:消息的服務質量,當網絡過載或擁塞時,QoS 能確保重要業務量不受延遲或丟棄
6、Callback:當客戶端收到消息后對消息的處理(回調)
7、KeepAliveInterval:客戶端與服務器之間的連接是通過發送心跳包來保持存活
二、JAVA端實例
首先導入所依賴jar包
1、服務端代碼(負責消息的發送)
package com.sc.util.paho; import org.eclipse.paho.client.mqttv3.MqttClient; import org.eclipse.paho.client.mqttv3.MqttConnectOptions; import org.eclipse.paho.client.mqttv3.MqttDeliveryToken; import org.eclipse.paho.client.mqttv3.MqttException; import org.eclipse.paho.client.mqttv3.MqttMessage; import org.eclipse.paho.client.mqttv3.MqttPersistenceException; import org.eclipse.paho.client.mqttv3.MqttTopic; import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence; /** * Title:Server 這是發送消息的服務端 * Description: 服務器向多個客戶端推送主題,即不同客戶端可向服務器訂閱相同主題 * @author rao */ public class ServerMQTT { //tcp://MQTT安裝的服務器地址:MQTT定義的端口號 public static final String HOST = "tcp://192.168.1.102:1883"; //定義一個主題 public static final String TOPIC = "pos_message_all"; //定義MQTT的ID,可以在MQTT服務配置中指定 private static final String clientid = "server11"; private MqttClient client; private MqttTopic topic11; private String userName = "paho"; //非必須 private String passWord = ""; //非必須 private MqttMessage message; /** * 構造函數 * @throws MqttException */ public ServerMQTT() throws MqttException { // MemoryPersistence設置clientid的保存形式,默認為以內存保存 client = new MqttClient(HOST, clientid, new MemoryPersistence()); connect(); } /** * 用來連接服務器 */ private void connect() { MqttConnectOptions options = new MqttConnectOptions(); options.setCleanSession(false); options.setUserName(userName); options.setPassword(passWord.toCharArray()); // 設置超時時間 options.setConnectionTimeout(10); // 設置會話心跳時間 options.setKeepAliveInterval(20); try { client.setCallback(new PushCallback()); client.connect(options); topic11 = client.getTopic(TOPIC); } catch (Exception e) { e.printStackTrace(); } } /** * * @param topic * @param message * @throws MqttPersistenceException * @throws MqttException */ public void publish(MqttTopic topic , MqttMessage message) throws MqttPersistenceException, MqttException { MqttDeliveryToken token = topic.publish(message); token.waitForCompletion(); System.out.println("message is published completely! " + token.isComplete()); } /** * 啟動入口 * @param args * @throws MqttException */ public static void main(String[] args) throws MqttException { ServerMQTT server = new ServerMQTT(); server.message = new MqttMessage(); server.message.setQos(1); //保證消息能到達一次 server.message.setRetained(true); server.message.setPayload("這是推送消息的內容".getBytes()); server.publish(server.topic11 , server.message); System.out.println(server.message.isRetained() + "------ratained狀態"); } }
2、對消息進行處理的回調函數代碼(必須)
package com.sc.util.paho; import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken; import org.eclipse.paho.client.mqttv3.MqttCallback; import org.eclipse.paho.client.mqttv3.MqttMessage; /** * 發布消息的回調類 * * 必須實現MqttCallback的接口並實現對應的相關接口方法CallBack 類將實現 MqttCallBack。 * 每個客戶機標識都需要一個回調實例。在此示例中,構造函數傳遞客戶機標識以另存為實例數據。 * 在回調中,將它用來標識已經啟動了該回調的哪個實例。 * 必須在回調類中實現三個方法: * * public void messageArrived(MqttTopic topic, MqttMessage message)接收已經預訂的發布。 * * public void connectionLost(Throwable cause)在斷開連接時調用。 * * public void deliveryComplete(MqttDeliveryToken token)) * 接收到已經發布的 QoS 1 或 QoS 2 消息的傳遞令牌時調用。 * 由 MqttClient.connect 激活此回調。 * */ public class PushCallback implements MqttCallback { public void connectionLost(Throwable cause) { // 連接丟失后,一般在這里面進行重連 System.out.println("連接斷開,可以做重連"); } public void deliveryComplete(IMqttDeliveryToken token) { System.out.println("deliveryComplete---------" + token.isComplete()); } public void messageArrived(String topic, MqttMessage message) throws Exception { // subscribe后得到的消息會執行到這里面 System.out.println("接收消息主題 : " + topic); System.out.println("接收消息Qos : " + message.getQos()); System.out.println("接收消息內容 : " + new String(message.getPayload())); } }
3、客戶端代碼(接收消息)
package com.sc.util.paho; import java.util.concurrent.ScheduledExecutorService; import org.eclipse.paho.client.mqttv3.MqttClient; import org.eclipse.paho.client.mqttv3.MqttConnectOptions; import org.eclipse.paho.client.mqttv3.MqttException; import org.eclipse.paho.client.mqttv3.MqttTopic; import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence; /** * 模擬一個客戶端接收消息 * @author rao * */ public class ClientMQTT { public static final String HOST = "tcp://192.168.1.102:1883"; public static final String TOPIC1 = "pos_message_all"; private static final String clientid = "client11"; private MqttClient client; private MqttConnectOptions options; private String userName = "admin"; //非必須 private String passWord = "password"; //非必須 @SuppressWarnings("unused") private ScheduledExecutorService scheduler; private void start() { try { // host為主機名,clientid即連接MQTT的客戶端ID,一般以唯一標識符表示,MemoryPersistence設置clientid的保存形式,默認為以內存保存 client = new MqttClient(HOST, clientid, new MemoryPersistence()); // MQTT的連接設置 options = new MqttConnectOptions(); // 設置是否清空session,這里如果設置為false表示服務器會保留客戶端的連接記錄,設置為true表示每次連接到服務器都以新的身份連接 options.setCleanSession(false); // 設置連接的用戶名 options.setUserName(userName); // 設置連接的密碼 options.setPassword(passWord.toCharArray()); // 設置超時時間 單位為秒 options.setConnectionTimeout(10); // 設置會話心跳時間 單位為秒 服務器會每隔1.5*20秒的時間向客戶端發送個消息判斷客戶端是否在線,但這個方法並沒有重連的機制 options.setKeepAliveInterval(20); // 設置回調 client.setCallback(new PushCallback()); MqttTopic topic = client.getTopic(TOPIC1); //setWill方法,如果項目中需要知道客戶端是否掉線可以調用該方法。設置最終端口的通知消息 //遺囑 options.setWill(topic, "close".getBytes(), 2, true); client.connect(options); //訂閱消息 int[] Qos = {1}; String[] topic1 = {TOPIC1}; client.subscribe(topic1, Qos); } catch (Exception e) { e.printStackTrace(); } } public static void main(String[] args) throws MqttException { ClientMQTT client = new ClientMQTT(); client.start(); } }
4、運行以上代碼需要配置服務器
常用的搭建MQTT服務器的軟件是mosquitto,是一款實現了消息推送協議 MQTT v3.1 的開源消息代理軟件,可以很方便的搭載在linux、windows服務器上,我是搭載在linux服務器上使用。
mosquitto自帶有許多TOPIC(主題)供我們訂閱使用,可以實時的對服務器狀態進行監測和狀態響應以及統計,包括可以監測到當前連接的客戶端數,某時間段內消息收發的總量等。
關於服務器的配置可參考:http://www.cnblogs.com/Free-Thinker/p/5559816.html
mosquito內置主題具體可參考:http://blog.csdn.net/qhdcsj/article/details/44630201
三、將以上代碼調整為工具類供其他項目調用
1、服務端代碼及調用實例
1)服務端代碼(需要引入之前的回調函數)
package com.scpos.util; import org.eclipse.paho.client.mqttv3.MqttClient; import org.eclipse.paho.client.mqttv3.MqttConnectOptions; import org.eclipse.paho.client.mqttv3.MqttDeliveryToken; import org.eclipse.paho.client.mqttv3.MqttException; import org.eclipse.paho.client.mqttv3.MqttMessage; import org.eclipse.paho.client.mqttv3.MqttPersistenceException; import org.eclipse.paho.client.mqttv3.MqttTopic; import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence; /** * Title:Server 這是發送消息的服務端 * Description: 服務器向多個客戶端推送主題,即不同客戶端可向服務器訂閱相同主題 * @author rao */ public class ServerMQTTUtil { //tcp://MQTT安裝的服務器地址:MQTT定義的端口號 public static final String HOST = "tcp://dev.****.com.cn:1883"; //定義MQTT的ID,可以在MQTT服務配置中指定 private static final String clientid = "server11"; private MqttClient client; private MqttTopic mqttTopic; private String userName = "paho"; private String passWord = ""; /** * 構造函數 * @throws MqttException */ public ServerMQTTUtil(String topic) throws MqttException { // MemoryPersistence設置clientid的保存形式,默認為以內存保存 client = new MqttClient(HOST, clientid, new MemoryPersistence()); connect(topic); } /** * 用來連接服務器 */ private void connect(String topic) { MqttConnectOptions options = new MqttConnectOptions(); options.setCleanSession(false); options.setUserName(userName); options.setPassword(passWord.toCharArray()); // 設置超時時間 options.setConnectionTimeout(10); // 設置會話心跳時間 options.setKeepAliveInterval(20); try { // client.setCallback(new PushCallback()); client.connect(options); mqttTopic = client.getTopic(topic); } catch (Exception e) { e.printStackTrace(); } } //發送消息並獲取回執 public void publish(MqttMessage message) throws MqttPersistenceException, MqttException, InterruptedException { MqttDeliveryToken token = mqttTopic.publish(message); token.waitForCompletion(); System.out.println("message is published completely! " + token.isComplete()); System.out.println("messageId:" + token.getMessageId()); token.getResponse(); if (client.isConnected()) client.disconnect(10000); System.out.println("Disconnected: delivery token \"" + token.hashCode() + "\" received: " + token.isComplete()); } public String getUserName() { return userName; } public void setUserName(String userName) { this.userName = userName; } public String getPassWord() { return passWord; } public void setPassWord(String passWord) { this.passWord = passWord; } public MqttTopic getMqttTopic() { return mqttTopic; } public void setMqttTopic(MqttTopic mqttTopic) { this.mqttTopic = mqttTopic; } }
2)調用處代碼
//設置推送消息體 String sendTime = TimeUtils.getTime(); String msgType = "html"; String title = message.getTitle(); String content = message.getContent(); String createTime = message.getCreateTime(); Map<String, Object> data = new HashMap<String, Object>(); data.put("messageId", messageId); data.put("msgType", msgType); data.put("title", title); data.put("content", content); data.put("createTime", createTime); data.put("sendTime", sendTime); msgType = "posMessage"; Map<String, Object> messageMap = new HashMap<String, Object>(); messageMap.put("msgType", msgType); messageMap.put("data", data); String MQTTObject = JSON.toJSONString(messageMap); System.out.println(MQTTObject); //將信息寫入消息體 MqttMessage mqttMessage = new MqttMessage(); mqttMessage.setQos(1); mqttMessage.setRetained(true); mqttMessage.setPayload(MQTTObject.getBytes("UTF-8")); //根據deviceIdList內容進行topic設置和發送推送 if (!deviceIdList.equals("all")) { List deviceList = Arrays.asList(deviceIdList.split(",")); for (int i =0; i < deviceList.size(); i++) { String topic = "pos_message_" + deviceList.get(i); ServerMQTTUtil serverMQTTUtil = new ServerMQTTUtil(topic); serverMQTTUtil.publish(mqttMessage); System.out.println(mqttMessage.isRetained() + "------ratained狀態"); } } else { String topic = "pos_message_all"; ServerMQTTUtil serverMQTTUtil = new ServerMQTTUtil(topic); serverMQTTUtil.publish(mqttMessage); System.out.println(mqttMessage.isRetained() + "------ratained狀態"); }
2、客戶端代碼及調用實例
1)客戶端代碼
package com.sichang.util; import java.util.concurrent.ScheduledExecutorService; import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken; import org.eclipse.paho.client.mqttv3.MqttCallback; import org.eclipse.paho.client.mqttv3.MqttClient; import org.eclipse.paho.client.mqttv3.MqttConnectOptions; import org.eclipse.paho.client.mqttv3.MqttException; import org.eclipse.paho.client.mqttv3.MqttMessage; import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence; /** * 模擬一個客戶端接收消息 * @author rao * */ public class ClientSearch { public static String str=""; public static final String HOST = "tcp://dev.****.com.cn:1883"; //服務器內置主題,用來監測當前服務器上連接的客戶端數量($SYS/broker/clients/connected) public static final String TOPIC1 = "$SYS/broker/clients/connected"; private static final String clientid = "client13"; private MqttClient client; private MqttConnectOptions options; private String userName = "admin"; private String passWord = "password"; @SuppressWarnings("unused") private ScheduledExecutorService scheduler; public void start() throws MqttException { // host為主機名,clientid即連接MQTT的客戶端ID,一般以唯一標識符表示,MemoryPersistence設置clientid的保存形式,默認為以內存保存 client = new MqttClient(HOST, clientid, new MemoryPersistence()); // MQTT的連接設置 options = new MqttConnectOptions(); // 設置是否清空session,這里如果設置為false表示服務器會保留客戶端的連接記錄,設置為true表示每次連接到服務器都以新的身份連接 options.setCleanSession(false); // 設置連接的用戶名 options.setUserName(userName); // 設置連接的密碼 options.setPassword(passWord.toCharArray()); // 設置超時時間 單位為秒 options.setConnectionTimeout(10); // 設置會話心跳時間 單位為秒 服務器會每隔1.5*20秒的時間向客戶端發送個消息判斷客戶端是否在線,但這個方法並沒有重連的機制 options.setKeepAliveInterval(20); // 設置回調 client.setCallback(new MqttCallback(){ public void connectionLost(Throwable cause) { // 連接丟失后,一般在這里面進行重連 // System.out.println("連接斷開,可以做重連"); } public void deliveryComplete(IMqttDeliveryToken token) { System.out.println("deliveryComplete---------" + token.isComplete()); } public void messageArrived(String topic, MqttMessage message) throws Exception { try { str = message.toString(); // System.out.println(" 從服務器收到的消息為:"+message.toString()); } catch (Exception e) { e.printStackTrace(); } } }); // MqttTopic topic = client.getTopic(TOPIC1); // setWill方法,如果項目中需要知道客戶端是否掉線可以調用該方法。設置最終端口的通知消息 // options.setWill(topic, "close".getBytes(), 2, true); //遺囑 client.connect(options); //訂閱消息 int[] Qos = {1}; String[] topic1 = {TOPIC1}; client.subscribe(topic1, Qos); } @SuppressWarnings("static-access") //用來給調用處返回消息內容 public String resc() { return this.str; } }
2)調用處代碼
下面線程的作用是每三秒鍾返回服務器上的客戶端連接數
public void runTask() { final long timeInterval = 2000;// 兩秒運行一次 Runnable runnable = new Runnable() { public void run() { while (true) { // ------- code for task to run try { //你要運行的程序 ClientSearch clientSearch = new ClientSearch(); try { clientSearch.start(); } catch (MqttException e) { e.printStackTrace(); } Thread.sleep(1000); //給一秒時間接收服務器消息 Integer num = Integer.valueOf(clientSearch.resc()); System.out.println("當前客戶端連接數:"+num); } catch (InterruptedException e) { e.printStackTrace(); } // ------- ends here try { Thread.sleep(timeInterval); } catch (InterruptedException e) { e.printStackTrace(); } } } }; Thread thread = new Thread(runnable); thread.start(); }