1.導入maven依賴
<dependency> <groupId>org.eclipse.paho</groupId> <artifactId>org.eclipse.paho.client.mqttv3</artifactId> <version>1.2.0</version> </dependency>
2.建立連接
- serverURI:EMQ X的連接網址,例如
tcp://localhost:1883
- clientId:標識客戶端的唯一ID,必須確保該ID在同一EMQ X服務器中是唯一的,否則該服務器在處理會話時會遇到問題
- MqttClientPersistence:當本地消息處理涉及服務器端的忙或不可用狀態時,需要持久存儲本地消息的持久實例,在該狀態中可以傳遞持久類實例eqw
package paho_demo; import org.eclipse.paho.client.mqttv3.MqttClient; import org.eclipse.paho.client.mqttv3.MqttConnectOptions; import org.eclipse.paho.client.mqttv3.MqttException; import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence; public class Demo { public static void main(String[] args) { String broker = "tcp://localhost:1883"; String clientId = "JavaSample"; //Use the memory persistence MemoryPersistence persistence = new MemoryPersistence(); try { MqttClient sampleClient = new MqttClient(broker, clientId, persistence); MqttConnectOptions connOpts = new MqttConnectOptions(); connOpts.setCleanSession(true); System.out.println("Connecting to broker:" + broker); sampleClient.connect(connOpts); System.out.println("Connected"); } catch (MqttException me) { System.out.println("reason" + me.getReasonCode()); System.out.println("msg" + me.getMessage()); System.out.println("loc" + me.getLocalizedMessage()); System.out.println("cause" + me.getCause()); System.out.println("excep" + me); me.printStackTrace(); } } }
執行此代碼后,如果可以成功連接到服務器,則會在控制台中打印以下內容。如果發生異常,請根據異常信息定位並解決問題。
Connecting to broker: tcp://localhost:1883 Connected
3.訂閱
只有在成功建立連接后才能進行訂閱。MqttClient
提供了多種subscribe
方法,可以使用不同的方式來訂閱主題。主題可以是明確的單個主題或通配符。
MqttCallback
訂閱成功時設置一個回調實例。接收消息時調用調用實例的功能。消息訂閱部分的代碼是:
String topic = "demo/topics"; System.out.println("Subscribe to topic:" + topic); sampleClient.subscribe(topic); sampleClient.setCallback(new MqttCallback() { public void messageArrived(String topic, MqttMessage message) throws Exception { String theMsg = MessageFormat.format("{0} is arrived for topic {1}.", new String(message.getPayload()), topic); System.out.println(theMsg); } public void deliveryComplete(IMqttDeliveryToken token) { } public void connectionLost(Throwable throwable) { } });
4.發布
publish
方法MqttClient
用於發布消息
MqttClient
還為用戶提供了一種在發布消息時指定QoS以及是否需要維護消息的方式:
String topic = "demo/topics"; String content = "Message from MqttPublishSample"; int qos = 2; System.out.println("Publishing message:" + content); MqttMessage message = new MqttMessage(content.getBytes()); message.setQos(qos); sampleClient.publish(topic, message); System.out.println("Message published");
5.例
package paho_demo; import java.text.MessageFormat; import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken; import org.eclipse.paho.client.mqttv3.MqttCallback; import org.eclipse.paho.client.mqttv3.MqttClient; import org.eclipse.paho.client.mqttv3.MqttConnectOptions; import org.eclipse.paho.client.mqttv3.MqttException; import org.eclipse.paho.client.mqttv3.MqttMessage; import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence; public class Demo { public static void main(String[] args) { String broker = "tcp://localhost:1883"; String clientId = "JavaSample"; //Use the memory persistence MemoryPersistence persistence = new MemoryPersistence(); try { MqttClient sampleClient = new MqttClient(broker, clientId, persistence); MqttConnectOptions connOpts = new MqttConnectOptions(); connOpts.setCleanSession(true); System.out.println("Connecting to broker:" + broker); sampleClient.connect(connOpts); System.out.println("Connected"); String topic = "demo/topics"; System.out.println("Subscribe to topic:" + topic); sampleClient.subscribe(topic); sampleClient.setCallback(new MqttCallback() { public void messageArrived(String topic, MqttMessage message) throws Exception { String theMsg = MessageFormat.format("{0} is arrived for topic {1}.", new String(message.getPayload()), topic); System.out.println(theMsg); } public void deliveryComplete(IMqttDeliveryToken token) { } public void connectionLost(Throwable throwable) { } }); String content = "Message from MqttPublishSample"; int qos = 2; System.out.println("Publishing message:" + content); MqttMessage message = new MqttMessage(content.getBytes()); message.setQos(qos); sampleClient.publish(topic, message); System.out.println("Message published"); } catch (MqttException me) { System.out.println("reason" + me.getReasonCode()); System.out.println("msg" + me.getMessage()); System.out.println("loc" + me.getLocalizedMessage()); System.out.println("cause" + me.getCause()); System.out.println("excep" + me); me.printStackTrace(); } } }
運行結果:
Connecting to broker: tcp://localhost:1883 Connected Subscribe to topic: demo/topics Publishing message: Message from MqttPublishSample Message published Message from MqttPublishSample is arrived for topic demo/topics.