SpringBoot整合MQTT( EMQ)


1.下載EMQ安裝包,配置EMQ環境

下載地址:https://www.emqx.cn/downloads#broker

下載壓縮包解壓,cmd進入bin文件夾

 

 

輸入  emqx start 啟動服務,打卡瀏覽器輸入本地ip:18083  進入登錄頁面   默認用戶名密碼 admin/public

 

 

2.配置application.properties文件,設置EMQ參數,添加pom引入jar包

#MQTT Config
  mqtt:
  #MQTT-服務器連接地址,如果有多個,用逗號隔開,如:tcp://127.0.0.1:61613,tcp://192.168.2.133:61613
    host: tcp://127.0.0.1:11883
    #MQTT-連接服務器默認客戶端ID
    clientid: mqtt_id
    #MQTT-用戶名
    username: admin
    #MQTT-密碼
    password: admin
    #MQTT-默認的消息推送主題,實際可在調用接口時指定
    topic: test
    #連接超時
    timeout: 1000
    #設置會話心跳時間
    keepalive: 100

 

 

<!-- mqtt -->
<dependency>
   <groupId>org.springframework.integration</groupId>
   <artifactId>spring-integration-stream</artifactId>
</dependency>
<dependency>
   <groupId>org.springframework.integration</groupId>
   <artifactId>spring-integration-mqtt</artifactId>
</dependency>
<dependency>
   <groupId>org.springframework.boot</groupId>
   <artifactId>spring-boot-starter-integration</artifactId>
</dependency>

<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.18.4</version>
</dependency>

 

 

3.創建工具類

1.配置文件

package com.st.modules.pump.mqtt;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.stereotype.Component;

@Component
@Configuration
@ConfigurationProperties(MqttConfiguration.PREFIX)
public class MqttConfiguration {

    @Autowired
    private MqttPushClient mqttPushClient;


    public static final  String PREFIX="mqtt";
    private String host;
    private String clientid;
    private String username;
    private String password;
    private String topic;
    private int timeout;
    private int keepalive;

    @Bean
    public MqttPushClient getMqttPushClient() {
        mqttPushClient.connect(host, clientid, username, password, timeout,keepalive);
        // 以/#結尾表示訂閱所有以test開頭的主題
        mqttPushClient.subscribe("test/#", 0);
        return mqttPushClient;
    }

    public String getHost() {
        return host;
    }

    public void setHost(String host) {
        this.host = host;
    }

    public String getClientid() {
        return clientid;
    }

    public void setClientid(String clientid) {
        this.clientid = clientid;
    }

    public String getUsername() {
        return username;
    }

    public void setUsername(String username) {
        this.username = username;
    }

    public String getPassword() {
        return password;
    }

    public void setPassword(String password) {
        this.password = password;
    }

    public String getTopic() {
        return topic;
    }

    public void setTopic(String topic) {
        this.topic = topic;
    }

    public int getTimeout() {
        return timeout;
    }

    public void setTimeout(int timeout) {
        this.timeout = timeout;
    }

    public int getKeepalive() {
        return keepalive;
    }

    public void setKeepalive(int keepalive) {
        this.keepalive = keepalive;
    }
}

 

2.發布者

package com.st.modules.pump.mqtt;

import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.*;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

@Slf4j
@Component
public class MqttPushClient {

    @Autowired
    private PushCallback pushCallback;


    private static MqttClient client;

    public  static MqttClient getClient(){
        return  client;
    }

    public static void setClient(MqttClient client){
        MqttPushClient.client=client;
    }

    /**
     * 客戶端連接
     *
     * @param host      ip+端口
     * @param clientID  客戶端Id
     * @param username  用戶名
     * @param password  密碼
     * @param timeout   超時時間
     * @param keeplive 保留數
     */
    public void connect(String host,String clientID,String username,String password,int timeout,int keeplive){
        MqttClient client;



        try {

            client=new MqttClient(host,clientID,new MemoryPersistence());
            MqttConnectOptions options=new MqttConnectOptions();
            options.setCleanSession(true);
            options.setUserName(username);
            options.setPassword(password.toCharArray());
            options.setConnectionTimeout(timeout);
            options.setKeepAliveInterval(keeplive);
            MqttPushClient.setClient(client);
            try {
                client.setCallback(pushCallback);
                client.connect(options);
            }catch (Exception e){
                e.printStackTrace();
            }
        }catch (Exception e){
            e.printStackTrace();
        }

    }

    /**
     * 發布,默認qos為0,非持久化
     * @param topic
     * @param pushMessage
     */

    public void pushlish(String topic,String pushMessage){
        pushlish(0,false,topic,pushMessage);
    }

    /**
     * 發布
     *
     * @param qos         連接方式
     * @param retained    是否保留
     * @param topic       主題
     * @param pushMessage 消息體
     */

    public void pushlish(int qos,boolean retained,String topic,String pushMessage){
        MqttMessage message=new MqttMessage();
        message.setQos(qos);
        message.setRetained(retained);
        message.setPayload(pushMessage.getBytes());
        MqttTopic mqttTopic=MqttPushClient.getClient().getTopic(topic);
        if(null== mqttTopic){
            log.error("topic not exist");
        }
        MqttDeliveryToken token;
        try {
            token=mqttTopic.publish(message);
            token.waitForCompletion();
        }catch (MqttPersistenceException e){
            e.printStackTrace();
        }catch (MqttException e){
            e.printStackTrace();
        }

    }

    /**
     * 訂閱某個主題,qos默認為0
     * @param topic
     */
    public void subscribe(String topic){
        log.error("開始訂閱主題" + topic);
        subscribe(topic,0);
    }


    public void subscribe(String topic,int qos){
        try {
            MqttPushClient.getClient().subscribe(topic,qos);
        }catch (MqttException e){
            e.printStackTrace();
        }
    }
}

 

3.消費監聽類

package com.st.modules.pump.mqtt;

import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

/**
 * @Classname PushCallback
 * @Description 消費監聽類
 */
@Component
public class PushCallback implements MqttCallback {
    @Autowired
    private MqttConfiguration mqttConfiguration;

    private static MqttClient client;

    @Override
    public void connectionLost(Throwable throwable) {if (client == null || !client.isConnected()) {
            System.out.println("連接斷開,正在重連....");
            mqttConfiguration.getMqttPushClient();
        }
    }

    @Override
    public void messageArrived(String topic, MqttMessage message) throws Exception {
        
        System.out.println("接收消息主題 : " + topic);
        System.out.println("接收消息Qos : " + message.getQos());
        System.out.println("接收消息內容 : " + new String(message.getPayload()));
    }

    @Override
    public void deliveryComplete(IMqttDeliveryToken token) {
        System.out.println("deliveryComplete---------" + token.isComplete());
    }
}

 

~~~~~~~~~~~~~Over~~~~~~~~~~~~~~


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM