ActiveMQ 的線程池實質上也是 ThreadPoolExecutor,不過它的任務模型有自己的特點,我們先看一個例子:
public static void main(String[] args) throws InterruptedException { // TaskRunnerFactory 的作用是創建線程池 TaskRunnerFactory factory = new TaskRunnerFactory(); factory.init(); // 創建 PooledTaskRunner TaskRunner taskRunner = factory.createTaskRunner(new Task() { // iterate 的返回值很重要,true表示繼續,false表示停止 public boolean iterate() { System.out.println("hello zhang"); try { Thread.sleep(2000); } catch (InterruptedException e) { e.printStackTrace(); } return true; } }, "zhang"); // 調用線程池的 execute(runnable) taskRunner.wakeup(); LockSupport.park(); }
Task 接口真正處理業務邏輯。factory.createTaskRunner 的作用只是創建一個命名的 PooledTaskRunner。
PooledTaskRunner 封裝了線程池 executor 和任務 runnable,只有在調用 PooledTaskRunner.wakeup() 時,才會調用 executor.execute(runnable),即真正執行任務。
以 Queue 類為例,它繼承了 Task 接口,並且有自己的 taskRunner:
// org.apache.activemq.broker.region.Queue public void initialize() throws Exception { // 省略其他代碼 // 創建queue的taskRunner this.taskRunner = taskFactory.createTaskRunner(this, "Queue:" + destination.getPhysicalName()); }
queue 的 iterate:
//org.apache.activemq.broker.region.Queue @Override public boolean iterate() { MDC.put("activemq.destination", getName()); boolean pageInMoreMessages = false; synchronized (iteratingMutex) { // If optimize dispatch is on or this is a slave this method could be called recursively // we set this state value to short-circuit wakeup in those cases to avoid that as it // could lead to errors. iterationRunning = true; // do early to allow dispatch of these waiting messages synchronized (messagesWaitingForSpace) { Iterator<Runnable> it = messagesWaitingForSpace.values().iterator(); while (it.hasNext()) { if (!memoryUsage.isFull()) { Runnable op = it.next(); it.remove(); op.run(); } else { registerCallbackForNotFullNotification(); break; } } } if (firstConsumer) { firstConsumer = false; try { if (consumersBeforeDispatchStarts > 0) { int timeout = 1000; // wait one second by default if // consumer count isn't reached if (timeBeforeDispatchStarts > 0) { timeout = timeBeforeDispatchStarts; } if (consumersBeforeStartsLatch.await(timeout, TimeUnit.MILLISECONDS)) { LOG.debug("{} consumers subscribed. Starting dispatch.", consumers.size()); } else { LOG.debug("{} ms elapsed and {} consumers subscribed. Starting dispatch.", timeout, consumers.size()); } } if (timeBeforeDispatchStarts > 0 && consumersBeforeDispatchStarts <= 0) { iteratingMutex.wait(timeBeforeDispatchStarts); LOG.debug("{} ms elapsed. Starting dispatch.", timeBeforeDispatchStarts); } } catch (Exception e) { LOG.error(e.toString()); } } messagesLock.readLock().lock(); try{ pageInMoreMessages |= !messages.isEmpty(); } finally { messagesLock.readLock().unlock(); } pagedInPendingDispatchLock.readLock().lock(); try { pageInMoreMessages |= !pagedInPendingDispatch.isEmpty(); } finally { pagedInPendingDispatchLock.readLock().unlock(); } // Perhaps we should page always into the pagedInPendingDispatch // list if // !messages.isEmpty(), and then if // !pagedInPendingDispatch.isEmpty() // then we do a dispatch. boolean hasBrowsers = browserDispatches.size() > 0; if (pageInMoreMessages || hasBrowsers || !redeliveredWaitingDispatch.isEmpty()) { try { //分發消息
pageInMessages(hasBrowsers); } catch (Throwable e) { LOG.error("Failed to page in more queue messages ", e); } } if (hasBrowsers) { ArrayList<MessageReference> alreadyDispatchedMessages = null; pagedInMessagesLock.readLock().lock(); try{ alreadyDispatchedMessages = new ArrayList<MessageReference>(pagedInMessages.values()); }finally { pagedInMessagesLock.readLock().unlock(); } Iterator<BrowserDispatch> browsers = browserDispatches.iterator(); while (browsers.hasNext()) { BrowserDispatch browserDispatch = browsers.next(); try { MessageEvaluationContext msgContext = new NonCachedMessageEvaluationContext(); msgContext.setDestination(destination); QueueBrowserSubscription browser = browserDispatch.getBrowser(); LOG.debug("dispatch to browser: {}, already dispatched/paged count: {}", browser, alreadyDispatchedMessages.size()); boolean added = false; for (MessageReference node : alreadyDispatchedMessages) { if (!((QueueMessageReference)node).isAcked() && !browser.isDuplicate(node.getMessageId()) && !browser.atMax()) { msgContext.setMessageReference(node); if (browser.matches(node, msgContext)) { browser.add(node); added = true; } } } // are we done browsing? no new messages paged if (!added || browser.atMax()) { browser.decrementQueueRef(); browserDispatches.remove(browserDispatch); } } catch (Exception e) { LOG.warn("exception on dispatch to browser: {}", browserDispatch.getBrowser(), e); } } } if (pendingWakeups.get() > 0) { pendingWakeups.decrementAndGet(); } MDC.remove("activemq.destination"); iterationRunning = false; return pendingWakeups.get() > 0; } }
隊列分發消息:
protected void pageInMessages(boolean force) throws Exception { doDispatch(doPageInForDispatch(force, true)); }