最近工作不算太忙,抽時間學習了下java線程池底層源碼,廢話不多說,馬上“去片”!
Executors類是java線程池的工具類,此類位於java.util.concurrent包下。在日常項目開發中,我們使用得比較多的主要有CachedThreadPool、FixedThreadPool、SingleThreadExecutor、ScheduledThreadPool這4個線程池,這些線程池底層均調用new ThreadPoolExecutor()方法來新建一個線程池。
現在簡單介紹這4個池:
CachedThreadPool
CachedThreadPool默認最大線程數量為int的最大值,狹義來說已經可以說是不限制數量了。默認的空閑線程終止時間60秒。一般在項目開發中如果不明確線程池中運行任務的數量不建議使用此線程池,因為如果大量的並發任務運行時,會大量占用CPU資源,導致應用程序崩潰甚至服務器崩潰等嚴重問題。
FixedThreadPool
FixedThreadPool需要指定線程數量,工作任務超過線程池總線程數后會進入等待狀態 ,直到已經在工作的線程完成任務后釋放后再進行另一任務的工作。此線程池本人在日常開發中使用得較多,可以有效控制線程的數量,防止大量占用CPU資源,導致應用程序崩潰甚至服務器崩潰等問題的發生。
SingleThreadExecutor
SingleThreadExecutor根據命名就可以猜這是一個單線程線程池,如果任務超過一個時,則會排隊等待線程池唯一的線程完成任務再運行其它任務。
ScheduledThreadPool
ScheduledThreadPool為一個延時任務線程池。(在此不作詳細介紹,此線程池將作為另一篇文章單獨講解)
以上簡單介紹了java常用的4個線程池,下面將深入分析線程池創建后,是如何執行任務及線程池的底層是如何實現的。
剛剛也提到了所有線程池都要使用new ThreadPoolExecutor()方法來新建,讓我們先看一下JDK的源碼。
1 public ThreadPoolExecutor(int corePoolSize, 2 int maximumPoolSize, 3 long keepAliveTime, 4 TimeUnit unit, 5 BlockingQueue<Runnable> workQueue, 6 ThreadFactory threadFactory, 7 RejectedExecutionHandler handler) { 8 if (corePoolSize < 0 ||
9 maximumPoolSize <= 0 ||
10 maximumPoolSize < corePoolSize ||
11 keepAliveTime < 0) 12 throw new IllegalArgumentException(); 13 if (workQueue == null || threadFactory == null || handler == null) 14 throw new NullPointerException(); 15 this.corePoolSize = corePoolSize; 16 this.maximumPoolSize = maximumPoolSize; 17 this.workQueue = workQueue; 18 this.keepAliveTime = unit.toNanos(keepAliveTime); 19 this.threadFactory = threadFactory; 20 this.handler = handler; 21 }
先看一下需要傳遞的參數:
corePoolSize 線程池核心線程數,如果當前運行線程數量小於核心線程數量的話,處理新任務的時候,線程池還是會新建一個線程執行任務,直到當前運行線程數量等於核心線程數量;
maximumPoolSize: 線程池最大線程數,當運行線程數量等於線程池最大線程數時,再來新的任務會觸發異常;
keepAliveTime: 空閑線程終止時間,線程執行完任務空閑時間超過指定時間后,線程將會被終止;
Unit: 空閑線程終止時間單位;
workQueue: 線程工作任務隊列,線程主要使用ArrayBlockingQueue,DelayQueue,DelayedWorkQueue,LinkedBlockingDeque,PriorityBlockingQueue,SynchronousQueue,LinkedTransferQueue這些隊列,這里不作詳細介紹,有興趣的童鞋可以自行谷歌學習;
threadFactory: 線程工廠;
Handler: 線程任務拒絕策略處理器,默認使用AbortPolicy策略,當任務超過最大線程時,會拋出異常。除此之外還有另外幾種異常處理策略,CallerRunsPolicy使用調用者本身的線程來執行任務,DiscardPolicy丟棄當前任務,DiscardOldestPolicy丟棄最舊的任務,執行當前任務;
介紹完新建線程池的基本參數傳遞,現在到了本文最核心部分,線程池調用 execute方法時,內部究竟是如何實現的。
我們先看execute方法的源碼:
1 public void execute(Runnable command) { 2 //先判斷任務是否為空,如果為空則拋出異常; 3 4 if (command == null) 5 throw new NullPointerException(); 6 7 //獲取當前運行線程數,與線程池核心線程數作對比,如果當前運行線程數小於核心線程數,則調用addWorker方法將任務加入隊列執行任務。這是一個任務加入線程池運行最基本的過程。 8 int c = ctl.get(); 9 if (workerCountOf(c) < corePoolSize) { 10 if (addWorker(command, true)) 11 return; 12 c = ctl.get(); 13 } 14 if (isRunning(c) && workQueue.offer(command)) { 15 int recheck = ctl.get(); 16 if (! isRunning(recheck) && remove(command)) 17 reject(command); 18 else if (workerCountOf(recheck) == 0) 19 addWorker(null, false); 20 } 21 else if (!addWorker(command, false)) 22 reject(command); 23 }
后面一段源碼我們待會再分析,先看addWorker方法做了什么,以下為源碼:
1 private boolean addWorker(Runnable firstTask, boolean core) { 2 //首先看到這是一個無限循環 3 retry: 4 for (;;) { 5 int c = ctl.get(); 6 int rs = runStateOf(c); 7 8 //檢查任務和隊列是否為空,如果為空則返回false 9 if (rs >= SHUTDOWN && 10 ! (rs == SHUTDOWN && 11 firstTask == null && 12 ! workQueue.isEmpty())) 13 return false; 14 15 for (;;) { 16 17 //此處檢查當前運行的任務數,參數core在此起作用,如果true則比較當前任務數是否大於等於線程池核心線程數,false則比較當前任務數是否大於等於線程池總線程數。 18 int wc = workerCountOf(c); 19 if (wc >= CAPACITY || 20 wc >= (core ? corePoolSize : maximumPoolSize)) 21 return false; 22 23 //此處使用AtomicInteger類compareAndSet方法(此方法為原子性)將當前運行線程數加1,成功加1則退出循環 24 if (compareAndIncrementWorkerCount(c)) 25 break retry; 26 27 //由於此循環代碼段沒有加鎖,為防止多個線程並發引起數據問題,這里再次檢查當前運行任務數 28 c = ctl.get(); 29 if (runStateOf(c) != rs) 30 continue retry; 31 } 32 } 33 34 boolean workerStarted = false; 35 boolean workerAdded = false; 36 Worker w = null; 37 try { 38 final ReentrantLock mainLock = this.mainLock; 39 //此處新建一個Worker,Worker繼承了Runnable接口,用作執行實際任務用 40 41 w = new Worker(firstTask); 42 final Thread t = w.thread; 43 if (t != null) { 44 45 //此處加鎖,防止並發 46 mainLock.lock(); 47 try { 48 int c = ctl.get(); 49 int rs = runStateOf(c); 50 //檢查任務是否為空 51 if (rs < SHUTDOWN || 52 (rs == SHUTDOWN && firstTask == null)) { 53 54 //檢查線程是否已經啟動,如果是則拋出異常 55 if (t.isAlive()) 56 57 throw new IllegalThreadStateException(); 58 workers.add(w); 59 int s = workers.size(); 60 if (s > largestPoolSize) 61 largestPoolSize = s; 62 workerAdded = true; 63 } 64 } finally { 65 mainLock.unlock(); 66 } 67 68 //最后啟動線程,執行任務 69 if (workerAdded) { 70 t.start(); 71 workerStarted = true; 72 } 73 } 74 } finally { 75 if (! workerStarted) 76 addWorkerFailed(w); 77 } 78 return workerStarted; 79 }
以上就是線程池新建一個線程,成功執行一個任務的過程。那當線程池中的線程已經達到最大數量了,線程池又會是如何工作,並觸發任務拒絕策略的呢?請看以下源碼:
1 //當前工作任務數如果大於等於核心線程數的話,將會執行以下代碼 2 3 if (isRunning(c) && workQueue.offer(command)) { 4 int recheck = ctl.get(); 5 6 //如果任務不是正在運行並且能將任務從隊列移除,執行reject方法,根據任務拒絕策略進行處理 7 if (! isRunning(recheck) && remove(command)) 8 reject(command); 9 10 //如果當前運行的任務為0,傳入一個空的任務 11 else if (workerCountOf(recheck) == 0) 12 addWorker(null, false); 13 } 14 15 //如果添加任務到隊列失敗,執行reject方法,根據任務拒絕策略進行處理 16 else if (!addWorker(command, false)) 17 18 reject(command); 19 }
以上就是我最近在學習線程池及分析其源碼得到的“成果”,如有錯漏這處,歡迎指正!
(如轉載請附上原文地址:https://www.cnblogs.com/kenblog/p/9366691.html )