通過 CTWing中國電信物聯網開放平台 實現AEP設備接入(MQTT協議)
Maven依賴
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.12</version>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.54</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.eclipse.paho/org.eclipse.paho.client.mqttv3 -->
<dependency>
<groupId>org.eclipse.paho</groupId>
<artifactId>org.eclipse.paho.client.mqttv3</artifactId>
<version>1.2.0</version>
</dependency>
<!-- 第三方util -->
<dependency>
<groupId>cn.hutool</groupId>
<artifactId>hutool-all</artifactId>
<version>4.1.2</version>
</dependency>
<!--aep-->
<dependency>
<groupId>com.ctg.ag</groupId>
<artifactId>ag-sdk-biz-64993.tar.gz</artifactId>
<version>20210714.173418-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>com.ctg.ag</groupId>
<artifactId>ctg-ag-sdk-core</artifactId>
<version>2.5.0-SNAPSHOT</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.httpcomponents/httpclient -->
<dependency>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpclient</artifactId>
<version>4.5.5</version>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
<version>3.1</version>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<scope>runtime</scope>
</dependency>
<!-- HikariCP連接池 -->
<dependency>
<groupId>com.zaxxer</groupId>
<artifactId>HikariCP</artifactId>
</dependency>
<!--pagehelper 分頁組件 -->
<dependency>
<groupId>com.github.pagehelper</groupId>
<artifactId>pagehelper-spring-boot-starter</artifactId>
<version>1.2.10</version>
</dependency>
<dependency>
<groupId>org.mybatis.spring.boot</groupId>
<artifactId>mybatis-spring-boot-starter</artifactId>
<version>2.0.0</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-stream</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-mqtt</artifactId>
</dependency>
</dependencies>
配置文件application.yml
server: port: 110 spring: main: allow-bean-definition-overriding: true basePackage: com.test.device resources: static-locations: classpath:/static, classpath:/templates jackson: #參數意義: #JsonInclude.Include.ALWAYS 默認 #JsonInclude.Include.NON_DEFAULT 屬性為默認值不序列化 #JsonInclude.Include.NON_EMPTY 屬性為 空(””) 或者為 NULL 都不序列化 #JsonInclude.Include.NON_NULL 屬性為NULL 不序列化 default-property-inclusion: ALWAYS time-zone: GMT+8 date-format: yyyy-MM-dd HH:mm:ss datasource: type: com.zaxxer.hikari.HikariDataSource url: jdbc:mysql://127.0.0.1:3306/ctwing_db?characterEncoding=utf8&serverTimezone=Asia/Shanghai username: root password: root # 連接池配置 hikari: # 等待連接池分配連接的最大時長(毫秒),超過這個時長還沒可用的連接則發生SQLException, 缺省:30秒 connection-timeout: 30000 # 一個連接idle狀態的最大時長(毫秒),超時則被釋放(retired),缺省:10分鍾 idle-timeout: 600000 # 一個連接的生命時長(毫秒),超時而且沒被使用則被釋放(retired),缺省:30分鍾,建議設置比數據庫超時時長少30秒,參考MySQL max-lifetime: 1800000 # 連接池中允許的最大連接數。缺省值:10;推薦的公式:((core_count * 2) + effective_spindle_count) maximum-pool-size: 2 minimum-idle: 1 servlet: multipart: max-file-size: 10MB max-request-size: 10MB mybatis: type-aliases-package: ${spring.basePackage} mapper-locations: classpath:mapper/*/*.xml configuration: log-impl: org.apache.ibatis.logging.stdout.StdOutImpl logging: config: classpath:config/logback.xml log: path: C:\logs # 日志存放路徑 enableLogType: 1234 # 日志類型 增改刪查
timing:
http_whether: 0 #1開啟,非1不開啟定時(0)
mqtt_whether: 0 #1開啟,非1不開啟定時(0)
concurrent: 1 #循環並發下發次數
ctwing: mqtt: productId: ****** # 產品Id,平台產品信息中獲取 clientId: ****** #必填。填寫平台添加設備時生成的設備ID。 userName: ****** #必填。建議填寫為CTWing平台用戶名。 passWord: ******* #必填。填寫平台為設備自動分配的特征串。 broker: tcp://mqtt.ctwing.cn:1883 #mq地址 qos: 1 #質量等級 appKey: ******* #aep應用中配置 appSecret: ******* #aep應用中配置 masterKey: ******* #MasterKey為平台上創建產品時自動生成,可在產品概況中查詢。
Aep發布訂閱
import com.alibaba.fastjson.JSON;import org.eclipse.paho.client.mqttv3.*; import java.util.HashMap; import java.util.Map; public class MqttAepPaho { final Map<String,MqttClient> clientMap=new HashMap<>(); static final Map<String,MqttAepPaho> aepPahoMap=new HashMap<>(); private static MqttAepPaho instance; MQTTConfig mqttConfig; TimingConfig timingConfig; private MqttAepPaho (){} public MqttAepPaho(MQTTConfig mqttConfig, TimingConfig timingConfig) { this.mqttConfig = mqttConfig; this.timingConfig = timingConfig; } //運行時加載對象 public static MqttAepPaho getInstance(DeviceInfo deviceInfo,MQTTConfig mqttConfig, TimingConfig timingConfig){ instance = null; instance = aepPahoMap.get(deviceInfo.getDeviceId()); if (instance == null) { //todo 重新連接 System.out.println("重新連接"); new MqttAepPaho(mqttConfig,timingConfig).start(deviceInfo); } return instance; } /** * AEP 發布 * @param deviceInfo 設備ID+設備特征串 * @param topic 服務標識 平台會生成相應Topic,Publish報文的Topic字段填寫相應服務標識 * @param map 服務標識-參數 */ public void topic(DeviceInfo deviceInfo,String topic, Map map){ try { MqttClient client=clientMap.get(deviceInfo.getDeviceId()); if(client==null){ client=connect(deviceInfo); } System.out.println("當前發布獲取實例:"+deviceInfo.getDeviceId()+"---"+client); //在另一個線程中發送消息 publishMsg(topic, JSON.toJSONString(map), mqttConfig.getQos(), client); //斷開服務連接 //client.disconnect(); //System.out.println("斷開服務連接"); System.out.println("已完成"); } catch(MqttException me) { System.out.println("reason "+me.getReasonCode()); System.out.println("msg "+me.getMessage()); System.out.println("loc "+me.getLocalizedMessage()); System.out.println("cause "+me.getCause()); System.out.println("exception "+me); me.printStackTrace(); } catch (Exception e) { e.printStackTrace(); } finally { System.out.println("正常退出"); //System.exit(0); } } //推送 private void publishMsg(String topic, String content, int qos, MqttClient client) throws MqttException { //循環發送10次消息 for (int times =0 ;times<timingConfig.getConcurrent(); times++) { System.out.println(String.format("%d 循環發送消息: %s", times, content)); //創建消息內容 MqttMessage message = new MqttMessage(content.getBytes()); //設置質量級別 message.setQos(qos); //發送消息 client.publish(topic, message); System.out.println("Message published"); } } /** * AEP訂閱 * @param deviceInfo 設備ID+設備特征串 */ public void start(DeviceInfo deviceInfo) { try { MqttClient client=clientMap.get(deviceInfo.getDeviceId()); if(client==null){ client=connect(deviceInfo); } System.out.println("當前訂閱獲取實例:"+deviceInfo.getDeviceId()+"---"+client); //訂閱消息 int[] Qos = CommonConstants.Qos; String[] topic1 = CommonConstants.topic; client.subscribe(topic1, Qos); } catch (Exception e) { e.printStackTrace(); } } //連接 private MqttClient connect(DeviceInfo deviceInfo) throws Exception{ // host為主機名,clientId即連接MQTT的客戶端ID,一般以客戶端唯一標識符表示,MemoryPersistence設置clientId的保存形式,默認為以內存保存 MqttClient client = new MqttClient(mqttConfig.getBroker(), deviceInfo.getDeviceId(), null); // MQTT的連接設置 MqttConnectOptions options = new MqttConnectOptions(); // 設置是否清空session,這里如果設置為false表示服務器會保留客戶端的連接記錄,這里設置為true表示每次連接到服務器都以新的身份連接 options.setCleanSession(true); // 設置連接的用戶名 options.setUserName(mqttConfig.getUserName()); // 設置連接的密碼 options.setPassword(deviceInfo.getToken().toCharArray()); // 設置超時時間 單位為秒 options.setConnectionTimeout(10); // 設置會話心跳時間 單位為秒 服務器會每隔1.5*20秒的時間向客戶端發送個消息判斷客戶端是否在線,但這個方法並沒有重連的機制 options.setKeepAliveInterval(90); // 設置回調 client.setCallback(new MqttAepCallback(deviceInfo,this)); client.connect(options); //緩存實例 clientMap.put(deviceInfo.getDeviceId(),client); aepPahoMap.put(deviceInfo.getDeviceId(),this); instance=this; return client; } }
配置類
import lombok.Getter; import lombok.Setter; import org.springframework.boot.context.properties.ConfigurationProperties; import org.springframework.stereotype.Component; @ConfigurationProperties(prefix = "timing", ignoreInvalidFields = true) @Getter @Setter @Component public class TimingConfig { /** 是否開啟定時上報 */ private int http_whether; private int mqtt_whether; /** 循環並發下發次數*/ private int concurrent; }
import lombok.Getter; import lombok.Setter; import org.springframework.boot.context.properties.ConfigurationProperties; import org.springframework.stereotype.Component; @ConfigurationProperties(prefix = "ctwing.mqtt", ignoreInvalidFields = true) @Getter @Setter @Component public class MQTTConfig { /** 必填。建議填寫為CTWing平台用戶名。 */ private String userName; /** 必填。填寫平台為設備自動分配的特征串。 */ private String passWord; /* mq地址*/ private String broker; /*質量等級 */ private int qos; /*產品Id,平台產品信息中獲取*/ private int productId; /*必填。填寫平台添加設備時生成的設備ID。*/ private String clientId; /*aep應用中配置*/ private String appSecret; /*aep應用中配置*/ private String appKey; /*MasterKey為平台上創建產品時自動生成,可在產品概況中查詢。*/ private String masterKey; }
訂閱回調
import lombok.extern.slf4j.Slf4j; import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken; import org.eclipse.paho.client.mqttv3.MqttCallback; import org.eclipse.paho.client.mqttv3.MqttException; import org.eclipse.paho.client.mqttv3.MqttMessage; import java.nio.charset.StandardCharsets; @Slf4j public class MqttAepCallback implements MqttCallback { private DeviceInfoDao deviceInfoDao = ContextHolder.getBean(DeviceInfoDao.class);private DeviceInfo deviceInfo; private MqttAepPaho mqttAepPaho; public MqttAepCallback() { } public MqttAepCallback(DeviceInfo deviceInfo,MqttAepPaho mqttAepPaho) { this.deviceInfo = deviceInfo; this.mqttAepPaho = mqttAepPaho; } // 連接丟失后,一般在這里面進行重連 @Override public void connectionLost(Throwable arg0) { log.info("Connection Lost:{}",arg0.getMessage()); } // subscribe后得到的消息會執行到這里面 @Override public void messageArrived(String s, MqttMessage mqttMessage) throws MqttException { log.info("當前clientId:{},特征串:{}",deviceInfo.getDeviceId(),deviceInfo.getToken()); log.info("接收消息messageId:{}", mqttMessage.getId()); log.info("接收消息Qos:{}", mqttMessage.getQos()); log.info("接收消息內容:{} from 接收消息主題:{}",mqttMessage,s); String ss = new String(mqttMessage.getPayload(), StandardCharsets.UTF_8); log.info("字節轉Json:{}",ss); ReceiptStrategyContext receiptStrategyContext = new ReceiptStrategyContext(); if(s.equals(TopicEnum.open_cmd.getTopic())){ receiptStrategyContext.setReceiptHandleStrategy(new Mqtt8001ReceiptHandleStrategy(mqttAepPaho,deviceInfo,deviceInfoDao)); }else{ log.info("未知訂閱主題"); return; } receiptStrategyContext.handleReceipt(new Receipt(){{setTopic(s);setMessage(ss);}}); } //消息發送成功后,調用 @Override public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) { log.info("消息發送成功后,調用"); if(iMqttDeliveryToken.isComplete()){ log.info("Delivery a Msg to Topic:{}",iMqttDeliveryToken.getTopics()[0]); } } }
回調策略
/** * 上下文類,持有策略接口 */ public class ReceiptStrategyContext { private ReceiptHandleStrategy receiptHandleStrategy; //設置策略接口 public void setReceiptHandleStrategy(ReceiptHandleStrategy receiptHandleStrategy) { this.receiptHandleStrategy = receiptHandleStrategy; } public void handleReceipt(Receipt receipt){ if (receiptHandleStrategy != null) { receiptHandleStrategy.handleReceipt(receipt); } } }
public interface ReceiptHandleStrategy { //回執處理策略接口 void handleReceipt(Receipt receipt); }
@Slf4j public class Mqtt8001ReceiptHandleStrategy implements ReceiptHandleStrategy { private MqttAepPaho mqttAepPaho; private DeviceInfo deviceInfo; private DeviceInfoDao deviceInfoDao; public Mqtt8001ReceiptHandleStrategy(MqttAepPaho mqttAepPaho,DeviceInfo deviceInfo,DeviceInfoDao deviceInfoDao) { this.mqttAepPaho = mqttAepPaho; this.deviceInfo = deviceInfo; this.deviceInfoDao = deviceInfoDao; } @Override public void handleReceipt(Receipt receipt) { log.info("解析報文MQTT8001:{},當前實例:{},當前設備信息:{}",receipt.getMessage(),mqttAepPaho,deviceInfo); AepRespVo<JSONObject> aepRespVo= JSON.parseObject(receipt.getMessage(), AepRespVo.class); CmdRespReport report=new CmdRespReport(){{setTaskId(aepRespVo.getTaskId());setResultPayload(new CmdResp());}}; log.info("設備回復AEP指令響應"); mqttAepPaho.topic(deviceInfo, TopicEnum.cmd_resp.getTopic(), MapUtil.beanToMap(report)); OpenCmd openCmd=JSON.toJavaObject(aepRespVo.getPayload(),OpenCmd.class); //@todo 是否判斷開門失敗成功? 上報開門記錄?存儲開門日志? log.info("遠程開門指令開始上報遠程開門結果"); RemoteOpenReport remoteOpenReport=new RemoteOpenReport(); remoteOpenReport.setMsg_id(openCmd.getMsg_id()); remoteOpenReport.setRemoteopen_params(JSON.toJSONString(openCmd)); mqttAepPaho.topic(deviceInfo,TopicEnum.remoteopen_report.getTopic(),MapUtil.beanToMap(remoteOpenReport)); } }
DTO
import com.fasterxml.jackson.annotation.JsonIgnoreProperties; import lombok.Data; @Data @JsonIgnoreProperties(ignoreUnknown = true) public class AepRespVo <T>{ private int taskId; private T payload; }
import com.fasterxml.jackson.annotation.JsonIgnoreProperties; import lombok.Data; /** * 訂閱指令信息 */ @Data @JsonIgnoreProperties(ignoreUnknown = true) public class Receipt { //回執信息(json字符串) String message; //回執主題(`open_cmd、、、、、、、、、、`) String topic; }
指令下發
@Scheduled(cron = "*/5 * * * * ?") public void mqttConfigureTasks(){ //@todo 固定上報yml中配置的設備ID,批量上報設備需獲取設備ID+特征串 if(config.getMqtt_whether()==1){ List<DeviceInfo> list=deviceInfoDao.findDeviceByToken(new DeviceInfo()); list.forEach(i->{ MqttReport(i,2); }); }else{ log.info("未開啟MQTT定時器"); } } public void MqttReport(DeviceInfo deviceInfo,int id){ MqttBusinessDto dto=new MqttBusinessDto(); switch (id) {case 2://心跳 dto.setTopic(TopicEnum.heartbeat.getTopic()); dto.setMap(MapUtil.beanToMap(new Heartbeat(){{setIMEI("001");}})); break;default: break; } MqttAepPaho.getInstance(deviceInfo,mqttConfig,config).topic(deviceInfo,dto.getTopic(),dto.getMap()); }
DTO
import com.fasterxml.jackson.annotation.JsonIgnoreProperties; import lombok.Data; import java.util.Map; @Data @JsonIgnoreProperties(ignoreUnknown = true) public class MqttBusinessDto { //設備Id,平台設備信息管理中獲取。 private String clientId; //設備特征串。平台生成,每個設備唯一 private String passWord; //服務標識 private String topic; //業務數據。僅支持JSON格式。非透傳產品需根據服務定義填寫。 private Map map; }
import java.util.Date; import lombok.Getter; import lombok.Setter; import lombok.NoArgsConstructor; import org.springframework.format.annotation.DateTimeFormat; @Getter @Setter @NoArgsConstructor public class DeviceInfo { private static final long serialVersionUID = 1L; /** 設備id */ private String deviceId = ""; /** 設備編號 */ private String deviceSn = ""; /** 終端名稱 */ private String deviceName = ""; /** 租戶id */ private String tenantId = ""; /** 產品id */ private int productId; /** 版本信息 */ private String firmwareVersion = ""; /** 設備狀態 0:已注冊,1:已激活,2:已注銷 */ private int deviceStatus; /** 激活時間 */ @DateTimeFormat(pattern = "yyyy-MM-dd HH:mm:ss") private Date activeTime; /** 注銷時間 */ @DateTimeFormat(pattern = "yyyy-MM-dd HH:mm:ss") private Date logoutTime; /** 設備在線狀態 */ private int netStatus; /** 設備最后上線時間 */ @DateTimeFormat(pattern = "yyyy-MM-dd HH:mm:ss") private Date onlineAt; /** 設備最后下線時間 */ @DateTimeFormat(pattern = "yyyy-MM-dd HH:mm:ss") private Date offlineAt; /** 設備所在產品協議:1.T-LINK協議 2.MQTT協議 3.LWM2M協議 4.TUP協議 5.HTTP協議 6.JT/T808 7.TCP協議 8.私有TCP(網關子設備協議) 9.私有UDP(網關子設備協議) 10.網關產品MQTT(網關產品協議) 11.南向雲 */ private int productProtocol; /** 設備特征串 */ private String token = ""; /** 枚舉值 : 0--正常 1--禁用 */ private int disable_status; }
部分上報接收MQ消息的實體Report和指令下發的實體Cmd根據AEP平台中定義的通訊來組裝。
MQTT簡介
MQTT(Message Queuing Telemetry Transport,消息隊列遙測傳輸協議),是一種基於發布/訂閱(publish/subscribe)模式的“輕量級”通訊協議,該協議構建於TCP/IP協議上,由IBM在1999年發布。MQTT最大優點在於,可以以極少的代碼和有限的帶寬,為連接遠程設備提供實時可靠的消息服務。作為一種低開銷、低帶寬占用的即時通訊協議,使其在物聯網、小型設備、移動應用等方面有較廣泛的應用。
MQTT是一個基於客戶端-服務器的消息發布/訂閱傳輸協議。MQTT協議是輕量、簡單、開放和易於實現的,這些特點使它適用范圍非常廣泛。在很多情況下,包括受限的環境中,如:機器與機器(M2M)通信和物聯網(IoT)。其在,通過衛星鏈路通信傳感器、偶爾撥號的醫療設備、智能家居、及一些小型化設備中已廣泛使用。
MQTT特性
MQTT協議工作在低帶寬、不可靠的網絡的遠程傳感器和控制設備通訊而設計的協議,它具有以下主要的幾項特性:
- 使用發布/訂閱消息模式,提供一對多的消息發布,解除應用程序耦合。這一點很類似於XMPP,但是MQTT的信息冗余遠小於XMPP,,因為XMPP使用XML格式文本來傳遞數據。
- 對負載內容屏蔽的消息傳輸。
- 使用TCP/IP提供網絡連接。主流的MQTT是基於TCP連接進行數據推送的,但是同樣有基於UDP的版本,叫做MQTT-SN。這兩種版本由於基於不同的連接方式,優缺點自然也就各有不同了。
- 有三種消息發布服務質量:“至多一次”,消息發布完全依賴底層TCP/IP網絡。會發生消息丟失或重復。這一級別可用於如下情況,環境傳感器數據,丟失一次讀記錄無所謂,因為不久后還會有第二次發送。這一種方式主要普通APP的推送,倘若你的智能設備在消息推送時未聯網,推送過去沒收到,再次聯網也就收不到了。“至少一次”,確保消息到達,但消息重復可能會發生。“只有一次”,確保消息到達一次。在一些要求比較嚴格的計費系統中,可以使用此級別。在計費系統中,消息重復或丟失會導致不正確的結果。這種最高質量的消息發布服務還可以用於即時通訊類的APP的推送,確保用戶收到且只會收到一次。
- 小型傳輸,開銷很小(固定長度的頭部是2字節),協議交換最小化,以降低網絡流量。這就是為什么在介紹里說它非常適合“在物聯網領域,傳感器與服務器的通信,信息的收集”,要知道嵌入式設備的運算能力和帶寬都相對薄弱,使用這種協議來傳遞消息再適合不過了。
- 使用Last Will和Testament特性通知有關各方客戶端異常中斷的機制。Last Will:即遺言機制,用於通知同一主題下的其他設備發送遺言的設備已經斷開了連接。Testament:遺囑機制,功能類似於Last Will。
MQTT協議實現方式
實現MQTT協議需要客戶端和服務器端通訊完成,在通訊過程中,MQTT協議中有三種身份:發布者(Publish)、代理(Broker)(服務器)、訂閱者(Subscribe)。其中,消息的發布者和訂閱者都是客戶端,消息代理是服務器,消息發布者可以同時是訂閱者。MQTT傳輸的消息分為:主題(Topic)和負載(payload)兩部分:
- Topic,可以理解為消息的類型,訂閱者訂閱(Subscribe)后,就會收到該主題的消息內容(payload);
- payload,可以理解為消息的內容,是指訂閱者具體要使用的內容。
MQTT搭建
略
MQTT協議之訂閱及發布(使用paho-mqtt-client或mqttv3實現)
消息接收回調類
import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.MqttDeliveryToken; import org.eclipse.paho.client.mqttv3.MqttMessage; import org.eclipse.paho.client.mqttv3.MqttTopic; /** * 發布消息的回調類 * * 必須實現MqttCallback的接口並實現對應的相關接口方法 * CallBack 類將實現 MqttCallBack。每個客戶機標識都需要一個回調實例。在此示例中,構造函數傳遞客戶機標識以另存為實例數據。在回調中,將它用來標識已經啟動了該回調的哪個實例。 * 必須在回調類中實現三個方法: * * public void messageArrived(MqttTopic topic, MqttMessage message) * 接收已經預訂的發布。 * * public void connectionLost(Throwable cause) * 在斷開連接時調用。 * * public void deliveryComplete(MqttDeliveryToken token)) * 接收到已經發布的 QoS 1 或 QoS 2 消息的傳遞令牌時調用。由 MqttClient.connect 激活此回調。 */ public class PushCallback implements MqttCallback { public void connectionLost(Throwable cause) { // 連接丟失后,一般在這里面進行重連 System.out.println("連接斷開,可以做重連"); } public void deliveryComplete(MqttDeliveryToken token) { // publish后會執行到這里 System.out.println("deliveryComplete---------"+ token.isComplete()); } public void messageArrived(MqttTopic topic, MqttMessage message) throws Exception { // subscribe后得到的消息會執行到這里面 System.out.println("接收消息主題:"+topic.getName()); System.out.println("接收消息Qos:"+message.getQos()); System.out.println("接收消息內容:"+new String(message.getPayload())); } }
服務端消息發布
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions; import org.eclipse.paho.client.mqttv3.MqttDeliveryToken; import org.eclipse.paho.client.mqttv3.MqttException; import org.eclipse.paho.client.mqttv3.MqttMessage; import org.eclipse.paho.client.mqttv3.MqttPersistenceException; import org.eclipse.paho.client.mqttv3.MqttTopic; import org.eclipse.paho.client.mqttv3.internal.MemoryPersistence; public class Server { public static final String HOST = "tcp://127.0.0.1:1883"; public static final String TOPIC = "主題"; private static final String clientid ="server_clientid"; private MqttClient client; private MqttTopic topic; private String userName = "test"; private String passWord = "test"; private MqttMessage message; public Server() throws MqttException { //MemoryPersistence設置clientid的保存形式,默認為以內存保存 client = new MqttClient(HOST, clientid, new MemoryPersistence()); connect(); } private void connect() { MqttConnectOptions options = new MqttConnectOptions(); options.setCleanSession(false); options.setUserName(userName); options.setPassword(passWord.toCharArray()); // 設置超時時間 options.setConnectionTimeout(10); // 設置會話心跳時間 options.setKeepAliveInterval(20); try { client.setCallback(new PushCallback()); client.connect(options); topic = client.getTopic(TOPIC); } catch (Exception e) { e.printStackTrace(); } } public void publish(MqttMessage message) throws MqttPersistenceException, MqttException{ MqttDeliveryToken token = topic.publish(message); token.waitForCompletion(); System.out.println(token.isComplete()+"========"); } public static void main(String[] args) throws MqttException { Server server = new Server(); server.message = new MqttMessage(); server.message.setQos(1); server.message.setRetained(true); server.message.setPayload("eeeeeaaaaaawwwwww---".getBytes()); server.publish(server.message); System.out.println(server.message.isRetained()+"------ratained狀態"); } }
客戶端接收消息
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import org.eclipse.paho.client.mqttv3.MqttClient; import org.eclipse.paho.client.mqttv3.MqttConnectOptions; import org.eclipse.paho.client.mqttv3.MqttException; import org.eclipse.paho.client.mqttv3.MqttSecurityException; import org.eclipse.paho.client.mqttv3.MqttTopic; import org.eclipse.paho.client.mqttv3.internal.MemoryPersistence; public class Client { public static final String HOST = "tcp://127.0.0.1:1883";
public static final String TOPIC = "主題"; private static final String clientid = "server_clientid";
private MqttClient client; private MqttConnectOptions options; private String userName = "test"; private String passWord = "test"; private ScheduledExecutorService scheduler; //重新鏈接 public void startReconnect() { scheduler = Executors.newSingleThreadScheduledExecutor(); scheduler.scheduleAtFixedRate(new Runnable() { public void run() { if (!client.isConnected()) { try { client.connect(options); } catch (MqttSecurityException e) { e.printStackTrace(); } catch (MqttException e) { e.printStackTrace(); } } } }, 0 * 1000, 10 * 1000, TimeUnit.MILLISECONDS); } private void start() { try { // host為主機名,test為clientid即連接MQTT的客戶端ID,一般以客戶端唯一標識符表示,MemoryPersistence設置clientid的保存形式,默認為以內存保存 client = new MqttClient(HOST, clientid, new MemoryPersistence()); // MQTT的連接設置 options = new MqttConnectOptions(); // 設置是否清空session,這里如果設置為false表示服務器會保留客戶端的連接記錄,這里設置為true表示每次連接到服務器都以新的身份連接 options.setCleanSession(true); // 設置連接的用戶名 options.setUserName(userName); // 設置連接的密碼 options.setPassword(passWord.toCharArray()); // 設置超時時間 單位為秒 options.setConnectionTimeout(10); // 設置會話心跳時間 單位為秒 服務器會每隔1.5*20秒的時間向客戶端發送個消息判斷客戶端是否在線,但這個方法並沒有重連的機制 options.setKeepAliveInterval(20); // 設置回調 client.setCallback(new PushCallback()); MqttTopic topic = client.getTopic(TOPIC); //setWill方法,如果項目中需要知道客戶端是否掉線可以調用該方法。設置最終端口的通知消息 options.setWill(topic, "close".getBytes(), 0, true); client.connect(options); //訂閱消息 int[] Qos = {1}; String[] topic1 = {TOPIC}; client.subscribe(topic1, Qos); } catch (Exception e) { e.printStackTrace(); } } public void disconnect() { try { client.disconnect(); } catch (MqttException e) { e.printStackTrace(); } } public static void main(String[] args) throws MqttException { Client client = new Client(); client.start(); } }
Maven依賴
<dependency> <groupId>org.eclipse.paho</groupId> <artifactId>org.eclipse.paho.client.mqttv3</artifactId> <version>1.1.1</version> </dependency>