dubbo提供了四種線程池。其實我理解還是還是根據ThreadPoolExecutor這個JDK提供的線程池類,只不過適應性的改變了其中的參數。dubbo分別提供了1. 緩存線程池 2。固定大小線程池 3. 上屆線程池 4.定時線程池。下面具體的說一說這些線程池。
1. 公共行為
首先這些線程池類均繼承了ThreadPool接口。該接口中的定義了getExecutor
/** * Thread pool * * @param url URL contains thread parameter * @return thread pool */ @Adaptive({THREADPOOL_KEY}) Executor getExecutor(URL url); // 實際上還是對JDKExcutor的封裝
可以看到其返回值還是Executor。並且需要采用什么樣的線程池,可以從URL中進行設置。
2. CachedThreadPool
public class CachedThreadPool implements ThreadPool { // 可以看到這里設置了alive這個參數 // 那么也就是說用這個存活時間來控制整個線程池的時間 @Override public Executor getExecutor(URL url) { String name = url.getParameter(THREAD_NAME_KEY, DEFAULT_THREAD_NAME); int cores = url.getParameter(CORE_THREADS_KEY, DEFAULT_CORE_THREADS); int threads = url.getParameter(THREADS_KEY, Integer.MAX_VALUE); int queues = url.getParameter(QUEUES_KEY, DEFAULT_QUEUES); int alive = url.getParameter(ALIVE_KEY, DEFAULT_ALIVE); return new ThreadPoolExecutor(cores, threads, alive, TimeUnit.MILLISECONDS, queues == 0 ? new SynchronousQueue<Runnable>() : (queues < 0 ? new LinkedBlockingQueue<Runnable>() : new LinkedBlockingQueue<Runnable>(queues)), new NamedInternalThreadFactory(name, true), new AbortPolicyWithReport(name, url)); } }
從源碼中可以看到,將隊列設置為同步隊列,只要沒有沒有達到threads的數量,就會一直增加線程。
3.FixedThreadPool
public class FixedThreadPool implements ThreadPool { // 開啟固定大小的線程數 // 就是將核心池以及最大池大小都調整為一致。同時阻塞隊列設置為同步隊列 @Override public Executor getExecutor(URL url) { String name = url.getParameter(THREAD_NAME_KEY, DEFAULT_THREAD_NAME); int threads = url.getParameter(THREADS_KEY, DEFAULT_THREADS); int queues = url.getParameter(QUEUES_KEY, DEFAULT_QUEUES); return new ThreadPoolExecutor(threads, threads, 0, TimeUnit.MILLISECONDS, queues == 0 ? new SynchronousQueue<Runnable>() : (queues < 0 ? new LinkedBlockingQueue<Runnable>() : new LinkedBlockingQueue<Runnable>(queues)), new NamedInternalThreadFactory(name, true), new AbortPolicyWithReport(name, url)); } }
固定大小線程池,顧名思義就是線程池中的線程數量維持着固定大小。其原理就是將其中的隊列設置為同步隊列,同時將最大池和核心池的數量都設定為一致就行。
4. LimitedThreadPool
public class LimitedThreadPool implements ThreadPool { // 如果queue == 0 則創建同步隊列 // queue < 0 就創建無界隊列 // queue > 0 就創建上屆隊列 // 可擴張線程池,就是把阻塞隊列改成同步隊列。這樣有任務的時候就會一直開辟新的線程 @Override public Executor getExecutor(URL url) { String name = url.getParameter(THREAD_NAME_KEY, DEFAULT_THREAD_NAME); int cores = url.getParameter(CORE_THREADS_KEY, DEFAULT_CORE_THREADS); int threads = url.getParameter(THREADS_KEY, DEFAULT_THREADS); int queues = url.getParameter(QUEUES_KEY, DEFAULT_QUEUES); return new ThreadPoolExecutor(cores, threads, Long.MAX_VALUE, TimeUnit.MILLISECONDS, queues == 0 ? new SynchronousQueue<Runnable>() : (queues < 0 ? new LinkedBlockingQueue<Runnable>() : new LinkedBlockingQueue<Runnable>(queues)), new NamedInternalThreadFactory(name, true), new AbortPolicyWithReport(name, url)); } }
顧名思義,可擴張線程池就是通過上屆隊列存儲任務。
4.EagerThreadPoolExecutor
public class EagerThreadPoolExecutor extends ThreadPoolExecutor { /** * task count */ private final AtomicInteger submittedTaskCount = new AtomicInteger(0); public EagerThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, TaskQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) { super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler); } /** * @return current tasks which are executed */ public int getSubmittedTaskCount() { return submittedTaskCount.get(); } @Override protected void afterExecute(Runnable r, Throwable t) { submittedTaskCount.decrementAndGet(); } @Override public void execute(Runnable command) { if (command == null) { throw new NullPointerException(); } // do not increment in method beforeExecute! submittedTaskCount.incrementAndGet(); try { super.execute(command); } catch (RejectedExecutionException rx) { // retry to offer the task into queue. final TaskQueue queue = (TaskQueue) super.getQueue(); try { if (!queue.retryOffer(command, 0, TimeUnit.MILLISECONDS)) { submittedTaskCount.decrementAndGet(); throw new RejectedExecutionException("Queue capacity is full.", rx); } } catch (InterruptedException x) { submittedTaskCount.decrementAndGet(); throw new RejectedExecutionException(x); } } catch (Throwable t) { // decrease any way submittedTaskCount.decrementAndGet(); throw t; } } }
這個池子的比較不同的是:還是通過ThreadPoolExecutor實現任務的管理,唯一不同的是當任務執行失敗的時候,會將任務存儲到自定義的taskqueue中。同時維持這一個當前池子的任務的計數。
線程池中的所有核心線程都在忙碌時,此時如果再添加新的任務不會放入阻塞隊列,而且創建新的線程,直到達到最大線程限制,此時如果還有任務,才會放入阻塞隊列。