messageModel有兩種方式:BROADCASTING 和 CLUSTERING,
消費者收到消息也有兩種消費方式:orderly和concurrently,
1、BROADCASTING模式下,所有注冊的消費者都會消費,而這些消費者通常是集群部署的一個個微服務,這樣就會多台機器重復消費。
2、在CLUSTERING模式下,如果一個topic被多個consumerGroup消費,也會重復消費。
3、即使是在CLUSTERING模式下,同一個consumerGroup下,一個隊列只會分配給一個消費者,看起來好像是不會重復消費。但是,有個特殊情況:一個消費者上線后,同組的所有消費者要重新負載均衡(反之一個消費者掉線后,也一樣)。一個隊列所對應的新的消費者要獲取之前消費的offset,此時之前的消費者可能已經消費了一條消息,但是並沒有把offset提交給broker,那么新的消費者可能會重新消費一次。雖然orderly是前一個消費者先解鎖,后一個消費者加鎖再消費的模式,比起concurrently要嚴格了,但是加鎖的線程和提交offset的線程不是同一個,所以還是會出現極端情況下的重復消費。
4、默認情況下orderly和concurrently模式都是一條一條的消費,但是如果在消費一條消息的時候要進行比如AB兩個操作,在沒有事務控制的情況下,如果A操作成功而B操作失敗,就會重新消費導致A操作會再執行一次,這樣雖然不是重復的消費整條消息,但是一部分的操作會重復。如果放寬條件到一次消費多條的情況,orderly模式是一批中有一條消費失敗,一批統一重新消費,直到達到最大消費次數的限制,發送到死信隊列。而concurrently情況下,在返回成功(CONSUME_SUCCESS)的前提下,有個ackIndex可以分隔成功和失敗的消息,失敗的、沒有消費的消息發送到retry隊列,不會造成重復消費,而如果返回的是(RECONSUME_LATER),仍然是和orderly一樣的同批次全部重新消費。(其實orderly也可以這樣做,但是不知道為什么沒有,而是全部重新放入隊列消費)。
5、消費者pullRequest發出去,如果長時間收不到請求,是會被取出來重新再放入隊列再請求一次的,所以也是會重復拉取消息的。
注:orderly和concurrently消費失敗的處理(假設在集群模式下):
ConsumeMessageOrderlyService的內部類ConsumeRequest的run方法中,messageListener.consumeMessage的返回值status的值有兩種:SUSPEND_CURRENT_QUEUE_A_MOMENT和 SUCCESS,如果拋出異常返回null,隨后仍會被賦值SUSPEND_CURRENT_QUEUE_A_MOMENT,接下來的processConsumeResult方法中,在isAutoCommit為true的判斷中,進入SUSPEND_CURRENT_QUEUE_A_MOMENT的判斷,假設把msgs的size簡化為1(也就是說一次只從processQueue的msgTreeMap中取出offset最小的那條),默認的maxReconsumeTimes是Integer.maxValue,一般是需要自己設置一次的,到達最大失敗次數,還是沒成功,進入sendMessageBack方法,在調用send方法發送消息,看起來是把消息發送到topic為retry開頭的重試隊列,但是,由於broker這里SendMessageProcessor.sendMessage方法中,有handleRetryAndDLQ方法判斷當前消息的重試次數有沒有達到最大重試次數,如果到達,替換topic為DLQ死信隊列,將消息存儲到死信隊列中。剛才說的是每次從processQueue獲取一條消息的情況,如果一次獲取多條,那么如果status不是成功,那么會針對每一條消息判斷是不是到達最大重試次數,如果到達,送到死信隊列,如果沒有,重試次數加一,再次放到processQueue中。而這次成功消費的offset要保存,因為之前的前提是集群模式,那么要提交給messageId對應的broker,具體代碼邏輯是:RemoteBrokerOffsetStore的updateOffset是存到自己的offsetTable屬性中,而MQClientInstance.start---startScheduledTask---persistAllConsumerOffset---RemoteBrokerOffsetStore.persistAll---updateConsumeOffsetToBroker,把offset提交。
而ConsumeMessageConcurrentlyService的submitConsumeRequest方法,把這次取到的消息分批(如果需要的話),組裝成內部類ConsumeRequest,所以concurrently也存在一次消費多條消息的情況,這時候會在context參數中保存一個ackIndex,如果成功了,ackIndex不變,如果失敗了,ackIndex為-1,如果ackIndex小於這次分批msgs的size,那么之后的消息要發送回重試隊列,這里和orderly不同,orderly是直接send方法,而這里的requestCode都不一樣了。這里也會判斷最大消費次數的問題,發送到重試隊列或者死信隊列。注意和orderly不同的是,orderly是本地重試,不會發送到重試隊列,只會最后發送到死信隊列。而concurrently會先發送到重試隊列。至於本地消息的刪除以及commitoffset的處理,因為並發消費在目前的業務中不太能用得到,等以后用到了再看。