理解線程池到走進dubbo源碼


引言

合理利用線程池能夠帶來三個好處。

​ 第一:降低資源消耗。通過重復利用已創建的線程降低線程創建和銷毀造成的消耗。

​ 第二:提高響應速度。當任務到達時,任務可以不需要等到線程創建就能立即執行。

​ 第三:提高線程的可管理性。線程是稀缺資源,如果無限制的創建,不僅會消耗系統資源,還會降低系統的穩定性,使用線程池可以進行統一的分配,調優和監控。但是要做到合理的利用線程池,必須對其原理了如執掌。

 


 

 

 

線程池的使用

線程池的創建

​ 我們可以通過ThreadPoolExecutor來創建一個線程池

1 new  ThreadPoolExecutor(corePoolSize, maximumPoolSize, keepAliveTime, milliseconds,runnableTaskQueue, handler);

 

創建一個線程池需要輸入幾個參數:

  • corePoolSize(線程池的基本大小):當提交一個任務到線程池時,線程池會創建一個線程來執行任務,即使其他空閑的基本線程能夠執行新任務也會創建線程,等到需要執行的任務數大於線程池基本大小時就不再創建。如果調用了線程池的prestartAllCoreThreads方法,線程池會提前創建並啟動所有基本線程。
  • runnableTaskQueue(任務隊列):用於保存等待執行的任務的阻塞隊列。 可以選擇以下幾個阻塞隊列。
  1. ArrayBlockingQueue:是一個基於數組結構的有界阻塞隊列,此隊列按 FIFO(先進先出)原則對元素進行排序。
  2. LinkedBlockingQueue:一個基於鏈表結構的阻塞隊列,此隊列按FIFO (先進先出) 排序元素,吞吐量通常要高於ArrayBlockingQueue。靜態工廠方法Executors.newFixedThreadPool()使用了這個隊列。
  3. SynchronousQueue:一個不存儲元素的阻塞隊列。每個插入操作必須等到另一個線程調用移除操作,否則插入操作一直處於阻塞狀態,吞吐量通常要高於LinkedBlockingQueue,靜態工廠方法Executors.newCachedThreadPool使用了這個隊列。
  4. PriorityBlockingQueue:一個具有優先級的無限阻塞隊列。
  • maximumPoolSize(線程池最大大小):線程池允許創建的最大線程數。如果隊列滿了,並且已創建的線程數小於最大線程數,則線程池會再創建新的線程執行任務。值得注意的是如果使用了無界的任務隊列這個參數就沒什么效果。
  • ThreadFactory:用於設置創建線程的工廠,可以通過線程工廠給每個創建出來的線程設置更有意義的名字。
  • RejectedExecutionHandler(飽和策略):當隊列和線程池都滿了,說明線程池處於飽和狀態,那么必須采取一種策略處理提交的新任務。這個策略默認情況下是AbortPolicy,表示無法處理新任務時拋出異常。以下是JDK1.5提供的四種策略。
  1. AbortPolicy:直接拋出異常。
  2. CallerRunsPolicy:只用調用者所在線程來運行任務。
  3. DiscardOldestPolicy:丟棄隊列里最近的一個任務,並執行當前任務。
  4. DiscardPolicy:不處理,丟棄掉。
  5. 當然也可以根據應用場景需要來實現RejectedExecutionHandler接口自定義策略。如記錄日志或持久化不能處理的任務。
  • keepAliveTime(線程活動保持時間):線程池的工作線程空閑后,保持存活的時間。所以如果任務很多,並且每個任務執行的時間比較短,可以調大這個時間,提高線程的利用率。
  • TimeUnit(線程活動保持時間的單位):可選的單位有天(DAYS),小時(HOURS),分鍾(MINUTES),毫秒(MILLISECONDS),微秒(MICROSECONDS, 千分之一毫秒)和毫微秒(NANOSECONDS, 千分之一微秒)。

 

向線程池提交任務

​ 我們可以使用execute提交的任務,但是execute方法沒有返回值,所以無法判斷任務是否被線程池執行成功。通過以下代碼可知execute方法輸入的任務是一個Runnable類的實例。

1 threadsPool.execute(new Runnable() {
2             @Override
3             public void run() {
4                 // TODO Auto-generated method stub
5             }
6         });

