首先簡要介紹一下prefetch機制。ActiveMQ通過prefetch機制來提高性能,這意味這 客戶端的內存里可能會緩存一定數量的消息。緩存消息的數量由prefetch limit來控 制。當某個consumer的prefetch buffer已經達到上限,那么broker不會再向consumer 分發消息,直到consumer向broker發送消息的確認。可以通過在 ActiveMQConnectionFactory或者ActiveMQConnection上設置 ActiveMQPrefetchPolicy對象來配置prefetch policy。也可以通過connection options或者destination options來配置。例如:
tcp://localhost:61616?jms.prefetchPolicy.all=50
tcp://localhost:61616?jms.prefetchPolicy.queuePrefetch=1
queue = new ActiveMQQueue("TEST.QUEUE?consumer.prefetchSize=10");
prefetch size 的缺省值如下:
• persistent queues (default value: 1000)
• non-persistent queues (default value: 1000)
• persistent topics (default value: 100)
• non-persistent topics (default value: Short.MAX_VALUE -1)
慢消費者會在非持久的 topics 和 queue上導致問題:一旦消息積壓起來,會導致 broker 把大量消息保存在內存中,broker 也會因此而變慢。ActiveMQ采用一定的緩存策略來進行控制,例如
<policyEntry topic="PRICES.>">
<!-- lets force old messages to be discarded for slow consumers -->
<pendingMessageLimitStrategy>
<constantPendingMessageLimitStrategy limit="50"/> // constantPendingMessageLimitStrategy 限制內存中只保留50條最新消息,其余將會被剔除或保存在temp store中
</pendingMessageLimitStrategy>
</policyEntry>
<prefetchRatePendingMessageLimitStrategy multiplier="2.5"/> // prefetchRatePendingMessageLimitStrategy是倍數策略,如果prefetchSize為100,則保留2.5 * 100條消息
