RabbitMQ雙活實踐(轉)


有貨RabbitMQ雙活實踐

 

消息服務中間件在日常工作中用途很多,如業務之間的解耦,其中 RabbitMQ 是比較容易上手且企業使用比較廣泛的一種,本文主要介紹有貨在使用 RabbitMQ 的一些實踐與嘗試。

有貨的 RabbitMQ 部署架構采用雙中心模式,在兩套數據中心中各部署一套 RabbitMQ 集群,各中心的 RabbitMQ 服務除了需要為業務提供正常的消息服務外,中心之間還需要實現部分隊列消息共享。

消息傳遞的可靠性

場景 1:生產者與消費者互不感知,怎么確認生產者已將消息投遞到 RabbitMQ 服務端,又如何確認消費者已經消費了該消息?

 消息 Publish 可靠性

首先來談談 Publisher 的可靠發送,如果使用標准 AMQP 0-9-1,保證消息不丟失的唯一方法是使用事務,但使用事務模式會導致服務端吞吐量急劇下降。為了彌補這一點,AMQP 引入了確認機制。它模仿了協議中已經存在的消費者 ACK 確認機制。Publisher 通過發送 confirm.select 命令開啟確認模式,隨后 RabbitMQ 服務端在收到消息后會進行確認,Publisher 會收到服務端發送的確認回復。要注意:無法在通道中同時使用確認模式與事務模式,只可二選一。

 消息 Consume 可靠性

再說說如何保證隊列中消息至少被消費一次。當 RabbitMQ 交付消息給 Consumer 時,需要確認 Message 已被投遞到 Consumer。Acknowledgements 作用,Consumer 發送確認消息通知 RabbitMQ 服務端已收到消息或已成功消費消息。看下消息生產、消費的流程圖:

在 1 號的位置需要開啟 Channel 的 Confirm 模式,接收 RabbitMQ 服務端發送的確認消息已到達的 Ack 信息;在 3 號的位置,消費者在成功消費或者業務處理失敗后,需要告訴 RabbitMQ 服務端,消息已被消費成功或者失敗;當然在某些網絡故障中,數據包丟失可能意味着中斷的 TCP 連接需要較長時間才能夠被操作系統檢測到。通過心跳功能,確保應用程序層及時發現連接中斷。

在我們的部署架構中,ELB 與 RabbitMQ 之間就是通過此機制來判斷服務是否存活,通知消息生產者服務端已掛,異步等待 Confirm 的消息直接進入 Unconfirm 的處理環節。另外為了避免在代理中丟失消息,AMQP 標准具有交換、隊列和持久消息的耐久性概念,要求持久對象或持久消息將在重新啟動后生存,這些特性同樣也是可靠性的基礎。

實現消息的延遲重試機制(重試隊列)

場景 2:在某些情況下,業務系統在處理消息時可能會失敗,此時需要做的是重試,而不是直接丟棄;當然重試也不能直接重試,一旦有任務長時間失敗,會導致后面的消息無法被正常處理,此時可以借助死信機制(消息在隊列中存活時間超出隊列 ttl 的設定)轉發投遞到特定的重試隊列后,隨后再嘗試重新處理該消息。

下面介紹具體操作:

首先創建兩個隊列,工作隊列命名為“yoho_test_retry”,重試隊列命名為“yoho_test_retry.retry“。

再看下工作隊列的參數配置:

  • x-dead-letter-exchange:死信轉發的 Exchange

  • x-dead-letter-routing-key:死信轉發時的 Routing-key

  • yoho_test_retry 綁定到名為“amp.topic”的 topic 類型 Exchange,接收 Routing-key 為“yoho_test_retry”的消息

再看下重試隊列的參數配置:

  • 死信轉發到“amp.topic”的 Exchange

  • Routing-key 為“yoho_test_retry”(即工作隊列 yoho_test_retry 接收該主題消息)

  • x-message-ttl:message 在重試隊列中存活的時間,也就是延遲多久重試。該隊列綁定到“amp.topic”的 Exchange,接收 Routing-key 為“retry.yoho_test_retry”的消息(即接收工作隊列的死信),這樣就可以實現消息重試隊列的機制了

當然還有別的方式,如通過聲明 Retry 的 Exchange 來中轉到 Retry 隊列中,不需要指定 x-dead-letter-routing-key,再指定 Retry 隊列的 dead-letter-exchange 為“amp.topic”即可,這種方式不需要每個隊列都生成一個 Retry 隊列,大家可以自己動手嘗試下。

實現消息的延時消費(延時隊列)

場景 3:如何實現消息的延時消費也是一種常見的需求,可以讓某些任務延時執行,其實同樣也可以借助死信機制來實現。

