介紹:
RocketMQ的消息重試及時分為兩種,一種是Producer端重試,一種是Consume端重試。
1、Producer端重試 :
1.1消息發沒發成功,默認情況下是3次重試。
2、Consumer端重試:
2.1 exception的情況,一般重復16次 10s、30s、1mins、2mins、3mins等。注意reconsumeTimes這個參數;
2.2 超時情況,這種情況MQ會無限制的發送給消費端。這種情況就是Consumer端沒有返回ConsumeConcurrentlyStatus.CONSUME_SUCCESS,也沒有返回ConsumeConcurrentlyStatus.RECONSUME_LATER。Consumner超時的情況我們還分為一個Producer和一個Consumer的場景和一個Producer和多個Consumer(Consumer集群)的場景。下面的實例中我們會詳細的做實驗。
問題,Consumer的默認超時時間是多少?
示例:
Producer端重試
producer端重試的機制比較簡單,我們通過看一下源碼可以發現,通過設置retryTimesWhenSendFailed定義重試次數,通過設置sendMsgTimeout來定義超時時間
producer.setRetryTimesWhenSendFailed(3); //默認是2
producer.setSendMsgTimeout(6000); //默認是3000
package org.hope.lee.producer; import com.alibaba.rocketmq.client.exception.MQBrokerException; import com.alibaba.rocketmq.client.exception.MQClientException; import com.alibaba.rocketmq.client.producer.DefaultMQProducer; import com.alibaba.rocketmq.client.producer.SendCallback; import com.alibaba.rocketmq.client.producer.SendResult; import com.alibaba.rocketmq.common.message.Message; import com.alibaba.rocketmq.common.message.MessageQueue; import com.alibaba.rocketmq.remoting.exception.RemotingException; public class Producer { public static void main(String[] args) { DefaultMQProducer producer = new DefaultMQProducer("push_consumer"); // producer.setNamesrvAddr("192.168.31.176:9876"); producer.setNamesrvAddr("192.168.31.176:9876;192.168.31.165:9876"); try { // 設置實例名稱 producer.setInstanceName("quick_start_producer"); // 設置重試次數,默認2 producer.setRetryTimesWhenSendFailed(3); //設置發送超時時間,默認是3000 producer.setSendMsgTimeout(6000); // 開啟生產者 producer.start(); // 創建一條消息 Message msg = new Message("PushTopic_tt1", "TagB", "OrderID0034", "uniform_just_for_test".getBytes()); SendResult send = producer.send(msg); System.out.println("id:--->" + send.getMsgId() + ",result:--->" + send.getSendStatus()); } catch (MQClientException e) { e.printStackTrace(); } catch (RemotingException e) { e.printStackTrace(); } catch (MQBrokerException e) { e.printStackTrace(); } catch (InterruptedException e) { e.printStackTrace(); } producer.shutdown(); } }
Consumer端exception的情況。
Producer端發送消息:
package org.hope.lee.producer; import com.alibaba.rocketmq.client.exception.MQBrokerException; import com.alibaba.rocketmq.client.exception.MQClientException; import com.alibaba.rocketmq.client.producer.DefaultMQProducer; import com.alibaba.rocketmq.client.producer.SendCallback; import com.alibaba.rocketmq.client.producer.SendResult; import com.alibaba.rocketmq.common.message.Message; import com.alibaba.rocketmq.common.message.MessageQueue; import com.alibaba.rocketmq.remoting.exception.RemotingException; public class ProducerException { public static void main(String[] args) { DefaultMQProducer producer = new DefaultMQProducer("push_consumer"); producer.setNamesrvAddr("192.168.31.176:9876;192.168.31.165:9876"); try { producer.setInstanceName("quick_start_producer"); producer.setRetryTimesWhenSendFailed(3); producer.start(); for(int i = 0; i < 10; i++) { Message msg = new Message("PushTopic_tt1", "TagA", "OrderID0034", ("message" + i).getBytes()); //目前發現3.2.6版本沒有這個方法,3.5.3版本有這個方法,並且必須要設置為false否則會報錯 // producer.setVipChannelEnabled(false); SendResult send = producer.send(msg); System.out.println("id:--->" + send.getMsgId() + ",result:--->" + send.getSendStatus()); } } catch (MQClientException e) { e.printStackTrace(); } catch (RemotingException e) { e.printStackTrace(); } catch (MQBrokerException e) { e.printStackTrace(); } catch (InterruptedException e) { e.printStackTrace(); } producer.shutdown(); } }
Consumer端接收消息
package org.hope.lee.consumer; import java.util.List; import com.alibaba.rocketmq.client.consumer.DefaultMQPushConsumer; import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext; import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus; import com.alibaba.rocketmq.client.consumer.listener.MessageListener; import com.alibaba.rocketmq.client.consumer.listener.MessageListenerConcurrently; import com.alibaba.rocketmq.client.exception.MQClientException; import com.alibaba.rocketmq.client.producer.DefaultMQProducer; import com.alibaba.rocketmq.common.consumer.ConsumeFromWhere; import com.alibaba.rocketmq.common.message.MessageExt; import com.sun.org.apache.xpath.internal.SourceTree; /** * 消費端重試的情況 :異常情況 */ public class ConsumerException { public static void main(String[] args) throws MQClientException { DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("push_consumer"); consumer.setNamesrvAddr("192.168.31.176:9876;192.168.31.165:9876"); // 批量消費,每次拉取10條 consumer.setConsumeMessageBatchMaxSize(10);// ① // consumer.setInstanceName("quick_start_consumer"); // 3.2.6這個版本沒有這個方法,3.5.3版本要設置這個方法為false,否則取不到topic // consumer.setVipChannelEnabled(false); // 程序第一次啟動從消息隊列頭取數據 // 如果非第一次啟動,那么按照上次消費的位置繼續消費 consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET); // 訂閱PushTopic下Tag為push的消息 consumer.subscribe("PushTopic_tt1", "*"); consumer.registerMessageListener(new MessageListenerConcurrently() { public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) { MessageExt msg = msgs.get(0); try { String topic = msg.getTopic(); String msgBody = new String(msg.getBody(), "utf-8"); String tags = msg.getTags(); System.out.println("收到消息:topic:" + topic + ",tags:" + tags + ",msg:" + msg + "msgBody:" + msgBody); if ("message4".equals(msgBody)) { System.out.println("====失敗消息開始====="); System.out.println("msg:" + msg); System.out.println("msgBody:" + msgBody); System.out.println("====失敗消息結束====="); // 發生異常 int i = 1 / 0; System.out.println(i); } } catch (Exception e) { e.printStackTrace(); // ②如果重試了三次就返回成功 if (msg.getReconsumeTimes() == 3) { return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } return ConsumeConcurrentlyStatus.RECONSUME_LATER; } return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); consumer.start(); System.out.println("Consumer Started."); // consumer.suspend(); } }
我們先啟動ConsumerException再啟動ProducerException。我們看到在①處發生了異常,這個時候消息就會進行重試,我們看看Consumer端的輸出來分析一下。
Consumer Started.
收到消息:topic:PushTopic_tt1,tags:TagA,msg:MessageExt [queueId=0, storeSize=149, queueOffset=4, sysFlag=0, bornTimestamp=1515158266232, bornHost=/192.168.31.38:7309, storeTimestamp=1515158293401, storeHost=/192.168.31.176:10911, msgId=C0A81FB000002A9F0000000000001603, commitLogOffset=5635, bodyCRC=886509244, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message [topic=PushTopic_tt1, flag=0, properties={TAGS=TagA, KEYS=OrderID0034, WAIT=true, MAX_OFFSET=5, MIN_OFFSET=0}, body=8]]msgBody:message0
收到消息:topic:PushTopic_tt1,tags:TagA,msg:MessageExt [queueId=1, storeSize=149, queueOffset=3, sysFlag=0, bornTimestamp=1515158266249, bornHost=/192.168.31.38:7309, storeTimestamp=1515158293411, storeHost=/192.168.31.176:10911, msgId=C0A81FB000002A9F0000000000001698, commitLogOffset=5784, bodyCRC=1137720874, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message [topic=PushTopic_tt1, flag=0, properties={TAGS=TagA, KEYS=OrderID0034, WAIT=true, MAX_OFFSET=4, MIN_OFFSET=0}, body=8]]msgBody:message1
收到消息:topic:PushTopic_tt1,tags:TagA,msg:MessageExt [queueId=3, storeSize=149, queueOffset=2, sysFlag=0, bornTimestamp=1515158266267, bornHost=/192.168.31.38:7309, storeTimestamp=1515158293428, storeHost=/192.168.31.176:10911, msgId=C0A81FB000002A9F00000000000017C2, commitLogOffset=6082, bodyCRC=769548038, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message [topic=PushTopic_tt1, flag=0, properties={TAGS=TagA, KEYS=OrderID0034, WAIT=true, MAX_OFFSET=3, MIN_OFFSET=0}, body=8]]msgBody:message3
收到消息:topic:PushTopic_tt1,tags:TagA,msg:MessageExt [queueId=2, storeSize=149, queueOffset=2, sysFlag=0, bornTimestamp=1515158266259, bornHost=/192.168.31.38:7309, storeTimestamp=1515158293420, storeHost=/192.168.31.176:10911, msgId=C0A81FB000002A9F000000000000172D, commitLogOffset=5933, bodyCRC=1524199312, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message [topic=PushTopic_tt1, flag=0, properties={TAGS=TagA, KEYS=OrderID0034, WAIT=true, MAX_OFFSET=3, MIN_OFFSET=0}, body=8]]msgBody:message2
收到消息:topic:PushTopic_tt1,tags:TagA,msg:MessageExt [queueId=0, storeSize=149, queueOffset=5, sysFlag=0, bornTimestamp=1515158266276, bornHost=/192.168.31.38:7309, storeTimestamp=1515158293445, storeHost=/192.168.31.176:10911, msgId=C0A81FB000002A9F0000000000001857, commitLogOffset=6231, bodyCRC=867879589, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message [topic=PushTopic_tt1, flag=0, properties={TAGS=TagA, KEYS=OrderID0034, WAIT=true, MAX_OFFSET=6, MIN_OFFSET=0}, body=8]]msgBody:message4
====失敗消息開始=====
msg:MessageExt [queueId=0, storeSize=149, queueOffset=5, sysFlag=0, bornTimestamp=1515158266276, bornHost=/192.168.31.38:7309, storeTimestamp=1515158293445, storeHost=/192.168.31.176:10911, msgId=C0A81FB000002A9F0000000000001857, commitLogOffset=6231, bodyCRC=867879589, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message [topic=PushTopic_tt1, flag=0, properties={TAGS=TagA, KEYS=OrderID0034, WAIT=true, MAX_OFFSET=6, MIN_OFFSET=0}, body=8]]
msgBody:message4
====失敗消息結束=====
java.lang.ArithmeticException: / by zero
at org.hope.lee.consumer.ConsumerException$1.consumeMessage(ConsumerException.java:55)
at com.alibaba.rocketmq.client.impl.consumer.ConsumeMessageConcurrentlyService$ConsumeRequest.run(ConsumeMessageConcurrentlyService.java:142)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
at java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:334)
at java.util.concurrent.FutureTask.run(FutureTask.java:166)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:722)
收到消息:topic:PushTopic_tt1,tags:TagA,msg:MessageExt [queueId=1, storeSize=149, queueOffset=4, sysFlag=0, bornTimestamp=1515158266292, bornHost=/192.168.31.38:7309, storeTimestamp=1515158293452, storeHost=/192.168.31.176:10911, msgId=C0A81FB000002A9F00000000000018EC, commitLogOffset=6380, bodyCRC=1153301043, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message [topic=PushTopic_tt1, flag=0, properties={TAGS=TagA, KEYS=OrderID0034, WAIT=true, MAX_OFFSET=5, MIN_OFFSET=0}, body=8]]msgBody:message5
收到消息:topic:PushTopic_tt1,tags:TagA,msg:MessageExt [queueId=2, storeSize=149, queueOffset=3, sysFlag=0, bornTimestamp=1515158266300, bornHost=/192.168.31.38:7309, storeTimestamp=1515158293482, storeHost=/192.168.31.176:10911, msgId=C0A81FB000002A9F0000000000001A9D, commitLogOffset=6813, bodyCRC=1572121481, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message [topic=PushTopic_tt1, flag=0, properties={TAGS=TagA, KEYS=OrderID0034, WAIT=true, MAX_OFFSET=4, MIN_OFFSET=0}, body=8]]msgBody:message6
收到消息:topic:PushTopic_tt1,tags:TagA,msg:MessageExt [queueId=3, storeSize=149, queueOffset=3, sysFlag=0, bornTimestamp=1515158266328, bornHost=/192.168.31.38:7309, storeTimestamp=1515158293491, storeHost=/192.168.31.176:10911, msgId=C0A81FB000002A9F0000000000001B32, commitLogOffset=6962, bodyCRC=716413727, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message [topic=PushTopic_tt1, flag=0, properties={TAGS=TagA, KEYS=OrderID0034, WAIT=true, MAX_OFFSET=4, MIN_OFFSET=0}, body=8]]msgBody:message7
收到消息:topic:PushTopic_tt1,tags:TagA,msg:MessageExt [queueId=0, storeSize=149, queueOffset=6, sysFlag=0, bornTimestamp=1515158266337, bornHost=/192.168.31.38:7309, storeTimestamp=1515158293497, storeHost=/192.168.31.176:10911, msgId=C0A81FB000002A9F0000000000001BC7, commitLogOffset=7111, bodyCRC=973899406, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message [topic=PushTopic_tt1, flag=0, properties={TAGS=TagA, KEYS=OrderID0034, WAIT=true, MAX_OFFSET=7, MIN_OFFSET=0}, body=8]]msgBody:message8
收到消息:topic:PushTopic_tt1,tags:TagA,msg:MessageExt [queueId=1, storeSize=149, queueOffset=5, sysFlag=0, bornTimestamp=1515158266348, bornHost=/192.168.31.38:7309, storeTimestamp=1515158293508, storeHost=/192.168.31.176:10911, msgId=C0A81FB000002A9F0000000000001C5C, commitLogOffset=7260, bodyCRC=1292613144, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message [topic=PushTopic_tt1, flag=0, properties={TAGS=TagA, KEYS=OrderID0034, WAIT=true, MAX_OFFSET=6, MIN_OFFSET=0}, body=8]]msgBody:message9
收到消息:topic:PushTopic_tt1,tags:TagA,msg:MessageExt [queueId=0, storeSize=285, queueOffset=7, sysFlag=0, bornTimestamp=1515158266276, bornHost=/192.168.31.38:7309, storeTimestamp=1515158303482, storeHost=/192.168.31.176:10911, msgId=C0A81FB000002A9F0000000000001CF1, commitLogOffset=7409, bodyCRC=867879589, reconsumeTimes=1, preparedTransactionOffset=0, toString()=Message [topic=PushTopic_tt1, flag=0, properties={DELAY=3, ORIGIN_MESSAGE_ID=C0A81FB000002A9F0000000000001857, REAL_TOPIC=%RETRY%push_consumer, TAGS=TagA, WAIT=false, KEYS=OrderID0034, RETRY_TOPIC=PushTopic_tt1, MAX_OFFSET=8, MIN_OFFSET=0, REAL_QID=0}, body=8]]msgBody:message4
====失敗消息開始=====
msg:MessageExt [queueId=0, storeSize=285, queueOffset=7, sysFlag=0, bornTimestamp=1515158266276, bornHost=/192.168.31.38:7309, storeTimestamp=1515158303482, storeHost=/192.168.31.176:10911, msgId=C0A81FB000002A9F0000000000001CF1, commitLogOffset=7409, bodyCRC=867879589, reconsumeTimes=1, preparedTransactionOffset=0, toString()=Message [topic=PushTopic_tt1, flag=0, properties={DELAY=3, ORIGIN_MESSAGE_ID=C0A81FB000002A9F0000000000001857, REAL_TOPIC=%RETRY%push_consumer, TAGS=TagA, WAIT=false, KEYS=OrderID0034, RETRY_TOPIC=PushTopic_tt1, MAX_OFFSET=8, MIN_OFFSET=0, REAL_QID=0}, body=8]]
msgBody:message4
====失敗消息結束=====
java.lang.ArithmeticException: / by zero
at org.hope.lee.consumer.ConsumerException$1.consumeMessage(ConsumerException.java:55)
at com.alibaba.rocketmq.client.impl.consumer.ConsumeMessageConcurrentlyService$ConsumeRequest.run(ConsumeMessageConcurrentlyService.java:142)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
at java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:334)
at java.util.concurrent.FutureTask.run(FutureTask.java:166)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:722)
收到消息:topic:PushTopic_tt1,tags:TagA,msg:MessageExt [queueId=0, storeSize=285, queueOffset=8, sysFlag=0, bornTimestamp=1515158266276, bornHost=/192.168.31.38:7309, storeTimestamp=1515158333701, storeHost=/192.168.31.176:10911, msgId=C0A81FB000002A9F0000000000001F2A, commitLogOffset=7978, bodyCRC=867879589, reconsumeTimes=2, preparedTransactionOffset=0, toString()=Message [topic=PushTopic_tt1, flag=0, properties={DELAY=4, ORIGIN_MESSAGE_ID=C0A81FB000002A9F0000000000001857, REAL_TOPIC=%RETRY%push_consumer, TAGS=TagA, WAIT=false, KEYS=OrderID0034, RETRY_TOPIC=PushTopic_tt1, MAX_OFFSET=9, MIN_OFFSET=0, REAL_QID=0}, body=8]]msgBody:message4
====失敗消息開始=====
msg:MessageExt [queueId=0, storeSize=285, queueOffset=8, sysFlag=0, bornTimestamp=1515158266276, bornHost=/192.168.31.38:7309, storeTimestamp=1515158333701, storeHost=/192.168.31.176:10911, msgId=C0A81FB000002A9F0000000000001F2A, commitLogOffset=7978, bodyCRC=867879589, reconsumeTimes=2, preparedTransactionOffset=0, toString()=Message [topic=PushTopic_tt1, flag=0, properties={DELAY=4, ORIGIN_MESSAGE_ID=C0A81FB000002A9F0000000000001857, REAL_TOPIC=%RETRY%push_consumer, TAGS=TagA, WAIT=false, KEYS=OrderID0034, RETRY_TOPIC=PushTopic_tt1, MAX_OFFSET=9, MIN_OFFSET=0, REAL_QID=0}, body=8]]
msgBody:message4
====失敗消息結束=====
java.lang.ArithmeticException: / by zero
at org.hope.lee.consumer.ConsumerException$1.consumeMessage(ConsumerException.java:55)
at com.alibaba.rocketmq.client.impl.consumer.ConsumeMessageConcurrentlyService$ConsumeRequest.run(ConsumeMessageConcurrentlyService.java:142)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
at java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:334)
at java.util.concurrent.FutureTask.run(FutureTask.java:166)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:722)
收到消息:topic:PushTopic_tt1,tags:TagA,msg:MessageExt [queueId=0, storeSize=285, queueOffset=9, sysFlag=0, bornTimestamp=1515158266276, bornHost=/192.168.31.38:7309, storeTimestamp=1515158394718, storeHost=/192.168.31.176:10911, msgId=C0A81FB000002A9F0000000000002163, commitLogOffset=8547, bodyCRC=867879589, reconsumeTimes=3, preparedTransactionOffset=0, toString()=Message [topic=PushTopic_tt1, flag=0, properties={DELAY=5, ORIGIN_MESSAGE_ID=C0A81FB000002A9F0000000000001857, REAL_TOPIC=%RETRY%push_consumer, TAGS=TagA, WAIT=false, KEYS=OrderID0034, RETRY_TOPIC=PushTopic_tt1, MAX_OFFSET=10, MIN_OFFSET=0, REAL_QID=0}, body=8]]msgBody:message4
====失敗消息開始=====
msg:MessageExt [queueId=0, storeSize=285, queueOffset=9, sysFlag=0, bornTimestamp=1515158266276, bornHost=/192.168.31.38:7309, storeTimestamp=1515158394718, storeHost=/192.168.31.176:10911, msgId=C0A81FB000002A9F0000000000002163, commitLogOffset=8547, bodyCRC=867879589, reconsumeTimes=3, preparedTransactionOffset=0, toString()=Message [topic=PushTopic_tt1, flag=0, properties={DELAY=5, ORIGIN_MESSAGE_ID=C0A81FB000002A9F0000000000001857, REAL_TOPIC=%RETRY%push_consumer, TAGS=TagA, WAIT=false, KEYS=OrderID0034, RETRY_TOPIC=PushTopic_tt1, MAX_OFFSET=10, MIN_OFFSET=0, REAL_QID=0}, body=8]]
msgBody:message4
====失敗消息結束=====
java.lang.ArithmeticException: / by zero
at org.hope.lee.consumer.ConsumerException$1.consumeMessage(ConsumerException.java:55)
at com.alibaba.rocketmq.client.impl.consumer.ConsumeMessageConcurrentlyService$ConsumeRequest.run(ConsumeMessageConcurrentlyService.java:142)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
at java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:334)
at java.util.concurrent.FutureTask.run(FutureTask.java:166)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:722)
收到消息:topic:PushTopic_tt1,tags:TagA,msg:MessageExt [queueId=0, storeSize=285, queueOffset=10, sysFlag=0, bornTimestamp=1515158266276, bornHost=/192.168.31.38:7309, storeTimestamp=1515158514742, storeHost=/192.168.31.176:10911, msgId=C0A81FB000002A9F000000000000239C, commitLogOffset=9116, bodyCRC=867879589, reconsumeTimes=4, preparedTransactionOffset=0, toString()=Message [topic=PushTopic_tt1, flag=0, properties={DELAY=6, ORIGIN_MESSAGE_ID=C0A81FB000002A9F0000000000001857, REAL_TOPIC=%RETRY%push_consumer, TAGS=TagA, WAIT=false, KEYS=OrderID0034, RETRY_TOPIC=PushTopic_tt1, MAX_OFFSET=11, MIN_OFFSET=0, REAL_QID=0}, body=8]]msgBody:message4
====失敗消息開始=====
msg:MessageExt [queueId=0, storeSize=285, queueOffset=10, sysFlag=0, bornTimestamp=1515158266276, bornHost=/192.168.31.38:7309, storeTimestamp=1515158514742, storeHost=/192.168.31.176:10911, msgId=C0A81FB000002A9F000000000000239C, commitLogOffset=9116, bodyCRC=867879589, reconsumeTimes=4, preparedTransactionOffset=0, toString()=Message [topic=PushTopic_tt1, flag=0, properties={DELAY=6, ORIGIN_MESSAGE_ID=C0A81FB000002A9F0000000000001857, REAL_TOPIC=%RETRY%push_consumer, TAGS=TagA, WAIT=false, KEYS=OrderID0034, RETRY_TOPIC=PushTopic_tt1, MAX_OFFSET=11, MIN_OFFSET=0, REAL_QID=0}, body=8]]
msgBody:message4
====失敗消息結束=====
java.lang.ArithmeticException: / by zero
at org.hope.lee.consumer.ConsumerException$1.consumeMessage(ConsumerException.java:55)
at com.alibaba.rocketmq.client.impl.consumer.ConsumeMessageConcurrentlyService$ConsumeRequest.run(ConsumeMessageConcurrentlyService.java:142)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
at java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:334)
at java.util.concurrent.FutureTask.run(FutureTask.java:166)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:722)
java.lang.ArithmeticException: / by zero
at org.hope.lee.consumer.ConsumerException$1.consumeMessage(ConsumerException.java:55)
at com.alibaba.rocketmq.client.impl.consumer.ConsumeMessageConcurrentlyService$ConsumeRequest.run(ConsumeMessageConcurrentlyService.java:142)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
at java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:334)
at java.util.concurrent.FutureTask.run(FutureTask.java:166)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:722)
我們看到最初消費到第四條數據的時候拋出異常,然后繼續消費了剩下的6條消息,過了一段時間MQ又重試發送消息給Consumer端,又拋出異常,過一段時間MQ還是會發送消息過來。我么恩看一下用藍色標注的字體,這個就是記錄重試次數的字段。我們可以利用這個字段進行業務邏輯處理,比如重試了3次就返回SUCCESS了,就不再重試了。比如Consumern代碼中的②處
customer端發生超時的情況討論。
在消費端,我們有兩個Consumer來組成一個集群。ConsumerClusterMember1中我們設置一個Thread.sleep()來模擬一個消費超時的狀態。
實驗的步驟如下:
一、先啟動ConsumerClusterMember1
二、再啟動ConsumerClusterMember2。
三、然后啟動ProducerCluster來發送一條消息。
四、這個時候ConsumerClusterMember1可能會先接收到這條消息,然后sleep等待了。這個時候我們停掉它的JVM
五、過了一段時間,ConsumerClusterMember2就會重新收到Producer端發過來的消息。完成了Consumer端集群的消息超時。
下面我們看看代碼
Produder端:
package org.hope.lee.producer; import com.alibaba.rocketmq.client.exception.MQBrokerException; import com.alibaba.rocketmq.client.exception.MQClientException; import com.alibaba.rocketmq.client.producer.DefaultMQProducer; import com.alibaba.rocketmq.client.producer.SendCallback; import com.alibaba.rocketmq.client.producer.SendResult; import com.alibaba.rocketmq.common.message.Message; import com.alibaba.rocketmq.common.message.MessageQueue; import com.alibaba.rocketmq.remoting.exception.RemotingException; /** * 配合ConsumerClusterMember1,2做測試 */ public class ProducerCluster { public static void main(String[] args) { DefaultMQProducer producer = new DefaultMQProducer("producer_cluster"); producer.setNamesrvAddr("192.168.31.176:9876;192.168.31.165:9876"); try { // 設置實例名稱 producer.setInstanceName("producer_cluster_name"); // 開啟生產者 producer.start(); // 創建一條消息 Message msg = new Message("cluster_timeout_test", "TagA", "OrderID0034", "customer_cluster_test".getBytes()); SendResult send = producer.send(msg); System.out.println("id:--->" + send.getMsgId() + ",result:--->" + send.getSendStatus()); } catch (MQClientException e) { e.printStackTrace(); } catch (RemotingException e) { e.printStackTrace(); } catch (MQBrokerException e) { e.printStackTrace(); } catch (InterruptedException e) { e.printStackTrace(); } producer.shutdown(); } }
消費端集群:
package org.hope.lee.consumer; import java.util.List; import com.alibaba.rocketmq.client.consumer.DefaultMQPushConsumer; import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext; import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus; import com.alibaba.rocketmq.client.consumer.listener.MessageListenerConcurrently; import com.alibaba.rocketmq.client.exception.MQClientException; import com.alibaba.rocketmq.common.consumer.ConsumeFromWhere; import com.alibaba.rocketmq.common.message.MessageExt; public class ConsumerClusterMember1 { public static void main(String[] args) throws MQClientException { DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumer_cluster"); consumer.setNamesrvAddr("192.168.31.176:9876;192.168.31.165:9876"); // 批量消費,每次拉取10條 consumer.setConsumeMessageBatchMaxSize(10); // 如果非第一次啟動,那么按照上次消費的位置繼續消費 consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET); // 訂閱PushTopic下Tag為push的消息 consumer.subscribe("cluster_timeout_test", "TagA || Tag B || Tage C"); consumer.registerMessageListener(new MqMessageListener1()); consumer.start(); System.out.println("Consumer Started."); } } class MqMessageListener1 implements MessageListenerConcurrently{ public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) { try { MessageExt msg = msgs.get(0); System.out.println("------消息處理中--------"); Thread.sleep(60000); String msgBody = new String(msg.getBody(), "utf-8"); System.out.println("msgBody:" + msgBody); } catch(Exception e) { e.printStackTrace(); return ConsumeConcurrentlyStatus.RECONSUME_LATER; } return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }
package org.hope.lee.consumer; import java.util.List; import com.alibaba.rocketmq.client.consumer.DefaultMQPushConsumer; import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext; import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus; import com.alibaba.rocketmq.client.consumer.listener.MessageListenerConcurrently; import com.alibaba.rocketmq.client.exception.MQClientException; import com.alibaba.rocketmq.common.consumer.ConsumeFromWhere; import com.alibaba.rocketmq.common.message.MessageExt; public class ConsumerClusterMember2 { public static void main(String[] args) throws MQClientException { DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumer_cluster"); consumer.setNamesrvAddr("192.168.31.176:9876;192.168.31.165:9876"); // 批量消費,每次拉取10條 consumer.setConsumeMessageBatchMaxSize(10); // 如果非第一次啟動,那么按照上次消費的位置繼續消費 consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET); // 訂閱PushTopic下Tag為push的消息 consumer.subscribe("cluster_timeout_test", "TagA || Tag B || Tage C"); consumer.registerMessageListener(new MqMessageListener2()); consumer.start(); System.out.println("Consumer Started."); } } class MqMessageListener2 implements MessageListenerConcurrently{ public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) { try { MessageExt msg = msgs.get(0); String msgBody = new String(msg.getBody(), "utf-8"); System.out.println("msgBody:" + msgBody); } catch(Exception e) { e.printStackTrace(); return ConsumeConcurrentlyStatus.RECONSUME_LATER; } return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }
https://gitee.com/huayicompany/RocketMQ-learn/tree/master/rocketmq-api
參考:
[1]博客,http://blog.csdn.net/u010634288/article/details/56049305