1.下載EMQ安裝包,配置EMQ環境
下載地址:https://www.emqx.cn/downloads#broker
下載壓縮包解壓,cmd進入bin文件夾

輸入 emqx start 啟動服務,打卡瀏覽器輸入本地ip:18083 進入登錄頁面 默認用戶名密碼 admin/public

2.配置application.properties文件,設置EMQ參數,添加pom引入jar包
#MQTT Config
mqtt:
#MQTT-服務器連接地址,如果有多個,用逗號隔開,如:tcp://127.0.0.1:61613,tcp://192.168.2.133:61613
host: tcp://127.0.0.1:11883
#MQTT-連接服務器默認客戶端ID
clientid: mqtt_id
#MQTT-用戶名
username: admin
#MQTT-密碼
password: admin
#MQTT-默認的消息推送主題,實際可在調用接口時指定
topic: test
#連接超時
timeout: 1000
#設置會話心跳時間
keepalive: 100
<!-- mqtt --> <dependency> <groupId>org.springframework.integration</groupId> <artifactId>spring-integration-stream</artifactId> </dependency> <dependency> <groupId>org.springframework.integration</groupId> <artifactId>spring-integration-mqtt</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-integration</artifactId> </dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.18.4</version>
</dependency>
3.創建工具類
1.配置文件
package com.st.modules.pump.mqtt; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.context.properties.ConfigurationProperties; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.stereotype.Component; @Component @Configuration @ConfigurationProperties(MqttConfiguration.PREFIX) public class MqttConfiguration { @Autowired private MqttPushClient mqttPushClient; public static final String PREFIX="mqtt"; private String host; private String clientid; private String username; private String password; private String topic; private int timeout; private int keepalive; @Bean public MqttPushClient getMqttPushClient() { mqttPushClient.connect(host, clientid, username, password, timeout,keepalive); // 以/#結尾表示訂閱所有以test開頭的主題 mqttPushClient.subscribe("test/#", 0); return mqttPushClient; } public String getHost() { return host; } public void setHost(String host) { this.host = host; } public String getClientid() { return clientid; } public void setClientid(String clientid) { this.clientid = clientid; } public String getUsername() { return username; } public void setUsername(String username) { this.username = username; } public String getPassword() { return password; } public void setPassword(String password) { this.password = password; } public String getTopic() { return topic; } public void setTopic(String topic) { this.topic = topic; } public int getTimeout() { return timeout; } public void setTimeout(int timeout) { this.timeout = timeout; } public int getKeepalive() { return keepalive; } public void setKeepalive(int keepalive) { this.keepalive = keepalive; } }
2.發布者
package com.st.modules.pump.mqtt; import lombok.extern.slf4j.Slf4j; import org.eclipse.paho.client.mqttv3.*; import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; @Slf4j @Component public class MqttPushClient { @Autowired private PushCallback pushCallback; private static MqttClient client; public static MqttClient getClient(){ return client; } public static void setClient(MqttClient client){ MqttPushClient.client=client; } /** * 客戶端連接 * * @param host ip+端口 * @param clientID 客戶端Id * @param username 用戶名 * @param password 密碼 * @param timeout 超時時間 * @param keeplive 保留數 */ public void connect(String host,String clientID,String username,String password,int timeout,int keeplive){ MqttClient client; try { client=new MqttClient(host,clientID,new MemoryPersistence()); MqttConnectOptions options=new MqttConnectOptions(); options.setCleanSession(true); options.setUserName(username); options.setPassword(password.toCharArray()); options.setConnectionTimeout(timeout); options.setKeepAliveInterval(keeplive); MqttPushClient.setClient(client); try { client.setCallback(pushCallback); client.connect(options); }catch (Exception e){ e.printStackTrace(); } }catch (Exception e){ e.printStackTrace(); } } /** * 發布,默認qos為0,非持久化 * @param topic * @param pushMessage */ public void pushlish(String topic,String pushMessage){ pushlish(0,false,topic,pushMessage); } /** * 發布 * * @param qos 連接方式 * @param retained 是否保留 * @param topic 主題 * @param pushMessage 消息體 */ public void pushlish(int qos,boolean retained,String topic,String pushMessage){ MqttMessage message=new MqttMessage(); message.setQos(qos); message.setRetained(retained); message.setPayload(pushMessage.getBytes()); MqttTopic mqttTopic=MqttPushClient.getClient().getTopic(topic); if(null== mqttTopic){ log.error("topic not exist"); } MqttDeliveryToken token; try { token=mqttTopic.publish(message); token.waitForCompletion(); }catch (MqttPersistenceException e){ e.printStackTrace(); }catch (MqttException e){ e.printStackTrace(); } } /** * 訂閱某個主題,qos默認為0 * @param topic */ public void subscribe(String topic){ log.error("開始訂閱主題" + topic); subscribe(topic,0); } public void subscribe(String topic,int qos){ try { MqttPushClient.getClient().subscribe(topic,qos); }catch (MqttException e){ e.printStackTrace(); } } }
3.消費監聽類
package com.st.modules.pump.mqtt; import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken; import org.eclipse.paho.client.mqttv3.MqttCallback; import org.eclipse.paho.client.mqttv3.MqttClient; import org.eclipse.paho.client.mqttv3.MqttMessage; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; /** * @Classname PushCallback * @Description 消費監聽類 */ @Component public class PushCallback implements MqttCallback { @Autowired private MqttConfiguration mqttConfiguration; private static MqttClient client; @Override public void connectionLost(Throwable throwable) {if (client == null || !client.isConnected()) { System.out.println("連接斷開,正在重連...."); mqttConfiguration.getMqttPushClient(); } } @Override public void messageArrived(String topic, MqttMessage message) throws Exception { System.out.println("接收消息主題 : " + topic); System.out.println("接收消息Qos : " + message.getQos()); System.out.println("接收消息內容 : " + new String(message.getPayload())); } @Override public void deliveryComplete(IMqttDeliveryToken token) { System.out.println("deliveryComplete---------" + token.isComplete()); } }
~~~~~~~~~~~~~Over~~~~~~~~~~~~~~
