首先簡要介紹一下prefetch機制。ActiveMQ通過prefetch機制來提高性能,這意味這 客戶端的內存里可能會緩存一定數量的消息。緩存消息的數量由prefetch limit來控 制。當某個consumer的prefetch buffer已經達到上限,那么broker不會再向consumer 分發消息,直到consumer向broker發送消息的確認。可以通過在 ActiveMQConnectionFactory或者ActiveMQConnection上設置 ActiveMQPrefetchPolicy對象來配置prefetch policy。也可以通過connection options或者destination options來配置。例如:
tcp://localhost:61616?jms.prefetchPolicy.all=50 tcp://localhost:61616?jms.prefetchPolicy.queuePrefetch=1 queue = new ActiveMQQueue("TEST.QUEUE?consumer.prefetchSize=10"); prefetch size 的缺省值如下: • persistent queues (default value: 1000) • non-persistent queues (default value: 1000) • persistent topics (default value: 100) • non-persistent topics (default value: Short.MAX_VALUE -1)
慢消費者會在非持久的 topics 和 queue上導致問題:一旦消息積壓起來,會導致 broker 把大量消息保存在內存中,broker 也會因此而變慢。ActiveMQ采用一定的緩存策略來進行控制,例如
<policyEntry topic="PRICES.>"> <!-- lets force old messages to be discarded for slow consumers --> <pendingMessageLimitStrategy> <constantPendingMessageLimitStrategy limit="50"/> // constantPendingMessageLimitStrategy 限制內存中只保留50條最新消息,其余將會被剔除或保存在temp store中 </pendingMessageLimitStrategy> </policyEntry> <prefetchRatePendingMessageLimitStrategy multiplier="2.5"/> // prefetchRatePendingMessageLimitStrategy是倍數策略,如果prefetchSize為100,則保留2.5 * 100條消息