RocketMQ的使用


1.如何發布?

1.1.業務層調用發送mq消息方法  asyncService.sendAliMqttMsg();

//相關配置
public static final String TOPIC_TEST = "qbkj_test_mqtt";
public static final String TOPIC_QUEUE = "qbkj_queue_mqtt";
public static final String TOPIC_OBSERVE = "qbkj_observe_mqtt";
/** 排號類型-登記台 */
public static final String QUEUE_TYPE_REG = "1";
/** 排號類型-接種台 */
public static final String QUEUE_TYPE_INJECT = "2";


//
TODO:登記台叫號 Map<String, Object> param = new HashMap<String, Object>(); param.put("type", WebSocket.QUEUE_TYPE_REG); param.put("reg", callList); param.put("queueno", bsRegQueue.getQueue()); param.put("childname", bsRegQueue.getName()); param.put("roomcode", bsRegQueue.getRoom()); param.put("localcode", bsRegQueue.getLocalCode2()); if("mqtt".equals(Global.getConfig("quene_mode"))){ asyncService.sendAliMqttMsg(AliMqManager.TOPIC_QUEUE,bsRegQueue.getLocalCode2(), JsonMapper.toJsonString(param)); }else{ WebSocket.sendBroadCast(JsonMapper.toJsonString(param), bsRegQueue.getLocalCode2()); }

1.2.sendAliMqttMsg()具體方法

public void sendAliMqttMsg(String topic, String tag, String txt){
    try {
      AliMqManager.sendMqttMsg(topic, tag, txt);
    } catch (UnsupportedEncodingException e) {
      logger.error("sendAliMqttMsg失敗{}",e.getMessage());
    }
}

1.3.再進入AliMqManager(阿里雲 消息隊列mq工具類)

    public static boolean sendMqttMsg(String topic, String tag, String txt) throws UnsupportedEncodingException{
        logger.debug("開始發送mq消息{topic:{},tag:{},txt:{}}",topic, tag, txt);
        //循環發送消息
        Message msg =  new Message( //
                // Message 所屬的 Topic
                topic,
                // Message Tag 可理解為 Gmail 中的標簽,對消息進行再歸類,方便 Consumer 指定過濾條件在 MQ 服務器過濾
                "MQ2MQTT",
                // Message Body 可以是任何二進制形式的數據, MQ 不做任何干預,
                // 需要 Producer 與 Consumer 協商好一致的序列化和反序列化方式
                txt.getBytes("UTF-8"));
        
        // 設置代表消息的業務關鍵屬性,請盡可能全局唯一。
        // 以方便您在無法正常收到消息情況下,可通過阿里雲服務器管理控制台查詢消息並補發
        // 注意:不設置也不會影響消息正常收發
        msg.setKey("msg_"+ topic + "_" + tag +System.currentTimeMillis());
//        msg.putUserProperties("mqttSecondTopic", topic+"/queue/"+tag);
        
        try {
            SendResult sendResult = producer.send(msg);
            // 同步發送消息,只要不拋異常就是成功
            if (sendResult != null) {
                logger.info("mq消息發送成功 topic:{} msgId:{}",msg.getTopic(),sendResult.getMessageId());
            }
            return true;
        }catch (Exception e) {
            // 消息發送失敗,需要進行重試處理,可重新發送這條消息或持久化這條數據進行補償處理
            logger.error("mq消息發送失敗 topic:{}",msg.getTopic());
            logger.error("mq發送消息失敗{}",e.getMessage());
        }
        return false;
    }

這里注意一點:上面標記處的producer是靜態定義的對象,且被初始化過

private static Producer producer;

/**
 * 初始化
 *
 */
public static void init(){
        try {
            Properties properties = new Properties();
            //您在控制台創建的 Producer ID
            properties.put(PropertyKeyConst.ProducerId, Global.getConfig("mq.producerid"));
            // AccessKey 阿里雲身份驗證,在阿里雲服務器管理控制台創建
            properties.put(PropertyKeyConst.AccessKey,Global.getConfig("mq.accesskey"));
            // SecretKey 阿里雲身份驗證,在阿里雲服務器管理控制台創建
            properties.put(PropertyKeyConst.SecretKey, Global.getConfig("mq.secretkey"));
            //設置發送超時時間,單位毫秒
            properties.setProperty(PropertyKeyConst.SendMsgTimeoutMillis, Global.getConfig("mq.sendmsgtimeoutmillis"));
            // 設置 TCP 接入域名(此處以公共雲生產環境為例)
            properties.put(PropertyKeyConst.ONSAddr, Global.getConfig("mq.onsaddr"));
            producer = ONSFactory.createProducer(properties);
            // 在發送消息前,必須調用 start 方法來啟動 Producer,只需調用一次即可
            producer.start();
            logger.info("============阿里雲消息隊列mq初始化成功================");
        } catch (Exception e) {
            logger.error("阿里雲消息隊列mq初始化失敗,{}",e.getMessage());
        }
        
    }

 配置文件,jeesite.properties

#========MQ===============================================
mq.producerid=PID_qbkj_queue_mqtt
mq.accesskey=******
mq.secretkey=******
mq.sendmsgtimeoutmillis=3000
mq.onsaddr=http://onsaddr-internet.aliyun.com/rocketmq/nsaddr4client-internet

 

 

2.如何訂閱?


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM