MQTT Java Client開發


1、客戶端庫下載

下載地址:https://www.eclipse.org/paho/downloads.php

如下圖所示,有不用編程語言當前支持情況說明。

image

如下圖所示,咱們此處已Java為例,下載正式發布的版本。

image

當前最新版本為Java最新版本為1.2.2。

image

下載到的jar包如下圖所示:

clip_image002

將該jar包導入到我們的項目中,就可以使用了。

2、登陸連接

先創建MqttClinet對象。

private volatile MqttClient mqttClient; 
private volatile MqttMessage mqttMessage;
private MqttServerEntity mqttServerEntity;
// 初始化MQTTClient對象
private void initClient() {
    try {
        mqttClient = new MqttClient(getHostUrl(), getClientId());
    } catch (MqttException e) {
        LogUtils.error(logger, e);
        mqttClient = null;
    }
}

封裝連接參數。

設置回調接口。

准備工作做好后,執行連接即可。

// 連接MQTT服務器
public void startClient() {
    initClient();
    if (mqttClient == null) {
        LogUtils.info(logger, "mqttClient is null");
        return;
    }
    MqttConnectOptions options = new MqttConnectOptions();
    options.setUserName(mqttServerEntity.getUsername());
    options.setPassword(mqttServerEntity.getPassword().toCharArray());
    options.setConnectionTimeout(5); // 設置超時時間
    options.setCleanSession(getCleanSession());
    options.setKeepAliveInterval(getKeepAliveInterval());// 設置會話心跳時間
    options.setAutomaticReconnect(true); // 自動重連
    try {
        mqttClient.setCallback(new BtcMqttCallback());
        mqttClient.connect(options);
        subscribe();
    } catch (Exception e) {
        LogUtils.error(logger, e);
    }
    LogUtils.info(logger, "startClient() isConnected:" + mqttClient.isConnected());
}

3、訂閱主題

訂閱主題發生在服務器連接登陸成功之后,這里主要有兩點,發布消息的服務質量、以及訂閱的主題信息。

// 訂閱主題
private void subscribe() {
    try {
        int[] Qos = {getQos()};
        String[] topic1 = {mqttServerEntity.getSubscribeTopic()};
        mqttClient.subscribe(topic1, Qos);
    } catch (Exception e) {
        LogUtils.error(logger, e);
    }
}

4、發送消息

發送消息時要保證當前客戶端與服務器處於連接成功的狀態。將主題及消息封裝好后,調用發送接口即可。

// 發送消息
public void sendToMqtt(String data, @Header(MqttHeaders.TOPIC) String topic) {
    try {
        if (mqttMessage == null) {
            mqttMessage = new MqttMessage();
            mqttMessage.setQos(getQos());
            mqttMessage.setRetained(true);
        }
        mqttMessage.setPayload(data.getBytes("UTF-8")); 
        mqttClient.publish(topic, mqttMessage);
    } catch (Exception e) {
        LogUtils.error(logger, e);
    }
}

5、消息接收

消息接收是采用回調接口的形式,是建立連接之前設置的,連接成功之后,只有有消息就會回調到下面的方法。

public class BtcMqttCallback implements MqttCallbackExtended {
    public void connectionLost(Throwable cause) {
        LogUtils.info(logger, "connection lost");
    }
    public void deliveryComplete(IMqttDeliveryToken token) {
        LogUtils.info(logger, "delivery Complete:" + token.isComplete());
    }
    public void messageArrived(String topic, MqttMessage message) {
        String msg = new String(message.getPayload(), Charset.forName("UTF-8"));
        LogUtils.info(logger, "messageArrived() topic:" + topic);
        LogUtils.info(logger, msg);
        MessageCache.getInstance().putMessage(msg);
    }
    @Override
    public void connectComplete(boolean reconnect, String serverURI) {
        LogUtils.info(logger, "connectComplete() reconnect:" + reconnect + " serverURI:" + serverURI);
        subscribe();
    }
}

 

 

【參考資料】

https://www.eclipse.org/paho/clients/java/


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM