Spring boot 集成 阿里 Mqtt


因為公司業務需求,需要接入 阿里Mqtt,自己基於Spring寫了一個小demo,記錄下來,已備以后需要。

第一步

創建一個實體bean用來裝載 MqttClient

private MqttClient mqttClient;

    @Autowired
    private MqttConnectOptions mqttConnectOptions;

    @Autowired
    private   MqttConfig mqttConfig;

    @Autowired
    private MqttCallback mqttCallback;


    private void start() throws MqttException {
       final MemoryPersistence memoryPersistence = new MemoryPersistence();
        /**
         * 客戶端使用的協議和端口必須匹配,具體參考文檔 https://help.aliyun.com/document_detail/44866.html?spm=a2c4g.11186623.6.552.25302386RcuYFB
         * 如果是 SSL 加密則設置ssl://endpoint:8883
         */
        this.mqttClient= new MqttClient("tcp://" + mqttConfig.getConnectEndpoint() + ":1883",
                mqttConfig.getGroupId() + "@@@" + mqttConfig.getClientId(), memoryPersistence);
        mqttClient.setTimeToWait(mqttConfig.getTimeToWait());
        mqttClient.setCallback(mqttCallback);
        mqttClient.connect(mqttConnectOptions);
    }

    private void shutdown() throws MqttException {
        this.mqttClient.disconnect();
    }
    public MqttClient getMqttClient(){
        return this.mqttClient;
    }

第二步

對MqClient 進行加載


@Autowired
    private MqttConfig mqttConfig;
@Bean
public MqttConnectOptions getMqttConnectOptions() throws NoSuchAlgorithmException, InvalidKeyException {
    MqttConnectOptions mqttConnectOptions=new MqttConnectOptions();
    //組裝用戶名密碼
    mqttConnectOptions.setUserName("Signature|" + mqttConfig.getAccessKey() + "|" + mqttConfig.getInstanceId());
    //密碼簽名
    mqttConnectOptions.setPassword(info.feibiao.live.config.mqtt.Tools.macSignature(mqttConfig.getGroupId()+"@@@"+mqttConfig.getClientId(), mqttConfig.getSecretKey()).toCharArray());
    mqttConnectOptions.setCleanSession(true);
    mqttConnectOptions.setKeepAliveInterval(90);
    mqttConnectOptions.setAutomaticReconnect(true);
    mqttConnectOptions.setMqttVersion(MQTT_VERSION_3_1_1);
    //連接超時時間
    mqttConnectOptions.setConnectionTimeout(5000);
    mqttConnectOptions.setKeepAliveInterval(2);
    return mqttConnectOptions;
}
@Bean(initMethod = "start", destroyMethod = "shutdown")
public MqttClientBean getClient() {
    return new MqttClientBean();
}

第三步

創建接收消息,連接成功,連接丟失 回調類
連接成功后需要訂閱相關主題

@Autowired
    MqttClientBean mqttClientBean;
    @Autowired
    MqttConfig mqttConfig;
    @Override
    public void connectComplete(boolean reconnect, String serverURI) {
        /**
         * 客戶端連接成功后就需要盡快訂閱需要的 topic
         */
        System.out.println("connect success");

        ExecutorService mqttExecutorService = new ThreadPoolExecutor(1, 1, 0, TimeUnit.MILLISECONDS,
                new LinkedBlockingQueue<Runnable>());
        mqttExecutorService.submit(() -> {
            try {
                //訂閱主題,主主題后面可以跟子主題 過濾規則 +:過濾一級 ,#:過濾所有
                final String[] topicFilter = {mqttConfig.getTopicId() + "/" + "testMq4Iot"};
                int qosLevel=0;
                final int[] qos = {qosLevel};
                MqttClient mqttClient = mqttClientBean.getMqttClient();
                mqttClient.subscribe(topicFilter, qos);
            } catch (MqttException e) {
                e.printStackTrace();
            }
        });
    }
    
    @Override
    public void connectionLost(Throwable throwable) {
        throwable.printStackTrace();
    }
    
    @Override
    public void messageArrived(String s, MqttMessage mqttMessage) throws Exception {
        /**
         * 這個地方消費
         * 消費消息的回調接口,需要確保該接口不拋異常,該接口運行返回即代表消息消費成功。
         * 消費消息需要保證在規定時間內完成,如果消費耗時超過服務端約定的超時時間,對於可靠傳輸的模式,服務端可能會重試推送,業務需要做好冪等去重處理。超時時間約定參考限制
         * https://help.aliyun.com/document_detail/63620.html?spm=a2c4g.11186623.6.546.229f1f6ago55Fj
         */
        System.out.println(
                "receive msg from topic " + s + " , body is " + new String(mqttMessage.getPayload()));
    }
    
    @Override
    public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
        System.out.println("send msg succeed topic is : " + iMqttDeliveryToken.getTopics()[0]);
    }

第四步

添加配置實體類,從yml配置文件中讀取配置數據

/**
     * 可在阿里雲控制台找到(實例id)
     */
    private String instanceId;
    /**
     * accessKey
     */
    private String accessKey;
    /**
     * 密鑰
     */
    private String secretKey;
    /**
     * TCP 協議接入點
     */
    private String connectEndpoint;
    /**
     * 話題id
     */
    private String topicId;
    /**
     * 群組id
     */
    private String groupId;
    /**
     * 消息模式(廣播訂閱, 集群訂閱)
     */
    private String messageModel;

    /**
     * 超時時間
     */
    private String sendMsgTimeoutMillis;
    
    /**
     * 順序消息消費失敗進行重試前的等待時間 單位(毫秒)
     */
    private String suspendTimeMillis;
    
    /**
     * 消息消費失敗時的最大重試次數
     */
    private String maxReconsumeTimes;
    
    /**
     * 公網token服務器
     */
    private String mqttClientTokenServer;
    /**
     * 過期時間(默認1個月)
     */
    private Long mqttClientTokenExpireTime;
    /**
     * 分發給客戶端的token的操作權限
     */
    private String mqttAction;
    /**
     * 客戶端標識
     */
    private String clientId;
    
    /**
     * QoS參數代表傳輸質量,可選0,1,2,根據實際需求合理設置,具體參考 https://help.aliyun.com/document_detail/42420.html?spm=a2c4g.11186623.6.544.1ea529cfAO5zV3
     */
    private int qosLevel = 0;
    /**
     * 客戶端超時時間
     */
    private  int timeToWait;

配置文件:

spring.application.name: mqtt-server-demo
server.port: 18005
# mqtt消息
mqtt.msg:
  instanceId: post-cn-0pp13c3gn0u #實例Id
  accessKey: LTAIPZjAd2naVfA0	#appId
  secretKey: 38ZLMHoP5r4p0a4gUEGUhzL46EdzQx #密鑰 阿里雲控制台查看
  connectEndpoint: post-cn-0pp13c3gn0u.mqtt.aliyuncs.com #端點
  topicId: TID_liveChat #父級主題
  groupId: GID_liveChat #分組
  messageModel: BROADCASTING #廣播訂閱方式, 默認是 CLUSTERING 集群訂閱
  sendMsgTimeoutMillis: 20000 # 發消息超時時間30s
  suspendTimeMillis: 500 #順序消息消費失敗進行重試前的等待時間 單位(毫秒)
  maxReconsumeTimes: 3 #消息消費失敗時的最大重試次數
  mqttClientTokenServer: mqauth.aliyuncs.com # 公網token服務器
  mqttClientTokenExpireTime: 2592000000 # token 過期時間1個月
  mqttAction: R,W # 讀寫操作
  clientId: FEI_JAVA #客戶端名稱
  qosLevel: 0 #QoS參數代表傳輸質量,可選0,1,2
  timeToWait: 5000 #客戶端超時時間
spring.main.allow-bean-definition-overriding: true

最后貼上簽名方法:

/**
 * 計算簽名,參數分別是參數對以及密鑰
 *
 * @param requestParams 參數對,即參與計算簽名的參數
 * @param secretKey 密鑰
 * @return 簽名字符串
 * @throws NoSuchAlgorithmException
 * @throws InvalidKeyException
 */
public static String doHttpSignature(Map<String, String> requestParams,
    String secretKey) throws NoSuchAlgorithmException, InvalidKeyException {
    List<String> paramList = new ArrayList<String>();
    for (Map.Entry<String, String> entry : requestParams.entrySet()) {
        paramList.add(entry.getKey() + "=" + entry.getValue());
    }
    Collections.sort(paramList);
    StringBuffer sb = new StringBuffer();
    for (int i = 0; i < paramList.size(); i++) {
        if (i > 0) {
            sb.append('&');
        }
        sb.append(paramList.get(i));
    }
    return macSignature(sb.toString(), secretKey);
}

/**
 * @param text 要簽名的文本
 * @param secretKey 阿里雲MQ secretKey
 * @return 加密后的字符串
 * @throws InvalidKeyException
 * @throws NoSuchAlgorithmException
 */
public static String macSignature(String text,
    String secretKey) throws InvalidKeyException, NoSuchAlgorithmException {
    Charset charset = Charset.forName("UTF-8");
    String algorithm = "HmacSHA1";
    Mac mac = Mac.getInstance(algorithm);
    mac.init(new SecretKeySpec(secretKey.getBytes(charset), algorithm));
    byte[] bytes = mac.doFinal(text.getBytes(charset));
    return new String(Base64.encodeBase64(bytes), charset);
}

阿里雲mqtt支持Token 模式:

獲取token以及銷毀token:

private static final String applyTokenUrl = "/token/apply";
private static final String revokeTokenUrl = "/token/revoke";



/**
 * 申請 Token 接口,具體參數參考鏈接
 * https://help.aliyun.com/document_detail/54276.html?spm=a2c4g.11186623.6.562.f12033f5ay6nu5
 *
 * @param apiUrl token 服務器地址,參考文檔設置正確的地址
 * @param accessKey 賬號 AccessKey,由控制台獲取
 * @param secretKey 賬號 SecretKey,由控制台獲取
 * @param topics 申請的 topic 列表
 * @param action Token類型
 * @param expireTime Token 過期的時間戳
 * @param instanceId MQ4IoT 實例 Id
 * @return 如果申請成功則返回 token 內容
 * @throws InvalidKeyException
 * @throws NoSuchAlgorithmException
 * @throws IOException
 * @throws KeyStoreException
 * @throws UnrecoverableKeyException
 * @throws KeyManagementException
 */
public String applyToken(String apiUrl, String accessKey, String secretKey, List<String> topics,
                                String action,
                                long expireTime,
                                String instanceId) throws InvalidKeyException, NoSuchAlgorithmException, IOException, KeyStoreException, UnrecoverableKeyException, KeyManagementException {
    Map<String, String> paramMap = new HashMap<>();
    Collections.sort(topics);
    StringBuilder builder = new StringBuilder();
    for (String topic : topics) {
        builder.append(topic).append(",");
    }
    if (builder.length() > 0) {
        builder.setLength(builder.length() - 1);
    }
    paramMap.put("resources", builder.toString());
    paramMap.put("actions", action);
    paramMap.put("serviceName", "mq");
    paramMap.put("expireTime", String.valueOf(System.currentTimeMillis() + expireTime));
    paramMap.put("instanceId", instanceId);
    String signature = Tools.doHttpSignature(paramMap, secretKey);
    paramMap.put("proxyType", "MQTT");
    paramMap.put("accessKey", accessKey);
    paramMap.put("signature", signature);
    JSONObject object = Tools.httpsPost("http://"+apiUrl + applyTokenUrl, paramMap);
    if (object != null) {
        return (String) object.get("tokenData");
    }
    return null;
}

/**
 * 提前注銷 token,一般在 token 泄露出現安全問題時,提前禁用特定的客戶端
 *
 * @param apiUrl token 服務器地址,參考文檔設置正確的地址
 * @param accessKey 賬號 AccessKey,由控制台獲取
 * @param secretKey 賬號 SecretKey,由控制台獲取
 * @param token 禁用的 token 內容
 * @throws InvalidKeyException
 * @throws NoSuchAlgorithmException
 * @throws IOException
 * @throws UnrecoverableKeyException
 * @throws KeyStoreException
 * @throws KeyManagementException
 */
public void revokeToken(String apiUrl, String accessKey, String secretKey,
                               String token) throws InvalidKeyException, NoSuchAlgorithmException, IOException, UnrecoverableKeyException, KeyStoreException, KeyManagementException {
    Map<String, String> paramMap = new HashMap<String, String>();
    paramMap.put("token", token);
    String signature = Tools.doHttpSignature(paramMap, secretKey);
    paramMap.put("signature", signature);
    paramMap.put("accessKey", accessKey);
    JSONObject object = Tools.httpsPost("http://"+apiUrl + revokeTokenUrl, paramMap);
}

Token模式客戶端使用方式:

在構建ConnectionOptionWrapper的時候使用簽發的token:

String token="LzMT+XLFl5u**********************************KhCznZx";
Map<String, String> tokenData = new HashMap<String, String>();
tokenData.put("RW", token);
ConnectionOptionWrapper connectionOptionWrapper = new ConnectionOptionWrapper(instanceId, accessKey, clientId, tokenData);

碼雲地址:
https://gitee.com/ioso/mqtt-demo


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM