MQTT:java客戶端庫Paho


簡介

Eclipse Paho Java Client (opens new window)是用 Java 編寫的 MQTT 客戶端庫(MQTT Java Client),可用於 JVM 或其他 Java 兼容平台(例如Android)。

Eclipse Paho Java Client 提供了MqttAsyncClient 和 MqttClient 異步和同步 API。

emqx官方文檔中有相關介紹:https://docs.emqx.cn/enterprise/v4.3/development/java.html#通過-maven-安裝-paho-java

paho客戶端對象初始化

先導入paho依賴:

<dependency>
  <groupId>org.eclipse.paho</groupId>
	<artifactId>org.eclipse.paho.client.mqttv3</artifactId>
	<version>1.2.2</version>
</dependency>

配置MqttProperties.java:

@ConfigurationProperties("mqtt")
@Component
@Data
public class MqttProperties {

    private String brokerUrl;

    private String clientId;

    private String username;

    private String password;
}

application.yml

mqtt:
  broker-url: tcp://192.168.40.128:1883
  client-id: emq-client
  username: admin
  password: public

EmqClient.java

@Component
public class EmqClient {

    private static final Logger logger = LoggerFactory.getLogger(EmqClient.class);

    @Autowired
    private MqttProperties mqttProperties;

    private IMqttClient mqttClient;

    @PostConstruct
    public void init(){

        MemoryPersistence memoryPersistence = new MemoryPersistence();
        try {
            mqttClient = new MqttClient(mqttProperties.getBrokerUrl(),
                    mqttProperties.getClientId(),
                    memoryPersistence);
        } catch (MqttException e) {
            e.printStackTrace();
            logger.error("mqttClient 創建失敗");
        }
    }
}

編寫客戶端一些方法

創建Qos枚舉類:

public enum QosEnum {
    Qos0(0),Qos1(1),Qos2(2);

    private final int value;

    QosEnum(int value) {
        this.value = value;
    }

    public int value(){
        return this.value;
    }
}
    @Autowired
    private MqttCallback mqttCallback;

    public void connect(String username, String password){
        MqttConnectOptions options = new MqttConnectOptions();
        options.setAutomaticReconnect(true);//自動重連
        options.setUserName(username);
        options.setPassword(password.toCharArray());
        options.setCleanSession(true);//臨時會話

        mqttClient.setCallback(mqttCallback);//設置方法回調
        try {
            mqttClient.connect(options);
        } catch (MqttException e) {
            e.printStackTrace();
            logger.error("mqttClient 連接失敗");
        }
    }

    @PreDestroy
    public void disConnect(){
        try {
            mqttClient.disconnect();
        } catch (MqttException e) {
            e.printStackTrace();
            logger.error("mqttClient 斷開連接失敗");
        }
    }

    public void reConnect(){
        try {
            mqttClient.reconnect();
        } catch (MqttException e) {
            e.printStackTrace();
            logger.error("mqttClient 重新連接失敗");
        }
    }

    /**
     * 發布消息
     * @author wen.jie
     * @date 2021/7/27 16:11
     */
    public void publish(String topic, String msg, QosEnum qosEnum, boolean retained){
        MqttMessage mqttMessage = new MqttMessage();
        mqttMessage.setPayload(msg.getBytes());
        mqttMessage.setQos(qosEnum.value());
        mqttMessage.setRetained(retained);

        try {
            mqttClient.publish(topic, mqttMessage);
        } catch (MqttException e) {
            e.printStackTrace();
            logger.error("發布消息失敗");
        }
    }

    /**
     * 添加訂閱
     * @author wen.jie
     * @date 2021/7/27 16:18
     */
    public void subscribe(String topicFilters, QosEnum qosEnum){
        try {
            mqttClient.subscribe(topicFilters, qosEnum.value());
        } catch (MqttException e) {
            e.printStackTrace();
            logger.error("添加訂閱失敗");
        }
    }

編寫MqttCallback回調類

MessageCallback.java

@Component
public class MessageCallback implements MqttCallback {

    private static final Logger logger = LoggerFactory.getLogger(MessageCallback.class);

    /**
     * 丟失了對服務端連接后的回調
     * @author wen.jie
     * @date 2021/7/27 16:27
     */
    @Override
    public void connectionLost(Throwable cause) {
        //丟失對服務端的連接后觸發該方法回調,此處可以做一些特殊處理,比如重連
        logger.info("丟失了對服務端連接");
    }

    /**
     * 訂閱到消息后的回調
     * 該方法由mqtt客戶端同步調用,在此方法未正確返回之前,不會發送ack確認消息到broker
     * 一旦該方法向外拋出了異常客戶端將異常關閉,當再次連接時;所有QoS1,QoS2且客戶端未進行ack確認的消息都將由broker服務器再次發送到客戶端
     * @author wen.jie
     * @date 2021/7/27 16:28
     */
    @Override
    public void messageArrived(String topic, MqttMessage message) throws Exception {
        logger.info("訂閱到了消息;topic={},messageid={},qos={},msg={}",
                topic,
                message.getId(),
                message.getQos(),
                new String(message.getPayload()));
    }

    /**
     * 消息發布完成且收到ack確認后的回調
     * QoS0:消息被網絡發出后觸發一次
     * QoS1:當收到broker的PUBACK消息后觸發
     * QoS2:當收到broker的PUBCOMP消息后觸發
     * @author wen.jie
     * @date 2021/7/27 16:34
     */
    @Override
    public void deliveryComplete(IMqttDeliveryToken token) {
        int messageId = token.getMessageId();
        String[] topics = token.getTopics();
        logger.info("消息發送完成,messageId={},topics={}",messageId,topics);
    }
}

主啟動類

@SpringBootApplication
public class EmqDemoApplication {

    @Autowired
    private EmqClient emqClient;

    @Autowired
    private MqttProperties mqttProperties;

    @PostConstruct
    public void init(){
        emqClient.connect(mqttProperties.getUsername(), mqttProperties.getPassword());
        emqClient.subscribe("topictest/#", QosEnum.Qos2);
        new Thread(()->{
            while (true){
                emqClient.publish("topictest/123", "hello world", QosEnum.Qos2, true);
                try {
                    TimeUnit.SECONDS.sleep(5);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }

        }).start();
    }

    public static void main(String[] args) {
        SpringApplication.run(EmqDemoApplication.class, args);
    }

}

測試

啟動主啟動類,觀察效果

image-20210727164223278

每五秒鍾收發一次消息,測試成功


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM