SpringBoot2.x集成MQTT實現消息訂閱


1.引入相關的依賴

<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-integration</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-stream</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-mqtt</artifactId>
</dependency>

2. 在配置文件下配置MQTT服務器信息

spring.mqtt.username = username
spring.mqtt.password = password
spring.mqtt.url
= tcp://xx.xx.xx.xx:18083
spring.mqtt.client.id = clientid
spring.mqtt.
default.topic = topic
spring.mqtt.
default.completionTimeout = 3000

3.配置MQTT消息推送配置

/**
 * MQTT配置
 * @Author: songyaru
 * @Date: 2020/9/01 10:04
 * @Version 1.0
 */

@Slf4j
@Configuration
@IntegrationComponentScan
public class MqttReceiveConfig {

    @Value("${spring.mqtt.username}")
    private String username;

    @Value("${spring.mqtt.password}")
    private String password;

    @Value("${spring.mqtt.url}")
    private String hostUrl;

    @Value("${spring.mqtt.client.id}")
    private String clientId;

    @Value("${spring.mqtt.default.topic}")
    private String defaultTopic;

    @Value("${spring.mqtt.default.completionTimeout}")
    private int completionTimeout;//連接超時

    //初始化連接
    @Bean
    public MqttConnectOptions getMqttConnectOptions() {
        MqttConnectOptions mqttConnectOptions = new MqttConnectOptions();
        mqttConnectOptions.setUserName(username);
        mqttConnectOptions.setPassword(password.toCharArray());
        mqttConnectOptions.setServerURIs(new String[]{hostUrl});
        mqttConnectOptions.setKeepAliveInterval(50);
        return mqttConnectOptions;
    }

    //初始化mqtt工廠
    @Bean
    public MqttPahoClientFactory mqttClientFactory() {
        DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
        factory.setConnectionOptions(getMqttConnectOptions());
        return factory;
    }

    //接收通道
    @Primary
    @Bean("mqttInputChannel")
    public MessageChannel mqttInputChannel() {
        return new DirectChannel();
    }

    //配置client,監聽的topic
    @Bean
    public MessageProducer inbound(@Qualifier("mqttInputChannel") MessageChannel messageChannel) {
        MqttPahoMessageDrivenChannelAdapter adapter =
                new MqttPahoMessageDrivenChannelAdapter(clientId + "_inbound", mqttClientFactory(),defaultTopic); //這里的defaultTopic是發布者的主題               
        adapter.setCompletionTimeout(completionTimeout);
        adapter.setConverter(new DefaultPahoMessageConverter());
        adapter.setQos(1);
        adapter.setOutputChannel(messageChannel);
        return adapter;
    }


   //訂閱消費數據,通過通道獲取數據
    @Bean
    @ServiceActivator(inputChannel = "mqttInputChannel")
    public MessageHandler handler() {
        return new MessageHandler() {
            @Override
            public void handleMessage(Message<?> message) throws MessagingException {
                log.info("主題:{},消息接收到的數據:{}", message.getHeaders().get("mqtt_receivedTopic"), message.getPayload());
            }
        };
    }

}

4.啟動服務,使用上一篇博文的消息接口發送消息。

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM