Rocket重試機制,消息模式,刷盤方式


一、Consumer 批量消費(推模式)

可以通過

consumer.setConsumeMessageBatchMaxSize(10);//每次拉取10條  

這里需要分為2種情況

  •  Consumer端先啟動  
  •  Consumer端后啟動.   正常情況下:應該是Consumer需要先啟動

注意:如果broker采用推模式的話,consumer先啟動,會一條一條消息的消費,consumer后啟動會才用批量消費 

Consumer端先啟動

1、Consumer.java

package quickstart;   
import java.util.List;    
import com.alibaba.rocketmq.client.consumer.DefaultMQPushConsumer;  
import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;  
import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;  
import com.alibaba.rocketmq.client.consumer.listener.MessageListenerConcurrently;  
import com.alibaba.rocketmq.client.exception.MQClientException;  
import com.alibaba.rocketmq.common.consumer.ConsumeFromWhere;  
import com.alibaba.rocketmq.common.message.MessageExt;    
/** 
 * Consumer,訂閱消息 
 */  
public class Consumer {    
    public static void main(String[] args) throws InterruptedException, MQClientException {  
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name_4");  
        consumer.setNamesrvAddr("192.168.100.145:9876;192.168.100.146:9876");  
        consumer.setConsumeMessageBatchMaxSize(10);  
        /** 
         * 設置Consumer第一次啟動是從隊列頭部開始消費還是隊列尾部開始消費<br> 
         * 如果非第一次啟動,那么按照上次消費的位置繼續消費 ,(消費順序消息的時候設置)
         */  
        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);    
        consumer.subscribe("TopicTest", "*");    
        consumer.registerMessageListener(new MessageListenerConcurrently() {    
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {  
                  
                try {  
                    System.out.println("msgs的長度" + msgs.size());  
                    System.out.println(Thread.currentThread().getName() + " Receive New Messages: " + msgs);  
                } catch (Exception e) {  
                    e.printStackTrace();  
                    return ConsumeConcurrentlyStatus.RECONSUME_LATER;  
                }  
                          
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;  
            }  
        });  
  
        consumer.start();  
  
        System.out.println("Consumer Started.");  
    }  
}  

由於這里是Consumer先啟動,所以他回去輪詢MQ上是否有訂閱隊列的消息,由於每次producer插入一條,Consumer就拿一條所以測試結果如下(每次size都是1)

2、Consumer端后啟動,也就是Producer先啟動

由於這里是Consumer后啟動,所以MQ上也就堆積了一堆數據,Consumer的

consumer.setConsumeMessageBatchMaxSize(10);//每次拉取10條    

所以這段代碼就生效了測試結果如下(每次size最多是10):

二、消息重試機制:消息重試分為2種

  • 1、Producer端重試

  • 2、Consumer端重試

1、Producer端重試 

也就是Producer往MQ上發消息沒有發送成功,我們可以設置發送失敗重試的次數,發送並觸發回調函數

          //設置重試的次數  
            producer.setRetryTimesWhenSendFailed(3);  
            //開啟生產者  
            producer.start();  
            //創建一條消息  
            Message msg = new Message("PushTopic", "push", "1",   "我是一條普通消息".getBytes());  
            //發送消息  
            SendResult result = producer.send(msg);  
            //發送,並觸發回調函數  
            producer.send(msg, new SendCallback() {  
                  
                @Override  
                //成功的回調函數  
                public void onSuccess(SendResult sendResult) {  
                    System.out.println(sendResult.getSendStatus());  
                    System.out.println("成功了");  
                }  
                  
                @Override  
                //出現異常的回調函數  
                public void onException(Throwable e) {  
                System.out.println("失敗了"+e.getMessage());  
                      
                }  
            }); 

2、Consumer端重試

2.1、exception的情況,一般重復16次 10s、30s、1分鍾、2分鍾、3分鍾等等

上面的代碼中消費異常的情況返回

return ConsumeConcurrentlyStatus.RECONSUME_LATER;//重試

正常則返回:

return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;//成功

 
package quickstart;  
  
  
import java.util.List;  
  
import com.alibaba.rocketmq.client.consumer.DefaultMQPushConsumer;  
import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;  
import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;  
import com.alibaba.rocketmq.client.consumer.listener.MessageListenerConcurrently;  
import com.alibaba.rocketmq.client.exception.MQClientException;  
import com.alibaba.rocketmq.common.consumer.ConsumeFromWhere;  
import com.alibaba.rocketmq.common.message.MessageExt;  
  
/** 
 * Consumer,訂閱消息 
 */  
public class Consumer {  
  
    public static void main(String[] args) throws InterruptedException, MQClientException {  
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name_4");  
        consumer.setNamesrvAddr("192.168.100.145:9876;192.168.100.146:9876");  
        consumer.setConsumeMessageBatchMaxSize(10);  
        /** 
         * 設置Consumer第一次啟動是從隊列頭部開始消費還是隊列尾部開始消費<br> 
         * 如果非第一次啟動,那么按照上次消費的位置繼續消費 
         */  
        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);  
  
        consumer.subscribe("TopicTest", "*");  
  
        consumer.registerMessageListener(new MessageListenerConcurrently() {  
  
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {  
  
                try {  
                    // System.out.println("msgs的長度" + msgs.size());  
                    System.out.println(Thread.currentThread().getName() + " Receive New Messages: " + msgs);  
                    for (MessageExt msg : msgs) {  
                        String msgbody = new String(msg.getBody(), "utf-8");  
                        if (msgbody.equals("Hello RocketMQ 4")) {  
                            System.out.println("======錯誤=======");  
                            int a = 1 / 0;  
                        }  
                    }  
  
                } catch (Exception e) {  
                    e.printStackTrace();  
                    if(msgs.get(0).getReconsumeTimes()==3){  
                        //記錄日志  
                          
                        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;// 成功  
                    }else{  
                          
                    return ConsumeConcurrentlyStatus.RECONSUME_LATER;// 重試  
                    }  
                }  
  
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;// 成功  
            }  
        });  
  
        consumer.start();  
  
        System.out.println("Consumer Started.");  
    }  
}  

打印結果:



假如超過了多少次之后我們可以讓他不再重試記錄 日志。

if(msgs.get(0).getReconsumeTimes()==3){
//記錄日志  
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;// 成功
}

2.2超時的情況,這種情況MQ會無限制的發送給消費端。

就是由於網絡的情況,MQ發送數據之后,Consumer端並沒有收到導致超時。也就是消費端沒有給我返回return 任何狀態,這樣的就認為沒有到達Consumer端。

這里模擬Producer只發送一條數據。consumer端暫停1分鍾並且不發送接收狀態給MQ

 

package model;  
  
import java.util.List;  
  
import com.alibaba.rocketmq.client.consumer.DefaultMQPushConsumer;  
import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;  
import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;  
import com.alibaba.rocketmq.client.consumer.listener.MessageListenerConcurrently;  
import com.alibaba.rocketmq.client.exception.MQClientException;  
import com.alibaba.rocketmq.common.consumer.ConsumeFromWhere;  
import com.alibaba.rocketmq.common.message.MessageExt;  
  
/** 
 * Consumer,訂閱消息 
 */  
public class Consumer {  
  
    public static void main(String[] args) throws InterruptedException, MQClientException {  
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("message_consumer");  
        consumer.setNamesrvAddr("192.168.100.145:9876;192.168.100.146:9876");  
        consumer.setConsumeMessageBatchMaxSize(10);  
        /** 
         * 設置Consumer第一次啟動是從隊列頭部開始消費還是隊列尾部開始消費<br> 
         * 如果非第一次啟動,那么按照上次消費的位置繼續消費 
         */  
        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);  
  
        consumer.subscribe("TopicTest", "*");  
  
        consumer.registerMessageListener(new MessageListenerConcurrently() {  
  
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {  
  
                try {  
  
                    // 表示業務處理時間  
                    System.out.println("=========開始暫停===============");  
                    Thread.sleep(60000);  
  
                    for (MessageExt msg : msgs) {  
                        System.out.println(" Receive New Messages: " + msg);  
                    }  
  
                } catch (Exception e) {  
                    e.printStackTrace();  
                    return ConsumeConcurrentlyStatus.RECONSUME_LATER;// 重試  
                }  
  
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;// 成功  
            }  
        });  
  
        consumer.start();  
  
        System.out.println("Consumer Started.");  
    }  
}  
 


三、消費模式

1、集群消費

2、廣播消費

rocketMQ默認是集群消費,我們可以通過在Consumer來支持廣播消費

consumer.setMessageModel(MessageModel.BROADCASTING);// 廣播消費
 
package model;  
  
import java.util.List;  
  
import com.alibaba.rocketmq.client.consumer.DefaultMQPushConsumer;  
import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;  
import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;  
import com.alibaba.rocketmq.client.consumer.listener.MessageListenerConcurrently;  
import com.alibaba.rocketmq.client.exception.MQClientException;  
import com.alibaba.rocketmq.common.consumer.ConsumeFromWhere;  
import com.alibaba.rocketmq.common.message.MessageExt;  
import com.alibaba.rocketmq.common.protocol.heartbeat.MessageModel;  
  
/** 
 * Consumer,訂閱消息 
 */  
public class Consumer2 {  
  
    public static void main(String[] args) throws InterruptedException, MQClientException {  
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("message_consumer");  
        consumer.setNamesrvAddr("192.168.100.145:9876;192.168.100.146:9876");  
        consumer.setConsumeMessageBatchMaxSize(10);  
        consumer.setMessageModel(MessageModel.BROADCASTING);// 廣播消費  
      
        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);  
  
        consumer.subscribe("TopicTest", "*");  
  
        consumer.registerMessageListener(new MessageListenerConcurrently() {  
  
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {  
  
                try {  
  
                    for (MessageExt msg : msgs) {  
                        System.out.println(" Receive New Messages: " + msg);  
                    }  
  
                } catch (Exception e) {  
                    e.printStackTrace();  
                    return ConsumeConcurrentlyStatus.RECONSUME_LATER;// 重試  
                }  
  
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;// 成功  
            }  
        });  
  
        consumer.start();  
  
        System.out.println("Consumer Started.");  
    }  
}  

四、conf下的配置文件說明

異步復制和同步雙寫主要是主和從的關系。消息需要實時消費的,就需要采用主從模式部署

異步復制:比如這里有一主一從,我們發送一條消息到主節點之后,這樣消息就算從producer端發送成功了,然后通過異步復制的方法將數據復制到從節點

同步雙寫:比如這里有一主一從,我們發送一條消息到主節點之后,這樣消息就並不算從producer端發送成功了,需要通過同步雙寫的方法將數據同步到從節點后, 才算數據發送成功。

 如果rocketMq才用雙master部署,Producer往MQ上寫入20條數據 其中Master1中拉取了12條 。Master2中拉取了8 條,這種情況下,Master1宕機,那么我們消費數據的時候,只能消費到Master2中的8條,Master1中的12條默認持久化,不會丟失消息,需要Master1恢復之后這12條數據才能繼續被消費,如果想保證消息實時消費,就才用雙Master雙Slave的模式

五、刷盤方式

同步刷盤:在消息到達MQ后,RocketMQ需要將數據持久化,同步刷盤是指數據到達內存之后,必須刷到commitlog日志之后才算成功,然后返回producer數據已經發送成功。

異步刷盤:,同步刷盤是指數據到達內存之后,返回producer說數據已經發送成功。,然后再寫入commitlog日志。

commitlog:

commitlog就是來存儲所有的元信息,包含消息體,類似於MySQLOracle的redolog,所以主要有CommitLog在,Consume Queue即使數據丟失,仍然可以恢復出來。

consumequeue:記錄數據的位置,以便Consume快速通過consumequeue找到commitlog中的數據


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM