rocketMQ(十一) rocketMQ使用 重復消費、消息堆積、順序消費、消息丟失等問題


一:順序發送

通過一定算法,將一組順序消息發送到同一個broker下面的同一個隊列,消費者進行順序監聽即可。
例如:一條信息的唯一標識 通過一定算法 路由到 同一個 broker 下到 某一個隊列下。 通過業務層面處理。

// RocketMQ通過MessageQueueSelector中實現的算法來確定消息發送到哪一個隊列上
// RocketMQ默認提供了兩種MessageQueueSelector實現:隨機/Hash
// 當然你可以根據業務實現自己的MessageQueueSelector來決定消息按照何種策略發送到消息隊列中
SendResult sendResult = producer.send(msg, new MessageQueueSelector() {
    @Override
    public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
        Integer id = (Integer) arg;
        int index = id % mqs.size();
        return mqs.get(index);
    }
}, orderId);

備注:send方法帶有參數MessageQueueSelector,MessageQueueSelector是讓用戶自己決定消息發送到哪一個隊列,如果是局部消息的話,用來決定消息與隊列的對應關系。
細節:https://www.jianshu.com/p/f8d5a5f37595

二:重復消費

RocketMQ不保證消息不重復,如果你的業務需要保證嚴格的不重復消息,需要你自己在業務端去重

官方對comsumerMessage方法的實現建議是:
It is not recommend to throw exception,rather than returning ConsumeConcurrentlyStatus.RECONSUME_LATER if consumption failure
無論如何,都不要拋出異常,如果需要重新消費,可以返回RECONSUME_LATER主動要求重新消費。

解決方案:
1: 采用分布式鎖
2:采用數據表操作
以下鏈接 分析很到位。可以好好看一下
https://blog.csdn.net/xzq19920203/article/details/98532304?utm_medium=distribute.pc_relevant.none-task-blog-BlogCommendFromMachineLearnPai2-2.nonecase&depth_1-utm_source=distribute.pc_relevant.none-task-blog-BlogCommendFromMachineLearnPai2-2.nonecase

三:消息堆積

  1. 其實對於一個原本正常的消息系統來說消息積壓,只會出現兩種情況:要么生產者消息數量增加導致的積壓;要么就是消費者消費變慢導致的消息積壓。對於一個消息隊列我們肯定在上線前就預估好,單節點最大承受流量與系統目前最大峰值流量的數據,一般情況下消息隊列收發性能是遠大於業務處理性能的,一旦出現的話問題也很顯而易見:要么就是流量突然增加,要么就是業務邏輯異常。我能應該從三個方面來查找問題:

  2. 生產端:一般當生產端發生積壓(Broker正常的情況下)就要查看你的業務邏輯是否有異常的耗時步驟導致的。是否需要改並行化操作等。
    Broker端:當Broker端發生積壓我們首先要查看,消息隊列內存使用情況,如果有分區的的話還得看每個分區積壓的消息數量差異。當每個分區的消息積壓數據量相對均勻的話,我們大致可以認為是流量激增。需要在消費端做優化,或者同時需要增加Broker節點(相當於存儲擴容),如果分區加壓消息數量差異很大的話(有的隊列滿了,有的隊列可能還是空閑狀態),我們這時候就要檢查我們的路由轉發規則是否合理,

  3. 消費端:在使用消息隊列的時候大部分的問題都出在消費端,當消費速度小於生產速度很快就會出現積壓,導致消息延遲,以至於丟失。這里需要重點說明一點的是,當消費速度小於生產速度的時候,僅增加消費者是沒有用處的,因為多個消費者在同一個分區上實際是單線程資源競爭關系(當然還有一些冒險的單隊列多消費者並行方式就是:消費者接到消息就ack成功再去處理業務邏輯,這樣你就要承受消息丟失的代價),我們需要同時增加Broker上的分區數量才能解決這一問題。
    那么上面我們說到消息積壓的問題所在,那么遇到這樣問題我們怎么能夠快速的解決呢?我們需要查看是否有無限重發的消息或者有進入死鎖的程序等等,當確定是流量激增的話,我們需要評估是否需要增加資源還是通過限流的方式解決,當短時間大量消息需要處理時,在資源允許的情況下,我們可以新啟一批消費者與消息隊列,將原來的消費者中的消息直接作為生產者轉發到臨時應急隊列中,這樣大概率的能夠快速解決消息積壓。與其事后處理不如我們在設計之初就要把積壓考慮進來,對於數據量非常大,但是實時性要求不高的場景,可以設計出批量消息發送,當隊列積累到一定閥值再做批量消費消費,這里需要注意的就是重復消費帶來的影響,設計不好就是一場災難。
    來源: https://zhuanlan.zhihu.com/p/112681372

四:消息丟失(消息重試)

主要分為生產端和消費端
生產端:可以指定發送次數,失敗后 異常處理
消費端:重試-進行重新消費問題(注意冪等性問題,數據庫或者分布式鎖) 。 一般指定次數。超過限制則記錄 日志表 進行人工補償機制
細節看 https://www.jianshu.com/p/09d13c70ddad

注意:默認情況下CommitLog在每天4點刪除超過48小時的文件或者當磁盤水位線達到75%。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM