1. 前言
公司的IOT平台主要采用MQTT(消息隊列遙測傳輸)對底層的驅動做命令下發和數據采集。也用到了redis、zeroMQ、nats等消息中間件。今天先整理SpringBoot集成MQTT筆記和工作中遇到的問題。
2. MQTT介紹
MQTT is a machine-to-machine (M2M)/"Internet of Things" connectivity protocol. It was designed as an extremely lightweight publish/subscribe messaging transport. It is useful for connections with remote locations where a small code footprint is required and/or network bandwidth is at a premium.
官網地址:http://mqtt.org/ 、 https://www.mqtt.com/
MQTT除了具備大部分消息中間件擁有的功能外,其最大的特點就是小型傳輸。以減少開銷,減低網絡流量的方式去滿足低帶寬、不穩定的網絡遠程傳輸。
MQTT服務器有很多,比如Apache-Apollo和EMQX,ITDragon龍 目前使用的時EMQX作為MQTT的服務器。使用也很簡單,下載解壓后,進入bin目錄執行emqx console 啟動服務。
MQTT調試工具可以用MQTTBox
3. SpringBoot 集成MQTT
3.1 導入mqtt庫
第一步:導入面向企業應用集成庫和對應mqtt集成庫
compile('org.springframework.boot:spring-boot-starter-integration')
compile('org.springframework.integration:spring-integration-mqtt')
這里要注意spring-integration-mqtt的版本。因為會存在org.eclipse.paho.client.mqttv3修復了一些bug,並迭代了新版本。但spring-integration-mqtt並沒有及時更新的情況。修改方法如下
compile("org.springframework.integration:spring-integration-mqtt") {
exclude group: "org.eclipse.paho" , module: "org.eclipse.paho.client.mqttv3"
}
compile("org.eclipse.paho:org.eclipse.paho.client.mqttv3:1.2.2")
第二步:MQTT連接配置文件
# MQTT Config
mqtt.server=tcp://x.x.x.x:1883
mqtt.username=xxx
mqtt.password=xxx
mqtt.client-id=clientID
mqtt.cache-number=100
mqtt.message.topic=itDragon/tags/cov
3.2 配置MQTT訂閱者
第一步:配置MQTT客戶端工廠類DefaultMqttPahoClientFactory
第二步:配置MQTT入站消息適配器MqttPahoMessageDrivenChannelAdapter
第三步:定義MQTT入站消息通道MessageChannel
第四步:聲明MQTT入站消息處理器MessageHandler
以下有些配置是沖突或者重復的,主要是體現一些重要配置。
package com.itdragon.server.config
import com.itdragon.server.message.ITDragonMQTTMessageHandler
import org.eclipse.paho.client.mqttv3.MqttConnectOptions
import org.springframework.beans.factory.annotation.Value
import org.springframework.context.annotation.Bean
import org.springframework.context.annotation.Configuration
import org.springframework.integration.annotation.ServiceActivator
import org.springframework.integration.channel.DirectChannel
import org.springframework.integration.core.MessageProducer
import org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory
import org.springframework.integration.mqtt.core.MqttPahoClientFactory
import org.springframework.integration.mqtt.inbound.MqttPahoMessageDrivenChannelAdapter
import org.springframework.integration.mqtt.support.DefaultPahoMessageConverter
import org.springframework.messaging.MessageChannel
import org.springframework.messaging.MessageHandler
import java.time.Instant
@Configuration
class MQTTConfig {
@Value("\${mqtt.server}")
lateinit var mqttServer: String
@Value("\${mqtt.user-name}")
lateinit var mqttUserName: String
@Value("\${mqtt.password}")
lateinit var mqttUserPassword: String
@Value("\${mqtt.client-id}")
lateinit var clientID: String
@Value("\${mqtt.cache-number}")
lateinit var maxMessageInFlight: String
@Value("\${mqtt.message.topic}")
lateinit var messageTopic: String
/**
* 配置DefaultMqttPahoClientFactory
* 1. 配置基本的鏈接信息
* 2. 配置maxInflight,在mqtt消息量比較大的情況下將值設大
*/
fun mqttClientFactory(): MqttPahoClientFactory {
val mqttConnectOptions = MqttConnectOptions()
// 配置mqtt服務端地址,登錄賬號和密碼
mqttConnectOptions.serverURIs = arrayOf(mqttServer)
mqttConnectOptions.userName = mqttUserName
mqttConnectOptions.password = mqttUserPassword.toCharArray()
// 配置最大不確定接收消息數量,默認值10,qos!=0 時生效
mqttConnectOptions.maxInflight = maxMessageInFlight.toInt()
val factory = DefaultMqttPahoClientFactory()
factory.connectionOptions = mqttConnectOptions
return factory
}
/**
* 配置Inbound入站,消費者基本連接配置
* 1. 通過DefaultMqttPahoClientFactory 初始化入站通道適配器
* 2. 配置超時時長,默認30000毫秒
* 3. 配置Paho消息轉換器
* 4. 配置發送數據的服務質量 0~2
* 5. 配置訂閱通道
*/
@Bean
fun itDragonMqttInbound(): MessageProducer {
// 初始化入站通道適配器,使用的是Eclipse Paho MQTT客戶端庫
val adapter = MqttPahoMessageDrivenChannelAdapter(clientID + Instant.now().toEpochMilli(), mqttClientFactory(), messageTopic)
// 設置連接超時時長(默認30000毫秒)
adapter.setCompletionTimeout(30000)
// 配置默認Paho消息轉換器(qos=0, retain=false, charset=UTF-8)
adapter.setConverter(DefaultPahoMessageConverter())
// 設置服務質量
// 0 最多一次,數據可能丟失;
// 1 至少一次,數據可能重復;
// 2 只有一次,有且只有一次;最耗性能
adapter.setQos(0)
// 設置訂閱通道
adapter.outputChannel = itDragonMqttInputChannel()
return adapter
}
/**
* 配置Inbound入站,消費者訂閱的消息通道
*/
@Bean
fun itDragonMqttInputChannel(): MessageChannel {
return DirectChannel()
}
/**
* 配置Inbound入站,消費者的消息處理器
* 1. 使用@ServiceActivator注解,表明所修飾的方法用於消息處理
* 2. 使用inputChannel值,表明從指定通道中取值
* 3. 利用函數式編程的思路,解耦MessageHandler的業務邏輯
*/
@Bean
@ServiceActivator(inputChannel = "itDragonMqttInputChannel")
fun commandDataHandler(): MessageHandler {
/*return MessageHandler { message ->
println(message.payload)
}*/
return ITDragonMQTTMessageHandler()
}
}
注意:
- 1)MQTT的客戶端ID要唯一。
- 2)MQTT在消息量大的情況下會出現消息丟失的情況。
- 3)MessageHandler注意解耦問題。
3.3 配置MQTT發布者
第一步:配置Outbound出站,出站通道適配器
第二步:配置Outbound出站,發布者發送的消息通道
第三步:對外提供推送消息的接口
在原有的MQTTConfig配置類的集成上補充以下內容
/**
* 配置Outbound出站,出站通道適配器
* 1. 通過MqttPahoMessageHandler 初始化出站通道適配器
* 2. 配置異步發送
* 3. 配置默認的服務質量
*/
@Bean
@ServiceActivator(inputChannel = "itDragonMqttOutputChannel")
fun itDragonMqttOutbound(): MqttPahoMessageHandler {
// 初始化出站通道適配器,使用的是Eclipse Paho MQTT客戶端庫
val messageHandler = MqttPahoMessageHandler(clientID + Instant.now().toEpochMilli() + "_set", mqttClientFactory())
// 設置異步發送,默認是false(發送時阻塞)
messageHandler.setAsync(true)
// 設置默認的服務質量
messageHandler.setDefaultQos(0)
return messageHandler
}
/**
* 配置Outbound出站,發布者發送的消息通道
*/
@Bean
fun itDragonMqttOutputChannel(): MessageChannel {
return DirectChannel()
}
/**
* 對外提供推送消息的接口
* 1. 使用@MessagingGateway注解,配置MQTTMessageGateway消息推送接口
* 2. 使用defaultRequestChannel值,調用時將向其發送消息的默認通道
* 3. 配置靈活的topic主題
*/
@MessagingGateway(defaultRequestChannel = "itDragonMqttOutputChannel")
interface MQTTMessageGateway {
fun sendToMqtt(data: String, @Header(MqttHeaders.TOPIC) topic: String)
fun sendToMqtt(data: String, @Header(MqttHeaders.QOS) qos: Int, @Header(MqttHeaders.TOPIC) topic: String)
}
注意:
- 1)發布者和訂閱者的客戶端ID不能相同。
- 2)消息的推送建議采用異步的方式。
- 3)消息的推送方法可以只傳payload消息體,但需要配置setDefaultTopic。
3.4 MQTT消息處理和發送
3.4.1 消息處理
為了讓消息處理函數和MQTT配置解耦,這里提供MessageHandler 注冊類,將消息處理的業務邏輯以函數式編程的思維注冊到Handler中。
package com.itdragon.server.message
import org.springframework.messaging.Message
import org.springframework.messaging.MessageHandler
class ITDragonMQTTMessageHandler : MessageHandler {
private var handler: ((String) -> Unit)? = null
fun registerHandler(handler: (String) -> Unit) {
this.handler = handler
}
override fun handleMessage(message: Message<*>) {
handler?.run { this.invoke(message.payload.toString()) }
}
}
注冊MessageHandler
package com.itdragon.server.message
import org.slf4j.LoggerFactory
import org.springframework.beans.factory.annotation.Autowired
import org.springframework.stereotype.Service
import javax.annotation.PostConstruct
@Service
class ITDragonMessageDispatcher {
private val logger = LoggerFactory.getLogger(ITDragonMessageDispatcher::class.java)
@Autowired
lateinit var itDragonMQTTMessageHandler: ITDragonMQTTMessageHandler
@PostConstruct
fun init() {
itDragonMQTTMessageHandler.registerHandler { itDragonMsgHandler(it) }
}
fun itDragonMsgHandler(message: String) {
logger.info("itdragon mqtt receive message: $message")
try {
// todo
}catch (ex: Exception) {
ex.printStackTrace()
}
}
}
3.4.1 消息發送
注入MQTT的MessageGateway,然后推送消息。
@Autowired
lateinit var mqttGateway: MQTTConfig.MQTTMessageGateway
@Scheduled(fixedDelay = 10*1000)
fun sendMessage() {
mqttGateway.sendToMqtt("Hello ITDragon ${Instant.now()}", "itDragon/tags/cov/set")
}
4. 開發常見問題
4.1 MQTT每次重連失敗都會增長線程數
項目上線一段時間后,客戶的服務器嚴重卡頓。原因是客戶服務斷網后,MQTT在每次嘗試重連的過程中一直在創建新的線程,導致一個Java服務創建了上萬個線程。解決方案是更新了org.eclipse.paho.client.mqttv3的版本,也是 "3.1 導入mqtt庫" 中提到的。后續就沒有出現這個問題了。
4.2 MQTT消息量大存在消息丟失的情況
MQTT的消息量大的情況下,既要保障數據的完整,又要保障性能的穩定。光從MQTT本身上來說,很難做到魚和熊掌不可兼得。ITDragon龍 先要理清需求:
- 1)數據的完整性,主要用於能耗的統計、報警的分析
- 2)性能的穩定性,服務器不掛🤣🤣🤣🤣
在消息量大的情況下,ITDragon龍 可以將服務質量設置成0(最多一次)以減少消息確認的開銷,用來保證系統的穩定性。
將消息的服務質量設置成0后,會讓消息的丟失可能性變得更大,如何保證數據的完整性?其實ITDragon龍 可以在往MQTT通道推送消息之前,先將底層驅動采集的數據先異步保存到Inflxudb數據庫中。
還有就是每次發送消息量不能太大,太大也會導致消息丟失。最直接的就是后端報錯,比如:java.io.EOFException
和 too large message: xxx bytes
。但是有的場景后端沒有報錯,前端訂閱的mqtt也沒收到消息。最麻煩的是mqttbox工具因為數據量太大直接卡死。一時間真不知道把鍋甩給誰。其實我們 可以將消息拆包一批批發送。可以緩解這個問題🤣🤣🤣🤣。
其實采集的數據消息,若在這一批推送過程中丟失。也會在下一批推送過程中補上。命令下發也是一樣,如果下發失敗,再重寫下發一次。畢竟消息的丟失並不是必現的情況。也是小概率事件,系統的穩定性才是最重要的。