本文代碼有些許問題,處理方案見:解決 spring-integration-mqtt 頻繁報 Lost connection 錯誤
一、添加配置
spring:
mqtt:
client:
username: 用戶名
password: 密碼
serverURIs: tcp://ip:port # 客戶端地址,多個使用逗號隔開
clientId: client0001 # ${random.value}
keepAliveInterval: 30
connectionTimeout: 30
producer:
defaultQos: 1
defaultRetained: true
defaultTopic: defaultTopicName
consumer:
defaultQos: 1
completionTimeout: 30000
consumerTopics: topic1,topic2 # 監聽的 topic,多個使用逗號隔開
二、客戶端配置
/* 客戶端 */
@Bean
public MqttConnectOptions getMqttConnectOptions() {
MqttConnectOptions mqttConnectOptions = new MqttConnectOptions();
mqttConnectOptions.setUserName(username);
mqttConnectOptions.setPassword(password.toCharArray());
mqttConnectOptions.setServerURIs(serverURIs);
mqttConnectOptions.setKeepAliveInterval(keepAliveInterval);
mqttConnectOptions.setConnectionTimeout(connectionTimeout);
return mqttConnectOptions;
}
@Bean
public MqttPahoClientFactory getMqttClientFactory() {
DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
factory.setConnectionOptions(getMqttConnectOptions());
return factory;
}
三、發布消息
3.1 配置
@Bean
public MessageChannel outboundChannel() {
return new DirectChannel();
}
@Bean
@ServiceActivator(inputChannel = OUTBOUND_CHANNEL)
public MessageHandler getMqttProducer() {
MqttPahoMessageHandler messageHandler = new MqttPahoMessageHandler(clientId, getMqttClientFactory());
messageHandler.setAsync(true);
messageHandler.setDefaultTopic(defaultTopic);
messageHandler.setDefaultRetained(defaultRetained);
messageHandler.setDefaultQos(defaultProducerQos);
return messageHandler;
}
3.2 消息推送接口類
@MessagingGateway(defaultRequestChannel = MqttConfig.OUTBOUND_CHANNEL)
public interface MqttSender {
void sendToMqtt(String data);
void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic, String payload);
void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic, @Header(MqttHeaders.QOS) int qos, String payload);
}
3.3 測試
@RestController
public class TestController {
@Autowired
private MqttSender mqttSender;
@RequestMapping("/send")
private void send(String data){
mqttSender.sendToMqtt(data);
}
}
四、訂閱消息
4.1 配置
@Bean
public MessageChannel inboundChannel() {
return new DirectChannel();
}
@Bean
public MessageProducer getMqttConsumer() {
MqttPahoMessageDrivenChannelAdapter adapter =
new MqttPahoMessageDrivenChannelAdapter(clientId, getMqttClientFactory(), consumerTopics);
adapter.setCompletionTimeout(completionTimeout);
adapter.setConverter(new DefaultPahoMessageConverter());
adapter.setQos(defaultConsumerQos);
adapter.setOutputChannel(inboundChannel());
return adapter;
}
4.2 測試
@Component
public class MqttConsumer {
private static final Logger LOGGER = LoggerFactory.getLogger(MqttConsumer.class);
@Bean
@ServiceActivator(inputChannel = MqttConfig.INBOUND_CHANNEL)
public MessageHandler handler() {
return message -> {
String topic = message.getHeaders().get(MqttConfig.RECEIVED_TOPIC_KEY).toString();
LOGGER.info("[{}]主題接收到消息:{}", topic, message.getPayload().toString());
};
}
}
注意事項
-
@ServiceActivator 和 @MessagingGateway 中綁定的 Channel 名,需與返回 MessageChannel 的 Bean 的方法名一樣:
如發布者綁定的 Channel 名為 outboundChannel,則需要有對應的方法,如下:
@Bean public MessageChannel outboundChannel() { return new DirectChannel(); }
- 發布者與訂閱者的 Channel 名不能相同
- 連接服務器的超時時間和訂閱的超時時間單位不一樣
參考
完整代碼:GitHub