我們也可以使用submit 方法來提交任務,它會返回一個future,那么我們可以通過這個future來判斷任務是否執行成功,通過future的get方法來獲取返回值,get方法會阻塞住直到任務完成,而使用get(long timeout, TimeUnit unit)方法則會阻塞一段時間后立即返回,這時有可能任務沒有執行完。

 1 Future<Object> future = executor.submit(harReturnValuetask);
 2 try {
 3      Object s = future.get();
 4 } catch (InterruptedException e) {
 5     // 處理中斷異常
 6 } catch (ExecutionException e) {
 7     // 處理無法執行任務異常
 8 } finally {
 9     // 關閉線程池
10     executor.shutdown();
11 }

 

線程池的關閉

​ 我們可以通過調用線程池的shutdown或shutdownNow方法來關閉線程池,它們的原理是遍歷線程池中的工作線程,然后逐個調用線程的interrupt方法來中斷線程,所以無法響應中斷的任務可能永遠無法終止。但是它們存在一定的區別,shutdownNow首先將線程池的狀態設置成STOP,然后嘗試停止所有的正在執行或暫停任務的線程,並返回等待執行任務的列表,而shutdown只是將線程池的狀態設置成SHUTDOWN狀態,然后中斷所有沒有正在執行任務的線程。

​ 只要調用了這兩個關閉方法的其中一個,isShutdown方法就會返回true。當所有的任務都已關閉后,才表示線程池關閉成功,這時調用isTerminaed方法會返回true。至於我們應該調用哪一種方法來關閉線程池,應該由提交到線程池的任務特性決定,通常調用shutdown來關閉線程池,如果任務不一定要執行完,則可以調用shutdownNow。

 

線程池的分析

​ 流程分析: 線程池的主要工作流程如下圖:

從上圖我們可以看出,當提交一個新任務到線程池時,線程池的處理流程如下:

  1. 首先線程池判斷基本線程池是否已滿?沒滿,創建一個工作線程來執行任務。滿了,則進入下個流程。
  2. 其次線程池判斷工作隊列是否已滿?沒滿,則將新提交的任務存儲在工作隊列里。滿了,則進入下個流程。
  3. 最后線程池判斷整個線程池是否已滿?沒滿,則創建一個新的工作線程來執行任務,滿了,則交給飽和策略來處理這個任務。

 

源碼分析。上面的流程分析讓我們很直觀的了解了線程池的工作原理,讓我們再通過源代碼來看看是如何實現的。線程池執行任務的方法如下:

 1 public void execute(Runnable command) {
 2     if (command == null)
 3        throw new NullPointerException();
 4     //如果線程數小於基本線程數,則創建線程並執行當前任務 
 5     if (poolSize >= corePoolSize || !addIfUnderCorePoolSize(command)) {
 6     //如線程數大於等於基本線程數或線程創建失敗,則將當前任務放到工作隊列中。
 7         if (runState == RUNNING && workQueue.offer(command)) {
 8             if (runState != RUNNING || poolSize == 0)
 9                       ensureQueuedTaskHandled(command);
10         }
11     //如果線程池不處於運行中或任務無法放入隊列,並且當前線程數量小於最大允許的線程數量,
12 則創建一個線程執行任務。
13         else if (!addIfUnderMaximumPoolSize(command))
14         //拋出RejectedExecutionException異常
15             reject(command); // is shutdown or saturated
16     }
17 }

 

工作線程。線程池創建線程時,會將線程封裝成工作線程Worker,Worker在執行完任務后,還會無限循環獲取工作隊列里的任務來執行。我們可以從Worker的run方法里看到這點:

 1 public void run() {
 2      try {
 3            Runnable task = firstTask;
 4            firstTask = null;
 5             while (task != null || (task = getTask()) != null) {
 6                     runTask(task);
 7                     task = null;
 8             }
 9       } finally {
10              workerDone(this);
11       }
12 }

 

合理的配置線程池

​ 要想合理的配置線程池,就必須首先分析任務特性,可以從以下幾個角度來進行分析:

  1. 任務的性質:CPU密集型任務,IO密集型任務和混合型任務。

  2. 任務的優先級:高,中和低。

  3. 任務的執行時間:長,中和短。

  4. 任務的依賴性:是否依賴其他系統資源,如數據庫連接。

    ​ 任務性質不同的任務可以用不同規模的線程池分開處理。CPU密集型任務配置盡可能小的線程,如配置Ncpu+1個線程的線程池。IO密集型任務則由於線程並不是一直在執行任務,則配置盡可能多的線程,如2*Ncpu。混合型的任務,如果可以拆分,則將其拆分成一個CPU密集型任務和一個IO密集型任務,只要這兩個任務執行的時間相差不是太大,那么分解后執行的吞吐率要高於串行執行的吞吐率,如果這兩個任務執行時間相差太大,則沒必要進行分解。我們可以通過Runtime.getRuntime().availableProcessors()方法獲得當前設備的CPU個數。

    ​ 優先級不同的任務可以使用優先級隊列PriorityBlockingQueue來處理。它可以讓優先級高的任務先得到執行,需要注意的是如果一直有優先級高的任務提交到隊列里,那么優先級低的任務可能永遠不能執行。

    ​ 執行時間不同的任務可以交給不同規模的線程池來處理,或者也可以使用優先級隊列,讓執行時間短的任務先執行。

    ​ 依賴數據庫連接池的任務,因為線程提交SQL后需要等待數據庫返回結果,如果等待的時間越長CPU空閑時間就越長,那么線程數應該設置越大,這樣才能更好的利用CPU。

    ​ 建議使用有界隊列,有界隊列能增加系統的穩定性和預警能力,可以根據需要設大一點,比如幾千。有一次我們組使用的后台任務線程池的隊列和線程池全滿了,不斷的拋出拋棄任務的異常,通過排查發現是數據庫出現了問題,導致執行SQL變得非常緩慢,因為后台任務線程池里的任務全是需要向數據庫查詢和插入數據的,所以導致線程池里的工作線程全部阻塞住,任務積壓在線程池里。如果當時我們設置成無界隊列,線程池的隊列就會越來越多,有可能會撐滿內存,導致整個系統不可用,而不只是后台任務出現問題。當然我們的系統所有的任務是用的單獨的服務器部署的,而我們使用不同規模的線程池跑不同類型的任務,但是出現這樣問題時也會影響到其他任務。

 

線程池的監控

通過線程池提供的參數進行監控。線程池里有一些屬性在監控線程池的時候可以使用

  • taskCount:線程池需要執行的任務數量。
  • completedTaskCount:線程池在運行過程中已完成的任務數量。小於或等於taskCount。
  • largestPoolSize:線程池曾經創建過的最大線程數量。通過這個數據可以知道線程池是否滿過。如等於線程池的最大大小,則表示線程池曾經滿了。
  • getPoolSize:線程池的線程數量。如果線程池不銷毀的話,池里的線程不會自動銷毀,所以這個大小只增不+ getActiveCount:獲取活動的線程數。

通過擴展線程池進行監控。通過繼承線程池並重寫線程池的beforeExecute,afterExecute和terminated方法,我們可以在任務執行前,執行后和線程池關閉前干一些事情。如監控任務的平均執行時間,最大執行時間和最小執行時間等。這幾個方法在線程池里是空方法。如下:

 1 protected void beforeExecute(Thread t, Runnable r) { } 

 

dubbo對線程池的使用

dubbo-common 模塊的threadpool包下體現,如下圖所示:

 

ThreadPool

com.alibaba.dubbo.common.threadpool.ThreadPool ,線程池接口。代碼如下:

 1 //@SPI("fixed")注解,Dubbo SPI擴展點,默認為"fixed"。
 2 @SPI("fixed")
 3 public interface ThreadPool {
 4     /**
 5      * @Adaptive({Constants.THREADPOOL_KEY}) 注解,基於Dubbo SPI Adaptive機制,加載對應的線程池實現,使用URL.threadpool屬性。
 6      * getExecutor(url)方法,獲得對應的線程池的執行器
 7      *
 8      */
 9     @Adaptive({Constants.THREADPOOL_KEY})
10     Executor getExecutor(URL url);
11 
12 }

 

FixedThreadPool

com.alibaba.dubbo.common.threadpool.support.fixed.FixedThreadPool ,實現ThreadPool接口,固定大小線程池,啟動時建立線程,不關閉,一直持有。代碼如下:

 1 public class FixedThreadPool implements ThreadPool {
 2 
 3     @Override
 4     public Executor getExecutor(URL url) {
 5         //線程名
 6         String name = url.getParameter(Constants.THREAD_NAME_KEY, Constants.DEFAULT_THREAD_NAME);
 7         //線程數
 8         int threads = url.getParameter(Constants.THREADS_KEY, Constants.DEFAULT_THREADS);
 9         //隊列數
10         int queues = url.getParameter(Constants.QUEUES_KEY, Constants.DEFAULT_QUEUES);
11         //創建執行器
12         return new ThreadPoolExecutor(threads, threads, 0, TimeUnit.MILLISECONDS,
13                 /**
14                  * 根據不同的隊列數,使用不同的隊列實現:
15                  * queues == 0,SynchronousQueue對象。
16                  * queues < 0,LinkedBlockingQueue對象。
17                  * queues > 0,帶隊列數的LinkedBlockingQueue對象。
18                  */
19                 queues == 0 ? new SynchronousQueue<Runnable>() :
20                         (queues < 0 ? new LinkedBlockingQueue<Runnable>()
21                                 : new LinkedBlockingQueue<Runnable>(queues)),
22                 /**
23                  * 創建NamedThreadFactory對象,用於生成線程名
24                  * 創建AbortPolicyWithReport對象,用於當任務添加到線程池中被拒絕時。
25                  */
26                 new NamedInternalThreadFactory(name, true), new AbortPolicyWithReport(name, url));
27     }
28 }

 

推薦閱讀:

 

CachedThreadPool

com.alibaba.dubbo.common.threadpool.support.cached.CachedThreadPool ,實現ThreadPool接口,緩存線程池,空閑一定時長,自動刪除,需要時重建。代碼如下:

 1 public class CachedThreadPool implements ThreadPool {
 2 
 3     @Override
 4     public Executor getExecutor(URL url) {
 5         //線程池名
 6         String name = url.getParameter(Constants.THREAD_NAME_KEY, Constants.DEFAULT_THREAD_NAME);
 7         //核心線程數
 8         int cores = url.getParameter(Constants.CORE_THREADS_KEY, Constants.DEFAULT_CORE_THREADS);
 9         //最大線程數
10         int threads = url.getParameter(Constants.THREADS_KEY, Integer.MAX_VALUE);
11         //隊列數
12         int queues = url.getParameter(Constants.QUEUES_KEY, Constants.DEFAULT_QUEUES);
13         //線程存活時長
14         int alive = url.getParameter(Constants.ALIVE_KEY, Constants.DEFAULT_ALIVE);
15         //創建執行器
16         return new ThreadPoolExecutor(cores, threads, alive, TimeUnit.MILLISECONDS,
17                 queues == 0 ? new SynchronousQueue<Runnable>() :
18                         (queues < 0 ? new LinkedBlockingQueue<Runnable>()
19                                 : new LinkedBlockingQueue<Runnable>(queues)),
20                 new NamedInternalThreadFactory(name, true), new AbortPolicyWithReport(name, url));
21     }
22 }

 

LimitedThreadPool

com.alibaba.dubbo.common.threadpool.support.limited.LimitedThreadPool ,實現ThreadPool接口,可伸縮線程池,但池中的線程池只會增長不會收縮。只增長不收縮的目的是為了避免收縮時突然來了大流量引起的性能問題。代碼如下:

 1 public class LimitedThreadPool implements ThreadPool {
 2 
 3     @Override
 4     public Executor getExecutor(URL url) {
 5         //線程池名
 6         String name = url.getParameter(Constants.THREAD_NAME_KEY, Constants.DEFAULT_THREAD_NAME);
 7         //核心線程數
 8         int cores = url.getParameter(Constants.CORE_THREADS_KEY, Constants.DEFAULT_CORE_THREADS);
 9         //最大線程數
10         int threads = url.getParameter(Constants.THREADS_KEY, Constants.DEFAULT_THREADS);
11         //隊列數
12         int queues = url.getParameter(Constants.QUEUES_KEY, Constants.DEFAULT_QUEUES);
13         /**
14          * 和CachedThreadPool實現是基本一致的,差異點在alive == Integer.MAX_VALUE,空閑時間無限大,即不會刪除。
15          */
16         return new ThreadPoolExecutor(cores, threads, Long.MAX_VALUE, TimeUnit.MILLISECONDS,
17                 queues == 0 ? new SynchronousQueue<Runnable>() :
18                         (queues < 0 ? new LinkedBlockingQueue<Runnable>()
19                                 : new LinkedBlockingQueue<Runnable>(queues)),
20                 new NamedInternalThreadFactory(name, true), new AbortPolicyWithReport(name, url));
21     }
22 
23 }

 

AbortPolicyWithReport

com.alibaba.dubbo.common.threadpool.support.AbortPolicyWithReport ,實現 java.util.concurrent.ThreadPoolExecutor.AbortPolicy,拒絕策略實現類。打印JStack,分析線程狀態 代碼如下:

  1 /**
  2  * AbortPolicyWithReport實現自ThreadPoolExecutor.AbortPolicy,拒絕策略實現類,
  3  * 打印JStack,分析線程狀態。
  4  */
  5 public class AbortPolicyWithReport extends ThreadPoolExecutor.AbortPolicy {
  6 
  7 
  8     protected static final Logger logger = LoggerFactory.getLogger(AbortPolicyWithReport.class);
  9     /**
 10      * 線程名
 11      */
 12     private final String threadName;
 13 
 14     /**
 15      * URL 對象
 16      */
 17     private final URL url;
 18 
 19     /**
 20      * 最后打印時間
 21      */
 22     private static volatile long lastPrintTime = 0;
 23 
 24     /**
 25      * 信號量,大小為1。
 26      */
 27     private static Semaphore guard = new Semaphore(1);
 28 
 29     public AbortPolicyWithReport(String threadName, URL url) {
 30         this.threadName = threadName;
 31         this.url = url;
 32     }
 33 
 34     @Override
 35     public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
 36         /**
 37          * 打印告警日志
 38          */
 39         String msg = String.format("Thread pool is EXHAUSTED!" +
 40                         " Thread Name: %s, Pool Size: %d (active: %d, core: %d, max: %d, largest: %d), Task: %d (completed: %d)," +
 41                         " Executor status:(isShutdown:%s, isTerminated:%s, isTerminating:%s), in %s://%s:%d!",
 42                 threadName, e.getPoolSize(), e.getActiveCount(), e.getCorePoolSize(), e.getMaximumPoolSize(), e.getLargestPoolSize(),
 43                 e.getTaskCount(), e.getCompletedTaskCount(), e.isShutdown(), e.isTerminated(), e.isTerminating(),
 44                 url.getProtocol(), url.getIp(), url.getPort());
 45         logger.warn(msg);
 46         // 打印 JStack,分析線程狀態。
 47         dumpJStack();
 48         //拋出 RejectedExecutionException 異常
 49         throw new RejectedExecutionException(msg);
 50     }
 51 
 52     private void dumpJStack() {
 53         long now = System.currentTimeMillis();
 54         //每 10 分鍾,打印一次。
 55         //dump every 10 minutes
 56         if (now - lastPrintTime < 10 * 60 * 1000) {
 57             return;
 58         }
 59         //獲得信號量
 60         if (!guard.tryAcquire()) {
 61             return;
 62         }
 63         //創建線程池,后台執行打印JStack
 64         Executors.newSingleThreadExecutor().execute(new Runnable() {
 65             @Override
 66             public void run() {
 67                 //獲得路徑
 68                 String dumpPath = url.getParameter(Constants.DUMP_DIRECTORY, System.getProperty("user.home"));
 69 
 70                 SimpleDateFormat sdf;
 71                 //獲得系統
 72                 String OS = System.getProperty("os.name").toLowerCase();
 73 
 74                 // window system don't support ":" in file name
 75                 if(OS.contains("win")){
 76                     sdf = new SimpleDateFormat("yyyy-MM-dd_HH-mm-ss");
 77                 }else {
 78                     sdf = new SimpleDateFormat("yyyy-MM-dd_HH:mm:ss");
 79                 }
 80 
 81                 String dateStr = sdf.format(new Date());
 82                 //獲得輸出流
 83                 FileOutputStream jstackStream = null;
 84                 try {
 85                     jstackStream = new FileOutputStream(new File(dumpPath, "Dubbo_JStack.log" + "." + dateStr));
 86                     //打印JStack
 87                     JVMUtil.jstack(jstackStream);
 88                 } catch (Throwable t) {
 89                     logger.error("dump jstack error", t);
 90                 } finally {
 91                     //釋放信號量
 92                     guard.release();
 93                     //釋放輸出流
 94                     if (jstackStream != null) {
 95                         try {
 96                             jstackStream.flush();
 97                             jstackStream.close();
 98                         } catch (IOException e) {
 99                         }
100                     }
101                 }
102                 //記錄最后打印時間
103                 lastPrintTime = System.currentTimeMillis();
104             }
105         });
106 
107     }
108 
109 }

推薦閱讀:


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM