ActiveMQ 的線程池


ActiveMQ 的線程池實質上也是 ThreadPoolExecutor,不過它的任務模型有自己的特點,我們先看一個例子:

public static void main(String[] args) throws InterruptedException {
    // TaskRunnerFactory 的作用是創建線程池
    TaskRunnerFactory factory = new TaskRunnerFactory();
    factory.init();
    // 創建 PooledTaskRunner
    TaskRunner taskRunner = factory.createTaskRunner(new Task() {
        // iterate 的返回值很重要,true表示繼續,false表示停止
        public boolean iterate() {
            System.out.println("hello zhang");
            try {
                Thread.sleep(2000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return true;
        }
    }, "zhang");
    
    // 調用線程池的 execute(runnable)
    taskRunner.wakeup();
    
    LockSupport.park();
}

Task 接口真正處理業務邏輯。factory.createTaskRunner 的作用只是創建一個命名的 PooledTaskRunner。

PooledTaskRunner 封裝了線程池 executor 和任務 runnable,只有在調用 PooledTaskRunner.wakeup() 時,才會調用 executor.execute(runnable),即真正執行任務。

 

以 Queue 類為例,它繼承了 Task 接口,並且有自己的 taskRunner:

// org.apache.activemq.broker.region.Queue
public void initialize() throws Exception {
    // 省略其他代碼
    // 創建queue的taskRunner
    this.taskRunner = taskFactory.createTaskRunner(this, "Queue:" + destination.getPhysicalName());
}

 queue 的 iterate:

//org.apache.activemq.broker.region.Queue
@Override
public boolean iterate() {
    MDC.put("activemq.destination", getName());
    boolean pageInMoreMessages = false;
    synchronized (iteratingMutex) {

        // If optimize dispatch is on or this is a slave this method could be called recursively
        // we set this state value to short-circuit wakeup in those cases to avoid that as it
        // could lead to errors.
        iterationRunning = true;

        // do early to allow dispatch of these waiting messages
        synchronized (messagesWaitingForSpace) {
            Iterator<Runnable> it = messagesWaitingForSpace.values().iterator();
            while (it.hasNext()) {
                if (!memoryUsage.isFull()) {
                    Runnable op = it.next();
                    it.remove();
                    op.run();
                } else {
                    registerCallbackForNotFullNotification();
                    break;
                }
            }
        }

        if (firstConsumer) {
            firstConsumer = false;
            try {
                if (consumersBeforeDispatchStarts > 0) {
                    int timeout = 1000; // wait one second by default if
                                        // consumer count isn't reached
                    if (timeBeforeDispatchStarts > 0) {
                        timeout = timeBeforeDispatchStarts;
                    }
                    if (consumersBeforeStartsLatch.await(timeout, TimeUnit.MILLISECONDS)) {
                        LOG.debug("{} consumers subscribed. Starting dispatch.", consumers.size());
                    } else {
                        LOG.debug("{} ms elapsed and {} consumers subscribed. Starting dispatch.", timeout, consumers.size());
                    }
                }
                if (timeBeforeDispatchStarts > 0 && consumersBeforeDispatchStarts <= 0) {
                    iteratingMutex.wait(timeBeforeDispatchStarts);
                    LOG.debug("{} ms elapsed. Starting dispatch.", timeBeforeDispatchStarts);
                }
            } catch (Exception e) {
                LOG.error(e.toString());
            }
        }

        messagesLock.readLock().lock();
        try{
            pageInMoreMessages |= !messages.isEmpty();
        } finally {
            messagesLock.readLock().unlock();
        }

        pagedInPendingDispatchLock.readLock().lock();
        try {
            pageInMoreMessages |= !pagedInPendingDispatch.isEmpty();
        } finally {
            pagedInPendingDispatchLock.readLock().unlock();
        }

        // Perhaps we should page always into the pagedInPendingDispatch
        // list if
        // !messages.isEmpty(), and then if
        // !pagedInPendingDispatch.isEmpty()
        // then we do a dispatch.
        boolean hasBrowsers = browserDispatches.size() > 0;

        if (pageInMoreMessages || hasBrowsers || !redeliveredWaitingDispatch.isEmpty()) {
            try {
          //分發消息
pageInMessages(hasBrowsers); }
catch (Throwable e) { LOG.error("Failed to page in more queue messages ", e); } } if (hasBrowsers) { ArrayList<MessageReference> alreadyDispatchedMessages = null; pagedInMessagesLock.readLock().lock(); try{ alreadyDispatchedMessages = new ArrayList<MessageReference>(pagedInMessages.values()); }finally { pagedInMessagesLock.readLock().unlock(); } Iterator<BrowserDispatch> browsers = browserDispatches.iterator(); while (browsers.hasNext()) { BrowserDispatch browserDispatch = browsers.next(); try { MessageEvaluationContext msgContext = new NonCachedMessageEvaluationContext(); msgContext.setDestination(destination); QueueBrowserSubscription browser = browserDispatch.getBrowser(); LOG.debug("dispatch to browser: {}, already dispatched/paged count: {}", browser, alreadyDispatchedMessages.size()); boolean added = false; for (MessageReference node : alreadyDispatchedMessages) { if (!((QueueMessageReference)node).isAcked() && !browser.isDuplicate(node.getMessageId()) && !browser.atMax()) { msgContext.setMessageReference(node); if (browser.matches(node, msgContext)) { browser.add(node); added = true; } } } // are we done browsing? no new messages paged if (!added || browser.atMax()) { browser.decrementQueueRef(); browserDispatches.remove(browserDispatch); } } catch (Exception e) { LOG.warn("exception on dispatch to browser: {}", browserDispatch.getBrowser(), e); } } } if (pendingWakeups.get() > 0) { pendingWakeups.decrementAndGet(); } MDC.remove("activemq.destination"); iterationRunning = false; return pendingWakeups.get() > 0; } }

 隊列分發消息:

protected void pageInMessages(boolean force) throws Exception {
    doDispatch(doPageInForDispatch(force, true));
}

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM