因為公司業務需求,需要接入 阿里Mqtt,自己基於Spring寫了一個小demo,記錄下來,已備以后需要。
第一步
創建一個實體bean用來裝載 MqttClient
private MqttClient mqttClient;
@Autowired
private MqttConnectOptions mqttConnectOptions;
@Autowired
private MqttConfig mqttConfig;
@Autowired
private MqttCallback mqttCallback;
private void start() throws MqttException {
final MemoryPersistence memoryPersistence = new MemoryPersistence();
/**
* 客戶端使用的協議和端口必須匹配,具體參考文檔 https://help.aliyun.com/document_detail/44866.html?spm=a2c4g.11186623.6.552.25302386RcuYFB
* 如果是 SSL 加密則設置ssl://endpoint:8883
*/
this.mqttClient= new MqttClient("tcp://" + mqttConfig.getConnectEndpoint() + ":1883",
mqttConfig.getGroupId() + "@@@" + mqttConfig.getClientId(), memoryPersistence);
mqttClient.setTimeToWait(mqttConfig.getTimeToWait());
mqttClient.setCallback(mqttCallback);
mqttClient.connect(mqttConnectOptions);
}
private void shutdown() throws MqttException {
this.mqttClient.disconnect();
}
public MqttClient getMqttClient(){
return this.mqttClient;
}
第二步
對MqClient 進行加載
@Autowired
private MqttConfig mqttConfig;
@Bean
public MqttConnectOptions getMqttConnectOptions() throws NoSuchAlgorithmException, InvalidKeyException {
MqttConnectOptions mqttConnectOptions=new MqttConnectOptions();
//組裝用戶名密碼
mqttConnectOptions.setUserName("Signature|" + mqttConfig.getAccessKey() + "|" + mqttConfig.getInstanceId());
//密碼簽名
mqttConnectOptions.setPassword(info.feibiao.live.config.mqtt.Tools.macSignature(mqttConfig.getGroupId()+"@@@"+mqttConfig.getClientId(), mqttConfig.getSecretKey()).toCharArray());
mqttConnectOptions.setCleanSession(true);
mqttConnectOptions.setKeepAliveInterval(90);
mqttConnectOptions.setAutomaticReconnect(true);
mqttConnectOptions.setMqttVersion(MQTT_VERSION_3_1_1);
//連接超時時間
mqttConnectOptions.setConnectionTimeout(5000);
mqttConnectOptions.setKeepAliveInterval(2);
return mqttConnectOptions;
}
@Bean(initMethod = "start", destroyMethod = "shutdown")
public MqttClientBean getClient() {
return new MqttClientBean();
}
第三步
創建接收消息,連接成功,連接丟失 回調類
連接成功后需要訂閱相關主題
@Autowired
MqttClientBean mqttClientBean;
@Autowired
MqttConfig mqttConfig;
@Override
public void connectComplete(boolean reconnect, String serverURI) {
/**
* 客戶端連接成功后就需要盡快訂閱需要的 topic
*/
System.out.println("connect success");
ExecutorService mqttExecutorService = new ThreadPoolExecutor(1, 1, 0, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
mqttExecutorService.submit(() -> {
try {
//訂閱主題,主主題后面可以跟子主題 過濾規則 +:過濾一級 ,#:過濾所有
final String[] topicFilter = {mqttConfig.getTopicId() + "/" + "testMq4Iot"};
int qosLevel=0;
final int[] qos = {qosLevel};
MqttClient mqttClient = mqttClientBean.getMqttClient();
mqttClient.subscribe(topicFilter, qos);
} catch (MqttException e) {
e.printStackTrace();
}
});
}
@Override
public void connectionLost(Throwable throwable) {
throwable.printStackTrace();
}
@Override
public void messageArrived(String s, MqttMessage mqttMessage) throws Exception {
/**
* 這個地方消費
* 消費消息的回調接口,需要確保該接口不拋異常,該接口運行返回即代表消息消費成功。
* 消費消息需要保證在規定時間內完成,如果消費耗時超過服務端約定的超時時間,對於可靠傳輸的模式,服務端可能會重試推送,業務需要做好冪等去重處理。超時時間約定參考限制
* https://help.aliyun.com/document_detail/63620.html?spm=a2c4g.11186623.6.546.229f1f6ago55Fj
*/
System.out.println(
"receive msg from topic " + s + " , body is " + new String(mqttMessage.getPayload()));
}
@Override
public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
System.out.println("send msg succeed topic is : " + iMqttDeliveryToken.getTopics()[0]);
}
第四步
添加配置實體類,從yml配置文件中讀取配置數據
/**
* 可在阿里雲控制台找到(實例id)
*/
private String instanceId;
/**
* accessKey
*/
private String accessKey;
/**
* 密鑰
*/
private String secretKey;
/**
* TCP 協議接入點
*/
private String connectEndpoint;
/**
* 話題id
*/
private String topicId;
/**
* 群組id
*/
private String groupId;
/**
* 消息模式(廣播訂閱, 集群訂閱)
*/
private String messageModel;
/**
* 超時時間
*/
private String sendMsgTimeoutMillis;
/**
* 順序消息消費失敗進行重試前的等待時間 單位(毫秒)
*/
private String suspendTimeMillis;
/**
* 消息消費失敗時的最大重試次數
*/
private String maxReconsumeTimes;
/**
* 公網token服務器
*/
private String mqttClientTokenServer;
/**
* 過期時間(默認1個月)
*/
private Long mqttClientTokenExpireTime;
/**
* 分發給客戶端的token的操作權限
*/
private String mqttAction;
/**
* 客戶端標識
*/
private String clientId;
/**
* QoS參數代表傳輸質量,可選0,1,2,根據實際需求合理設置,具體參考 https://help.aliyun.com/document_detail/42420.html?spm=a2c4g.11186623.6.544.1ea529cfAO5zV3
*/
private int qosLevel = 0;
/**
* 客戶端超時時間
*/
private int timeToWait;
配置文件:
spring.application.name: mqtt-server-demo
server.port: 18005
# mqtt消息
mqtt.msg:
instanceId: post-cn-0pp13c3gn0u #實例Id
accessKey: LTAIPZjAd2naVfA0 #appId
secretKey: 38ZLMHoP5r4p0a4gUEGUhzL46EdzQx #密鑰 阿里雲控制台查看
connectEndpoint: post-cn-0pp13c3gn0u.mqtt.aliyuncs.com #端點
topicId: TID_liveChat #父級主題
groupId: GID_liveChat #分組
messageModel: BROADCASTING #廣播訂閱方式, 默認是 CLUSTERING 集群訂閱
sendMsgTimeoutMillis: 20000 # 發消息超時時間30s
suspendTimeMillis: 500 #順序消息消費失敗進行重試前的等待時間 單位(毫秒)
maxReconsumeTimes: 3 #消息消費失敗時的最大重試次數
mqttClientTokenServer: mqauth.aliyuncs.com # 公網token服務器
mqttClientTokenExpireTime: 2592000000 # token 過期時間1個月
mqttAction: R,W # 讀寫操作
clientId: FEI_JAVA #客戶端名稱
qosLevel: 0 #QoS參數代表傳輸質量,可選0,1,2
timeToWait: 5000 #客戶端超時時間
spring.main.allow-bean-definition-overriding: true
最后貼上簽名方法:
/**
* 計算簽名,參數分別是參數對以及密鑰
*
* @param requestParams 參數對,即參與計算簽名的參數
* @param secretKey 密鑰
* @return 簽名字符串
* @throws NoSuchAlgorithmException
* @throws InvalidKeyException
*/
public static String doHttpSignature(Map<String, String> requestParams,
String secretKey) throws NoSuchAlgorithmException, InvalidKeyException {
List<String> paramList = new ArrayList<String>();
for (Map.Entry<String, String> entry : requestParams.entrySet()) {
paramList.add(entry.getKey() + "=" + entry.getValue());
}
Collections.sort(paramList);
StringBuffer sb = new StringBuffer();
for (int i = 0; i < paramList.size(); i++) {
if (i > 0) {
sb.append('&');
}
sb.append(paramList.get(i));
}
return macSignature(sb.toString(), secretKey);
}
/**
* @param text 要簽名的文本
* @param secretKey 阿里雲MQ secretKey
* @return 加密后的字符串
* @throws InvalidKeyException
* @throws NoSuchAlgorithmException
*/
public static String macSignature(String text,
String secretKey) throws InvalidKeyException, NoSuchAlgorithmException {
Charset charset = Charset.forName("UTF-8");
String algorithm = "HmacSHA1";
Mac mac = Mac.getInstance(algorithm);
mac.init(new SecretKeySpec(secretKey.getBytes(charset), algorithm));
byte[] bytes = mac.doFinal(text.getBytes(charset));
return new String(Base64.encodeBase64(bytes), charset);
}
阿里雲mqtt支持Token 模式:
獲取token以及銷毀token:
private static final String applyTokenUrl = "/token/apply";
private static final String revokeTokenUrl = "/token/revoke";
/**
* 申請 Token 接口,具體參數參考鏈接
* https://help.aliyun.com/document_detail/54276.html?spm=a2c4g.11186623.6.562.f12033f5ay6nu5
*
* @param apiUrl token 服務器地址,參考文檔設置正確的地址
* @param accessKey 賬號 AccessKey,由控制台獲取
* @param secretKey 賬號 SecretKey,由控制台獲取
* @param topics 申請的 topic 列表
* @param action Token類型
* @param expireTime Token 過期的時間戳
* @param instanceId MQ4IoT 實例 Id
* @return 如果申請成功則返回 token 內容
* @throws InvalidKeyException
* @throws NoSuchAlgorithmException
* @throws IOException
* @throws KeyStoreException
* @throws UnrecoverableKeyException
* @throws KeyManagementException
*/
public String applyToken(String apiUrl, String accessKey, String secretKey, List<String> topics,
String action,
long expireTime,
String instanceId) throws InvalidKeyException, NoSuchAlgorithmException, IOException, KeyStoreException, UnrecoverableKeyException, KeyManagementException {
Map<String, String> paramMap = new HashMap<>();
Collections.sort(topics);
StringBuilder builder = new StringBuilder();
for (String topic : topics) {
builder.append(topic).append(",");
}
if (builder.length() > 0) {
builder.setLength(builder.length() - 1);
}
paramMap.put("resources", builder.toString());
paramMap.put("actions", action);
paramMap.put("serviceName", "mq");
paramMap.put("expireTime", String.valueOf(System.currentTimeMillis() + expireTime));
paramMap.put("instanceId", instanceId);
String signature = Tools.doHttpSignature(paramMap, secretKey);
paramMap.put("proxyType", "MQTT");
paramMap.put("accessKey", accessKey);
paramMap.put("signature", signature);
JSONObject object = Tools.httpsPost("http://"+apiUrl + applyTokenUrl, paramMap);
if (object != null) {
return (String) object.get("tokenData");
}
return null;
}
/**
* 提前注銷 token,一般在 token 泄露出現安全問題時,提前禁用特定的客戶端
*
* @param apiUrl token 服務器地址,參考文檔設置正確的地址
* @param accessKey 賬號 AccessKey,由控制台獲取
* @param secretKey 賬號 SecretKey,由控制台獲取
* @param token 禁用的 token 內容
* @throws InvalidKeyException
* @throws NoSuchAlgorithmException
* @throws IOException
* @throws UnrecoverableKeyException
* @throws KeyStoreException
* @throws KeyManagementException
*/
public void revokeToken(String apiUrl, String accessKey, String secretKey,
String token) throws InvalidKeyException, NoSuchAlgorithmException, IOException, UnrecoverableKeyException, KeyStoreException, KeyManagementException {
Map<String, String> paramMap = new HashMap<String, String>();
paramMap.put("token", token);
String signature = Tools.doHttpSignature(paramMap, secretKey);
paramMap.put("signature", signature);
paramMap.put("accessKey", accessKey);
JSONObject object = Tools.httpsPost("http://"+apiUrl + revokeTokenUrl, paramMap);
}
Token模式客戶端使用方式:
在構建ConnectionOptionWrapper的時候使用簽發的token:
String token="LzMT+XLFl5u**********************************KhCznZx";
Map<String, String> tokenData = new HashMap<String, String>();
tokenData.put("RW", token);
ConnectionOptionWrapper connectionOptionWrapper = new ConnectionOptionWrapper(instanceId, accessKey, clientId, tokenData);