轉:http://ywx217.iteye.com/blog/1797463
MQTT是一款針對機對機(M2M)通信的,非常輕量級的的消息訂閱、發布協議。它適用於一些系統資源和網絡帶寬非常有限的情況下的遠程連接。MQTT-Client提供一個ASL 2.0證書下的MQTT接口。在網絡連接失敗時,它能夠自動地重新連接服務器並嘗試恢復會話。應用程序能夠使用阻塞API、基於Future的API和回調API,共三種接口形式。
在Maven中引用MQTT-Client
將下列文本加入到pom.xml
文件中。
<dependency> <groupId>org.fusesource.mqtt-client</groupId> <artifactId>mqtt-client</artifactId> <version>1.0-SNAPSHOT</version> </dependency> <repositories> <repository> <id>fusesource.snapshots</id> <name>FuseSource Snapshot Repository</name> <url>http://repo.fusesource.com/nexus/content/repositories/snapshots</url> <snapshots><enabled>true</enabled></snapshots> <releases><enabled>false</enabled></releases> </repository> </repositories>
以其他方式引用MQTT-Client
下載uber jar文件並加入編譯路徑中,該jar文件中包含MQTT-Client的所有依賴。
在Java 1.4環境中使用
作者同時提供了適用於Java 1.4的API。由於Java1.4中沒有SSLEngine類依賴的NIO,因此該API不支持SSL連接。
配置MQTT連接
前面提到的阻塞、Future和回調這3種API,建立連接時使用的代碼時完全相同的。首先新建一個MQTT對象並配置連接參數。在連接前至少要調用setHost方法,來指定所要連接的服務器地址。
MQTT mqtt = new MQTT(); mqtt.setHost("localhost", 1883); // or mqtt.setHost("tcp://localhost:1883");
MQTT設置說明
setClientId:用於設置客戶端會話的ID。在setCleanSession(false);被調用時,MQTT服務器利用該ID獲得相應的會話。此ID應少於23個字符,默認根據本機地址、端口和時間自動生成。
- setCleanSession:若設為false,MQTT服務器將持久化客戶端會話的主體訂閱和ACK位置,默認為true。
- setKeepAlive:定義客戶端傳來消息的最大時間間隔秒數,服務器可以據此判斷與客戶端的連接是否已經斷開,從而避免TCP/IP超時的長時間等待。
- setUserName:服務器認證用戶名。
- setPassword:服務器認證密碼。
- setWillTopic:設置“遺囑”消息的話題,若客戶端與服務器之間的連接意外中斷,服務器將發布客戶端的“遺囑”消息。
- setWillMessage:設置“遺囑”消息的內容,默認是長度為零的消息。
- setWillQos:設置“遺囑”消息的QoS,默認為QoS.ATMOSTONCE。
- setWillRetain:若想要在發布“遺囑”消息時擁有retain選項,則為true。
失敗重連接設置說明
網絡出現故障時,程序能夠自動重新連接並重建會話。利用下列方法能夠配置重新連接的間隔和最大重試次數:
- setConnectAttemptsMax:客戶端首次連接到服務器時,連接的最大重試次數,超出該次數客戶端將返回錯誤。-1意為無重試上限,默認為-1。
- setReconnectAttemptsMax:客戶端已經連接到服務器,但因某種原因連接斷開時的最大重試次數,超出該次數客戶端將返回錯誤。-1意為無重試上限,默認為-1。
- setReconnectDelay:首次重連接間隔毫秒數,默認為10ms。
- setReconnectDelayMax:重連接間隔毫秒數,默認為30000ms。
- setReconnectBackOffMultiplier:設置重連接指數回歸。設置為1則停用指數回歸,默認為2。
Socket設置說明
可以利用下列方法調整socket設置:
-
setReceiveBufferSize:設置socket接收緩沖區大小,默認為65536(64k)。
-
setSendBufferSize:設置socket發送緩沖區大小,默認為65536(64k)。
-
setTrafficClass:設置發送數據包頭的流量類型或服務類型字段,默認為8,意為吞吐量最大化傳輸。
帶寬限制設置說明
可通過下述方法設置讀寫速率限制:
-
setMaxReadRate
:設置連接的最大接收速率,單位為bytes/s。默認為0,即無限制。 -
setMaxWriteRate:
設置連接的最大發送速率,單位為bytes/s。默認為0,即無限制。
使用SSL連接
如果想使用SSL/TLS連接,替代TCP連接,可以使用“ssl://”或“tls://”作為連接URI前綴,實現安全連接。支持的協議包括:
ssl://
- 使用JVM默認版本的SSL算法。sslv*://
- 使用指定版本的SSL算法,星號為JVM支持的SSL算法版本,例如sslv3。
tls://
- 使用JVM默認版本的TLS算法。tlsv*://
- 使用指定版本的TLS算法,星號為JVM支持的TLS算法版本,例如tlsv1.1。
客戶端使用JVM的SSLContext,基於在JVM的系統配置進行連接。可以調用MQTT的setSslContext
方法,換用其他連接方式。
對於內部線程池,SSL連接為阻塞特性。調用setBlockingExecutor
方法可以替換所要使用的executor。
選擇消息分發隊列
若沒有調用方法setDispatchQueue,客戶端將為連接新建一個隊列。如果想實現多個連接使用公用的隊列,顯式地指定隊列是一個非常方便的實現方法。
使用阻塞API
MQTT.connectBlocking
方法建立並返回一個阻塞API連接。
BlockingConnection connection = mqtt.blockingConnection(); connection.connect();
connection.publish("foo", "Hello".toBytes(), QoS.AT_LEAST_ONCE, false);
Topic[] topics = {new Topic("foo", QoS.AT_LEAST_ONCE)};
byte[] qoses = connection.subscribe(topics);
Message message = connection.receive();
System.out.println(message.getTopic());
byte[] payload = message.getPayload();
// process the message then:
message.ack();
connection.disconnect();
使用基於Future的API
MQTT.connectFuture
方法建立並返回一個基於Future類型的API連接。所有連接操作都是非阻塞的,連接結果通過Future對象返回。
FutureConnection connection = mqtt.futureConnection(); Future<Void> f1 = connection.connect(); f1.await(); Future<byte[]> f2 = connection.subscribe(new Topic[]{new Topic(utf8("foo"), QoS.AT_LEAST_ONCE)}); byte[] qoses = f2.await(); // We can start future receive.. Future<Message> receive = connection.receive(); // send the message.. Future<Void> f3 = connection.publish("foo", "Hello".getBytes(), QoS.AT_LEAST_ONCE, false); // Then the receive will get the message. Message message = receive.await(); message.ack(); Future<Void> f4 connection.disconnect(); f4.await();
使用回調API MQTT.connectCallback方法建立並返回一個回調API連接。本方法是三種API中最復雜也是性能最好的API,前面提到的兩種API其實都是對回調API的封裝。對連接的所有操作都是非阻塞的,返回的結果將傳至回調接口函數。示例如下:
final CallbackConnection connection = mqtt.callbackConnection(); connection.listener(new Listener() { public void onDisconnected() { } public void onConnected() { } public void onSuccess(UTF8Buffer topic, Buffer payload, Runnable ack) { // You can now process a received message from a topic. // Once process execute the ack runnable. ack.run(); } public void onFailure(Throwable value) { connection.close(null); // a connection failure occured. } }) connection.connect(new Callback<Void>() { public void onFailure(Throwable value) { result.failure(value); // If we could not connect to the server. } // Once we connect.. public void onSuccess(Void v) { // Subscribe to a topic Topic[] topics = {new Topic("foo", QoS.AT_LEAST_ONCE)}; connection.subscribe(topics, new Callback<byte[]>() { public void onSuccess(byte[] qoses) { // The result of the subcribe request. } public void onFailure(Throwable value) { connection.close(null); // subscribe failed. } }); // Send a message to a topic connection.publish("foo", "Hello".getBytes(), QoS.AT_LEAST_ONCE, false, new Callback<Void>() { public void onSuccess(Void v) { // the pubish operation completed successfully. } public void onFailure(Throwable value) { connection.close(null); // publish failed. } }); // To disconnect.. connection.disconnect(new Callback<Void>() { public void onSuccess(Void v) { // called once the connection is disconnected. } public void onFailure(Throwable value) { // Disconnects never fail. } }); } });