隊列 A 用於接收暫存 Producer 的消息,隊列 B 用於 Consumer 的消費,在隊列 A 中指定消息的 ttl 即生命周期時長,同時指定其死信交換機 DLXs,一旦消息在隊列中存活時長超過 ttl 的設定值,那么消息會被轉發到 DLXs,將隊列 B 綁定到 DLXs,即可接收到隊列 A 的死信。

具體操作流程,與場景 2 一樣,首先創建兩個隊列:工作隊列名為“yoho_test_delay” ,延遲隊列名為“yoho_test_delay.delay“。

再看下工作隊列的配置參數:

  • 從“amp.topic”的 Exchange 中接收 Routing-key 為“delay.yoho_test_delay”的消息。

延遲隊列“yoho_test_delay.delay”的配置:

  • x-dead-letter-exchange 死信轉到交換機“amp.topic”

  • 死信消息的 Routing-key 為“delay.yoho_test_delay”(即工作隊列接收消息的 Routing-key)

  • 消息在延遲隊列中存活時間 ttl

  • 該隊列綁定到“amp.topic”交換機,接收 Routing-key 為“yoho_test_delay”的消息(即生產者發送消息指定的 topic)。如此一來延遲隊列接收消息后,等待 ttl 時長后將消息轉發到工作隊列中,即可實現延遲隊列機制

同樣還有別的方法,大家可以靈活實現。

實現跨數據中心的消息共享

場景 4:有時跨中心業務需要共享消息,如緩存清理等,在業務代碼中分別向多個中心的 RabbitMQ 發布消費消息顯然不是一種比較好的解決方案,那還有什么好的方法呢?

RabbitMQ 為此提供了 Federation 插件來很好地解決此類問題,有貨跨中心部署 Federation 架構圖:

Federation 插件是一個不需要構建 Cluster,而在 Brokers 之間傳輸消息的高性能插件,Federation 插件可以在 Brokers 或者 Cluster 之間傳輸消息,連接的雙方可以使用不同的 users 和 virtual hosts,雙方也可以使用版本不同的 RabbitMQ 和 Erlang。Federation 插件使用 AMQP 協議通訊,可以接受不連續的傳輸。

Federation Exchanges,可以看成 Downstream 從 Upstream 主動拉取消息,但並不是拉取所有消息,必須是在 Downstream 上已經明確定義 Bindings 關系的 Exchange,也就是有實際的物理 Queue 來接收消息,才會從 Upstream 拉取消息到 Downstream。使用 AMQP 協議實施代理間通信,Downstream 會將綁定關系組合在一起,綁定 / 解除綁定命令將發送到 Upstream 交換機。因此,Federation Exchange 只接收具有訂閱的消息,本處貼出官方圖來說明;

但是注意,由於綁定是異步發送的 Upstream 的,所以添加或刪除綁定的效果並不立即生效,消息被緩沖在 Upstream 交換機的所在 Broker 創建的隊列中,這被稱為 Upstream 隊列。任何 Upstream Exchange 接收到的消息都可能被 Downstream 中 Federation Exchange 接收到,但直接發送給 Federation Exchange 的消息是不能被 Upstream 中所綁定的 Exchange 接收到的。

下面動手創建名為“fed_test”的 Federation Exchange,配置 Federation 策略“fed_ex”,Federation-upstream-set 可以簡單的配置為“all”,表示與所有的 Upstream 都建立點對點的 Federation 連接。

此時在 Downstream 上可以看到建立了一個 Running-Links 連接到 Upstream,該 Exchange 就可以收到 Upstream 中同名 Exchange 收到的所有消息(前提是 Downstream 中有物理隊列接收)。

大家應該都知道 RabbitMQ 中單 Queue 能夠對外提供的服務能力有局限性,如何通過 Federation 來滿足跨中心同時高並發的場景呢,此時就需要自己編寫插件了,結合后面會介紹的 Sharding 分片機制,創建多個 Federation 緩沖隊列分攤壓力,本人的想法僅供參考。

實現消息隊列的高可用(HA 容災)

場景 5:需要保證消息隊列高可用的場景有很多,比如核心業務的訂單服務、erp 服務。

默認情況下,RabbitMQ 群集中的隊列位於單個節點上(首次被聲明的節點上),而 Exchanges 和 Bindings 可以認為在所有節點上存在,但也可以將 Queue 在 Cluster 節點之間配置為鏡像隊列。每個鏡像隊列由一個 Master 和一個或多個 Slave 組成,如果 Master 因為某些原因失效,則將從 Slave 中選擇一個提升為 Master。

發布到隊列的消息將復制到所有鏡像,消費者連接到主機,無論它們連接到哪個節點,鏡像會丟棄已在主設備上確認的消息,隊列鏡像因此增強了可用性,但不跨節點分配負載。

如上圖創建名為“ha_test_queue”的隊列,同時為該隊列配置了策略 Policy,Ha-mode 簡單配置為 all,當然可以使用 Ha-node 參數選擇節點制作鏡像。

