dubbo提供了四種線程池。其實我理解還是還是根據ThreadPoolExecutor這個JDK提供的線程池類,只不過適應性的改變了其中的參數。dubbo分別提供了1. 緩存線程池 2。固定大小線程池 3. 上屆線程池 4.定時線程池。下面具體的說一說這些線程池。
1. 公共行為
首先這些線程池類均繼承了ThreadPool接口。該接口中的定義了getExecutor
/**
* Thread pool
*
* @param url URL contains thread parameter
* @return thread pool
*/
@Adaptive({THREADPOOL_KEY})
Executor getExecutor(URL url); // 實際上還是對JDKExcutor的封裝
可以看到其返回值還是Executor。並且需要采用什么樣的線程池,可以從URL中進行設置。
2. CachedThreadPool
public class CachedThreadPool implements ThreadPool {
// 可以看到這里設置了alive這個參數
// 那么也就是說用這個存活時間來控制整個線程池的時間
@Override
public Executor getExecutor(URL url) {
String name = url.getParameter(THREAD_NAME_KEY, DEFAULT_THREAD_NAME);
int cores = url.getParameter(CORE_THREADS_KEY, DEFAULT_CORE_THREADS);
int threads = url.getParameter(THREADS_KEY, Integer.MAX_VALUE);
int queues = url.getParameter(QUEUES_KEY, DEFAULT_QUEUES);
int alive = url.getParameter(ALIVE_KEY, DEFAULT_ALIVE);
return new ThreadPoolExecutor(cores, threads, alive, TimeUnit.MILLISECONDS,
queues == 0 ? new SynchronousQueue<Runnable>() :
(queues < 0 ? new LinkedBlockingQueue<Runnable>()
: new LinkedBlockingQueue<Runnable>(queues)),
new NamedInternalThreadFactory(name, true), new AbortPolicyWithReport(name, url));
}
}
從源碼中可以看到,將隊列設置為同步隊列,只要沒有沒有達到threads的數量,就會一直增加線程。
3.FixedThreadPool
public class FixedThreadPool implements ThreadPool {
// 開啟固定大小的線程數
// 就是將核心池以及最大池大小都調整為一致。同時阻塞隊列設置為同步隊列
@Override
public Executor getExecutor(URL url) {
String name = url.getParameter(THREAD_NAME_KEY, DEFAULT_THREAD_NAME);
int threads = url.getParameter(THREADS_KEY, DEFAULT_THREADS);
int queues = url.getParameter(QUEUES_KEY, DEFAULT_QUEUES);
return new ThreadPoolExecutor(threads, threads, 0, TimeUnit.MILLISECONDS,
queues == 0 ? new SynchronousQueue<Runnable>() :
(queues < 0 ? new LinkedBlockingQueue<Runnable>()
: new LinkedBlockingQueue<Runnable>(queues)),
new NamedInternalThreadFactory(name, true), new AbortPolicyWithReport(name, url));
}
}
固定大小線程池,顧名思義就是線程池中的線程數量維持着固定大小。其原理就是將其中的隊列設置為同步隊列,同時將最大池和核心池的數量都設定為一致就行。
4. LimitedThreadPool
public class LimitedThreadPool implements ThreadPool {
// 如果queue == 0 則創建同步隊列
// queue < 0 就創建無界隊列
// queue > 0 就創建上屆隊列
// 可擴張線程池,就是把阻塞隊列改成同步隊列。這樣有任務的時候就會一直開辟新的線程
@Override
public Executor getExecutor(URL url) {
String name = url.getParameter(THREAD_NAME_KEY, DEFAULT_THREAD_NAME);
int cores = url.getParameter(CORE_THREADS_KEY, DEFAULT_CORE_THREADS);
int threads = url.getParameter(THREADS_KEY, DEFAULT_THREADS);
int queues = url.getParameter(QUEUES_KEY, DEFAULT_QUEUES);
return new ThreadPoolExecutor(cores, threads, Long.MAX_VALUE, TimeUnit.MILLISECONDS,
queues == 0 ? new SynchronousQueue<Runnable>() :
(queues < 0 ? new LinkedBlockingQueue<Runnable>()
: new LinkedBlockingQueue<Runnable>(queues)),
new NamedInternalThreadFactory(name, true), new AbortPolicyWithReport(name, url));
}
}
顧名思義,可擴張線程池就是通過上屆隊列存儲任務。
4.EagerThreadPoolExecutor
public class EagerThreadPoolExecutor extends ThreadPoolExecutor {
/**
* task count
*/
private final AtomicInteger submittedTaskCount = new AtomicInteger(0);
public EagerThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit, TaskQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler);
}
/**
* @return current tasks which are executed
*/
public int getSubmittedTaskCount() {
return submittedTaskCount.get();
}
@Override
protected void afterExecute(Runnable r, Throwable t) {
submittedTaskCount.decrementAndGet();
}
@Override
public void execute(Runnable command) {
if (command == null) {
throw new NullPointerException();
}
// do not increment in method beforeExecute!
submittedTaskCount.incrementAndGet();
try {
super.execute(command);
} catch (RejectedExecutionException rx) {
// retry to offer the task into queue.
final TaskQueue queue = (TaskQueue) super.getQueue();
try {
if (!queue.retryOffer(command, 0, TimeUnit.MILLISECONDS)) {
submittedTaskCount.decrementAndGet();
throw new RejectedExecutionException("Queue capacity is full.", rx);
}
} catch (InterruptedException x) {
submittedTaskCount.decrementAndGet();
throw new RejectedExecutionException(x);
}
} catch (Throwable t) {
// decrease any way
submittedTaskCount.decrementAndGet();
throw t;
}
}
}
這個池子的比較不同的是:還是通過ThreadPoolExecutor實現任務的管理,唯一不同的是當任務執行失敗的時候,會將任務存儲到自定義的taskqueue中。同時維持這一個當前池子的任務的計數。
線程池中的所有核心線程都在忙碌時,此時如果再添加新的任務不會放入阻塞隊列,而且創建新的線程,直到達到最大線程限制,此時如果還有任務,才會放入阻塞隊列。
