SpringBoot 集成MQTT配置


1. 前言

公司的IOT平台主要采用MQTT(消息隊列遙測傳輸)對底層的驅動做命令下發和數據采集。也用到了redis、zeroMQ、nats等消息中間件。今天先整理SpringBoot集成MQTT筆記和工作中遇到的問題。

2. MQTT介紹

MQTT is a machine-to-machine (M2M)/"Internet of Things" connectivity protocol. It was designed as an extremely lightweight publish/subscribe messaging transport. It is useful for connections with remote locations where a small code footprint is required and/or network bandwidth is at a premium.

官網地址:http://mqtt.org/https://www.mqtt.com/

MQTT除了具備大部分消息中間件擁有的功能外,其最大的特點就是小型傳輸。以減少開銷,減低網絡流量的方式去滿足低帶寬、不穩定的網絡遠程傳輸。

MQTT服務器有很多,比如Apache-Apollo和EMQX,ITDragon龍 目前使用的時EMQX作為MQTT的服務器。使用也很簡單,下載解壓后,進入bin目錄執行emqx console 啟動服務。

MQTT調試工具可以用MQTTBox

3. SpringBoot 集成MQTT

3.1 導入mqtt庫

第一步:導入面向企業應用集成庫和對應mqtt集成庫

compile('org.springframework.boot:spring-boot-starter-integration')
compile('org.springframework.integration:spring-integration-mqtt')

這里要注意spring-integration-mqtt的版本。因為會存在org.eclipse.paho.client.mqttv3修復了一些bug,並迭代了新版本。但spring-integration-mqtt並沒有及時更新的情況。修改方法如下

compile("org.springframework.integration:spring-integration-mqtt") {
    exclude group: "org.eclipse.paho" , module: "org.eclipse.paho.client.mqttv3"
}
compile("org.eclipse.paho:org.eclipse.paho.client.mqttv3:1.2.2")

第二步:MQTT連接配置文件

# MQTT Config
mqtt.server=tcp://x.x.x.x:1883
mqtt.username=xxx
mqtt.password=xxx
mqtt.client-id=clientID
mqtt.cache-number=100
mqtt.message.topic=itDragon/tags/cov

3.2 配置MQTT訂閱者

Inbound 入站消息適配器

第一步:配置MQTT客戶端工廠類DefaultMqttPahoClientFactory

第二步:配置MQTT入站消息適配器MqttPahoMessageDrivenChannelAdapter

第三步:定義MQTT入站消息通道MessageChannel

第四步:聲明MQTT入站消息處理器MessageHandler

以下有些配置是沖突或者重復的,主要是體現一些重要配置。

package com.itdragon.server.config

import com.itdragon.server.message.ITDragonMQTTMessageHandler
import org.eclipse.paho.client.mqttv3.MqttConnectOptions
import org.springframework.beans.factory.annotation.Value
import org.springframework.context.annotation.Bean
import org.springframework.context.annotation.Configuration
import org.springframework.integration.annotation.ServiceActivator
import org.springframework.integration.channel.DirectChannel
import org.springframework.integration.core.MessageProducer
import org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory
import org.springframework.integration.mqtt.core.MqttPahoClientFactory
import org.springframework.integration.mqtt.inbound.MqttPahoMessageDrivenChannelAdapter
import org.springframework.integration.mqtt.support.DefaultPahoMessageConverter
import org.springframework.messaging.MessageChannel
import org.springframework.messaging.MessageHandler
import java.time.Instant

@Configuration
class MQTTConfig {

    @Value("\${mqtt.server}")
    lateinit var mqttServer: String
    @Value("\${mqtt.user-name}")
    lateinit var mqttUserName: String
    @Value("\${mqtt.password}")
    lateinit var mqttUserPassword: String
    @Value("\${mqtt.client-id}")
    lateinit var clientID: String
    @Value("\${mqtt.cache-number}")
    lateinit var maxMessageInFlight: String
    @Value("\${mqtt.message.topic}")
    lateinit var messageTopic: String

    /**
     * 配置DefaultMqttPahoClientFactory
     * 1. 配置基本的鏈接信息
     * 2. 配置maxInflight,在mqtt消息量比較大的情況下將值設大
     */
    fun mqttClientFactory(): MqttPahoClientFactory {
        val mqttConnectOptions = MqttConnectOptions()
        // 配置mqtt服務端地址,登錄賬號和密碼
        mqttConnectOptions.serverURIs = arrayOf(mqttServer)
        mqttConnectOptions.userName = mqttUserName
        mqttConnectOptions.password = mqttUserPassword.toCharArray()
        // 配置最大不確定接收消息數量,默認值10,qos!=0 時生效
        mqttConnectOptions.maxInflight = maxMessageInFlight.toInt()
        val factory = DefaultMqttPahoClientFactory()
        factory.connectionOptions = mqttConnectOptions
        return factory
    }

    /**
     * 配置Inbound入站,消費者基本連接配置
     * 1. 通過DefaultMqttPahoClientFactory 初始化入站通道適配器
     * 2. 配置超時時長,默認30000毫秒
     * 3. 配置Paho消息轉換器
     * 4. 配置發送數據的服務質量 0~2
     * 5. 配置訂閱通道
     */
    @Bean
    fun itDragonMqttInbound(): MessageProducer {
        // 初始化入站通道適配器,使用的是Eclipse Paho MQTT客戶端庫
        val adapter = MqttPahoMessageDrivenChannelAdapter(clientID + Instant.now().toEpochMilli(), mqttClientFactory(), messageTopic)
        // 設置連接超時時長(默認30000毫秒)
        adapter.setCompletionTimeout(30000)
        // 配置默認Paho消息轉換器(qos=0, retain=false, charset=UTF-8)
        adapter.setConverter(DefaultPahoMessageConverter())
        // 設置服務質量
        // 0 最多一次,數據可能丟失;
        // 1 至少一次,數據可能重復;
        // 2 只有一次,有且只有一次;最耗性能
        adapter.setQos(0)
        // 設置訂閱通道
        adapter.outputChannel = itDragonMqttInputChannel()
        return adapter
    }

    /**
     * 配置Inbound入站,消費者訂閱的消息通道
     */
    @Bean
    fun itDragonMqttInputChannel(): MessageChannel {
        return DirectChannel()
    }

    /**
     * 配置Inbound入站,消費者的消息處理器
     * 1. 使用@ServiceActivator注解,表明所修飾的方法用於消息處理
     * 2. 使用inputChannel值,表明從指定通道中取值
     * 3. 利用函數式編程的思路,解耦MessageHandler的業務邏輯
     */
    @Bean
    @ServiceActivator(inputChannel = "itDragonMqttInputChannel")
    fun commandDataHandler(): MessageHandler {
        /*return MessageHandler { message ->
            println(message.payload)
        }*/
        return ITDragonMQTTMessageHandler()
    }

}

注意:

  • 1)MQTT的客戶端ID要唯一。
  • 2)MQTT在消息量大的情況下會出現消息丟失的情況。
  • 3)MessageHandler注意解耦問題。

3.3 配置MQTT發布者

Outbound 出站消息適配器

第一步:配置Outbound出站,出站通道適配器

第二步:配置Outbound出站,發布者發送的消息通道

第三步:對外提供推送消息的接口

在原有的MQTTConfig配置類的集成上補充以下內容

	/**
     * 配置Outbound出站,出站通道適配器
     * 1. 通過MqttPahoMessageHandler 初始化出站通道適配器
     * 2. 配置異步發送
     * 3. 配置默認的服務質量
     */
    @Bean
    @ServiceActivator(inputChannel = "itDragonMqttOutputChannel")
    fun itDragonMqttOutbound(): MqttPahoMessageHandler {
        // 初始化出站通道適配器,使用的是Eclipse Paho MQTT客戶端庫
        val messageHandler = MqttPahoMessageHandler(clientID + Instant.now().toEpochMilli() + "_set", mqttClientFactory())
        // 設置異步發送,默認是false(發送時阻塞)
        messageHandler.setAsync(true)
        // 設置默認的服務質量
        messageHandler.setDefaultQos(0)
        return messageHandler
    }

    /**
     * 配置Outbound出站,發布者發送的消息通道
     */
    @Bean
    fun itDragonMqttOutputChannel(): MessageChannel {
        return DirectChannel()
    }

    /**
     * 對外提供推送消息的接口
     * 1. 使用@MessagingGateway注解,配置MQTTMessageGateway消息推送接口
     * 2. 使用defaultRequestChannel值,調用時將向其發送消息的默認通道
     * 3. 配置靈活的topic主題
     */
    @MessagingGateway(defaultRequestChannel = "itDragonMqttOutputChannel")
    interface MQTTMessageGateway {
        fun sendToMqtt(data: String, @Header(MqttHeaders.TOPIC) topic: String)
        fun sendToMqtt(data: String, @Header(MqttHeaders.QOS) qos: Int, @Header(MqttHeaders.TOPIC) topic: String)
    }

注意:

  • 1)發布者和訂閱者的客戶端ID不能相同。
  • 2)消息的推送建議采用異步的方式。
  • 3)消息的推送方法可以只傳payload消息體,但需要配置setDefaultTopic。

3.4 MQTT消息處理和發送

3.4.1 消息處理

為了讓消息處理函數和MQTT配置解耦,這里提供MessageHandler 注冊類,將消息處理的業務邏輯以函數式編程的思維注冊到Handler中。

package com.itdragon.server.message

import org.springframework.messaging.Message
import org.springframework.messaging.MessageHandler

class ITDragonMQTTMessageHandler : MessageHandler {

    private var handler: ((String) -> Unit)? = null

    fun registerHandler(handler: (String) -> Unit) {
        this.handler = handler
    }

    override fun handleMessage(message: Message<*>) {
        handler?.run { this.invoke(message.payload.toString()) }
    }
}

注冊MessageHandler

package com.itdragon.server.message

import org.slf4j.LoggerFactory
import org.springframework.beans.factory.annotation.Autowired
import org.springframework.stereotype.Service
import javax.annotation.PostConstruct

@Service
class ITDragonMessageDispatcher {

    private val logger = LoggerFactory.getLogger(ITDragonMessageDispatcher::class.java)

    @Autowired
    lateinit var itDragonMQTTMessageHandler: ITDragonMQTTMessageHandler

    @PostConstruct
    fun init() {
        itDragonMQTTMessageHandler.registerHandler { itDragonMsgHandler(it) }
    }

    fun itDragonMsgHandler(message: String) {
        logger.info("itdragon mqtt receive message: $message")
        try {
            // todo
        }catch (ex: Exception) {
            ex.printStackTrace()
        }
    }

}

3.4.1 消息發送

注入MQTT的MessageGateway,然后推送消息。

@Autowired
lateinit var mqttGateway: MQTTConfig.MQTTMessageGateway

@Scheduled(fixedDelay = 10*1000)
fun sendMessage() {
    mqttGateway.sendToMqtt("Hello ITDragon ${Instant.now()}", "itDragon/tags/cov/set")
}

4. 開發常見問題

4.1 MQTT每次重連失敗都會增長線程數

項目上線一段時間后,客戶的服務器嚴重卡頓。原因是客戶服務斷網后,MQTT在每次嘗試重連的過程中一直在創建新的線程,導致一個Java服務創建了上萬個線程。解決方案是更新了org.eclipse.paho.client.mqttv3的版本,也是 "3.1 導入mqtt庫" 中提到的。后續就沒有出現這個問題了。

4.2 MQTT消息量大存在消息丟失的情況

MQTT的消息量大的情況下,既要保障數據的完整,又要保障性能的穩定。光從MQTT本身上來說,很難做到魚和熊掌不可兼得。ITDragon龍 先要理清需求:

  • 1)數據的完整性,主要用於能耗的統計、報警的分析
  • 2)性能的穩定性,服務器不掛🤣🤣🤣🤣

在消息量大的情況下,ITDragon龍 可以將服務質量設置成0(最多一次)以減少消息確認的開銷,用來保證系統的穩定性。

將消息的服務質量設置成0后,會讓消息的丟失可能性變得更大,如何保證數據的完整性?其實ITDragon龍 可以在往MQTT通道推送消息之前,先將底層驅動采集的數據先異步保存到Inflxudb數據庫中。

還有就是每次發送消息量不能太大,太大也會導致消息丟失。最直接的就是后端報錯,比如:java.io.EOFExceptiontoo large message: xxx bytes 。但是有的場景后端沒有報錯,前端訂閱的mqtt也沒收到消息。最麻煩的是mqttbox工具因為數據量太大直接卡死。一時間真不知道把鍋甩給誰。其實我們 可以將消息拆包一批批發送。可以緩解這個問題🤣🤣🤣🤣。

其實采集的數據消息,若在這一批推送過程中丟失。也會在下一批推送過程中補上。命令下發也是一樣,如果下發失敗,再重寫下發一次。畢竟消息的丟失並不是必現的情況。也是小概率事件,系統的穩定性才是最重要的。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM