Java實現MQTT通信


視頻說明:https://www.bilibili.com/video/BV1qf4y1n7js/?p=3

關於MQTT

做一個無人船項目,使用MQTT通信。

MQTT是一個基於客戶端-服務器的消息發布/訂閱傳輸協議。MQTT協議是輕量簡單開放易於實現的,這些特點使它適用范圍非常廣泛。在很多情況下,包括受限的環境中,如:機器與機器(M2M)通信和物聯網(IoT)。

發布訂閱

文檔總結

🟣 MQTT 發布訂閱

🟣 MQTT 協議

🟣 Java實現MQTT通信

🟣 基於 WebSocket 的 MQTT

服務端

服務端使用 mosquitto(版本2.0.14)

下載頁面:https://mosquitto.org/download/

客戶端

MQTTX

下載頁面:https://mqttx.app/zh#download

MQTT.fx

下載鏈接:http://www.jensd.de/apps/mqttfx/1.7.1/mqttfx-1.7.1-windows-x64.exe

paho

https://github.com/eclipse/paho.mqtt.java

paho是eclipse提供MQTT客戶端開源庫,Java代碼集成這個客戶端用來收發消息。

springboot 集成 MQTT

代碼:https://gitee.com/ioufev/mqtt-springboot-demo

依賴

pom.xml

<!-- MQTT -->
<dependency>
	<groupId>org.springframework.integration</groupId>
	<artifactId>spring-integration-mqtt</artifactId>
</dependency>

類MqttConfig

spring中集成框架,有消息入站通道(用來接收消息)和出站通道(用來發送消息)

import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.integration.core.MessageProducer;
import org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory;
import org.springframework.integration.mqtt.core.MqttPahoClientFactory;
import org.springframework.integration.mqtt.inbound.MqttPahoMessageDrivenChannelAdapter;
import org.springframework.integration.mqtt.outbound.MqttPahoMessageHandler;
import org.springframework.integration.mqtt.support.DefaultPahoMessageConverter;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHandler;

@Configuration
public class MqttConfig {

    // 消費消息

    /**
     * 創建MqttPahoClientFactory,設置MQTT Broker連接屬性,如果使用SSL驗證,也在這里設置。
     * @return factory
     */
    @Bean
    public MqttPahoClientFactory mqttClientFactory() {
        DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
        MqttConnectOptions options = new MqttConnectOptions();

        // 設置代理端的URL地址,可以是多個
        options.setServerURIs(new String[]{"tcp://127.0.0.1:1883"});

        factory.setConnectionOptions(options);
        return factory;
    }

    /**
     * 入站通道
     */
    @Bean
    public MessageChannel mqttInputChannel() {
        return new DirectChannel();
    }

    /**
     * 入站
     */
    @Bean
    public MessageProducer inbound() {
        // Paho客戶端消息驅動通道適配器,主要用來訂閱主題
        MqttPahoMessageDrivenChannelAdapter adapter = new MqttPahoMessageDrivenChannelAdapter("consumerClient-paho",
                mqttClientFactory(), "boat", "collector", "battery", "+/sensor");
        adapter.setCompletionTimeout(5000);

        // Paho消息轉換器
        DefaultPahoMessageConverter defaultPahoMessageConverter = new DefaultPahoMessageConverter();
        // 按字節接收消息
//        defaultPahoMessageConverter.setPayloadAsBytes(true);
        adapter.setConverter(defaultPahoMessageConverter);
        adapter.setQos(1); // 設置QoS
        adapter.setOutputChannel(mqttInputChannel());
        return adapter;
    }

    @Bean
    // ServiceActivator注解表明:當前方法用於處理MQTT消息,inputChannel參數指定了用於消費消息的channel。
    @ServiceActivator(inputChannel = "mqttInputChannel")
    public MessageHandler handler() {
        return message -> {
            String payload = message.getPayload().toString();

            // byte[] bytes = (byte[]) message.getPayload(); // 收到的消息是字節格式
            String topic = message.getHeaders().get("mqtt_receivedTopic").toString();

            // 根據主題分別進行消息處理。
            if (topic.matches(".+/sensor")) { // 匹配:1/sensor
                String sensorSn = topic.split("/")[0];
                System.out.println("傳感器" + sensorSn + ": 的消息: " + payload);
            } else if (topic.equals("collector")) {
                System.out.println("采集器的消息:" + payload);
            } else {
                System.out.println("丟棄消息:主題[" + topic  + "],負載:" + payload);
            }

        };
    }

    // 發送消息

    /**
     * 出站通道
     */
    @Bean
    public MessageChannel mqttOutboundChannel() {
        return new DirectChannel();
    }

    /**
     * 出站
     */
    @Bean
    @ServiceActivator(inputChannel = "mqttOutboundChannel")
    public MessageHandler outbound() {

        // 發送消息和消費消息Channel可以使用相同MqttPahoClientFactory
        MqttPahoMessageHandler messageHandler = new MqttPahoMessageHandler("publishClient", mqttClientFactory());
        messageHandler.setAsync(true); // 如果設置成true,即異步,發送消息時將不會阻塞。
        messageHandler.setDefaultTopic("command");
        messageHandler.setDefaultQos(1); // 設置默認QoS

        // Paho消息轉換器
        DefaultPahoMessageConverter defaultPahoMessageConverter = new DefaultPahoMessageConverter();

        // defaultPahoMessageConverter.setPayloadAsBytes(true); // 發送默認按字節類型發送消息
        messageHandler.setConverter(defaultPahoMessageConverter);
        return messageHandler;
    }

}

接口MqttGateway

import org.springframework.integration.annotation.MessagingGateway;
import org.springframework.integration.mqtt.support.MqttHeaders;
import org.springframework.messaging.handler.annotation.Header;

@MessagingGateway(defaultRequestChannel = "mqttOutboundChannel")
public interface MqttGateway {
    // 定義重載方法,用於消息發送
    void sendToMqtt(String payload);
    // 指定topic進行消息發送
    void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic, String payload);
    void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic, @Header(MqttHeaders.QOS) int qos, String payload);
    void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic, @Header(MqttHeaders.QOS) int qos, byte[] payload);
}

測試

測試方式:使用接口工具,給接口發送消息,從而調用MQTT客戶端發布消息

類MqttController

import com.ioufev.mqtt.domain.MyMessage;
import com.ioufev.mqtt.mqtt.MqttGateway;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RestController;

import javax.annotation.Resource;


@RestController
public class MqttController {

    @Resource
    private MqttGateway mqttGateway;

    @PostMapping("/send")
    public String send(@RequestBody MyMessage myMessage) {
        // 發送消息到指定主題
        mqttGateway.sendToMqtt(myMessage.getTopic(), 1, myMessage.getContent());
        return "send topic: " + myMessage.getTopic()+ ", message : " + myMessage.getContent();
    }

}

類MyMessage

public class MyMessage {
    private String topic;
    private String content;

    public String getTopic() {
        return topic;
    }

    public void setTopic(String topic) {
        this.topic = topic;
    }

    public String getContent() {
        return content;
    }

    public void setContent(String content) {
        this.content = content;
    }
}


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM