ActiveMQ是一個消息中間件,對於消費者而言有兩種方式從消息中間件獲取消息:
①Push方式:由消息中間件主動地將消息推送給消費者;②Pull方式:由消費者主動向消息中間件拉取消息。看一段官網對Push方式的解釋:
To be able to achieve high performance it is important to stream messages to consumers as fast as possible
so that the consumer always has a buffer of messages, in RAM, ready to process
- rather than have them explicitly pull messages from the server which adds significant latency per message.
采用Push方式,可以盡可能快地將消息發送給消費者(stream messages to consumers as fast as possible)
而采用Pull方式,會增加消息的延遲,即消息到達消費者的時間有點長(adds significant latency per message)。
但是,Push方式會有一個壞處:如果消費者的處理消息的能力很弱(一條消息需要很長的時間處理),而消息中間件不斷地向消費者Push消息,消費者的緩沖區可能會溢出。
那ActiveMQ是怎么解決這個問題的呢?那就是 prefetch limit
prefetch limit 規定了一次可以向消費者Push(推送)多少條消息。
Once the prefetch limit is reached, no more messages are dispatched to the consumer
until the consumer starts sending back acknowledgements of messages (to indicate that the message has been processed)
當推送消息的數量到達了perfetch limit規定的數值時,消費者還沒有向消息中間件返回ACK,消息中間件將不再繼續向消費者推送消息。
那prefetch limit的值設置為多少合適?視具體的應用場景而定。
If you have very few messages and each message takes a very long time to process you might want to set the prefetch value to 1 so that a consumer is given one message at a time.
如果消息的數量很少(生產者生產消息的速率不快),但是每條消息 消費者需要很長的時間處理,那么prefetch limit設置為1比較合適。這樣,消費者每次只會收到一條消息,當它處理完這條消息之后,向消息中間件發送ACK,此時消息中間件再向消費者推送下一條消息。
prefetch limit 設置成0意味着什么?
Specifying a prefetch limit of zero means the consumer will poll for more messages, one at a time,
instead of the message being pushed to the consumer.
意味着此時,消費者去輪詢消息中間件獲取消息。不再是Push方式了,而是Pull方式了。即消費者主動去消息中間件拉取消息。
perfetch limit是“消息預取”的值,這是針對消息中間件如何向消費者發消息 而設置的。與之相關的還有針對 消費者以何種方式向消息中間件返回確認ACK(響應):比如消費者是每次消費一條消息之后就向消息中間件確認呢?還是采用“延遲確認”---即采用批量確認的方式(消費了若干條消息之后,統一再發ACK)。這就是 Optimized Acknowledge
ActiveMQ can acknowledge receipt of messages back to the broker in batches (to improve performance).
引用 一段話:“如果prefetchACK為true,那么prefetch必須大於0;當prefetchACK為false時,你可以指定prefetch為0以及任意大小的正數。
不過,當prefetch=0是,表示consumer將使用PULL(拉取)的方式從broker端獲取消息,broker端將不會主動push消息給client端,直到client端發送PullCommand時;
當prefetch>0時,就開啟了broker push模式,此后只要當client端消費且ACK了一定的消息之后,會立即push給client端多條消息。”
那么,在程序中如何采用Push方式或者Pull方式呢?
從是否阻塞來看,消費者有兩種方式獲取消息。同步方式和異步方式。
同步方式使用的是ActiveMQMessageConsumer的receive()方法。而異步方式則是采用消費者實現MessageListener接口,監聽消息。
使用同步方式receive()方法獲取消息時,prefetch limit即可以設置為0,也可以設置為大於0
prefetch limit為零 意味着:“receive()方法將會首先發送一個PULL指令並阻塞,直到broker端返回消息為止,這也意味着消息只能逐個獲取(類似於Request<->Response)”
prefetch limit 大於零 意味着:“broker端將會批量push給client 一定數量的消息(<= prefetch),client端會把這些消息(unconsumedMessage)放入到本地的隊列中,只要此隊列有消息,那么receive方法將會立即返回,當一定量的消息ACK之后,broker端會繼續批量push消息給client端。”
當使用MessageListener異步獲取消息時,prefetch limit必須大於零了。因為,prefetch limit 等於零 意味着消息中間件不會主動給消費者Push消息,而此時消費者又用MessageListener被動獲取消息(不會主動去輪詢消息)。這二者是矛盾的。
此外,還有一個要注意的地方,即消費者采用同步獲取消息(receive方法) 與 異步獲取消息的方法(MessageListener) ,對消息的確認時機是不同的。
具體可參考:這篇文章。
參考資料: ActiveMQ消息傳送機制以及ACK機制詳解