轉:http://ywx217.iteye.com/blog/1797463
MQTT是一款針對機對機(M2M)通信的,非常輕量級的的消息訂閱、發布協議。它適用於一些系統資源和網絡帶寬非常有限的情況下的遠程連接。MQTT-Client提供一個ASL 2.0證書下的MQTT接口。在網絡連接失敗時,它能夠自動地重新連接服務器並嘗試恢復會話。應用程序能夠使用阻塞API、基於Future的API和回調API,共三種接口形式。
在Maven中引用MQTT-Client
將下列文本加入到pom.xml文件中。
<dependency>
<groupId>org.fusesource.mqtt-client</groupId>
<artifactId>mqtt-client</artifactId>
<version>1.0-SNAPSHOT</version>
</dependency>
<repositories>
<repository>
<id>fusesource.snapshots</id>
<name>FuseSource Snapshot Repository</name>
<url>http://repo.fusesource.com/nexus/content/repositories/snapshots</url>
<snapshots><enabled>true</enabled></snapshots>
<releases><enabled>false</enabled></releases>
</repository>
</repositories>
以其他方式引用MQTT-Client
下載uber jar文件並加入編譯路徑中,該jar文件中包含MQTT-Client的所有依賴。
在Java 1.4環境中使用
作者同時提供了適用於Java 1.4的API。由於Java1.4中沒有SSLEngine類依賴的NIO,因此該API不支持SSL連接。
配置MQTT連接
前面提到的阻塞、Future和回調這3種API,建立連接時使用的代碼時完全相同的。首先新建一個MQTT對象並配置連接參數。在連接前至少要調用setHost方法,來指定所要連接的服務器地址。
MQTT mqtt = new MQTT();
mqtt.setHost("localhost", 1883);
// or
mqtt.setHost("tcp://localhost:1883");
MQTT設置說明
setClientId:用於設置客戶端會話的ID。在setCleanSession(false);被調用時,MQTT服務器利用該ID獲得相應的會話。此ID應少於23個字符,默認根據本機地址、端口和時間自動生成。- setCleanSession:若設為false,MQTT服務器將持久化客戶端會話的主體訂閱和ACK位置,默認為true。
- setKeepAlive:定義客戶端傳來消息的最大時間間隔秒數,服務器可以據此判斷與客戶端的連接是否已經斷開,從而避免TCP/IP超時的長時間等待。
- setUserName:服務器認證用戶名。
- setPassword:服務器認證密碼。
- setWillTopic:設置“遺囑”消息的話題,若客戶端與服務器之間的連接意外中斷,服務器將發布客戶端的“遺囑”消息。
- setWillMessage:設置“遺囑”消息的內容,默認是長度為零的消息。
- setWillQos:設置“遺囑”消息的QoS,默認為QoS.ATMOSTONCE。
- setWillRetain:若想要在發布“遺囑”消息時擁有retain選項,則為true。
失敗重連接設置說明
網絡出現故障時,程序能夠自動重新連接並重建會話。利用下列方法能夠配置重新連接的間隔和最大重試次數:
- setConnectAttemptsMax:客戶端首次連接到服務器時,連接的最大重試次數,超出該次數客戶端將返回錯誤。-1意為無重試上限,默認為-1。
- setReconnectAttemptsMax:客戶端已經連接到服務器,但因某種原因連接斷開時的最大重試次數,超出該次數客戶端將返回錯誤。-1意為無重試上限,默認為-1。
- setReconnectDelay:首次重連接間隔毫秒數,默認為10ms。
- setReconnectDelayMax:重連接間隔毫秒數,默認為30000ms。
- setReconnectBackOffMultiplier:設置重連接指數回歸。設置為1則停用指數回歸,默認為2。
Socket設置說明
可以利用下列方法調整socket設置:
-
setReceiveBufferSize:設置socket接收緩沖區大小,默認為65536(64k)。
-
setSendBufferSize:設置socket發送緩沖區大小,默認為65536(64k)。
-
setTrafficClass:設置發送數據包頭的流量類型或服務類型字段,默認為8,意為吞吐量最大化傳輸。
帶寬限制設置說明
可通過下述方法設置讀寫速率限制:
-
setMaxReadRate:設置連接的最大接收速率,單位為bytes/s。默認為0,即無限制。 -
setMaxWriteRate:設置連接的最大發送速率,單位為bytes/s。默認為0,即無限制。
使用SSL連接
如果想使用SSL/TLS連接,替代TCP連接,可以使用“ssl://”或“tls://”作為連接URI前綴,實現安全連接。支持的協議包括:
ssl://- 使用JVM默認版本的SSL算法。sslv*://- 使用指定版本的SSL算法,星號為JVM支持的SSL算法版本,例如sslv3。tls://- 使用JVM默認版本的TLS算法。tlsv*://- 使用指定版本的TLS算法,星號為JVM支持的TLS算法版本,例如tlsv1.1。
客戶端使用JVM的SSLContext,基於在JVM的系統配置進行連接。可以調用MQTT的setSslContext方法,換用其他連接方式。
對於內部線程池,SSL連接為阻塞特性。調用setBlockingExecutor方法可以替換所要使用的executor。
選擇消息分發隊列
若沒有調用方法setDispatchQueue,客戶端將為連接新建一個隊列。如果想實現多個連接使用公用的隊列,顯式地指定隊列是一個非常方便的實現方法。
使用阻塞API
MQTT.connectBlocking方法建立並返回一個阻塞API連接。
BlockingConnection connection = mqtt.blockingConnection(); connection.connect();
connection.publish("foo", "Hello".toBytes(), QoS.AT_LEAST_ONCE, false);
Topic[] topics = {new Topic("foo", QoS.AT_LEAST_ONCE)};
byte[] qoses = connection.subscribe(topics);
Message message = connection.receive();
System.out.println(message.getTopic());
byte[] payload = message.getPayload();
// process the message then:
message.ack();
connection.disconnect();
使用基於Future的API
MQTT.connectFuture方法建立並返回一個基於Future類型的API連接。所有連接操作都是非阻塞的,連接結果通過Future對象返回。
FutureConnection connection = mqtt.futureConnection();
Future<Void> f1 = connection.connect();
f1.await();
Future<byte[]> f2 = connection.subscribe(new Topic[]{new Topic(utf8("foo"), QoS.AT_LEAST_ONCE)});
byte[] qoses = f2.await();
// We can start future receive..
Future<Message> receive = connection.receive();
// send the message..
Future<Void> f3 = connection.publish("foo", "Hello".getBytes(), QoS.AT_LEAST_ONCE, false);
// Then the receive will get the message.
Message message = receive.await();
message.ack();
Future<Void> f4 connection.disconnect();
f4.await();
使用回調API MQTT.connectCallback方法建立並返回一個回調API連接。本方法是三種API中最復雜也是性能最好的API,前面提到的兩種API其實都是對回調API的封裝。對連接的所有操作都是非阻塞的,返回的結果將傳至回調接口函數。示例如下:
final CallbackConnection connection = mqtt.callbackConnection();
connection.listener(new Listener() {
public void onDisconnected() {
}
public void onConnected() {
}
public void onSuccess(UTF8Buffer topic, Buffer payload, Runnable ack) {
// You can now process a received message from a topic.
// Once process execute the ack runnable.
ack.run();
}
public void onFailure(Throwable value) {
connection.close(null); // a connection failure occured.
}
})
connection.connect(new Callback<Void>() {
public void onFailure(Throwable value) {
result.failure(value); // If we could not connect to the server.
}
// Once we connect..
public void onSuccess(Void v) {
// Subscribe to a topic
Topic[] topics = {new Topic("foo", QoS.AT_LEAST_ONCE)};
connection.subscribe(topics, new Callback<byte[]>() {
public void onSuccess(byte[] qoses) {
// The result of the subcribe request.
}
public void onFailure(Throwable value) {
connection.close(null); // subscribe failed.
}
});
// Send a message to a topic
connection.publish("foo", "Hello".getBytes(), QoS.AT_LEAST_ONCE, false, new Callback<Void>() {
public void onSuccess(Void v) {
// the pubish operation completed successfully.
}
public void onFailure(Throwable value) {
connection.close(null); // publish failed.
}
});
// To disconnect..
connection.disconnect(new Callback<Void>() {
public void onSuccess(Void v) {
// called once the connection is disconnected.
}
public void onFailure(Throwable value) {
// Disconnects never fail.
}
});
}
});
