因為原來使用了MQ作為rpc機制,隨着客戶交易量越來越大,很多服務器推送行情的壓力很大,最近打算重寫為批量模式,又重新看了下qos和prefetch設置的作用以確定優化的具體細節。
消費者在開啟acknowledge的情況下,對接收到的消息可以根據業務的需要異步對消息進行確認。
然而在實際使用過程中,由於消費者自身處理能力有限,從rabbitmq獲取一定數量的消息后,希望rabbitmq不再將隊列中的消息推送過來,當對消息處理完后(即對消息進行了ack,並且有能力處理更多的消息)再接收來自隊列的消息。在這種場景下,我們可以通過設置basic.qos信令中的prefetch_count來達到這種效果。
1. rabbitmq對basic.qos信令的處理
首先,basic.qos是針對channel進行設置的,也就是說只有在channel建立之后才能發送basic.qos信令。
在rabbitmq的實現中,每個channel都對應會有一個rabbit_limiter進程,當收到basic.qos信令后,在rabbit_limiter進程中記錄信令中prefetch_count的值,同時記錄的還有該channel未ack的消息個數。
注:其實basic.qos里還有另外兩個參數可進行設置,prefetch_size和global,但是RabbitMQ沒有實現prefetch_size,並在3.3.0版本中對global這個參數的含義進行了重新定義,即glotal=true時表示在當前channel上所有的consumer都生效,否則只對設置了之后新建的consumer生效
global | Meaning of prefetch_count in AMQP 0-9-1 | Meaning of prefetch_count in RabbitMQ |
---|---|---|
false | shared across all consumers on the channel | applied separately to each new consumer on the channel |
true | shared across all consumers on the connection | shared across all consumers on the channel |
一個 queue 中消息最大保存量可以在聲明
queue 的時候通過設置 x-max-length 參數為非負整數進行指定。Queue 長度的選取需要考量 就緒消息量、被忽略的未確認消息量,以及消息大小。當
queue 中的消息量達到了設定的上限時,為了給新消息騰出空間,將會從該 queue 用於保存消息的隊列的前端將“老”消息丟棄或者 dead-lettered 。
其實從這樣來看的話,如果MQ和應用服務器之間的網絡延時比較大並且應用服務器處理消息的邏輯比較慢,這時候優化一些prefetch count是有意義的具體可以設置1、5、10、30、50、100、1000看consumer的utilization,越接近100%越佳,反之延時很低其作用就沒這么大了。
一條消息從客戶端到MQ中一般經過如下環節:
Network
↓
Connection process - AMQP parsing, channel multiplexing
↓
Channel process - routing, security, coordination
↓
Queue process - in-memory messages, persistent queue indexing
↓
Message store - message persistence
每個環節都可能會成為瓶頸。
具體參見http://www.rabbitmq.com/consumer-prefetch.html。