http://activemq.apache.org/destination-options.html
1. consumer 的配置參數如下圖:
配置consumer的示例:
public void run() { try { // Create a ConnectionFactory ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://localhost:61616"); // Create a Connection Connection connection = connectionFactory.createConnection(); connection.start(); connection.setExceptionListener(this); // Create a Session ActiveMQSession session =
(ActiveMQSession) connection.createSession(false, Session.AUTO_ACKNOWLEDGE); // Create the destination (Topic or Queue) // 此時加入參數 ActiveMQQueue destination =
(ActiveMQQueue) session.createQueue("TEST.FOO?consumer.prefetchSize=10"); // Create a MessageConsumer from the Session to the Topic or Queue ActiveMQMessageConsumer consumer =
(ActiveMQMessageConsumer) session.createConsumer(destination); // 打印出prefetchSize參數值 System.out.println("prefetchSize=" + consumer.getPrefetchNumber()); // Wait for a message Message message = consumer.receive(); if (message instanceof TextMessage) { TextMessage textMessage = (TextMessage) message; String text = textMessage.getText(); System.out.println("Received: " + text); } else { System.out.println("Received: " + message); } consumer.close(); session.close(); connection.close(); } catch (Exception e) { System.out.println("Caught: " + e); e.printStackTrace(); } }
在創建Queue的時候,配置以url形式跟在隊列名后面:session.createQueue("TEST.FOO?consumer.prefetchSize=10")
consumer的prefetchSize參數默認為1000。
consumer 有推和拉2種方式獲取消息:當 prefetchSize = 0 時,pull;當 prefetchSize > 0 時,push。
2. broker分發消息的邏輯在org.apache.activemq.broker.region.Queue.doActualDispatch方法中:
private PendingList doActualDispatch(PendingList list) throws Exception { List<Subscription> consumers; consumersLock.writeLock().lock(); try { if (this.consumers.isEmpty()) { // slave dispatch happens in processDispatchNotification return list; } consumers = new ArrayList<Subscription>(this.consumers); } finally { consumersLock.writeLock().unlock(); } Set<Subscription> fullConsumers = new HashSet<Subscription>(this.consumers.size()); for (Iterator<MessageReference> iterator = list.iterator(); iterator.hasNext();) { MessageReference node = iterator.next(); Subscription target = null; for (Subscription s : consumers) { if (s instanceof QueueBrowserSubscription) { continue; } if (!fullConsumers.contains(s)) { if (!s.isFull()) { if (dispatchSelector.canSelect(s, node) && assignMessageGroup(s, (QueueMessageReference)node) && !((QueueMessageReference) node).isAcked() ) { // Dispatch it. s.add(node); LOG.trace("assigned {} to consumer {}", node.getMessageId(), s.getConsumerInfo().getConsumerId()); iterator.remove(); target = s; break; } } else { // no further dispatch of list to a full consumer to // avoid out of order message receipt fullConsumers.add(s); LOG.trace("Subscription full {}", s); } } } if (target == null && node.isDropped()) { iterator.remove(); } // return if there are no consumers or all consumers are full if (target == null && consumers.size() == fullConsumers.size()) { return list; } // If it got dispatched, rotate the consumer list to get round robin // distribution. if (target != null && !strictOrderDispatch && consumers.size() > 1 && !dispatchSelector.isExclusiveConsumer(target)) { consumersLock.writeLock().lock(); try { if (removeFromConsumerList(target)) { addToConsumerList(target); consumers = new ArrayList<Subscription>(this.consumers); } } finally { consumersLock.writeLock().unlock(); } } } return list; }
2層for循環,外面是消息,里面是consumer,只要consumer沒有飽和,broker一直會給consumer分發消息。
對於一個consumer而言,未確認的消息數大於等於prefetchSize,則認為該consumer是飽的
// PrefetchSubscription public boolean isFull() { // 未確認的消息數 = 已發送給該consumer的消息數 - 收到確認的消息數 return dispatched.size() - prefetchExtension.get() >= info.getPrefetchSize(); }
因為 consumer 的 prefetchSize 參數默認為1000,所以 activeMQ 默認是推。而且是一條一條地推。
3. consumer獲取消息有同步和異步兩種方式:consumer.receive() 或 consumer.setMessageListener(listener)
對於 receive 方式,如果prefetchSize = 0 並且本地沒有緩存消息,則發送一個pull 命令給broker;
否則,則從本地緩存中取消息。
// ActiveMQMessageConsumer public Message receive() throws JMSException { checkClosed(); checkMessageListener(); sendPullCommand(0); MessageDispatch md = dequeue(-1); if (md == null) { return null; } beforeMessageIsConsumed(md); afterMessageIsConsumed(md, false); return createActiveMQMessage(md); } protected void sendPullCommand(long timeout) throws JMSException { clearDeliveredList(); if (info.getCurrentPrefetchSize() == 0 && unconsumedMessages.isEmpty()) { MessagePull messagePull = new MessagePull(); messagePull.configure(info); messagePull.setTimeout(timeout); session.asyncSendPacket(messagePull); } }
consumer 本地消息緩存在
// These are the messages waiting to be delivered to the client protected final MessageDispatchChannel unconsumedMessages;
消息進入緩存有2條路線,調用棧分別如下:
(1)
(2)
consumer.setMessageListener 異步獲取消息的調用棧如下: