1.引入相關的依賴
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-integration</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-stream</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-mqtt</artifactId>
</dependency>
2. 在配置文件下配置MQTT服務器信息
spring.mqtt.username = username spring.mqtt.password = password
spring.mqtt.url = tcp://xx.xx.xx.xx:18083
spring.mqtt.client.id = clientid
spring.mqtt.default.topic = topic
spring.mqtt.default.completionTimeout = 3000
3.配置MQTT消息推送配置
/** * MQTT配置 * @Author: songyaru * @Date: 2020/9/01 10:04 * @Version 1.0 */ @Slf4j @Configuration @IntegrationComponentScan public class MqttReceiveConfig { @Value("${spring.mqtt.username}") private String username; @Value("${spring.mqtt.password}") private String password; @Value("${spring.mqtt.url}") private String hostUrl; @Value("${spring.mqtt.client.id}") private String clientId; @Value("${spring.mqtt.default.topic}") private String defaultTopic; @Value("${spring.mqtt.default.completionTimeout}") private int completionTimeout;//連接超時 //初始化連接 @Bean public MqttConnectOptions getMqttConnectOptions() { MqttConnectOptions mqttConnectOptions = new MqttConnectOptions(); mqttConnectOptions.setUserName(username); mqttConnectOptions.setPassword(password.toCharArray()); mqttConnectOptions.setServerURIs(new String[]{hostUrl}); mqttConnectOptions.setKeepAliveInterval(50); return mqttConnectOptions; } //初始化mqtt工廠 @Bean public MqttPahoClientFactory mqttClientFactory() { DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory(); factory.setConnectionOptions(getMqttConnectOptions()); return factory; } //接收通道 @Primary @Bean("mqttInputChannel") public MessageChannel mqttInputChannel() { return new DirectChannel(); } //配置client,監聽的topic @Bean public MessageProducer inbound(@Qualifier("mqttInputChannel") MessageChannel messageChannel) { MqttPahoMessageDrivenChannelAdapter adapter = new MqttPahoMessageDrivenChannelAdapter(clientId + "_inbound", mqttClientFactory(),defaultTopic); //這里的defaultTopic是發布者的主題 adapter.setCompletionTimeout(completionTimeout); adapter.setConverter(new DefaultPahoMessageConverter()); adapter.setQos(1); adapter.setOutputChannel(messageChannel); return adapter; } //訂閱消費數據,通過通道獲取數據 @Bean @ServiceActivator(inputChannel = "mqttInputChannel") public MessageHandler handler() { return new MessageHandler() { @Override public void handleMessage(Message<?> message) throws MessagingException { log.info("主題:{},消息接收到的數據:{}", message.getHeaders().get("mqtt_receivedTopic"), message.getPayload()); } }; } }
4.啟動服務,使用上一篇博文的消息接口發送消息。