MQTT客戶端編程


1.導入maven依賴

<dependency>
    <groupId>org.eclipse.paho</groupId>
    <artifactId>org.eclipse.paho.client.mqttv3</artifactId>
    <version>1.2.0</version>
</dependency>

2.建立連接

  • serverURI:EMQ X的連接網址,例如 tcp://localhost:1883
  • clientId:標識客戶端的唯一ID,必須確保該ID在同一EMQ X服務器中是唯一的,否則該服務器在處理會話時會遇到問題
  • MqttClientPersistence:當本地消息處理涉及服務器端的忙或不可用狀態時,需要持久存儲本地消息的持久實例,在該狀態中可以傳遞持久類實例eqw
package paho_demo;

import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;

public class Demo {
    public static void main(String[] args) {
        String broker = "tcp://localhost:1883";
        String clientId = "JavaSample";
         //Use the memory persistence
        MemoryPersistence persistence = new MemoryPersistence();

        try {
            MqttClient sampleClient = new MqttClient(broker, clientId, persistence);
            MqttConnectOptions connOpts = new MqttConnectOptions();
            connOpts.setCleanSession(true);
            System.out.println("Connecting to broker:" + broker);
            sampleClient.connect(connOpts);
            System.out.println("Connected");
        } catch (MqttException me) {
            System.out.println("reason" + me.getReasonCode());
            System.out.println("msg" + me.getMessage());
            System.out.println("loc" + me.getLocalizedMessage());
            System.out.println("cause" + me.getCause());
            System.out.println("excep" + me);
            me.printStackTrace();
        }
    }
}

執行此代碼后,如果可以成功連接到服務器,則會在控制台中打印以下內容。如果發生異常,請根據異常信息定位並解決問題。

Connecting to broker: tcp://localhost:1883
Connected

3.訂閱

只有在成功建立連接后才能進行訂閱。MqttClient提供了多種subscribe方法,可以使用不同的方式來訂閱主題。主題可以是明確的單個主題或通配符。

MqttCallback訂閱成功時設置一個回調實例。接收消息時調用調用實例的功能。消息訂閱部分的代碼是:

String topic = "demo/topics";
System.out.println("Subscribe to topic:" + topic);
sampleClient.subscribe(topic);

sampleClient.setCallback(new MqttCallback() {
    public void messageArrived(String topic, MqttMessage message) throws Exception {
        String theMsg = MessageFormat.format("{0} is arrived for topic {1}.", new String(message.getPayload()), topic);
        System.out.println(theMsg);
    }

    public void deliveryComplete(IMqttDeliveryToken token) {
    }

    public void connectionLost(Throwable throwable) {
    }
});

4.發布

publish方法MqttClient用於發布消息

MqttClient 還為用戶提供了一種在發布消息時指定QoS以及是否需要維護消息的方式:

String topic = "demo/topics";
String content = "Message from MqttPublishSample";
int qos = 2;
System.out.println("Publishing message:" + content);
MqttMessage message = new MqttMessage(content.getBytes());
message.setQos(qos);
sampleClient.publish(topic, message);
System.out.println("Message published");

5.例

package paho_demo;

import java.text.MessageFormat;

import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;

public class Demo {
    public static void main(String[] args) {
        String broker = "tcp://localhost:1883";
        String clientId = "JavaSample";
        //Use the memory persistence
        MemoryPersistence persistence = new MemoryPersistence();

        try {
            MqttClient sampleClient = new MqttClient(broker, clientId, persistence);
            MqttConnectOptions connOpts = new MqttConnectOptions();
            connOpts.setCleanSession(true);
            System.out.println("Connecting to broker:" + broker);
            sampleClient.connect(connOpts);
            System.out.println("Connected");

            String topic = "demo/topics";
            System.out.println("Subscribe to topic:" + topic);
            sampleClient.subscribe(topic);
            sampleClient.setCallback(new MqttCallback() {
                public void messageArrived(String topic, MqttMessage message) throws Exception {
                    String theMsg = MessageFormat.format("{0} is arrived for topic {1}.", new String(message.getPayload()), topic);
                    System.out.println(theMsg);
                }

                public void deliveryComplete(IMqttDeliveryToken token) {
                }

                public void connectionLost(Throwable throwable) {
                }
            });


            String content = "Message from MqttPublishSample";
            int qos = 2;
            System.out.println("Publishing message:" + content);
            MqttMessage message = new MqttMessage(content.getBytes());
            message.setQos(qos);
            sampleClient.publish(topic, message);
            System.out.println("Message published");

        } catch (MqttException me) {
            System.out.println("reason" + me.getReasonCode());
            System.out.println("msg" + me.getMessage());
            System.out.println("loc" + me.getLocalizedMessage());
            System.out.println("cause" + me.getCause());
            System.out.println("excep" + me);
            me.printStackTrace();
        }
    }
}

運行結果:

Connecting to broker: tcp://localhost:1883
Connected
Subscribe to topic: demo/topics
Publishing message: Message from MqttPublishSample
Message published
Message from MqttPublishSample is arrived for topic demo/topics.

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM