Spring Boot 集成 MQTT


本文代碼有些許問題,處理方案見:解決 spring-integration-mqtt 頻繁報 Lost connection 錯誤

一、添加配置

spring:
  mqtt:
    client:
      username: 用戶名
      password: 密碼
      serverURIs: tcp://ip:port # 客戶端地址,多個使用逗號隔開
      clientId: client0001 # ${random.value}
      keepAliveInterval: 30
      connectionTimeout: 30
    producer:
      defaultQos: 1
      defaultRetained: true
      defaultTopic: defaultTopicName
    consumer:
      defaultQos: 1
      completionTimeout: 30000
      consumerTopics: topic1,topic2 # 監聽的 topic,多個使用逗號隔開

二、客戶端配置

    /* 客戶端 */
    @Bean
    public MqttConnectOptions getMqttConnectOptions() {
        MqttConnectOptions mqttConnectOptions = new MqttConnectOptions();
        mqttConnectOptions.setUserName(username);
        mqttConnectOptions.setPassword(password.toCharArray());
        mqttConnectOptions.setServerURIs(serverURIs);
        mqttConnectOptions.setKeepAliveInterval(keepAliveInterval);
        mqttConnectOptions.setConnectionTimeout(connectionTimeout);

        return mqttConnectOptions;
    }

    @Bean
    public MqttPahoClientFactory getMqttClientFactory() {
        DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
        factory.setConnectionOptions(getMqttConnectOptions());

        return factory;
    }

三、發布消息

3.1 配置

    @Bean
    public MessageChannel outboundChannel() {
        return new DirectChannel();
    }

    @Bean
    @ServiceActivator(inputChannel = OUTBOUND_CHANNEL)
    public MessageHandler getMqttProducer() {
        MqttPahoMessageHandler messageHandler = new MqttPahoMessageHandler(clientId, getMqttClientFactory());
        messageHandler.setAsync(true);
        messageHandler.setDefaultTopic(defaultTopic);
        messageHandler.setDefaultRetained(defaultRetained);
        messageHandler.setDefaultQos(defaultProducerQos);

        return messageHandler;
    }

3.2 消息推送接口類

@MessagingGateway(defaultRequestChannel = MqttConfig.OUTBOUND_CHANNEL)
public interface MqttSender {

    void sendToMqtt(String data);

    void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic, String payload);

    void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic, @Header(MqttHeaders.QOS) int qos, String payload);
}

3.3 測試

@RestController
public class TestController {

    @Autowired
    private MqttSender mqttSender;

    @RequestMapping("/send")
    private void send(String data){
        mqttSender.sendToMqtt(data);
    }
}

四、訂閱消息

4.1 配置

    @Bean
    public MessageChannel inboundChannel() {
        return new DirectChannel();
    }

    @Bean
    public MessageProducer getMqttConsumer() {
        MqttPahoMessageDrivenChannelAdapter adapter =
                new MqttPahoMessageDrivenChannelAdapter(clientId, getMqttClientFactory(), consumerTopics);
        adapter.setCompletionTimeout(completionTimeout);
        adapter.setConverter(new DefaultPahoMessageConverter());
        adapter.setQos(defaultConsumerQos);
        adapter.setOutputChannel(inboundChannel());

        return adapter;
    }

4.2 測試

@Component
public class MqttConsumer {
    private static final Logger LOGGER = LoggerFactory.getLogger(MqttConsumer.class);

    @Bean
    @ServiceActivator(inputChannel = MqttConfig.INBOUND_CHANNEL)
    public MessageHandler handler() {
        return message -> {
            String topic = message.getHeaders().get(MqttConfig.RECEIVED_TOPIC_KEY).toString();
            LOGGER.info("[{}]主題接收到消息:{}", topic, message.getPayload().toString());
        };
    }
}

注意事項

  1. @ServiceActivator 和 @MessagingGateway 中綁定的 Channel 名,需與返回 MessageChannel 的 Bean 的方法名一樣:

    如發布者綁定的 Channel 名為 outboundChannel,則需要有對應的方法,如下:

     @Bean
     public MessageChannel outboundChannel() {
         return new DirectChannel();
     }
    
  2. 發布者與訂閱者的 Channel 名不能相同
  3. 連接服務器的超時時間和訂閱的超時時間單位不一樣

參考

  1. MQTT系列教程1(基本概念介紹)
  2. SpringBoot - 集成MQTT教程1(發布消息)
  3. SpringBoot - 集成MQTT教程2(訂閱消息)

完整代碼:GitHub


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM