三、Mosquitto Java 客戶端實現


  本文的實現是在 << 一、Mosquitto 介紹&安裝>> << 二、 Mosquitto 的使用說明 >> 兩篇文章搭建好 Mosquitto 服務基礎上實現的。如果你還沒有搭建 Mosquitto 服務 請參考我上述兩篇文章進行 Mosquitto 服務的搭建。 

  Java 實現 Mosquitto 的客戶端主要使用 Eclipse Paho Java Client 提供的 SDK 來實現的。有興趣的可以直接去 Eclipse Paha 官網下載對應的sdk 和使用說明。

  一、 准備工作

    本本講解項目是Maven項目、如果還有對 Maven 不了解或者不熟悉的同學可以網上去學習下、本文不在這講解 Maven 的使用。

    添加依賴    

<dependency>
    <groupId>org.eclipse.paho</groupId>
    <artifactId>org.eclipse.paho.client.mqttv3</artifactId>
    <version>1.0.2</version>
</dependency>

  二、本文實現 Mosquitto 消息發送主要分為三個類

  1> ClientMQTT  客戶端類

  2> PushCallback 消息回調類

  3> ServerMQTT 服務端類

  四、 下面將直接上對應的 code

  1>  客戶端  

 1 import java.util.concurrent.ScheduledExecutorService;
 2 
 3 import org.eclipse.paho.client.mqttv3.MqttClient;
 4 import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
 5 import org.eclipse.paho.client.mqttv3.MqttException;
 6 import org.eclipse.paho.client.mqttv3.MqttTopic;
 7 import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
 8 
 9 public class ClientMQTT {
10 
11     public static final String HOST = "tcp://172.16.192.102:1883";
12     public static final String TOPIC = "root/topic/123";
13     private static final String clientid = "client11";
14     private MqttClient client;
15     private MqttConnectOptions options;
16     private String userName = "admin";
17     private String passWord = "admin";
18 
19     private ScheduledExecutorService scheduler;
20 
21     private void start() {
22         try {
23             // host為主機名,clientid即連接MQTT的客戶端ID,一般以唯一標識符表示,MemoryPersistence設置clientid的保存形式,默認為以內存保存
24             client = new MqttClient(HOST, clientid, new MemoryPersistence());
25             // MQTT的連接設置
26             options = new MqttConnectOptions();
27             // 設置是否清空session,這里如果設置為false表示服務器會保留客戶端的連接記錄,這里設置為true表示每次連接到服務器都以新的身份連接
28             options.setCleanSession(true);
29             // 設置連接的用戶名
30             options.setUserName(userName);
31             // 設置連接的密碼
32             options.setPassword(passWord.toCharArray());
33             // 設置超時時間 單位為秒
34             options.setConnectionTimeout(10);
35             // 設置會話心跳時間 單位為秒 服務器會每隔1.5*20秒的時間向客戶端發送個消息判斷客戶端是否在線,但這個方法並沒有重連的機制
36             options.setKeepAliveInterval(20);
37             // 設置回調
38             client.setCallback(new PushCallback());
39             MqttTopic topic = client.getTopic(TOPIC);
40             // setWill方法,如果項目中需要知道客戶端是否掉線可以調用該方法。設置最終端口的通知消息
41             options.setWill(topic, "close".getBytes(), 2, true);
42 
43             client.connect(options);
44             // 訂閱消息
45             int[] Qos = { 1 };
46             String[] topic1 = { TOPIC };
47             client.subscribe(topic1, Qos);
48 
49         } catch (Exception e) {
50             e.printStackTrace();
51         }
52     }
53 
54     public static void main(String[] args) throws MqttException {
55         ClientMQTT client = new ClientMQTT();
56         client.start();
57     }
58 }
View Code

  2> 消息回調

 1 import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
 2 import org.eclipse.paho.client.mqttv3.MqttCallback;
 3 import org.eclipse.paho.client.mqttv3.MqttMessage;
 4 
 5 /**
 6  * 發布消息的回調類
 7  * 
 8  * 必須實現MqttCallback的接口並實現對應的相關接口方法CallBack 類將實現 MqttCallBack。
 9  * 每個客戶機標識都需要一個回調實例。在此示例中,構造函數傳遞客戶機標識以另存為實例數據。 在回調中,將它用來標識已經啟動了該回調的哪個實例。
10  * 必須在回調類中實現三個方法:
11  * 
12  * public void messageArrived(MqttTopic topic, MqttMessage message)接收已經預訂的發布。
13  * 
14  * public void connectionLost(Throwable cause)在斷開連接時調用。
15  * 
16  * public void deliveryComplete(MqttDeliveryToken token)) 接收到已經發布的 QoS 1 或 QoS 2
17  * 消息的傳遞令牌時調用。 由 MqttClient.connect 激活此回調。
18  * 
19  */
20 public class PushCallback implements MqttCallback {
21 
22     public void connectionLost(Throwable cause) {
23         // 連接丟失后,一般在這里面進行重連
24         System.out.println("連接斷開,可以做重連");
25     }
26 
27     public void deliveryComplete(IMqttDeliveryToken token) {
28         System.out.println("deliveryComplete---------" + token.isComplete());
29     }
30 
31     public void messageArrived(String topic, MqttMessage message) throws Exception {
32         // subscribe后得到的消息會執行到這里面
33         System.out.println("接收消息主題 : " + topic);
34         System.out.println("接收消息Qos : " + message.getQos());
35         System.out.println("接收消息內容 : " + new String(message.getPayload()));
36     }
37 }
View Code

  3> 服務端

 1 import org.eclipse.paho.client.mqttv3.MqttClient;
 2 import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
 3 import org.eclipse.paho.client.mqttv3.MqttDeliveryToken;
 4 import org.eclipse.paho.client.mqttv3.MqttException;
 5 import org.eclipse.paho.client.mqttv3.MqttMessage;
 6 import org.eclipse.paho.client.mqttv3.MqttPersistenceException;
 7 import org.eclipse.paho.client.mqttv3.MqttTopic;
 8 import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
 9 
10 /**
11  * 
12  * Title:Server Description: 服務器向多個客戶端推送主題,即不同客戶端可向服務器訂閱相同主題
13  * 
14  * @author yueli 2017年9月1日下午17:41:10
15  */
16 public class ServerMQTT {
17 
18     // tcp://MQTT安裝的服務器地址:MQTT定義的端口號
19     public static final String HOST = "tcp://172.16.192.102:1883";
20     // 定義一個主題
21     public static final String TOPIC = "root/topic/123";
22     // 定義MQTT的ID,可以在MQTT服務配置中指定
23     private static final String clientid = "server11";
24 
25     private MqttClient client;
26     private MqttTopic topic11;
27     private String userName = "mosquitto";
28     private String passWord = "mosquitto";
29 
30     private MqttMessage message;
31 
32     /**
33      * 構造函數
34      * 
35      * @throws MqttException
36      */
37     public ServerMQTT() throws MqttException {
38         // MemoryPersistence設置clientid的保存形式,默認為以內存保存
39         client = new MqttClient(HOST, clientid, new MemoryPersistence());
40         connect();
41     }
42 
43     /**
44      * 用來連接服務器
45      */
46     private void connect() {
47         MqttConnectOptions options = new MqttConnectOptions();
48         options.setCleanSession(false);
49         options.setUserName(userName);
50         options.setPassword(passWord.toCharArray());
51         // 設置超時時間
52         options.setConnectionTimeout(10);
53         // 設置會話心跳時間
54         options.setKeepAliveInterval(20);
55         try {
56             client.setCallback(new PushCallback());
57             client.connect(options);
58 
59             topic11 = client.getTopic(TOPIC);
60         } catch (Exception e) {
61             e.printStackTrace();
62         }
63     }
64 
65     /**
66      * 
67      * @param topic
68      * @param message
69      * @throws MqttPersistenceException
70      * @throws MqttException
71      */
72     public void publish(MqttTopic topic, MqttMessage message) throws MqttPersistenceException, MqttException {
73         MqttDeliveryToken token = topic.publish(message);
74         token.waitForCompletion();
75         System.out.println("message is published completely! " + token.isComplete());
76     }
77 
78     /**
79      * 啟動入口
80      * 
81      * @param args
82      * @throws MqttException
83      */
84     public static void main(String[] args) throws MqttException {
85         ServerMQTT server = new ServerMQTT();
86 
87         server.message = new MqttMessage();
88         server.message.setQos(1);
89         server.message.setRetained(true);
90         server.message.setPayload("hello,topic14".getBytes());
91         server.publish(server.topic11, server.message);
92         System.out.println(server.message.isRetained() + "------ratained狀態");
93     }
94 }
View Code

好了、到這 Java 實現 Mosquiito 客戶端基本已經完成、本實列只是一個 demo 如果正式使用還得根據自己的業務做很多開發。謝謝 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM