dubbo中的線程池


dubbo提供了四種線程池。其實我理解還是還是根據ThreadPoolExecutor這個JDK提供的線程池類,只不過適應性的改變了其中的參數。dubbo分別提供了1. 緩存線程池 2。固定大小線程池 3. 上屆線程池 4.定時線程池。下面具體的說一說這些線程池。

1. 公共行為

首先這些線程池類均繼承了ThreadPool接口。該接口中的定義了getExecutor

/**
 * Thread pool
 *
 * @param url URL contains thread parameter
 * @return thread pool
 */
@Adaptive({THREADPOOL_KEY})
Executor getExecutor(URL url); // 實際上還是對JDKExcutor的封裝

  可以看到其返回值還是Executor。並且需要采用什么樣的線程池,可以從URL中進行設置。

2. CachedThreadPool

public class CachedThreadPool implements ThreadPool {

    // 可以看到這里設置了alive這個參數
    // 那么也就是說用這個存活時間來控制整個線程池的時間
    @Override
    public Executor getExecutor(URL url) {
        String name = url.getParameter(THREAD_NAME_KEY, DEFAULT_THREAD_NAME);
        int cores = url.getParameter(CORE_THREADS_KEY, DEFAULT_CORE_THREADS);
        int threads = url.getParameter(THREADS_KEY, Integer.MAX_VALUE);
        int queues = url.getParameter(QUEUES_KEY, DEFAULT_QUEUES);
        int alive = url.getParameter(ALIVE_KEY, DEFAULT_ALIVE);
        return new ThreadPoolExecutor(cores, threads, alive, TimeUnit.MILLISECONDS,
                queues == 0 ? new SynchronousQueue<Runnable>() :
                        (queues < 0 ? new LinkedBlockingQueue<Runnable>()
                                : new LinkedBlockingQueue<Runnable>(queues)),
                new NamedInternalThreadFactory(name, true), new AbortPolicyWithReport(name, url));
    }
}

 從源碼中可以看到,將隊列設置為同步隊列,只要沒有沒有達到threads的數量,就會一直增加線程。

3.FixedThreadPool

public class FixedThreadPool implements ThreadPool {

    // 開啟固定大小的線程數
    // 就是將核心池以及最大池大小都調整為一致。同時阻塞隊列設置為同步隊列
    @Override
    public Executor getExecutor(URL url) {
        String name = url.getParameter(THREAD_NAME_KEY, DEFAULT_THREAD_NAME);
        int threads = url.getParameter(THREADS_KEY, DEFAULT_THREADS);
        int queues = url.getParameter(QUEUES_KEY, DEFAULT_QUEUES);
        return new ThreadPoolExecutor(threads, threads, 0, TimeUnit.MILLISECONDS,
                queues == 0 ? new SynchronousQueue<Runnable>() :
                        (queues < 0 ? new LinkedBlockingQueue<Runnable>()
                                : new LinkedBlockingQueue<Runnable>(queues)),
                new NamedInternalThreadFactory(name, true), new AbortPolicyWithReport(name, url));
    }

}

  固定大小線程池,顧名思義就是線程池中的線程數量維持着固定大小。其原理就是將其中的隊列設置為同步隊列,同時將最大池和核心池的數量都設定為一致就行。

4. LimitedThreadPool

public class LimitedThreadPool implements ThreadPool {

    // 如果queue == 0 則創建同步隊列
    // queue < 0 就創建無界隊列
    // queue > 0 就創建上屆隊列

    // 可擴張線程池,就是把阻塞隊列改成同步隊列。這樣有任務的時候就會一直開辟新的線程
    @Override
    public Executor getExecutor(URL url) {
        String name = url.getParameter(THREAD_NAME_KEY, DEFAULT_THREAD_NAME);
        int cores = url.getParameter(CORE_THREADS_KEY, DEFAULT_CORE_THREADS);
        int threads = url.getParameter(THREADS_KEY, DEFAULT_THREADS);
        int queues = url.getParameter(QUEUES_KEY, DEFAULT_QUEUES);
        return new ThreadPoolExecutor(cores, threads, Long.MAX_VALUE, TimeUnit.MILLISECONDS,
                queues == 0 ? new SynchronousQueue<Runnable>() :
                        (queues < 0 ? new LinkedBlockingQueue<Runnable>()
                                : new LinkedBlockingQueue<Runnable>(queues)),
                new NamedInternalThreadFactory(name, true), new AbortPolicyWithReport(name, url));
    }
}  


顧名思義,可擴張線程池就是通過上屆隊列存儲任務。

4.EagerThreadPoolExecutor
public class EagerThreadPoolExecutor extends ThreadPoolExecutor {

    /**
     * task count
     */
    private final AtomicInteger submittedTaskCount = new AtomicInteger(0);

    public EagerThreadPoolExecutor(int corePoolSize,
                                   int maximumPoolSize,
                                   long keepAliveTime,
                                   TimeUnit unit, TaskQueue<Runnable> workQueue,
                                   ThreadFactory threadFactory,
                                   RejectedExecutionHandler handler) {
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler);
    }

    /**
     * @return current tasks which are executed
     */
    public int getSubmittedTaskCount() {
        return submittedTaskCount.get();
    }

    @Override
    protected void afterExecute(Runnable r, Throwable t) {
        submittedTaskCount.decrementAndGet();
    }

    @Override
    public void execute(Runnable command) {
        if (command == null) {
            throw new NullPointerException();
        }
        // do not increment in method beforeExecute!
        submittedTaskCount.incrementAndGet();
        try {
            super.execute(command);
        } catch (RejectedExecutionException rx) {
            // retry to offer the task into queue.
            final TaskQueue queue = (TaskQueue) super.getQueue();
            try {
                if (!queue.retryOffer(command, 0, TimeUnit.MILLISECONDS)) {
                    submittedTaskCount.decrementAndGet();
                    throw new RejectedExecutionException("Queue capacity is full.", rx);
                }
            } catch (InterruptedException x) {
                submittedTaskCount.decrementAndGet();
                throw new RejectedExecutionException(x);
            }
        } catch (Throwable t) {
            // decrease any way
            submittedTaskCount.decrementAndGet();
            throw t;
        }
    }
}

  

這個池子的比較不同的是:還是通過ThreadPoolExecutor實現任務的管理,唯一不同的是當任務執行失敗的時候,會將任務存儲到自定義的taskqueue中。同時維持這一個當前池子的任務的計數。
線程池中的所有核心線程都在忙碌時,此時如果再添加新的任務不會放入阻塞隊列,而且創建新的線程,直到達到最大線程限制,此時如果還有任務,才會放入阻塞隊列。
 
       


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM