雖說消息隊列的用法很簡單:PUB/SUB, PRODUCER/CONSUMER,不過真做起來還真麻煩。
先說下原始需求:
- Web前端發送命令消息,后端Consumer處理,然后前端得到結果
- 需要支持Windows服務
很快,下圖就出來了:
先來分析分析:
-
- 前端怎么知道后端已經處理完成?
- 前端如何在處理完后的第一時間被觸發去執行某些callback呢?
- Web前端很可能會通過ajax來定時查看某消息的處理狀態
第一反應是增加應答隊列,此時:
-
- 前端能夠很及時的被通知到(后端處理完觸發),來執行callback
- 但是
- ajax類型的定時查看怎么做?在ResponseQueue中查?顯然不行(隊列中數據越多,性能越差)
- 如果前端關閉一段時間,消息會積壓下來,性能越變越差
因此決定增加一個DB來解決這些消息的保存以及后續的ajax類型的多次查詢,如下圖:
再來分析分析,此時
-
- 前端ajax類型的不定時、多次的查詢某消息處理狀態是解決了
- 如果消息量很大,也可以將RabbitMQ以及DB分別做集群以及切片
- 但,似乎還是得增加應答隊列進去,因為現在CONSUMER處理完成后,針對前端的通知很麻煩,理由如下
- 基於DB行記錄的通知效率低
- 但,即便增加了這個應答隊列,也會出現如下問題
- 如果前端崩掉后有段時間未on service,此時應答隊列就會積壓消息...性能會變差
此時該咋辦?
答:用PUB/SUB機制來做這個應答隊列,此時如果前端崩掉,就不會SUB了,只要online時才會有消息被通知到
因此,繼續出一張圖
圖中的Notifier, NotifierPublisher是前端和后端的BROKER,考慮到有些線程需要主動監聽,因此畫在了上面。
再來談談后端,由於沒有特別高要求,對后端的要求也就是這么幾點:
-
- 在業務邏輯角度,消息只能被無錯處理一次
- 如果出現了Exception, 則需要后續人工介入,消息不能丟失,但也不應該造成無限循環的報Exception
- 對於報Exception的消息,人工處理要方便
分別分析
-
- 在業務邏輯角度,消息只能被無錯處理一次
- 在業務處理沒有報錯的情況下,將RabbitMQ消息的Ack動作與DB的消息狀態回寫做成一個TRANSACTION,如下
- 這樣就能保證此消息“從RABBITMQ Server中remove、寫入查詢DB”同時確保
- 但是,如果存在下述情況時,會出現即便業務邏輯沒有報錯情況下多次執行
- 那就是:如果業務邏輯執行完畢,沒有報錯,此時,即將觸發上述代碼,卻還沒有觸發的時刻,服務crash了...
- 解決方法
- 在業務邏輯代碼中加入冪等性
- 或
- 在業務邏輯代碼中加入檢查性質代碼
- 或
- 告訴我下其他簡便的方法吧(記錄本地文件日志能解決,就是比較復雜)
- 如果出現了Exception, 則需要后續人工介入,消息不能丟失,但也不應該造成無限循環的報Exception
- 增加相應的Exception隊列,實際中是增加了2個:如:
- 比如目前有隊列messages.CommandA,則異常隊列有:
- messages.CommandA.exceptions1
- messages.CommandA.exceptions2
- 為啥是2個?看后續
- 比如目前有隊列messages.CommandA,則異常隊列有:
- 增加相應的Exception隊列,實際中是增加了2個:如:
-
對於報Exception的消息,人工處理要方便
- 在配置文件中增加一個參數,表示運行級別:普通、異常1、異常2
- 如果是普通級別,則CONSUMER會從messages.CommandA中獲取消息進行處理,報錯后會將消息move到exception1中
- 如果是異常1級別,則CONSUMER會從messages.CommandA.exception1中獲取消息進行處理,報錯后會將消息move到exception2中
- 如果是異常2級別,則CONSUMER會從messages.CommandA.exception2中獲取消息進行處理,報錯后將消息move到exception1中
- 這里還有個問題,就是上面的這些RABBITMQ級別的消息從exception1移動到exception2中,都是分成PUBLISH和BASICACK兩個CHANNEL上的動作完成的,不能套RABBITMQ的TX,也就是存在一致性問題
- 解決起來同HandleSuccessfulMessage類似,都是通過本地db事務來做,都是借助了BASE思想來實現的
- 在業務邏輯角度,消息只能被無錯處理一次
剩下的一個問題,RABBITMQ有優先級隊列特性嗎?答案是有:
注意:RABBITMQ默認是不支持客戶端消息的優先級的,默認只支持Consumer的優先級,那如何實現客戶端消息的優先級呢?
-
- 發送消息時,設置properties屬性,記得設置Priority屬性值
- 安裝插件rabbitmq_priority_queue
- DeclareQueue時,加入x-max-priority特性值
DONE.