RocketMQ——ack機制保證消費成功


ACK簡介 在實際使用RocketMQ的時候我們並不能保證每次發送的消息都剛好能被消費者一次性正常消費成功,可能會存在需要多次消費才能成功或者一直消費失敗的情況,那作為發送者該做如何處理呢? RocketMQ提供了ack機制,以保證消息能夠被正常消費。發送者為了保證消息肯定消費成功,只有使用方明確表示消費成功,RocketMQ才會認為消息消費成功。中途斷電,拋出異常等都不會認為成功——即都會重新投遞。 DEMO 當然,如果消費者不告知發送者我這邊消費信息異常,那么發送者是不會知道的,所以消費者在設置監聽的時候需要給個回調,具體代碼如下: consumer.registerMessageListener(new MessageListenerConcurrently() { @Override public ConsumeConcurrentlyStatus consumeMessage(List msgs, ConsumeConcurrentlyContext context) { for (MessageExt messageExt : msgs) { String messageBody = new String(messageExt.getBody()); System.out.println(new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format( new Date())+"消費響應:msgId : " + messageExt.getMsgId() + ", msgBody : " + messageBody);//輸出消息內容 } //返回消費狀態 //CONSUME_SUCCESS 消費成功 //RECONSUME_LATER 消費失敗,需要稍后重新消費 return ConsumeConcurrentlyStatus.RECONSUME_LATER; } }); 業務實現消費回調的時候,當且僅當此回調函數返回ConsumeConcurrentlyStatus.CONSUME_SUCCESS,RocketMQ才會認為這批消息(默認是1條)是消費完成的。如果這時候消息消費失敗,例如數據庫異常,余額不足扣款失敗等一切業務認為消息需要重試的場景,只要返回ConsumeConcurrentlyStatus.RECONSUME_LATER,RocketMQ就會認為這批消息消費失敗了。 為了保證消息是肯定被至少消費成功一次,RocketMQ會把這批消息重發回Broker(topic不是原topic而是這個消費租的RETRY topic),在延遲的某個時間點(默認是10秒,業務可設置)后,再次投遞到這個ConsumerGroup。而如果一直這樣重復消費都持續失敗到一定次數(默認16次),就會投遞到DLQ死信隊列。應用可以監控死信隊列來做人工干預。 修改重試時間 重試的時間默認如下,這個可以通過查看broker的日志, messageDelayLevel = 1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h 當然,這個重復的時間間隔是可以在配置文件內設置的,由於我這邊配置的雙master模式,所以在128服務器的broker-a.properties和129的broker-b.properties中分別配置,如下圖,設置好后務必將之前的數據清理,具體查看RocketMQ雙Master環境部署 messageDelayLevel = 5s 5s 5s 5s 5s 5s 5s 5s 5s 5s 5s 5s 5s 5s 5s 5s 5s 5s 重試消息的處理 一般情況下我們在實際生產中是不需要重試16次,這樣既浪費時間又浪費性能,理論上當嘗試重復次數達到我們想要的結果時如果還是消費失敗,那么我們需要將對應的消息進行記錄,並且結束重復嘗試。 package com.gwd.rocketmq; import java.text.SimpleDateFormat; import java.util.Date; import java.util.List; import com.alibaba.rocketmq.client.consumer.DefaultMQPushConsumer; import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext; import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus; import com.alibaba.rocketmq.client.consumer.listener.ConsumeOrderlyStatus; import com.alibaba.rocketmq.client.consumer.listener.MessageListenerConcurrently; import com.alibaba.rocketmq.client.exception.MQClientException; import com.alibaba.rocketmq.common.consumer.ConsumeFromWhere; import com.alibaba.rocketmq.common.message.MessageExt; import com.alibaba.rocketmq.common.protocol.heartbeat.MessageModel; /** * @FileName Consumer.java * @Description: * @author gu.weidong * @version V1.0 * @createtime 2018年6月25日 上午9:49:39 * 修改歷史: * 時間 作者 版本 描述 *==================================================== * */ public class Consumer { public static void main(String[] args) throws MQClientException { //聲明並初始化一個consumer //需要一個consumer group名字作為構造方法的參數,這里為consumer1 DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumer1"); //同樣也要設置NameServer地址 consumer.setNamesrvAddr("192.168.159.128:9876;192.168.159.129:9876"); //這里設置的是一個consumer的消費策略 //CONSUME_FROM_LAST_OFFSET 默認策略,從該隊列最尾開始消費,即跳過歷史消息 //CONSUME_FROM_FIRST_OFFSET 從隊列最開始開始消費,即歷史消息(還儲存在broker的)全部消費一遍 //CONSUME_FROM_TIMESTAMP 從某個時間點開始消費,和setConsumeTimestamp()配合使用,默認是半個小時以前 consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET); //設置consumer所訂閱的Topic和Tag,*代表全部的Tag consumer.subscribe("TopicTest", "*"); //設置一個Listener,主要進行消息的邏輯處理 consumer.registerMessageListener(new MessageListenerConcurrently() { @Override public ConsumeConcurrentlyStatus consumeMessage(List msgs, ConsumeConcurrentlyContext context) { for (MessageExt messageExt : msgs) { if(messageExt.getReconsumeTimes()==3) { //可以將對應的數據保存到數據庫,以便人工干預 System.out.println(messageExt.getMsgId()+","+messageExt.getBody()); return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } } //返回消費狀態 //CONSUME_SUCCESS 消費成功 //RECONSUME_LATER 消費失敗,需要稍后重新消費 return ConsumeConcurrentlyStatus.RECONSUME_LATER; } }); //調用start()方法啟動consumer consumer.start(); System.out.println("Consumer Started."); } } 所以任何異常都要捕獲返回ConsumeConcurrentlyStatus.RECONSUME_LATER,rocketmq會放到重試隊列,這個重試TOPIC的名字是%RETRY%+consumergroup的名字,如下圖: 注意點 1.如果業務的回調沒有處理好而拋出異常,會認為是消費失敗當ConsumeConcurrentlyStatus.RECONSUME_LATER處理。 2.當使用順序消費的回調MessageListenerOrderly時,由於順序消費是要前者消費成功才能繼續消費,所以沒有ConsumeConcurrentlyStatus.RECONSUME_LATER的這個狀態,只有ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT來暫停隊列的其余消費,直到原消息不斷重試成功為止才能繼續消費。 測試案例 (1)producer package com.gwd.rocketmq; import com.alibaba.rocketmq.client.exception.MQClientException; import com.alibaba.rocketmq.client.producer.DefaultMQProducer; import com.alibaba.rocketmq.client.producer.SendResult; import com.alibaba.rocketmq.common.message.Message; /** * @FileName Producer.java * @Description: * @author gu.weidong * @version V1.0 * @createtime 2018年6月25日 上午9:48:37 * 修改歷史: * 時間 作者 版本 描述 *==================================================== * */ public class Producer { public static void main(String[] args) throws MQClientException, InterruptedException { //聲明並初始化一個producer //需要一個producer group名字作為構造方法的參數,這里為producer1 DefaultMQProducer producer = new DefaultMQProducer("producer1"); //設置NameServer地址,此處應改為實際NameServer地址,多個地址之間用;分隔 //NameServer的地址必須有,但是也可以通過環境變量的方式設置,不一定非得寫死在代碼里 producer.setNamesrvAddr("192.168.140.128:9876;192.168.140.129:9876"); // producer.setVipChannelEnabled(false);//3.2。6的版本沒有該設置,在更新或者最新的版本中務必將其設置為false,否則會有問題 //調用start()方法啟動一個producer實例 producer.start(); //發送10條消息到Topic為TopicTest,tag為TagA,消息內容為“Hello RocketMQ”拼接上i的值 for (int i = 0; i < 5; i++) { try { Message msg = new Message("TopicTest",// topic "TagA",// tag ("Hello RocketMQ " + i).getBytes("utf-8")// body ); //調用producer的send()方法發送消息 //這里調用的是同步的方式,所以會有返回結果 SendResult sendResult = producer.send(msg); //打印返回結果,可以看到消息發送的狀態以及一些相關信息 System.out.println(sendResult); } catch (Exception e) { e.printStackTrace(); Thread.sleep(1000); } } //發送完消息之后,調用shutdown()方法關閉producer producer.shutdown(); } } (2)Consumer package com.gwd.rocketmq; import java.text.SimpleDateFormat; import java.util.Date; import java.util.List; import com.alibaba.rocketmq.client.consumer.DefaultMQPushConsumer; import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext; import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus; import com.alibaba.rocketmq.client.consumer.listener.MessageListenerConcurrently; import com.alibaba.rocketmq.client.exception.MQClientException; import com.alibaba.rocketmq.common.consumer.ConsumeFromWhere; import com.alibaba.rocketmq.common.message.MessageExt; import com.alibaba.rocketmq.common.protocol.heartbeat.MessageModel; /** * @FileName Consumer.java * @Description: * @author gu.weidong * @version V1.0 * @createtime 2018年6月25日 上午9:49:39 * 修改歷史: * 時間 作者 版本 描述 *==================================================== * */ public class Consumer { public static void main(String[] args) throws MQClientException { //聲明並初始化一個consumer //需要一個consumer group名字作為構造方法的參數,這里為consumer1 DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumer1"); //同樣也要設置NameServer地址 consumer.setNamesrvAddr("192.168.140.128:9876;192.168.140.129:9876"); //這里設置的是一個consumer的消費策略 //CONSUME_FROM_LAST_OFFSET 默認策略,從該隊列最尾開始消費,即跳過歷史消息 //CONSUME_FROM_FIRST_OFFSET 從隊列最開始開始消費,即歷史消息(還儲存在broker的)全部消費一遍 //CONSUME_FROM_TIMESTAMP 從某個時間點開始消費,和setConsumeTimestamp()配合使用,默認是半個小時以前 consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET); //設置consumer所訂閱的Topic和Tag,*代表全部的Tag consumer.subscribe("TopicTest", "*"); //設置一個Listener,主要進行消息的邏輯處理 consumer.registerMessageListener(new MessageListenerConcurrently() { @Override public ConsumeConcurrentlyStatus consumeMessage(List msgs, ConsumeConcurrentlyContext context) { for (MessageExt messageExt : msgs) { String messageBody = new String(messageExt.getBody()); System.out.println(new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format( new Date())+"消費響應:msgId : " + messageExt.getMsgId() + ", msgBody : " + messageBody);//輸出消息內容 } //返回消費狀態 //CONSUME_SUCCESS 消費成功 //RECONSUME_LATER 消費失敗,需要稍后重新消費 return ConsumeConcurrentlyStatus.RECONSUME_LATER; } }); //調用start()方法啟動consumer consumer.start(); System.out.println("Consumer Started."); } } (3)測試結果如下


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM