RocketMQ冪等性順序性實戰, 及消息積壓解決方案


1. rocketmq源碼安裝
參考官方文檔:http://rocketmq.apache.org/docs/quick-start/
安裝好jdk和maven rocketmq安裝包:https://pan.baidu.com/s/1I3CqWaxFnxtUX1kJpIJkcQ 密碼: vu5m

代碼:https://github.com/2466845324/repository/tree/master/rocketmq

1. 解壓源碼包 unzip rocketmq-all-4.4.0-source-release.zip
2. 重命名並進目錄 mv rocketmq-all-4.4.0 rocketmq && cd rocketmq
3. 編譯打包(會下載很多依賴,大概10分鍾) mvn -Prelease-all -DskipTests clean install -U
4. 啟動nameserver服務 cd distribution/target/apache-rocketmq/bin && sh mqnamesrv
      * 默認配置是4g, 如果你的服務器配置較低則會報錯, 則要修改
      vim runbroker.sh 和 vim runserver.sh 這兩個文件,具體參數自己設定。
      例如配置:JAVA_OPTAVA_OPT="${JAVA_OPT} -server -Xms256m -Xmx256m -Xmn256m -XX:PermSize=128m -XX:MaxPermSize=320m"
5. 后台啟動 nohup sh mqnamesrv & 查看啟動日志 tail -f nohup.out
6. 啟動brokerserver服務 nohup sh mqbroker -n localhost:9876 &
7. 查看狀態 jps

rocketmq控制台安裝
1. 解壓安裝包 unzip rocketmq-externals-master.zip && cd rocketmq-externals-master
2. 修改pom文件版本號 cd rocketmq-console && vim pom.xml 修改為 <rocketmq.version>4.4.0</rocketmq.version>
3. 指定機器地址 vim src/main/resources/application.properties 修改為rocketmq.config.namesrvAddr=127.0.0.1:9876
4. 在rocketmq-console目錄下編譯 mvn clean package -Dmaven.test.skip=true
5. 啟動 cd target && nohup java -jar rocketmq-console-ng-1.0.0.jar &
6. 查看日志 tail -f nohup.out 啟動測試 192.168.100.100:8080

2. 消息冪等性

  生產者發送消息之后,為了確保消費者消費成功 我們通常會采用手動簽收方式確認消費,MQ就是使用了消息超時、重傳、確認機制來保證消息必達。

場景:
 1. 訂單服務(生產者),點擊結算訂單之后需要付款,這時就會發送一條“結算”的消息到mq的broker中。
 2. 此時支付服務(消費者)監聽到這條消息之后就會處理結算扣款的邏輯,然后手動簽收訂單告訴mq我已經消費完成了。
 3. 如果在結算的過程中程序出現了異常,我們就返回mq一個消費失敗的狀態,此時mq就會重新發送這條消息;
  或者是由於網絡波動支付服務一直沒有響應消息的消費狀態,mq也照樣會重新發送這條消息。
 4. 那么這種情況下,支付服務(消費者)就會重復收到這條消息,如果不做任何判斷就有可能會重復消費出現多次扣款的情況。
解決方案:
  在發送消息的時候,我們可以生成一個唯一ID標識每一條消息,將消息處理成功和去重日志通過事物的形式寫入去重表或緩存中。每次消費之前都先查一遍,如果存在就說明消費過了直接返回消費成功。

3. 消息順序性

  消息有序是指按照消息的發送順序來消費。例如我們有這樣一個需求:我們通過讀取sql的日志文件得到它的所有sql,然后通過發送消息給其他服務去執行這些sql來同步數據。這個時候順序性就顯得尤為重要了。比如我們先修改這條數據,然后刪除數據。倘若消費的時候先刪除了再修改這時候得到的數據就不一致了。

  

解決方案:
  我們往一個topic里面發送消息時,它默認會維護幾個隊列,是隨機發送到這些隊列里面的。消費者集群消費時,實際上是一個服務監聽一個隊列。我們發送消息時,對於同個數據的操作指定發送到一個隊列就好了,這時消費者就是按照順序來消費的。

4. 順序冪等案例實戰

   比如我們現在我們有下面9條消息要發送,分別是對用戶、訂單、商品的操作,最終的結果應該是“用戶被刪除”,“訂單已完成”,“商品被下架”。如果就條消息隨機指定隊列發送的話,就可能用戶先被刪除了,然后進行修改;也有可能支付訂單的過程比較慢一直沒反饋,從而收到多條消息而重復支付。

生產者對同一數據的操作發送到同一隊列

public List<Data> getOrderList(){
        List<Data> list = new ArrayList<Data>();
        list.add(new Data(1,"注冊用戶"));
        list.add(new Data(2,"創建訂單"));
        list.add(new Data(1,"修改用戶"));
        list.add(new Data(2,"支付訂單"));
        list.add(new Data(1,"刪除用戶"));
        list.add(new Data(3,"商品上架"));
        list.add(new Data(2,"完成訂單"));
        list.add(new Data(3,"商品打折"));
        list.add(new Data(3,"商品下架"));
        return list;
    }
    
    /**
     * @Title:順序發送
     * @author:吳磊  
     * @date:2019年5月4日 下午10:28:31
     */
    @RequestMapping("/send")
    public Object send() throws Exception{
        // 1. 獲取操作數據
        List<Data> list = getOrderList();
        
        Message message = null;
        for(int i=0; i<list.size(); i++) {
            Data data = list.get(i);
            // keys是唯一標識, 用作重試冪等判斷.
            String keys = data.getId()+"";
            // 2. 裝載消息 (topic, 標簽, key, 消息體)
            message = new Message(PayOrderlyProducer.TOPIC,"test_tag",keys,data.getType().getBytes());
            /* 3. 投遞消息
             *       每一個topic默認為4個queue, 我們可以指訂隊列發送.
             *   send(Message msg, MessageQueueSelector selector, Object arg),會將arg作為隊列下標傳
             *   給MessageQueueSelector中MessageQueue的arg,從而選出具體的queue.
             */
            SendResult sendResult =  payOrderlyProducer.getProducer().send(message, new MessageQueueSelector() {
                @Override
                public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
                    int id = (int) arg;
                    int index = id % mqs.size();//對queue總數取模
                    return mqs.get(index);
                }
            },data.getId());
         System.out.printf("發送結果=%s, sendResult=%s ,orderid=%s, type=%s\n", sendResult.getSendStatus(), sendResult.toString(),data.getId(),data.getType());
        }
        return "OK";
    }
View Code

消費者手動簽收消息,如果消費失敗就重復消費,如果重試次數過多就通知運營人員手動處理。

public PayOrderlyConsumer() throws MQClientException {

        consumer = new DefaultMQPushConsumer(consumerGroup);
        // 指定mq服務器地址
        consumer.setNamesrvAddr(NAME_SERVER);
        // 指定消費策略:這里是從最后一條消費
        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);

        //默認是集群方式,可以更改為廣播,但是廣播方式不支持重試
        //consumer.setMessageModel(MessageModel.CLUSTERING);
        consumer.subscribe(TOPIC, "test_tag");

        /**
         * MessageListenerOrderly是單線程的消費
         */
        consumer.registerMessageListener( new MessageListenerOrderly() {
            @Override
            public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {
                MessageExt msg = msgs.get(0);
                int times = msg.getReconsumeTimes();
                try {
                    System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), new String(msgs.get(0).getBody()));
                    //模擬異常
                    String str = new String(msgs.get(0).getBody());
                    if(str.contains("訂單")) {
                        int i=1/0;
                    }
                    
                    //做業務邏輯操作
                    System.out.println("消費成功");
                    return ConsumeOrderlyStatus.SUCCESS;
    
                } catch (Exception e) {
                    System.out.println("重試次數"+times);
                    //如果重試2次不成功,則記錄,人工介入
                    if(times >= 2){
                        System.out.println("重試次數大於2,記錄數據庫,發短信通知開發人員或者運營人員");
                        return ConsumeOrderlyStatus.SUCCESS;
                    }
                    e.printStackTrace();
                    return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;
                }
            }
        });
        consumer.start();
        System.out.println("consumer start ...");
    }
View Code

執行結果:可以看到id相同的數據(對同一數據的操作),都發送到同一隊列了,然后消費者對每一個隊列進行順序消費,消費失敗就會觸發重試機制。

 5. 消息積壓解決方案  

  如果生產環境中consumer半夜掛掉了,項目數據量大的情況下第二天可能有一千萬條消息積壓在broker中。過多的數據積壓不僅占用磁盤空間,還會影響MQ性能。那么這個時候應該怎么辦呢。
  修復consumer,然后慢慢消費嗎?比如rocketmq默認一個topic是4個queue,假如處理一條消息需要1秒,用4個consumer同時進行消費,也需要10000000÷4÷60=42分鍾才能消費完。那新的消息怎么辦呢?
解決方案:
  由於堆積的topic里面message queue數量固定,即使我們這時增加consumer的數量,它也不能分配到message queue。這時我們可以寫一個分發程序做一個臨時topic隊列擴充,來提高消費者能力。程序從舊的topic中讀取到新的topic,只是新的topic的queue可以指定多一點(理論上可以無限擴充,正常1000以內),然后啟動更多的consumer在臨時新的topic里面消費。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM