SpringBoot2.x集成MQTT實現消息推送


1.引入相關的依賴

 

<dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-integration</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.integration</groupId>
            <artifactId>spring-integration-stream</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.integration</groupId>
            <artifactId>spring-integration-mqtt</artifactId>
        </dependency>

2.在配置文件中配置MQTT服務器信息

 

spring.mqtt.username = username
spring.mqtt.password = password
spring.mqtt.url
= tcp://xx.xx.xx.xx:18083
spring.mqtt.client.id = clientid
spring.mqtt.
default.topic = topic
spring.mqtt.
default.completionTimeout = 3000

 3.配置MQTT消息推送配置

 

/**
 * @Author: songyaru
 * @Date: 2020/9/1 13:42
 * @Version 1.0
 */
@Configuration
@IntegrationComponentScan
public class MqttSenderConfig {
    @Value("${spring.mqtt.username}")
    private String username;
    @Value("${spring.mqtt.password}")
    private String password;
    @Value("${spring.mqtt.url}")
    private String hostUrl;
    @Value("${spring.mqtt.client.id}")
    private String clientId;
    @Value("${spring.mqtt.default.topic}")
    private String defaultTopic;
    @Value("${spring.mqtt.default.completionTimeout}")
    private int completionTimeout;

    @Bean
    public MqttConnectOptions getMqttConnectOptions() {
        MqttConnectOptions mqttConnectOptions = new MqttConnectOptions();
        mqttConnectOptions.setCleanSession(true);
        mqttConnectOptions.setConnectionTimeout(10);
        mqttConnectOptions.setKeepAliveInterval(90);
        mqttConnectOptions.setAutomaticReconnect(true);
        mqttConnectOptions.setUserName(username);
        mqttConnectOptions.setPassword(password.toCharArray());
        mqttConnectOptions.setServerURIs(new String[]{hostUrl});
        mqttConnectOptions.setKeepAliveInterval(2);
        return mqttConnectOptions;
    }

    @Bean
    public MqttPahoClientFactory mqttClientFactory() {
        DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
        factory.setConnectionOptions(getMqttConnectOptions());
        return factory;
    }

    @Bean
    @ServiceActivator(inputChannel = "mqttOutboundChannel")
    public MessageHandler mqttOutbound() {
        MqttPahoMessageHandler messageHandler = new MqttPahoMessageHandler(clientId, mqttClientFactory());
        messageHandler.setAsync(true);
        messageHandler.setDefaultTopic(defaultTopic);
        return messageHandler;
    }

    @Bean
    public MessageChannel mqttOutboundChannel() {
        return new DirectChannel();
    }
}

 

4.MQTT消息推送接口

/**
 * @Author: songyaru
 * @Date: 2020/9/1 13:51
 * @Version 1.0
 */
@MessagingGateway(defaultRequestChannel = "mqttOutboundChannel")
public interface MqttGateway {
    void sendToMqtt(String data, @Header(MqttHeaders.TOPIC) String topic);
}

5.MQTT消息推送API

/**
 * @Author: songyaru
 * @Date: 2020/9/1 13:52
 * @Version 1.0
 */
@RestController
public class MessageController {
    @Autowired
    MqttGateway mqttGateway;

    /***
     * 發布消息,用於其他客戶端消息接收測試
     */

    @RequestMapping("/sendMqttMessage")
    public String sendMqttMessage(String message, String topic) {
        mqttGateway.sendToMqtt(message, topic);
        return "ok";
    }
}

6、測試

 在POSTMAN中進行測試了,輸入消息內容和主題,就可以在相應的頻道發送消息了。使用其它的消息客戶端進行測試,可以接受到消息。

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM