在dubbo調用過程中被調用方有兩個線程池:io線程池,業務線程池。
這也是dubbo調優的點。
配置信息:
<dubbo:protocol name="dubbo" dispatcher="all" threadpool="fixed" threads="100" />
Dispatcher
- all 所有消息都派發到線程池,包括請求,響應,連接事件,斷開事件,心跳等。
- direct 所有消息都不派發到線程池,全部在 IO 線程上直接執行。
- message 只有請求響應消息派發到線程池,其它連接斷開事件,心跳等消息,直接在 IO 線程上執行。
- execution 只請求消息派發到線程池,不含響應,響應和其它連接斷開事件,心跳等消息,直接在 IO 線程上執行。
- connection 在 IO 線程上,將連接斷開事件放入隊列,有序逐個執行,其它消息派發到線程池。
ThreadPool
- fixed 固定大小線程池,啟動時建立線程,不關閉,一直持有。(缺省)
public class FixedThreadPool implements ThreadPool { public Executor getExecutor(URL url) { String name = url.getParameter(Constants.THREAD_NAME_KEY, Constants.DEFAULT_THREAD_NAME); int threads = url.getParameter(Constants.THREADS_KEY, Constants.DEFAULT_THREADS); int queues = url.getParameter(Constants.QUEUES_KEY, Constants.DEFAULT_QUEUES); return new ThreadPoolExecutor(threads, threads, 0, TimeUnit.MILLISECONDS, queues == 0 ? new SynchronousQueue<Runnable>() : (queues < 0 ? new LinkedBlockingQueue<Runnable>() : new LinkedBlockingQueue<Runnable>(queues)), new NamedThreadFactory(name, true), new AbortPolicyWithReport(name, url)); } }
- cached 緩存線程池,空閑一分鍾自動刪除,需要時重建。
public class CachedThreadPool implements ThreadPool { public Executor getExecutor(URL url) { String name = url.getParameter(Constants.THREAD_NAME_KEY, Constants.DEFAULT_THREAD_NAME); int cores = url.getParameter(Constants.CORE_THREADS_KEY, Constants.DEFAULT_CORE_THREADS); int threads = url.getParameter(Constants.THREADS_KEY, Integer.MAX_VALUE); int queues = url.getParameter(Constants.QUEUES_KEY, Constants.DEFAULT_QUEUES); int alive = url.getParameter(Constants.ALIVE_KEY, Constants.DEFAULT_ALIVE); return new ThreadPoolExecutor(cores, threads, alive, TimeUnit.MILLISECONDS, queues == 0 ? new SynchronousQueue<Runnable>() : (queues < 0 ? new LinkedBlockingQueue<Runnable>() : new LinkedBlockingQueue<Runnable>(queues)), new NamedThreadFactory(name, true), new AbortPolicyWithReport(name, url)); } }
- limited 可伸縮線程池,但池中的線程數只會增長不會收縮。只增長不收縮的目的是為了避免收縮時突然來了大流量引起的性能問題。
public class LimitedThreadPool implements ThreadPool { public Executor getExecutor(URL url) { String name = url.getParameter(Constants.THREAD_NAME_KEY, Constants.DEFAULT_THREAD_NAME); int cores = url.getParameter(Constants.CORE_THREADS_KEY, Constants.DEFAULT_CORE_THREADS); int threads = url.getParameter(Constants.THREADS_KEY, Constants.DEFAULT_THREADS); int queues = url.getParameter(Constants.QUEUES_KEY, Constants.DEFAULT_QUEUES); return new ThreadPoolExecutor(cores, threads, Long.MAX_VALUE, TimeUnit.MILLISECONDS, queues == 0 ? new SynchronousQueue<Runnable>() : (queues < 0 ? new LinkedBlockingQueue<Runnable>() : new LinkedBlockingQueue<Runnable>(queues)), new NamedThreadFactory(name, true), new AbortPolicyWithReport(name, url)); } }