一、Consumer 批量消費(推模式)
可以通過
consumer.setConsumeMessageBatchMaxSize(10);//每次拉取10條
這里需要分為2種情況
- Consumer端先啟動
- Consumer端后啟動. 正常情況下:應該是Consumer需要先啟動
注意:如果broker采用推模式的話,consumer先啟動,會一條一條消息的消費,consumer后啟動會才用批量消費
Consumer端先啟動
1、Consumer.java
package quickstart; import java.util.List; import com.alibaba.rocketmq.client.consumer.DefaultMQPushConsumer; import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext; import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus; import com.alibaba.rocketmq.client.consumer.listener.MessageListenerConcurrently; import com.alibaba.rocketmq.client.exception.MQClientException; import com.alibaba.rocketmq.common.consumer.ConsumeFromWhere; import com.alibaba.rocketmq.common.message.MessageExt; /** * Consumer,訂閱消息 */ public class Consumer { public static void main(String[] args) throws InterruptedException, MQClientException { DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name_4"); consumer.setNamesrvAddr("192.168.100.145:9876;192.168.100.146:9876"); consumer.setConsumeMessageBatchMaxSize(10); /** * 設置Consumer第一次啟動是從隊列頭部開始消費還是隊列尾部開始消費<br> * 如果非第一次啟動,那么按照上次消費的位置繼續消費 ,(消費順序消息的時候設置) */ consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET); consumer.subscribe("TopicTest", "*"); consumer.registerMessageListener(new MessageListenerConcurrently() { public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) { try { System.out.println("msgs的長度" + msgs.size()); System.out.println(Thread.currentThread().getName() + " Receive New Messages: " + msgs); } catch (Exception e) { e.printStackTrace(); return ConsumeConcurrentlyStatus.RECONSUME_LATER; } return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); consumer.start(); System.out.println("Consumer Started."); } }
由於這里是Consumer先啟動,所以他回去輪詢MQ上是否有訂閱隊列的消息,由於每次producer插入一條,Consumer就拿一條所以測試結果如下(每次size都是1)
2、Consumer端后啟動,也就是Producer先啟動
由於這里是Consumer后啟動,所以MQ上也就堆積了一堆數據,Consumer的
consumer.setConsumeMessageBatchMaxSize(10);//每次拉取10條
所以這段代碼就生效了測試結果如下(每次size最多是10):
二、消息重試機制:消息重試分為2種
-
1、Producer端重試
-
2、Consumer端重試
1、Producer端重試
也就是Producer往MQ上發消息沒有發送成功,我們可以設置發送失敗重試的次數,發送並觸發回調函數
//設置重試的次數 producer.setRetryTimesWhenSendFailed(3); //開啟生產者 producer.start(); //創建一條消息 Message msg = new Message("PushTopic", "push", "1", "我是一條普通消息".getBytes()); //發送消息 SendResult result = producer.send(msg); //發送,並觸發回調函數 producer.send(msg, new SendCallback() { @Override //成功的回調函數 public void onSuccess(SendResult sendResult) { System.out.println(sendResult.getSendStatus()); System.out.println("成功了"); } @Override //出現異常的回調函數 public void onException(Throwable e) { System.out.println("失敗了"+e.getMessage()); } });
2、Consumer端重試
2.1、exception的情況,一般重復16次 10s、30s、1分鍾、2分鍾、3分鍾等等
上面的代碼中消費異常的情況返回
return ConsumeConcurrentlyStatus.RECONSUME_LATER;//重試
正常則返回:
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;//成功
package quickstart; import java.util.List; import com.alibaba.rocketmq.client.consumer.DefaultMQPushConsumer; import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext; import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus; import com.alibaba.rocketmq.client.consumer.listener.MessageListenerConcurrently; import com.alibaba.rocketmq.client.exception.MQClientException; import com.alibaba.rocketmq.common.consumer.ConsumeFromWhere; import com.alibaba.rocketmq.common.message.MessageExt; /** * Consumer,訂閱消息 */ public class Consumer { public static void main(String[] args) throws InterruptedException, MQClientException { DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name_4"); consumer.setNamesrvAddr("192.168.100.145:9876;192.168.100.146:9876"); consumer.setConsumeMessageBatchMaxSize(10); /** * 設置Consumer第一次啟動是從隊列頭部開始消費還是隊列尾部開始消費<br> * 如果非第一次啟動,那么按照上次消費的位置繼續消費 */ consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET); consumer.subscribe("TopicTest", "*"); consumer.registerMessageListener(new MessageListenerConcurrently() { public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) { try { // System.out.println("msgs的長度" + msgs.size()); System.out.println(Thread.currentThread().getName() + " Receive New Messages: " + msgs); for (MessageExt msg : msgs) { String msgbody = new String(msg.getBody(), "utf-8"); if (msgbody.equals("Hello RocketMQ 4")) { System.out.println("======錯誤======="); int a = 1 / 0; } } } catch (Exception e) { e.printStackTrace(); if(msgs.get(0).getReconsumeTimes()==3){ //記錄日志 return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;// 成功 }else{ return ConsumeConcurrentlyStatus.RECONSUME_LATER;// 重試 } } return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;// 成功 } }); consumer.start(); System.out.println("Consumer Started."); } }
打印結果:
假如超過了多少次之后我們可以讓他不再重試記錄 日志。
if(msgs.get(0).getReconsumeTimes()==3){
//記錄日志
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;// 成功
}
2.2超時的情況,這種情況MQ會無限制的發送給消費端。
就是由於網絡的情況,MQ發送數據之后,Consumer端並沒有收到導致超時。也就是消費端沒有給我返回return 任何狀態,這樣的就認為沒有到達Consumer端。
這里模擬Producer只發送一條數據。consumer端暫停1分鍾並且不發送接收狀態給MQ
package model; import java.util.List; import com.alibaba.rocketmq.client.consumer.DefaultMQPushConsumer; import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext; import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus; import com.alibaba.rocketmq.client.consumer.listener.MessageListenerConcurrently; import com.alibaba.rocketmq.client.exception.MQClientException; import com.alibaba.rocketmq.common.consumer.ConsumeFromWhere; import com.alibaba.rocketmq.common.message.MessageExt; /** * Consumer,訂閱消息 */ public class Consumer { public static void main(String[] args) throws InterruptedException, MQClientException { DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("message_consumer"); consumer.setNamesrvAddr("192.168.100.145:9876;192.168.100.146:9876"); consumer.setConsumeMessageBatchMaxSize(10); /** * 設置Consumer第一次啟動是從隊列頭部開始消費還是隊列尾部開始消費<br> * 如果非第一次啟動,那么按照上次消費的位置繼續消費 */ consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET); consumer.subscribe("TopicTest", "*"); consumer.registerMessageListener(new MessageListenerConcurrently() { public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) { try { // 表示業務處理時間 System.out.println("=========開始暫停==============="); Thread.sleep(60000); for (MessageExt msg : msgs) { System.out.println(" Receive New Messages: " + msg); } } catch (Exception e) { e.printStackTrace(); return ConsumeConcurrentlyStatus.RECONSUME_LATER;// 重試 } return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;// 成功 } }); consumer.start(); System.out.println("Consumer Started."); } }
三、消費模式
1、集群消費
2、廣播消費
rocketMQ默認是集群消費,我們可以通過在Consumer來支持廣播消費
consumer.setMessageModel(MessageModel.BROADCASTING);// 廣播消費
package model; import java.util.List; import com.alibaba.rocketmq.client.consumer.DefaultMQPushConsumer; import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext; import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus; import com.alibaba.rocketmq.client.consumer.listener.MessageListenerConcurrently; import com.alibaba.rocketmq.client.exception.MQClientException; import com.alibaba.rocketmq.common.consumer.ConsumeFromWhere; import com.alibaba.rocketmq.common.message.MessageExt; import com.alibaba.rocketmq.common.protocol.heartbeat.MessageModel; /** * Consumer,訂閱消息 */ public class Consumer2 { public static void main(String[] args) throws InterruptedException, MQClientException { DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("message_consumer"); consumer.setNamesrvAddr("192.168.100.145:9876;192.168.100.146:9876"); consumer.setConsumeMessageBatchMaxSize(10); consumer.setMessageModel(MessageModel.BROADCASTING);// 廣播消費 consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET); consumer.subscribe("TopicTest", "*"); consumer.registerMessageListener(new MessageListenerConcurrently() { public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) { try { for (MessageExt msg : msgs) { System.out.println(" Receive New Messages: " + msg); } } catch (Exception e) { e.printStackTrace(); return ConsumeConcurrentlyStatus.RECONSUME_LATER;// 重試 } return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;// 成功 } }); consumer.start(); System.out.println("Consumer Started."); } }
四、conf下的配置文件說明
異步復制和同步雙寫主要是主和從的關系。消息需要實時消費的,就需要采用主從模式部署
異步復制:比如這里有一主一從,我們發送一條消息到主節點之后,這樣消息就算從producer端發送成功了,然后通過異步復制的方法將數據復制到從節點
同步雙寫:比如這里有一主一從,我們發送一條消息到主節點之后,這樣消息就並不算從producer端發送成功了,需要通過同步雙寫的方法將數據同步到從節點后, 才算數據發送成功。
如果rocketMq才用雙master部署,Producer往MQ上寫入20條數據 其中Master1中拉取了12條 。Master2中拉取了8 條,這種情況下,Master1宕機,那么我們消費數據的時候,只能消費到Master2中的8條,Master1中的12條默認持久化,不會丟失消息,需要Master1恢復之后這12條數據才能繼續被消費,如果想保證消息實時消費,就才用雙Master雙Slave的模式
五、刷盤方式
同步刷盤:在消息到達MQ后,RocketMQ需要將數據持久化,同步刷盤是指數據到達內存之后,必須刷到commitlog日志之后才算成功,然后返回producer數據已經發送成功。
異步刷盤:,同步刷盤是指數據到達內存之后,返回producer說數據已經發送成功。,然后再寫入commitlog日志。
commitlog:
commitlog就是來存儲所有的元信息,包含消息體,類似於MySQL、Oracle的redolog,所以主要有CommitLog在,Consume Queue即使數據丟失,仍然可以恢復出來。
consumequeue:記錄數據的位置,以便Consume快速通過consumequeue找到commitlog中的數據