MQTT客戶端代碼 -- JAVA fusesource mqtt-client


轉:http://ywx217.iteye.com/blog/1797463

MQTT是一款針對機對機(M2M)通信的,非常輕量級的的消息訂閱、發布協議。它適用於一些系統資源和網絡帶寬非常有限的情況下的遠程連接。MQTT-Client提供一個ASL 2.0證書下的MQTT接口。在網絡連接失敗時,它能夠自動地重新連接服務器並嘗試恢復會話。應用程序能夠使用阻塞API、基於Future的API和回調API,共三種接口形式。

在Maven中引用MQTT-Client

將下列文本加入到pom.xml文件中。

<dependency>
  <groupId>org.fusesource.mqtt-client</groupId>
  <artifactId>mqtt-client</artifactId>
  <version>1.0-SNAPSHOT</version>
</dependency>

<repositories>
  <repository>
    <id>fusesource.snapshots</id>
    <name>FuseSource Snapshot Repository</name>
    <url>http://repo.fusesource.com/nexus/content/repositories/snapshots</url>
    <snapshots><enabled>true</enabled></snapshots>
    <releases><enabled>false</enabled></releases>
  </repository>
</repositories>

  

以其他方式引用MQTT-Client

下載uber jar文件並加入編譯路徑中,該jar文件中包含MQTT-Client的所有依賴。

在Java 1.4環境中使用

作者同時提供了適用於Java 1.4的API。由於Java1.4中沒有SSLEngine類依賴的NIO,因此該API不支持SSL連接。

 

配置MQTT連接

前面提到的阻塞、Future和回調這3種API,建立連接時使用的代碼時完全相同的。首先新建一個MQTT對象並配置連接參數。在連接前至少要調用setHost方法,來指定所要連接的服務器地址。

MQTT mqtt = new MQTT(); 
mqtt.setHost("localhost", 1883); 
// or 
mqtt.setHost("tcp://localhost:1883");

  

MQTT設置說明

  • setClientId:用於設置客戶端會話的ID。在setCleanSession(false);被調用時,MQTT服務器利用該ID獲得相應的會話。此ID應少於23個字符,默認根據本機地址、端口和時間自動生成。
  • setCleanSession:若設為false,MQTT服務器將持久化客戶端會話的主體訂閱和ACK位置,默認為true。
  • setKeepAlive:定義客戶端傳來消息的最大時間間隔秒數,服務器可以據此判斷與客戶端的連接是否已經斷開,從而避免TCP/IP超時的長時間等待。
  • setUserName:服務器認證用戶名。
  • setPassword:服務器認證密碼。
  • setWillTopic:設置“遺囑”消息的話題,若客戶端與服務器之間的連接意外中斷,服務器將發布客戶端的“遺囑”消息。
  • setWillMessage:設置“遺囑”消息的內容,默認是長度為零的消息。
  • setWillQos:設置“遺囑”消息的QoS,默認為QoS.ATMOSTONCE。
  • setWillRetain:若想要在發布“遺囑”消息時擁有retain選項,則為true。

失敗重連接設置說明

網絡出現故障時,程序能夠自動重新連接並重建會話。利用下列方法能夠配置重新連接的間隔和最大重試次數:

  • setConnectAttemptsMax:客戶端首次連接到服務器時,連接的最大重試次數,超出該次數客戶端將返回錯誤。-1意為無重試上限,默認為-1。
  • setReconnectAttemptsMax:客戶端已經連接到服務器,但因某種原因連接斷開時的最大重試次數,超出該次數客戶端將返回錯誤。-1意為無重試上限,默認為-1。
  • setReconnectDelay:首次重連接間隔毫秒數,默認為10ms。
  • setReconnectDelayMax:重連接間隔毫秒數,默認為30000ms。
  • setReconnectBackOffMultiplier:設置重連接指數回歸。設置為1則停用指數回歸,默認為2。

Socket設置說明

可以利用下列方法調整socket設置:

  • setReceiveBufferSize:設置socket接收緩沖區大小,默認為65536(64k)。

  • setSendBufferSize:設置socket發送緩沖區大小,默認為65536(64k)。

  • setTrafficClass:設置發送數據包頭的流量類型或服務類型字段,默認為8,意為吞吐量最大化傳輸。

帶寬限制設置說明

可通過下述方法設置讀寫速率限制:

  • setMaxReadRate:設置連接的最大接收速率,單位為bytes/s。默認為0,即無限制。

  • setMaxWriteRate:設置連接的最大發送速率,單位為bytes/s。默認為0,即無限制。

使用SSL連接

如果想使用SSL/TLS連接,替代TCP連接,可以使用“ssl://”或“tls://”作為連接URI前綴,實現安全連接。支持的協議包括:

  • ssl:// - 使用JVM默認版本的SSL算法。
  • sslv*:// - 使用指定版本的SSL算法,星號為JVM支持的SSL算法版本,例如sslv3
  • tls:// - 使用JVM默認版本的TLS算法。
  • tlsv*:// - 使用指定版本的TLS算法,星號為JVM支持的TLS算法版本,例如tlsv1.1

客戶端使用JVM的SSLContext,基於在JVM的系統配置進行連接。可以調用MQTT的setSslContext方法,換用其他連接方式。
對於內部線程池,SSL連接為阻塞特性。調用setBlockingExecutor方法可以替換所要使用的executor。

選擇消息分發隊列

若沒有調用方法setDispatchQueue,客戶端將為連接新建一個隊列。如果想實現多個連接使用公用的隊列,顯式地指定隊列是一個非常方便的實現方法。
 

使用阻塞API

MQTT.connectBlocking方法建立並返回一個阻塞API連接。

BlockingConnection connection = mqtt.blockingConnection();
connection.connect();
connection.publish("foo", "Hello".toBytes(), QoS.AT_LEAST_ONCE, false);

Topic[] topics = {new Topic("foo", QoS.AT_LEAST_ONCE)};
byte[] qoses = connection.subscribe(topics);

Message message = connection.receive();
System.out.println(message.getTopic());
byte[] payload = message.getPayload();
// process the message then:
message.ack();

connection.disconnect();


  

使用基於Future的API

MQTT.connectFuture方法建立並返回一個基於Future類型的API連接。所有連接操作都是非阻塞的,連接結果通過Future對象返回

FutureConnection connection = mqtt.futureConnection();
Future<Void> f1 = connection.connect();
f1.await();

Future<byte[]> f2 = connection.subscribe(new Topic[]{new Topic(utf8("foo"), QoS.AT_LEAST_ONCE)});
byte[] qoses = f2.await();

// We can start future receive..
Future<Message> receive = connection.receive();

// send the message..
Future<Void> f3 = connection.publish("foo", "Hello".getBytes(), QoS.AT_LEAST_ONCE, false);

// Then the receive will get the message.
Message message = receive.await();
message.ack();

Future<Void> f4 connection.disconnect();
f4.await();

  

使用回調API
MQTT.connectCallback方法建立並返回一個回調API連接。本方法是三種API中最復雜也是性能最好的API,前面提到的兩種API其實都是對回調API的封裝。對連接的所有操作都是非阻塞的,返回的結果將傳至回調接口函數。示例如下:

  

final CallbackConnection connection = mqtt.callbackConnection();
connection.listener(new Listener() {

    public void onDisconnected() {
    }
    public void onConnected() {
    }

    public void onSuccess(UTF8Buffer topic, Buffer payload, Runnable ack) {
        // You can now process a received message from a topic.
        // Once process execute the ack runnable.
        ack.run();
    }
    public void onFailure(Throwable value) {
        connection.close(null); // a connection failure occured.
    }
})
connection.connect(new Callback<Void>() {
    public void onFailure(Throwable value) {
        result.failure(value); // If we could not connect to the server.
    }

    // Once we connect..
    public void onSuccess(Void v) {

        // Subscribe to a topic
        Topic[] topics = {new Topic("foo", QoS.AT_LEAST_ONCE)};
        connection.subscribe(topics, new Callback<byte[]>() {
            public void onSuccess(byte[] qoses) {
                // The result of the subcribe request.
            }
            public void onFailure(Throwable value) {
                connection.close(null); // subscribe failed.
            }
        });

        // Send a message to a topic
        connection.publish("foo", "Hello".getBytes(), QoS.AT_LEAST_ONCE, false, new Callback<Void>() {
            public void onSuccess(Void v) {
              // the pubish operation completed successfully.
            }
            public void onFailure(Throwable value) {
                connection.close(null); // publish failed.
            }
        });

        // To disconnect..
        connection.disconnect(new Callback<Void>() {
            public void onSuccess(Void v) {
              // called once the connection is disconnected.
            }
            public void onFailure(Throwable value) {
              // Disconnects never fail.
            }
        });
    }
});

  


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM