代碼:
package cc.gongchang.mqtt;
import java.net.URISyntaxException;
import org.fusesource.hawtdispatch.Dispatch;
import org.fusesource.mqtt.client.Future;
import org.fusesource.mqtt.client.FutureConnection;
import org.fusesource.mqtt.client.MQTT;
import org.fusesource.mqtt.client.Message;
import org.fusesource.mqtt.client.QoS;
import org.fusesource.mqtt.client.Topic;
import com.alibaba.fastjson.JSONObject;
/**
* Hello world!
*
*/
public class App {
public static void main(String[] args) {
MQTT mqtt = new MQTT();
// MQTT設置說明
// 設置主機號
try {
mqtt.setHost("tcp://sgdzpic.3322.org:1883");
} catch (URISyntaxException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
// 用於設置客戶端會話的ID。在setCleanSession(false);被調用時,MQTT服務器利用該ID獲得相應的會話。此ID應少於23個字符,默認根據本機地址、端口和時間自動生成
mqtt.setClientId("876543210");
// 若設為false,MQTT服務器將持久化客戶端會話的主體訂閱和ACK位置,默認為true
mqtt.setCleanSession(false);
// 定義客戶端傳來消息的最大時間間隔秒數,服務器可以據此判斷與客戶端的連接是否已經斷開,從而避免TCP/IP超時的長時間等待
mqtt.setKeepAlive((short) 60);
// 服務器認證用戶名
mqtt.setUserName("admin");
// 服務器認證密碼
mqtt.setPassword("123456");
// 設置“遺囑”消息的話題,若客戶端與服務器之間的連接意外中斷,服務器將發布客戶端的“遺囑”消息
mqtt.setWillTopic("willTopic");
// 設置“遺囑”消息的內容,默認是長度為零的消息
mqtt.setWillMessage("willMessage");
// 設置“遺囑”消息的QoS,默認為QoS.ATMOSTONCE
mqtt.setWillQos(QoS.AT_LEAST_ONCE);
// 若想要在發布“遺囑”消息時擁有retain選項,則為true
mqtt.setWillRetain(true);
// 設置版本
mqtt.setVersion("3.1.1");
// 失敗重連接設置說明
// 客戶端首次連接到服務器時,連接的最大重試次數,超出該次數客戶端將返回錯誤。-1意為無重試上限,默認為-1
mqtt.setConnectAttemptsMax(10L);
// 客戶端已經連接到服務器,但因某種原因連接斷開時的最大重試次數,超出該次數客戶端將返回錯誤。-1意為無重試上限,默認為-1
mqtt.setReconnectAttemptsMax(3L);
// 首次重連接間隔毫秒數,默認為10ms
mqtt.setReconnectDelay(10L);
// 重連接間隔毫秒數,默認為30000ms
mqtt.setReconnectDelayMax(30000L);
// 設置重連接指數回歸。設置為1則停用指數回歸,默認為2
mqtt.setReconnectBackOffMultiplier(2);
// Socket設置說明
// 設置socket接收緩沖區大小,默認為65536(64k)
mqtt.setReceiveBufferSize(65536);
// 設置socket發送緩沖區大小,默認為65536(64k)
mqtt.setSendBufferSize(65536);
// 設置發送數據包頭的流量類型或服務類型字段,默認為8,意為吞吐量最大化傳輸
mqtt.setTrafficClass(8);
// 帶寬限制設置說明
// 設置連接的最大接收速率,單位為bytes/s。默認為0,即無限制
mqtt.setMaxReadRate(0);
// 設置連接的最大發送速率,單位為bytes/s。默認為0,即無限制
mqtt.setMaxWriteRate(0);
Boolean contition = true;
while (contition) {
// 選擇消息分發隊列
// 若沒有調用方法setDispatchQueue,客戶端將為連接新建一個隊列。如果想實現多個連接使用公用的隊列,顯式地指定隊列是一個非常方便的實現方法
mqtt.setDispatchQueue(Dispatch.createQueue("foo"));
FutureConnection connection = mqtt.futureConnection();
Future<Void> f1 = connection.connect();
try {
f1.await();
} catch (Exception e1) {
// TODO Auto-generated catch block
e1.printStackTrace();
}
// 訂閱消息
Future<byte[]> f2 = connection.subscribe(new Topic[] {
new Topic("person/blacklist/#", QoS.AT_LEAST_ONCE) });
//
try {
byte[] qoses = f2.await();
} catch (Exception e1) {
// TODO Auto-generated catch block
e1.printStackTrace();
}
// 發送身份驗證消息.
// Future<Void> f3 = connection.publish("foo", "Hello".getBytes(),
// QoS.AT_LEAST_ONCE, false);
// 接收訂閱消息..
Future<Message> receive = connection.receive();
// 打印消息.
Message message = null;
try {
message = receive.await();
} catch (Exception e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
String subscribeInfo = String.valueOf(message.getPayloadBuffer());
if(subscribeInfo.startsWith("ascii")) {
JSONObject subscribeObject = JSONObject.parseObject(subscribeInfo.substring(7));
System.out.println(subscribeInfo);
System.out.println(subscribeObject.get("name"));//姓名,對應name
System.out.println(subscribeObject.get("certifiedNo"));//身份證,對應identityId
System.out.println(subscribeObject.get("url"));//圖片地址,對應imageUrl
//還需要傳送tarLibSerial
//入庫黑名單
}
// 回應
message.ack();
//
Future<Void> f4 = connection.disconnect();
try {
f4.await();
} catch (Exception e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
}
