1、客戶端庫下載
下載地址:https://www.eclipse.org/paho/downloads.php
如下圖所示,有不用編程語言當前支持情況說明。
如下圖所示,咱們此處已Java為例,下載正式發布的版本。
當前最新版本為Java最新版本為1.2.2。
下載到的jar包如下圖所示:
將該jar包導入到我們的項目中,就可以使用了。
2、登陸連接
先創建MqttClinet對象。
private volatile MqttClient mqttClient; private volatile MqttMessage mqttMessage; private MqttServerEntity mqttServerEntity; // 初始化MQTTClient對象 private void initClient() { try { mqttClient = new MqttClient(getHostUrl(), getClientId()); } catch (MqttException e) { LogUtils.error(logger, e); mqttClient = null; } }
封裝連接參數。
設置回調接口。
准備工作做好后,執行連接即可。
// 連接MQTT服務器 public void startClient() { initClient(); if (mqttClient == null) { LogUtils.info(logger, "mqttClient is null"); return; } MqttConnectOptions options = new MqttConnectOptions(); options.setUserName(mqttServerEntity.getUsername()); options.setPassword(mqttServerEntity.getPassword().toCharArray()); options.setConnectionTimeout(5); // 設置超時時間 options.setCleanSession(getCleanSession()); options.setKeepAliveInterval(getKeepAliveInterval());// 設置會話心跳時間 options.setAutomaticReconnect(true); // 自動重連 try { mqttClient.setCallback(new BtcMqttCallback()); mqttClient.connect(options); subscribe(); } catch (Exception e) { LogUtils.error(logger, e); } LogUtils.info(logger, "startClient() isConnected:" + mqttClient.isConnected()); }
3、訂閱主題
訂閱主題發生在服務器連接登陸成功之后,這里主要有兩點,發布消息的服務質量、以及訂閱的主題信息。
// 訂閱主題 private void subscribe() { try { int[] Qos = {getQos()}; String[] topic1 = {mqttServerEntity.getSubscribeTopic()}; mqttClient.subscribe(topic1, Qos); } catch (Exception e) { LogUtils.error(logger, e); } }
4、發送消息
發送消息時要保證當前客戶端與服務器處於連接成功的狀態。將主題及消息封裝好后,調用發送接口即可。
// 發送消息 public void sendToMqtt(String data, @Header(MqttHeaders.TOPIC) String topic) { try { if (mqttMessage == null) { mqttMessage = new MqttMessage(); mqttMessage.setQos(getQos()); mqttMessage.setRetained(true); }
mqttMessage.setPayload(data.getBytes("UTF-8")); mqttClient.publish(topic, mqttMessage); } catch (Exception e) { LogUtils.error(logger, e); } }
5、消息接收
消息接收是采用回調接口的形式,是建立連接之前設置的,連接成功之后,只有有消息就會回調到下面的方法。
public class BtcMqttCallback implements MqttCallbackExtended { public void connectionLost(Throwable cause) { LogUtils.info(logger, "connection lost"); } public void deliveryComplete(IMqttDeliveryToken token) { LogUtils.info(logger, "delivery Complete:" + token.isComplete()); } public void messageArrived(String topic, MqttMessage message) { String msg = new String(message.getPayload(), Charset.forName("UTF-8")); LogUtils.info(logger, "messageArrived() topic:" + topic); LogUtils.info(logger, msg); MessageCache.getInstance().putMessage(msg); } @Override public void connectComplete(boolean reconnect, String serverURI) { LogUtils.info(logger, "connectComplete() reconnect:" + reconnect + " serverURI:" + serverURI); subscribe(); } }
【參考資料】