在EMQ官網上拿張圖哈^_^;;
本來就是在做物聯網項目嘛,MQTT協議肯定是必須要的嘛,但之前不是我來負責這一塊的,就沒有對MQTT以及EMQ有更多的理解,只是會用能用罷了,要是讓我說個1 2 3 ,肯定是不行的呀,最近有一個項目剛好我來對接開發,而且是MQTT協議的,由於受測試環境的限制,只能在本地筆記本上window上搭建一套EMQ環境來試試啦;
一、EMQX安裝;
1、下載EMQ X Broker壓縮包;這里放了官網最新版的https://www.emqx.io/downloads/broker/v4.1.0/emqx-windows-v4.1.0.zip;下面文章中的使用的是 “emqx-windows10-v3.2.0.zip”;
2、在筆記本本地目錄直接解壓就行啦;
3、進入解壓后的目錄;C:\OldData_Win7\emqx-windows10-v3.2.0\emqx\bin;
4、進入EMQX啟動命令目錄;按下shift鍵+鼠標右鍵,選擇 ‘open PowerShell window here’;
5、啟動EMQX服務;輸入./emqx console;會彈出erlang的后台界面;
輸入./emqx stop;即可停止服務啦;
輸入./emqx start;即可輕松啟動服務啦;(推薦)
6、啟動成功后,EMQX自帶的dashboard既可以訪問啦;http://localhost:18083/#/login 默認u/p:admin/public
7、到這里我們的EMQ X Broker就已經可以使用了;
二、java連接EMQX,並發起訂閱發布的操作;
1、因為客戶端和服務端是都可以發布和訂閱的;所以我們就以發布和訂閱來區分吧;
訂閱端sub:(啟動main方法就可以連接到我們本地的EMQX了,哦,我這里使用了共享訂閱的,random策略)
package com.daopin.project.mqtt;
import org.eclipse.paho.client.mqttv3.*;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class SubMsg0 {
private static final Logger logger = LoggerFactory.getLogger(SubMsg0.class);
//private static String topic = "$share/group/test1";
//private static String topic = "$queue/test1";
//private static String topic = "test1";
private static int qos = 0;
private static String broker = "tcp://127.0.0.1:1883";
private static String userName = "COAP";
private static String passWord = "coap";
private static String clientId = "nokia-mqtt-cluster-0";
/**
* 有三種消息發布服務質量:
* <p>
* “至多一次”,消息發布完全依賴底層TCP/IP網絡。會發生消息丟失或重復。這一級別可用於如下情況,環境傳感器數據,丟失一次讀記錄無所謂,因為不久后還會有第二次發送。這一種方式主要普通APP的推送,倘若你的智能設備在消息推送時未聯網,推送過去沒收到,再次聯網也就收不到了。
* <p>
* “至少一次”,確保消息到達,但消息重復可能會發生。
* <p>
* “只有一次”,確保消息到達一次。在一些要求比較嚴格的計費系統中,可以使用此級別。在計費系統中,消息重復或丟失會導致不正確的結果。這種最高質量的消息發布服務還可以用於即時通訊類的APP的推送,確保用戶收到且只會收到一次。
*/
private static MqttClient connect(String clientId) throws MqttException {
MemoryPersistence persistence = new MemoryPersistence();
MqttConnectOptions connOpts = new MqttConnectOptions();
//String[] uris = {"tcp://10.100.124.206:1883","tcp://10.100.124.206:1883"};
// 設置是否清空session,這里如果設置為false表示服務器會保留客戶端的連接記錄,
// 這里設置為true表示每次連接到服務器都以新的身份連接(客戶端再次上線時,將不再關心之前所有的訂閱關系以及離線消息)
// 這里設置為false表示客戶端再次上線時,還需要處理之前的離線消息,而之前的訂閱關系也會持續生效
connOpts.setCleanSession(true);
connOpts.setUserName(userName);
connOpts.setPassword(passWord.toCharArray());
connOpts.setConnectionTimeout(10);
// 設置會話心跳時間 單位為秒 服務器會每隔1.5*20秒的時間向客戶端發送個消息判斷客戶端是否在線,但這個方法並沒有重連的機制
connOpts.setKeepAliveInterval(20);
connOpts.setAutomaticReconnect(true);
connOpts.setMaxInflight(10);
//connOpts.setServerURIs(uris);
//setWill方法(遺囑),如果項目中需要知道客戶端是否掉線可以調用該方法。設置最終端口的通知消息
//connOpts.setWill(topic, "close".getBytes(), 2, true);
// MemoryPersistence設置clientid的保存形式,默認為以內存保存
MqttClient mqttClient = new MqttClient(broker, clientId, persistence);
mqttClient.setCallback(new MqttCallbackExtended() {
@Override
public void connectionLost(Throwable throwable) {
//在斷開連接時調用 連接丟失后,一般在這里面進行重連
logger.warn("connectionLost ... ; We will do something ...");
}
@Override
public void messageArrived(String topic, MqttMessage mqttMessage) {
//接收已經預訂的發布
logger.info("topic - > " + topic + ", mqttMessage - > " + mqttMessage);
}
@Override
public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
//接收到已經發布的 QoS 1 或 QoS 2 消息的傳遞令牌時調用
logger.warn("deliveryComplete ... ; We will do something ...");
}
@Override
public void connectComplete(boolean reconnect, String serverURI) {
/*subscribe();*/ //連接成功,需要上傳客戶端所有的訂閱關系
logger.warn("connectComplete ... ; We will do something ...");
}
});
mqttClient.connect(connOpts);
return mqttClient;
}
public static void sub(MqttClient mqttClient, String topic) throws MqttException {
int[] Qos = {qos};
String[] topics = {topic};
mqttClient.subscribe(topics, Qos);
logger.info("sub >> " + topic);
}
private static void runsub(String clientId, String topic) throws MqttException {
MqttClient mqttClient = connect(clientId);
if (mqttClient != null) {
sub(mqttClient, topic);
}
}
public static void main(String[] args) throws MqttException {
runsub(clientId, "$queue/qdq02mzl6kvs/coap-server/uplinkMsg");
}
}
2、發布端pub;
package com.daopin.project.mqtt;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.eclipse.paho.client.mqttv3.*;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Date;
import java.util.concurrent.*;
public class PubMsg {
private static final Logger logger = LoggerFactory.getLogger(PubMsg.class);
private static int qos = 0;
private static String broker = "tcp://127.0.0.1:1883";
private static String userName = "COAP";
private static String passWord = "coap";
public static ThreadFactory namedThreadFactory = new ThreadFactoryBuilder().setNameFormat("demo-pool-%d").build();
public static ExecutorService singleThreadPool = new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>(1024), namedThreadFactory, new ThreadPoolExecutor.AbortPolicy());
private static MqttClient connect(String clientId, String userName,
String password) throws MqttException {
MemoryPersistence persistence = new MemoryPersistence();
MqttConnectOptions connOpts = new MqttConnectOptions();
connOpts.setCleanSession(true);
connOpts.setUserName(userName);
connOpts.setPassword(password.toCharArray());
connOpts.setConnectionTimeout(10);
connOpts.setKeepAliveInterval(20);
//String[] uris = {"tcp://10.100.124.206:1883","tcp://10.100.124.207:1883"};
//connOpts.setServerURIs(uris); //起到負載均衡和高可用的作用
MqttClient mqttClient = new MqttClient(broker, clientId, persistence);
mqttClient.setCallback(new PushCallback("test"));
mqttClient.connect(connOpts);
return mqttClient;
}
private static void pub(MqttClient sampleClient, String msg, String topic)
throws Exception {
while (true){
MqttMessage message = new MqttMessage((msg+" "+new Date()).getBytes());
message.setQos(qos);
message.setRetained(false);
sampleClient.publish(topic, message);
logger.info("pub-->" + message);
Thread.sleep(3000L);
}
}
private static void publish(String str, String clientId, String topic) {
try {
MqttClient mqttClient = connect(clientId, userName, passWord);
if (mqttClient != null) {
pub(mqttClient, str, topic);
}
if (mqttClient != null) {
mqttClient.disconnect();
}
} catch (Exception e) {
e.printStackTrace();
}
}
public static void main(String[] args) throws MqttException {
//singleThreadPool.execute( ()-> publish("message content", "868333030030008", "sl6o3lbk94xg/868333030030008/uplinkMsg/0/data"));
publish("AAAA0000", "nokia-mqtt-server", "qdq02mzl6kvs/coap-server/uplinkMsg");
}
}
class PushCallback implements MqttCallback {
private static final Logger logger = LoggerFactory.getLogger(PushCallback.class);
private String threadId;
public PushCallback(String threadId) {
this.threadId = threadId;
}
@Override
public void connectionLost(Throwable cause) {
//在斷開連接時調用 連接丟失后,一般在這里面進行重連
logger.warn("connectionLost ... ; We will do something ...");
}
@Override
public void deliveryComplete(IMqttDeliveryToken token) {
//System.out.println("deliveryComplete---------" + token.isComplete());
//接收到已經發布的 QoS 1 或 QoS 2 消息的傳遞令牌時調用
logger.warn("deliveryComplete ... ; We will do something ...");
}
@Override
public void messageArrived(String topic, MqttMessage message) throws Exception {
String msg = new String(message.getPayload());
System.out.println(threadId + " " + msg);
}
}
我們的訂閱端也已經收到這條pub的數據了。。。
3、我們也可以在 dashboard來觀察到這兩個會話;
三、總結;
1、共享訂閱
包含一個主題過濾器和訂閱選項,唯一的區別在於共享訂閱的主題過濾器格式必須是 $share/{ShareName}/{filter} 這種形式。這幾個的字段的含義分別是:
$share 前綴表明這將是一個共享訂閱
{ShareName} 是一個不包含 “/”, “+” 以及 “#” 的字符串。訂閱會話通過使用相同的 {ShareName} 表示共享同一個訂閱,匹配該訂閱的消息每次只會發布給其中一個會話
{filter} 即非共享訂閱中的主題過濾器
共享訂閱使得訂閱端能夠負載均衡地消費消息,但 MQTT 協議並沒有規定 Server 應當使用什么負載均衡策略。作為參考,EMQ X 提供了 random, round_robin, sticky, hash 四種策略供用戶自行選擇。
2、connetcLost方法
我們最好要在這里做一些事情,拋出異常或者打印日志,這樣你才能知道到底什么時候連接丟失了的
3、 消息發布服務質量
如何選擇QoS:
QoS 級別越高,流程越復雜,系統資源消耗越大。應用程序可以根據自己的網絡場景和業務需求,選擇合適的 QoS 級別,比如在同一個子網內部的服務間的消息交互往往選用 QoS 0;而通過互聯網的實時消息通信往往選用 QoS 1;QoS 2 使用的場景相對少一些,適合一些支付請求之類的要求較高的場景。
4、CleanSession參數;若是共享訂閱模式下,需要將此字段配置為true,保證在訂閱端有一個掉線的情況下,可以清除掉session信息,這樣就不會收到訂閱的pub信息了,避免了數據丟失的問題。