問題描述
在之前的博客介紹了如何在 Spring Boot 集成 MQTT,后面使用中沒有發現問題,最近發現一直報錯:
Lost connection: Connection lost; retrying...
Lost connection: 已斷開連接; retrying...
解決過程
網上說是因為 client ID 重復,最開始是不相信的,因為我測試只啟動了一個客戶端。但是卻怎么都定位不到異常原因,用重新回到 client ID 重復的這個思路上來:
因為程序里同時作為訂閱者和發布者,就懷疑訂閱和發布服務是不是單獨建立的連接,抱着試試看的想法試了一下,結果果然是這個原因,原代碼:
/* 發布者 */
@Bean
@ServiceActivator(inputChannel = OUTBOUND_CHANNEL)
public MessageHandler getMqttProducer() {
MqttPahoMessageHandler messageHandler = new MqttPahoMessageHandler(clientId, getMqttClientFactory());
messageHandler.setAsync(true);
messageHandler.setDefaultTopic(defaultTopic);
messageHandler.setDefaultRetained(defaultRetained);
messageHandler.setDefaultQos(defaultProducerQos);
return messageHandler;
}
/* 訂閱者 */
@Bean
public MessageProducer getMqttConsumer() {
MqttPahoMessageDrivenChannelAdapter adapter =
new MqttPahoMessageDrivenChannelAdapter(clientId, getMqttClientFactory(), consumerTopics);
adapter.setCompletionTimeout(completionTimeout);
adapter.setConverter(new DefaultPahoMessageConverter());
adapter.setQos(defaultConsumerQos);
adapter.setOutputChannel(inboundChannel());
return adapter;
}
訂閱者和發布者使用的是相同的 client ID,修改后代碼:
/* 發布者 */
@Bean
@ServiceActivator(inputChannel = OUTBOUND_CHANNEL)
public MessageHandler getMqttProducer() {
MqttPahoMessageHandler messageHandler = new MqttPahoMessageHandler(clientId + "_producer", getMqttClientFactory());
messageHandler.setAsync(true);
messageHandler.setDefaultTopic(defaultTopic);
messageHandler.setDefaultRetained(defaultRetained);
messageHandler.setDefaultQos(defaultProducerQos);
return messageHandler;
}
/* 訂閱者 */
@Bean
public MessageProducer getMqttConsumer() {
MqttPahoMessageDrivenChannelAdapter adapter =
new MqttPahoMessageDrivenChannelAdapter(clientId + "_consumer", getMqttClientFactory(), consumerTopics);
adapter.setCompletionTimeout(completionTimeout);
adapter.setConverter(new DefaultPahoMessageConverter());
adapter.setQos(defaultConsumerQos);
adapter.setOutputChannel(inboundChannel());
return adapter;
}
總結
雖然目前解決了這個問題,但是為什么會單獨建立兩個連接的原因還未找到;另外,一個程序兩個連接還是感覺怪怪的,不知道還有沒有更優的處理方案