此時隊列已被配置為鏡像,master 節點位於 server5,slave 節點位於 server6,此時,隨意關閉任意一台 RabbitMQ 節點,該隊列都可以正常對外提供服務。

當然在高可用的場景下,隊列的性能會受到一定的影響,此時可以借助后面提到的 Sharding 機制(根據場景選擇 x-modulus-hash 還是 consistent-hash ),解決單隊列的性能瓶頸,在高可用、高並發下尋求一個動態的平衡。

RabbitMQ 分片機制

在解決 RabbitMQ 單 Queue 性能問題時可以用到 RabbitMQ Sharding 插件,該插件可以提供消息的自動分片能力:自動創建分片隊列,同時交換機將在隊列中分區或分片消息。

在某些情況下,你可能希望發送到交換機的消息一致並且均勻地分布在多個不同的隊列。在上面的插件中如果隊列數量發生變化,則不能確保新的拓撲結構仍然在不同隊列之間均勻分配消息,此時就可以借助 Consistent-sharding 類型 Exchange,與 Sharding 插件的主要區別是,該類 Exchange 不能自動創建分片隊列,需要手動創建並配置 Binding 關系,且支持一致性 hash。

RabbitMQ 高並發實踐

衡量消息服務的性能最重要的指標之一就是吞吐量,那 RabbitMQ 的高並發到底可以到多少呢?首先使用了 32 台 8 核 30G 內存的虛擬機,構建了相對來說比較龐大的 RabbitMQ 集群,各虛擬機的作用分配如下:

  • 30 RabbitMQ RAM 節點(正常 RAM 節點,RabbitMQ 元數據和定義僅保存在 RAM 中);

  • 1 RabbitMQ Disc 節點(元數據持久化節點,其中 RabbitMQ 代理元數據和定義也保留在光盤上);

  • 1 RabbitMQ Stats 節點(統計信息節點,運行 RabbitMQ 管理插件,不帶任何隊列)。

測試環境架構結構圖,大致如下:

在 RabbitMQ 群集節點的前面,掛載負載均衡器,負載均衡器配置中包含了除統計信息節點以外的所有節點。來自連接的 AMQP 客戶端的請求在目標池中的節點之間進行了平衡。從目標池排除統計信息節點有助於確保消息隊列和傳送工作不會最終與管理節點發生資源競爭。在較低吞吐量的情況下,用戶可以選擇將統計信息節點包含在負載平衡器后台服務池中。實驗結果如下:

在這種高負載的生產(1345531 msgs/pers)消費(1413840 msgs/pers)壓力下,RabbitMQ 僅有 2343 條消息暫時在其等待發送的隊列中累積,在這樣的負載下,RabbitMQ 節點也沒有顯示內存壓力,或者需要基於資源限制啟動流控機制。我們在 AWS 上搭建了同等規模與配置的環境,驗證了上述 Google 提供的測試方案及結果后又做了一些別的嘗試,如使用 RabbitMQ Sharding 插件、Consistent-hash Sharding Exchange 來更加靈活地動態均衡隊列壓力,可以更從容地達到百萬並發的性能。

高可靠與高可用從來都是性能殺手,那 RabbitMQ 的表現如何,實際生產應用中應該中如何做權衡?最后通過一組數據來說明,RabbitMQ 環境配置:

  • 單虛擬機:8C8G ;

  • 單位消息大小:1KB;

  • 壓測工具:rabbitmq-perf-tes(官方提供)。

上圖中,我們在單節點 RabbitMQ 上對消息持久化、Consume-Ack、Publish-Confirm 三個特性做了壓測,消息持久化對性能影響最大,Consum-Ack 其次,Publish-Confirm 最小。

Prefetch,可以理解為 Consumer 一次最多獲取多少消息進行消費,可以看到,當 Prefetch 為 10 時,性能最差,當 Prefetch 放大到一定閾值如 10000,其對性能的影響也就微乎其微了。

再考慮下,多消費者多生產者對性能的影響,如上圖適當增加消費者,保持隊列的空閑,可以增加吞吐量,但當到達某一瓶頸時,效果不太明顯了。

在集群場景下,Ack 對性能影響明顯。

上圖為鏡像場景的壓測結果,對比普通集群,鏡像對性能的影響很明顯,消息持久化也拉低了集群的性能,適當增加 Prefetch 可以提高集群性能。

性能與高可靠、高可用,魚和熊掌不可兼得,所以想提升 RabbitMQ 集群或單節點服務的性能,可以犧牲可靠性(根據場景來),在消費能力范圍內,盡量提高 Prefetch 的數量;其次就是簡單粗暴型(加機器加配置,隊列實際存儲節點性能未榨干,建議隊列均衡分配到各節點)。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM