關於MQTT的應用場景對接


 

通過 CTWing中國電信物聯網開放平台 實現AEP設備接入(MQTT協議)

AEP設備接入文檔  

 

 

Maven依賴

<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.12</version>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.54</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.eclipse.paho/org.eclipse.paho.client.mqttv3 -->
<dependency>
<groupId>org.eclipse.paho</groupId>
<artifactId>org.eclipse.paho.client.mqttv3</artifactId>
<version>1.2.0</version>
</dependency>
<!-- 第三方util -->
<dependency>
<groupId>cn.hutool</groupId>
<artifactId>hutool-all</artifactId>
<version>4.1.2</version>
</dependency>
<!--aep-->
<dependency>
<groupId>com.ctg.ag</groupId>
<artifactId>ag-sdk-biz-64993.tar.gz</artifactId>
<version>20210714.173418-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>com.ctg.ag</groupId>
<artifactId>ctg-ag-sdk-core</artifactId>
<version>2.5.0-SNAPSHOT</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.httpcomponents/httpclient -->
<dependency>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpclient</artifactId>
<version>4.5.5</version>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
<version>3.1</version>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<scope>runtime</scope>
</dependency>
<!-- HikariCP連接池 -->
<dependency>
<groupId>com.zaxxer</groupId>
<artifactId>HikariCP</artifactId>
</dependency>
<!--pagehelper 分頁組件 -->
<dependency>
<groupId>com.github.pagehelper</groupId>
<artifactId>pagehelper-spring-boot-starter</artifactId>
<version>1.2.10</version>
</dependency>
<dependency>
<groupId>org.mybatis.spring.boot</groupId>
<artifactId>mybatis-spring-boot-starter</artifactId>
<version>2.0.0</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-stream</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-mqtt</artifactId>
</dependency>
</dependencies>

配置文件application.yml

server:
  port: 110
spring:
  main:
    allow-bean-definition-overriding: true
  basePackage: com.test.device
  resources:
    static-locations: classpath:/static, classpath:/templates
  jackson:
    #參數意義:
    #JsonInclude.Include.ALWAYS       默認
    #JsonInclude.Include.NON_DEFAULT   屬性為默認值不序列化
    #JsonInclude.Include.NON_EMPTY     屬性為 空(””) 或者為 NULL 都不序列化
    #JsonInclude.Include.NON_NULL      屬性為NULL  不序列化
    default-property-inclusion: ALWAYS
    time-zone: GMT+8
    date-format: yyyy-MM-dd HH:mm:ss
  datasource:
    type: com.zaxxer.hikari.HikariDataSource
    url: jdbc:mysql://127.0.0.1:3306/ctwing_db?characterEncoding=utf8&serverTimezone=Asia/Shanghai
    username: root
    password: root
    # 連接池配置
    hikari:
      # 等待連接池分配連接的最大時長(毫秒),超過這個時長還沒可用的連接則發生SQLException, 缺省:30秒
      connection-timeout: 30000
      # 一個連接idle狀態的最大時長(毫秒),超時則被釋放(retired),缺省:10分鍾
      idle-timeout: 600000
      # 一個連接的生命時長(毫秒),超時而且沒被使用則被釋放(retired),缺省:30分鍾,建議設置比數據庫超時時長少30秒,參考MySQL
      max-lifetime: 1800000
      # 連接池中允許的最大連接數。缺省值:10;推薦的公式:((core_count * 2) + effective_spindle_count)
      maximum-pool-size: 2
      minimum-idle: 1
  servlet:
    multipart:
      max-file-size: 10MB
      max-request-size: 10MB
mybatis:
  type-aliases-package: ${spring.basePackage}
  mapper-locations: classpath:mapper/*/*.xml
  configuration:
    log-impl: org.apache.ibatis.logging.stdout.StdOutImpl
logging:
  config: classpath:config/logback.xml
log:
  path: C:\logs # 日志存放路徑
  enableLogType: 1234   # 日志類型  增改刪查
timing:
http_whether: 0 #1開啟,非1不開啟定時(0)
mqtt_whether: 0 #1開啟,非1不開啟定時(0)
concurrent: 1 #循環並發下發次數
ctwing:
  mqtt:
    productId: ******  # 產品Id,平台產品信息中獲取
    clientId: ******   #必填。填寫平台添加設備時生成的設備ID。
    userName: ****** #必填。建議填寫為CTWing平台用戶名。
    passWord: ******* #必填。填寫平台為設備自動分配的特征串。
    broker: tcp://mqtt.ctwing.cn:1883 #mq地址
    qos: 1 #質量等級
    appKey: *******    #aep應用中配置
    appSecret: ******* #aep應用中配置
    masterKey: *******   #MasterKey為平台上創建產品時自動生成,可在產品概況中查詢。

Aep發布訂閱

import com.alibaba.fastjson.JSON;import org.eclipse.paho.client.mqttv3.*;
import java.util.HashMap;
import java.util.Map;

public class MqttAepPaho {

    final Map<String,MqttClient> clientMap=new HashMap<>();
    static final Map<String,MqttAepPaho> aepPahoMap=new HashMap<>();
    private static MqttAepPaho instance;
    MQTTConfig mqttConfig;
    TimingConfig timingConfig;

    private MqttAepPaho (){}

    public MqttAepPaho(MQTTConfig mqttConfig, TimingConfig timingConfig) {
        this.mqttConfig = mqttConfig;
        this.timingConfig = timingConfig;
    }

    //運行時加載對象
    public static MqttAepPaho getInstance(DeviceInfo deviceInfo,MQTTConfig mqttConfig, TimingConfig timingConfig){
        instance = null;
        instance = aepPahoMap.get(deviceInfo.getDeviceId());
        if (instance == null) {
            //todo 重新連接
            System.out.println("重新連接");
            new MqttAepPaho(mqttConfig,timingConfig).start(deviceInfo);
        }
        return instance;
    }

    /**
     * AEP 發布
     * @param deviceInfo 設備ID+設備特征串
     * @param topic 服務標識 平台會生成相應Topic,Publish報文的Topic字段填寫相應服務標識
     * @param map 服務標識-參數
     */
    public void topic(DeviceInfo deviceInfo,String topic, Map map){
        try {
            MqttClient client=clientMap.get(deviceInfo.getDeviceId());
            if(client==null){
                client=connect(deviceInfo);
            }
            System.out.println("當前發布獲取實例:"+deviceInfo.getDeviceId()+"---"+client);
            //在另一個線程中發送消息
            publishMsg(topic, JSON.toJSONString(map), mqttConfig.getQos(), client);
            //斷開服務連接
            //client.disconnect();
            //System.out.println("斷開服務連接");
            System.out.println("已完成");
        } catch(MqttException me) {
            System.out.println("reason "+me.getReasonCode());
            System.out.println("msg "+me.getMessage());
            System.out.println("loc "+me.getLocalizedMessage());
            System.out.println("cause "+me.getCause());
            System.out.println("exception "+me);
            me.printStackTrace();
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            System.out.println("正常退出");
            //System.exit(0);
        }
    }

    //推送
    private void publishMsg(String topic, String content, int qos, MqttClient client) throws MqttException {
        //循環發送10次消息
        for (int times =0 ;times<timingConfig.getConcurrent(); times++) {
            System.out.println(String.format("%d 循環發送消息: %s", times, content));
            //創建消息內容
            MqttMessage message = new MqttMessage(content.getBytes());
            //設置質量級別
            message.setQos(qos);
            //發送消息
            client.publish(topic, message);
            System.out.println("Message published");
        }
    }

    /**
     * AEP訂閱
     * @param deviceInfo 設備ID+設備特征串
     */
    public void start(DeviceInfo deviceInfo) {
        try {
            MqttClient client=clientMap.get(deviceInfo.getDeviceId());
            if(client==null){
                client=connect(deviceInfo);
            }
            System.out.println("當前訂閱獲取實例:"+deviceInfo.getDeviceId()+"---"+client);
            //訂閱消息
            int[] Qos  = CommonConstants.Qos;
            String[] topic1 = CommonConstants.topic;
            client.subscribe(topic1, Qos);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    //連接
    private MqttClient connect(DeviceInfo deviceInfo) throws Exception{
        // host為主機名,clientId即連接MQTT的客戶端ID,一般以客戶端唯一標識符表示,MemoryPersistence設置clientId的保存形式,默認為以內存保存
        MqttClient client = new MqttClient(mqttConfig.getBroker(), deviceInfo.getDeviceId(), null);
        // MQTT的連接設置
        MqttConnectOptions options = new MqttConnectOptions();
        // 設置是否清空session,這里如果設置為false表示服務器會保留客戶端的連接記錄,這里設置為true表示每次連接到服務器都以新的身份連接
        options.setCleanSession(true);
        // 設置連接的用戶名
        options.setUserName(mqttConfig.getUserName());
        // 設置連接的密碼
        options.setPassword(deviceInfo.getToken().toCharArray());
        // 設置超時時間 單位為秒
        options.setConnectionTimeout(10);
        // 設置會話心跳時間 單位為秒 服務器會每隔1.5*20秒的時間向客戶端發送個消息判斷客戶端是否在線,但這個方法並沒有重連的機制
        options.setKeepAliveInterval(90);
        // 設置回調
        client.setCallback(new MqttAepCallback(deviceInfo,this));
        client.connect(options);
        //緩存實例
        clientMap.put(deviceInfo.getDeviceId(),client);
        aepPahoMap.put(deviceInfo.getDeviceId(),this);
        instance=this;
        return client;
    }

}

配置類

import lombok.Getter;
import lombok.Setter;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.stereotype.Component;

@ConfigurationProperties(prefix = "timing", ignoreInvalidFields = true)
@Getter
@Setter
@Component
public class TimingConfig {
    /** 是否開啟定時上報 */
    private int http_whether;
    private int mqtt_whether;
    /** 循環並發下發次數*/
    private int concurrent;
}
import lombok.Getter;
import lombok.Setter;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.stereotype.Component;

@ConfigurationProperties(prefix = "ctwing.mqtt", ignoreInvalidFields = true)
@Getter
@Setter
@Component
public class MQTTConfig {
    /** 必填。建議填寫為CTWing平台用戶名。 */
    private String userName;
    /** 必填。填寫平台為設備自動分配的特征串。 */
    private String passWord;
    /* mq地址*/
    private String broker;
    /*質量等級 */
    private int qos;
    /*產品Id,平台產品信息中獲取*/
    private int productId;
    /*必填。填寫平台添加設備時生成的設備ID。*/
    private String clientId;
    /*aep應用中配置*/
    private String appSecret;
    /*aep應用中配置*/
    private String appKey;
    /*MasterKey為平台上創建產品時自動生成,可在產品概況中查詢。*/
    private String masterKey;
}

訂閱回調

import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import java.nio.charset.StandardCharsets;

@Slf4j
public class MqttAepCallback implements MqttCallback {

    private DeviceInfoDao deviceInfoDao = ContextHolder.getBean(DeviceInfoDao.class);private DeviceInfo deviceInfo;
    private MqttAepPaho mqttAepPaho;

    public MqttAepCallback() {
    }

    public MqttAepCallback(DeviceInfo deviceInfo,MqttAepPaho mqttAepPaho) {
        this.deviceInfo = deviceInfo;
        this.mqttAepPaho = mqttAepPaho;
    }

    // 連接丟失后,一般在這里面進行重連
    @Override
    public void connectionLost(Throwable arg0) {
        log.info("Connection Lost:{}",arg0.getMessage());
    }

    // subscribe后得到的消息會執行到這里面
    @Override
    public void messageArrived(String s, MqttMessage mqttMessage) throws MqttException {
        log.info("當前clientId:{},特征串:{}",deviceInfo.getDeviceId(),deviceInfo.getToken());
        log.info("接收消息messageId:{}", mqttMessage.getId());
        log.info("接收消息Qos:{}", mqttMessage.getQos());
        log.info("接收消息內容:{} from 接收消息主題:{}",mqttMessage,s);
        String ss = new String(mqttMessage.getPayload(), StandardCharsets.UTF_8);
        log.info("字節轉Json:{}",ss);
        ReceiptStrategyContext receiptStrategyContext = new ReceiptStrategyContext();
        if(s.equals(TopicEnum.open_cmd.getTopic())){
            receiptStrategyContext.setReceiptHandleStrategy(new Mqtt8001ReceiptHandleStrategy(mqttAepPaho,deviceInfo,deviceInfoDao));
        }else{
            log.info("未知訂閱主題");
            return;
        }
        receiptStrategyContext.handleReceipt(new Receipt(){{setTopic(s);setMessage(ss);}});
    }

    //消息發送成功后,調用
    @Override
    public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
        log.info("消息發送成功后,調用");
        if(iMqttDeliveryToken.isComplete()){
            log.info("Delivery a Msg to Topic:{}",iMqttDeliveryToken.getTopics()[0]);
        }
    }
}

回調策略

/**
 * 上下文類,持有策略接口
 */
public class ReceiptStrategyContext {

    private ReceiptHandleStrategy receiptHandleStrategy;

    //設置策略接口
    public void setReceiptHandleStrategy(ReceiptHandleStrategy receiptHandleStrategy) {
        this.receiptHandleStrategy = receiptHandleStrategy;
    }
    public void handleReceipt(Receipt receipt){
        if (receiptHandleStrategy != null) {
            receiptHandleStrategy.handleReceipt(receipt);
        }
    }
}
public interface ReceiptHandleStrategy {

    //回執處理策略接口
    void handleReceipt(Receipt receipt);
}
@Slf4j
public class Mqtt8001ReceiptHandleStrategy implements ReceiptHandleStrategy {

    private MqttAepPaho mqttAepPaho;
    private DeviceInfo deviceInfo;
    private DeviceInfoDao deviceInfoDao;

    public Mqtt8001ReceiptHandleStrategy(MqttAepPaho mqttAepPaho,DeviceInfo deviceInfo,DeviceInfoDao deviceInfoDao) {
        this.mqttAepPaho = mqttAepPaho;
        this.deviceInfo = deviceInfo;
        this.deviceInfoDao = deviceInfoDao;
    }

    @Override
    public void handleReceipt(Receipt receipt) {
        log.info("解析報文MQTT8001:{},當前實例:{},當前設備信息:{}",receipt.getMessage(),mqttAepPaho,deviceInfo);
        AepRespVo<JSONObject> aepRespVo= JSON.parseObject(receipt.getMessage(), AepRespVo.class);
        CmdRespReport report=new CmdRespReport(){{setTaskId(aepRespVo.getTaskId());setResultPayload(new CmdResp());}};
        log.info("設備回復AEP指令響應");
        mqttAepPaho.topic(deviceInfo, TopicEnum.cmd_resp.getTopic(), MapUtil.beanToMap(report));
        OpenCmd openCmd=JSON.toJavaObject(aepRespVo.getPayload(),OpenCmd.class);
        //@todo 是否判斷開門失敗成功? 上報開門記錄?存儲開門日志?
        log.info("遠程開門指令開始上報遠程開門結果");
        RemoteOpenReport remoteOpenReport=new RemoteOpenReport();
        remoteOpenReport.setMsg_id(openCmd.getMsg_id());
        remoteOpenReport.setRemoteopen_params(JSON.toJSONString(openCmd));
        mqttAepPaho.topic(deviceInfo,TopicEnum.remoteopen_report.getTopic(),MapUtil.beanToMap(remoteOpenReport));
    }
}

DTO

import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import lombok.Data;

@Data
@JsonIgnoreProperties(ignoreUnknown = true)
public class AepRespVo <T>{

    private int taskId;
    private T payload;
}
import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import lombok.Data;

/**
 * 訂閱指令信息
 */
@Data
@JsonIgnoreProperties(ignoreUnknown = true)
public class Receipt {

    //回執信息(json字符串)
    String message;
    //回執主題(`open_cmd、、、、、、、、、、`)
    String topic;
}

指令下發

    @Scheduled(cron = "*/5 * * * * ?")
    public void mqttConfigureTasks(){
        //@todo 固定上報yml中配置的設備ID,批量上報設備需獲取設備ID+特征串
        if(config.getMqtt_whether()==1){
            List<DeviceInfo> list=deviceInfoDao.findDeviceByToken(new DeviceInfo());
            list.forEach(i->{
                MqttReport(i,2);
            });
        }else{
            log.info("未開啟MQTT定時器");
        }
    }

    public void MqttReport(DeviceInfo deviceInfo,int id){
        MqttBusinessDto dto=new MqttBusinessDto();
        switch (id) {case 2://心跳
                dto.setTopic(TopicEnum.heartbeat.getTopic());
                dto.setMap(MapUtil.beanToMap(new Heartbeat(){{setIMEI("001");}}));
                break;default:
                break;
        }
        MqttAepPaho.getInstance(deviceInfo,mqttConfig,config).topic(deviceInfo,dto.getTopic(),dto.getMap());
    }

DTO

import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import lombok.Data;
import java.util.Map;

@Data
@JsonIgnoreProperties(ignoreUnknown = true)
public class MqttBusinessDto {

    //設備Id,平台設備信息管理中獲取。
    private String clientId;
    //設備特征串。平台生成,每個設備唯一
    private String passWord;
    //服務標識
    private String topic;
    //業務數據。僅支持JSON格式。非透傳產品需根據服務定義填寫。
    private Map map;
}
import java.util.Date;
import lombok.Getter;
import lombok.Setter;
import lombok.NoArgsConstructor;
import org.springframework.format.annotation.DateTimeFormat;

@Getter
@Setter
@NoArgsConstructor
public class DeviceInfo {

    private static final long serialVersionUID = 1L;

    /** 設備id */
    private String deviceId = "";
    /** 設備編號 */
    private String deviceSn = "";
    /** 終端名稱 */
    private String deviceName = "";
    /** 租戶id */
    private String tenantId = "";
    /** 產品id */
    private int productId;
    /** 版本信息 */
    private String firmwareVersion = "";
    /** 設備狀態 0:已注冊,1:已激活,2:已注銷 */
    private int deviceStatus;
    /** 激活時間 */
    @DateTimeFormat(pattern = "yyyy-MM-dd HH:mm:ss")
    private Date activeTime;
    /** 注銷時間 */
    @DateTimeFormat(pattern = "yyyy-MM-dd HH:mm:ss")
    private Date logoutTime;
    /** 設備在線狀態 */
    private int netStatus;
    /** 設備最后上線時間 */
    @DateTimeFormat(pattern = "yyyy-MM-dd HH:mm:ss")
    private Date onlineAt;
    /** 設備最后下線時間 */
    @DateTimeFormat(pattern = "yyyy-MM-dd HH:mm:ss")
    private Date offlineAt;
    /** 設備所在產品協議:1.T-LINK協議  2.MQTT協議  3.LWM2M協議  4.TUP協議  5.HTTP協議  6.JT/T808  7.TCP協議  8.私有TCP(網關子設備協議)  9.私有UDP(網關子設備協議)  10.網關產品MQTT(網關產品協議)  11.南向雲 */
    private int productProtocol;
    /** 設備特征串 */
    private String token = "";
    /** 枚舉值 : 0--正常 1--禁用 */
    private int disable_status;
}

部分上報接收MQ消息的實體Report和指令下發的實體Cmd根據AEP平台中定義的通訊來組裝。


 MQTT簡介

MQTT(Message Queuing Telemetry Transport,消息隊列遙測傳輸協議),是一種基於發布/訂閱(publish/subscribe)模式的“輕量級”通訊協議,該協議構建於TCP/IP協議上,由IBM在1999年發布。MQTT最大優點在於,可以以極少的代碼和有限的帶寬,為連接遠程設備提供實時可靠的消息服務。作為一種低開銷、低帶寬占用的即時通訊協議,使其在物聯網、小型設備、移動應用等方面有較廣泛的應用。

MQTT是一個基於客戶端-服務器的消息發布/訂閱傳輸協議。MQTT協議是輕量、簡單、開放和易於實現的,這些特點使它適用范圍非常廣泛。在很多情況下,包括受限的環境中,如:機器與機器(M2M)通信和物聯網(IoT)。其在,通過衛星鏈路通信傳感器、偶爾撥號的醫療設備、智能家居、及一些小型化設備中已廣泛使用。

MQTT特性

MQTT協議工作在低帶寬、不可靠的網絡的遠程傳感器和控制設備通訊而設計的協議,它具有以下主要的幾項特性:

  1. 使用發布/訂閱消息模式,提供一對多的消息發布,解除應用程序耦合。這一點很類似於XMPP,但是MQTT的信息冗余遠小於XMPP,,因為XMPP使用XML格式文本來傳遞數據。
  2. 對負載內容屏蔽的消息傳輸。
  3. 使用TCP/IP提供網絡連接。主流的MQTT是基於TCP連接進行數據推送的,但是同樣有基於UDP的版本,叫做MQTT-SN。這兩種版本由於基於不同的連接方式,優缺點自然也就各有不同了。
  4. 有三種消息發布服務質量:“至多一次”,消息發布完全依賴底層TCP/IP網絡。會發生消息丟失或重復。這一級別可用於如下情況,環境傳感器數據,丟失一次讀記錄無所謂,因為不久后還會有第二次發送。這一種方式主要普通APP的推送,倘若你的智能設備在消息推送時未聯網,推送過去沒收到,再次聯網也就收不到了。“至少一次”,確保消息到達,但消息重復可能會發生。“只有一次”,確保消息到達一次。在一些要求比較嚴格的計費系統中,可以使用此級別。在計費系統中,消息重復或丟失會導致不正確的結果。這種最高質量的消息發布服務還可以用於即時通訊類的APP的推送,確保用戶收到且只會收到一次。
  5. 小型傳輸,開銷很小(固定長度的頭部是2字節),協議交換最小化,以降低網絡流量。這就是為什么在介紹里說它非常適合“在物聯網領域,傳感器與服務器的通信,信息的收集”,要知道嵌入式設備的運算能力和帶寬都相對薄弱,使用這種協議來傳遞消息再適合不過了。
  6. 使用Last Will和Testament特性通知有關各方客戶端異常中斷的機制。Last Will:即遺言機制,用於通知同一主題下的其他設備發送遺言的設備已經斷開了連接。Testament:遺囑機制,功能類似於Last Will。

MQTT協議實現方式

實現MQTT協議需要客戶端和服務器端通訊完成,在通訊過程中,MQTT協議中有三種身份:發布者(Publish)、代理(Broker)(服務器)、訂閱者(Subscribe)。其中,消息的發布者和訂閱者都是客戶端,消息代理是服務器,消息發布者可以同時是訂閱者。MQTT傳輸的消息分為:主題(Topic)和負載(payload)兩部分:

  1. Topic,可以理解為消息的類型,訂閱者訂閱(Subscribe)后,就會收到該主題的消息內容(payload);
  2. payload,可以理解為消息的內容,是指訂閱者具體要使用的內容。

更多簡介傳送門


 MQTT搭建


 MQTT協議之訂閱及發布(使用paho-mqtt-client或mqttv3實現)

消息接收回調類

import org.eclipse.paho.client.mqttv3.MqttCallback;  
import org.eclipse.paho.client.mqttv3.MqttDeliveryToken; import org.eclipse.paho.client.mqttv3.MqttMessage; import org.eclipse.paho.client.mqttv3.MqttTopic; /** * 發布消息的回調類 * * 必須實現MqttCallback的接口並實現對應的相關接口方法 * CallBack 類將實現 MqttCallBack。每個客戶機標識都需要一個回調實例。在此示例中,構造函數傳遞客戶機標識以另存為實例數據。在回調中,將它用來標識已經啟動了該回調的哪個實例。 * 必須在回調類中實現三個方法: * * public void messageArrived(MqttTopic topic, MqttMessage message) * 接收已經預訂的發布。 * * public void connectionLost(Throwable cause) * 在斷開連接時調用。 * * public void deliveryComplete(MqttDeliveryToken token)) * 接收到已經發布的 QoS 1 或 QoS 2 消息的傳遞令牌時調用。由 MqttClient.connect 激活此回調。 */ public class PushCallback implements MqttCallback { public void connectionLost(Throwable cause) { // 連接丟失后,一般在這里面進行重連 System.out.println("連接斷開,可以做重連"); } public void deliveryComplete(MqttDeliveryToken token) { // publish后會執行到這里 System.out.println("deliveryComplete---------"+ token.isComplete()); } public void messageArrived(MqttTopic topic, MqttMessage message) throws Exception { // subscribe后得到的消息會執行到這里面 System.out.println("接收消息主題:"+topic.getName()); System.out.println("接收消息Qos:"+message.getQos()); System.out.println("接收消息內容:"+new String(message.getPayload())); } } 

服務端消息發布

import org.eclipse.paho.client.mqttv3.MqttClient;  
import org.eclipse.paho.client.mqttv3.MqttConnectOptions; import org.eclipse.paho.client.mqttv3.MqttDeliveryToken; import org.eclipse.paho.client.mqttv3.MqttException; import org.eclipse.paho.client.mqttv3.MqttMessage; import org.eclipse.paho.client.mqttv3.MqttPersistenceException; import org.eclipse.paho.client.mqttv3.MqttTopic; import org.eclipse.paho.client.mqttv3.internal.MemoryPersistence; public class Server { public static final String HOST = "tcp://127.0.0.1:1883"; public static final String TOPIC = "主題"; private static final String clientid ="server_clientid"; private MqttClient client; private MqttTopic topic; private String userName = "test"; private String passWord = "test"; private MqttMessage message; public Server() throws MqttException { //MemoryPersistence設置clientid的保存形式,默認為以內存保存 client = new MqttClient(HOST, clientid, new MemoryPersistence()); connect(); } private void connect() { MqttConnectOptions options = new MqttConnectOptions(); options.setCleanSession(false); options.setUserName(userName); options.setPassword(passWord.toCharArray()); // 設置超時時間 options.setConnectionTimeout(10); // 設置會話心跳時間 options.setKeepAliveInterval(20); try { client.setCallback(new PushCallback()); client.connect(options); topic = client.getTopic(TOPIC); } catch (Exception e) { e.printStackTrace(); } } public void publish(MqttMessage message) throws MqttPersistenceException, MqttException{ MqttDeliveryToken token = topic.publish(message); token.waitForCompletion(); System.out.println(token.isComplete()+"========"); } public static void main(String[] args) throws MqttException { Server server = new Server(); server.message = new MqttMessage(); server.message.setQos(1); server.message.setRetained(true); server.message.setPayload("eeeeeaaaaaawwwwww---".getBytes()); server.publish(server.message); System.out.println(server.message.isRetained()+"------ratained狀態"); } } 

客戶端接收消息

import java.util.concurrent.Executors;  
import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import org.eclipse.paho.client.mqttv3.MqttClient; import org.eclipse.paho.client.mqttv3.MqttConnectOptions; import org.eclipse.paho.client.mqttv3.MqttException; import org.eclipse.paho.client.mqttv3.MqttSecurityException; import org.eclipse.paho.client.mqttv3.MqttTopic; import org.eclipse.paho.client.mqttv3.internal.MemoryPersistence; public class Client { public static final String HOST = "tcp://127.0.0.1:1883"; 
public static final String TOPIC = "主題"; private static final String clientid = "server_clientid";
private MqttClient client; private MqttConnectOptions options; private String userName = "test"; private String passWord = "test"; private ScheduledExecutorService scheduler; //重新鏈接 public void startReconnect() { scheduler = Executors.newSingleThreadScheduledExecutor(); scheduler.scheduleAtFixedRate(new Runnable() { public void run() { if (!client.isConnected()) { try { client.connect(options); } catch (MqttSecurityException e) { e.printStackTrace(); } catch (MqttException e) { e.printStackTrace(); } } } }, 0 * 1000, 10 * 1000, TimeUnit.MILLISECONDS); } private void start() { try { // host為主機名,test為clientid即連接MQTT的客戶端ID,一般以客戶端唯一標識符表示,MemoryPersistence設置clientid的保存形式,默認為以內存保存 client = new MqttClient(HOST, clientid, new MemoryPersistence()); // MQTT的連接設置 options = new MqttConnectOptions(); // 設置是否清空session,這里如果設置為false表示服務器會保留客戶端的連接記錄,這里設置為true表示每次連接到服務器都以新的身份連接 options.setCleanSession(true); // 設置連接的用戶名 options.setUserName(userName); // 設置連接的密碼 options.setPassword(passWord.toCharArray()); // 設置超時時間 單位為秒 options.setConnectionTimeout(10); // 設置會話心跳時間 單位為秒 服務器會每隔1.5*20秒的時間向客戶端發送個消息判斷客戶端是否在線,但這個方法並沒有重連的機制 options.setKeepAliveInterval(20); // 設置回調 client.setCallback(new PushCallback()); MqttTopic topic = client.getTopic(TOPIC); //setWill方法,如果項目中需要知道客戶端是否掉線可以調用該方法。設置最終端口的通知消息 options.setWill(topic, "close".getBytes(), 0, true); client.connect(options); //訂閱消息 int[] Qos = {1}; String[] topic1 = {TOPIC}; client.subscribe(topic1, Qos); } catch (Exception e) { e.printStackTrace(); } } public void disconnect() { try { client.disconnect(); } catch (MqttException e) { e.printStackTrace(); } } public static void main(String[] args) throws MqttException { Client client = new Client(); client.start(); } }

Maven依賴

<dependency>
      <groupId>org.eclipse.paho</groupId>
      <artifactId>org.eclipse.paho.client.mqttv3</artifactId>
      <version>1.1.1</version>
</dependency>

原文鏈接


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM