記一次消息總線的打造


雖說消息隊列的用法很簡單:PUB/SUB, PRODUCER/CONSUMER,不過真做起來還真麻煩。

先說下原始需求:

  • Web前端發送命令消息,后端Consumer處理,然后前端得到結果
  • 需要支持Windows服務

很快,下圖就出來了:

先來分析分析:

    • 前端怎么知道后端已經處理完成?
    • 前端如何在處理完后的第一時間被觸發去執行某些callback呢?
    • Web前端很可能會通過ajax來定時查看某消息的處理狀態

  第一反應是增加應答隊列,此時:

    • 前端能夠很及時的被通知到(后端處理完觸發),來執行callback
    • 但是
      • ajax類型的定時查看怎么做?在ResponseQueue中查?顯然不行(隊列中數據越多,性能越差)
      • 如果前端關閉一段時間,消息會積壓下來,性能越變越差

  因此決定增加一個DB來解決這些消息的保存以及后續的ajax類型的多次查詢,如下圖:

 再來分析分析,此時

    • 前端ajax類型的不定時、多次的查詢某消息處理狀態是解決了
    • 如果消息量很大,也可以將RabbitMQ以及DB分別做集群以及切片
    • 但,似乎還是得增加應答隊列進去,因為現在CONSUMER處理完成后,針對前端的通知很麻煩,理由如下
      • 基於DB行記錄的通知效率低
    • 但,即便增加了這個應答隊列,也會出現如下問題
      • 如果前端崩掉后有段時間未on service,此時應答隊列就會積壓消息...性能會變差

  此時該咋辦?

  答:用PUB/SUB機制來做這個應答隊列,此時如果前端崩掉,就不會SUB了,只要online時才會有消息被通知到

  因此,繼續出一張圖

    

  圖中的Notifier, NotifierPublisher是前端和后端的BROKER,考慮到有些線程需要主動監聽,因此畫在了上面。

 

  再來談談后端,由於沒有特別高要求,對后端的要求也就是這么幾點:

    • 在業務邏輯角度,消息只能被無錯處理一次
    • 如果出現了Exception, 則需要后續人工介入,消息不能丟失,但也不應該造成無限循環的報Exception
    • 對於報Exception的消息,人工處理要方便

  分別分析    

    • 在業務邏輯角度,消息只能被無錯處理一次
      • 在業務處理沒有報錯的情況下,將RabbitMQ消息的Ack動作與DB的消息狀態回寫做成一個TRANSACTION,如下
      • 這樣就能保證此消息“從RABBITMQ Server中remove、寫入查詢DB”同時確保
      • 但是,如果存在下述情況時,會出現即便業務邏輯沒有報錯情況下多次執行
        • 那就是:如果業務邏輯執行完畢,沒有報錯,此時,即將觸發上述代碼,卻還沒有觸發的時刻,服務crash了...
        • 解決方法
          • 在業務邏輯代碼中加入冪等性
          • 在業務邏輯代碼中加入檢查性質代碼
          • 告訴我下其他簡便的方法吧(記錄本地文件日志能解決,就是比較復雜)
    • 如果出現了Exception, 則需要后續人工介入,消息不能丟失,但也不應該造成無限循環的報Exception
      • 增加相應的Exception隊列,實際中是增加了2個:如:
        • 比如目前有隊列messages.CommandA,則異常隊列有:
          • messages.CommandA.exceptions1
          • messages.CommandA.exceptions2
          • 為啥是2個?看后續
    • 對於報Exception的消息,人工處理要方便

      • 在配置文件中增加一個參數,表示運行級別:普通、異常1、異常2
      • 如果是普通級別,則CONSUMER會從messages.CommandA中獲取消息進行處理,報錯后會將消息move到exception1中
      • 如果是異常1級別,則CONSUMER會從messages.CommandA.exception1中獲取消息進行處理,報錯后會將消息move到exception2中
      • 如果是異常2級別,則CONSUMER會從messages.CommandA.exception2中獲取消息進行處理,報錯后將消息move到exception1中
      • 這里還有個問題,就是上面的這些RABBITMQ級別的消息從exception1移動到exception2中,都是分成PUBLISH和BASICACK兩個CHANNEL上的動作完成的,不能套RABBITMQ的TX,也就是存在一致性問題
        • 解決起來同HandleSuccessfulMessage類似,都是通過本地db事務來做,都是借助了BASE思想來實現的

 

剩下的一個問題,RABBITMQ有優先級隊列特性嗎?答案是有:

 

注意:RABBITMQ默認是不支持客戶端消息的優先級的,默認只支持Consumer的優先級,那如何實現客戶端消息的優先級呢?

    1. 發送消息時,設置properties屬性,記得設置Priority屬性值
    2. 安裝插件rabbitmq_priority_queue
    3. DeclareQueue時,加入x-max-priority特性值

 

DONE.

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM