activemq-重發、去重


activemq的consumer端也有窗口機制,通過prefetchSize就可以設置窗口大小。加入窗口是為了批量獲取數據,同時可以設置optimizeAcknowledge來優化確認回復,優化確認一方面可以減輕client負擔(不需要頻繁的確認消息)、減少通信開銷,另一方面由於延遲了確認(默認ack了0.65*prefetchSize個消息才確認),broker再次發送消息時又可以批量發送,如果只是開啟了prefetchSize,每條消息都去確認的話,broker在收到確認后也只是發送一條消息,當然也可以手動延遲確認。

consumer會維護兩個隊列,pendingList和dispatchedList,前者存放從broker已接受但未消費(未回調onMessage)的message,后者用於存放已消費但未確認的message(可用於recover,即redelivery)。

activemq的重發機制是session為單位的,並且重發只發生在client端,並不會向broker請求重發消息,只會在重發后向broker發送一個redelivered命令,如果某消息的redelivered次數達到閾值,這條消息就會被清除並送入DLQ。

 1 public void recover() throws JMSException {
 2 
 3   checkClosed();
 4   if (getTransacted()) {
 5     throw new IllegalStateException("This session is transacted");
 6   }
 7    //該session的每個consumer都會recover
 8   for (Iterator<ActiveMQMessageConsumer> iter = consumers.iterator(); iter.hasNext();) {
 9     ActiveMQMessageConsumer c = iter.next();
10     c.rollback();
11   }
12 
13 }

同樣的,message的確認也是session級別的

1 public void acknowledge() throws JMSException {
2   for (Iterator<ActiveMQMessageConsumer> iter = consumers.iterator(); iter.hasNext();) {
3     ActiveMQMessageConsumer c = iter.next();
4     c.acknowledge();
5   }
6 }

 

通過持久化、確認機制,broker可以保證消息不丟失,即如果consumer未確認消息,consumer都可以再次得到該消息,但broker並不擔保消息被client唯一消費。onMessage處理消息時出錯,consumer會自動發起recover;重啟consumer后,consumer會得到之前未確認的消息;consumer回復了確認,但確認命令還未得到broker處理時,broker掛掉了,broker重啟后,consumer依舊會收到之前確認過的消息。這些情況都會產生重復消息,消息的去重需要client自己保證,最簡單直接的方式就是處理完消息時,將消息業務唯一標識符入庫,每次處理消息時都檢查是否存在該標識符。

 

參考:http://activemq.apache.org/message-redelivery-and-dlq-handling.html

  http://activemq.apache.org/redelivery-policy.html


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM