學習這個很長時間了一直沒有去做個總結,現在大致總結一下並發包的線程池。
首先,任何代碼都是解決問題的,線程池解決什么問題?
如果我們不用線程池,每次需要跑一個線程的時候自己new一個,會導致幾個問題:
1,不好統一管理線程和它們的相互之間的依賴關系,尤其是有的程序要做的事情很多的時候,線程的處理就顯得很雜亂,更雪上加霜的是,線程本身就是不可預期的,不是說先跑的線程就一直在后跑的線程前面,一旦形成復雜的依賴關系,也就會形成復雜的狀態(由所有線程的狀態共同決定)。
2,效率低下,有可能你的每次跑的線程(thread.start())就做一點點小工作,很快做完很快消亡。可是跑一個線程的系統上下文切換的開銷是很大的(相對來說)。當需要大量線程的時候這個問題會很突出。
那么線程池為什么可以解決以上兩個問題呢?對於第一個問題,毫無疑問對線程進行了統一管理。第二個問題,注意線程池的底層實現不是new一個線程然后start,而是有一個名字叫worker的線程,它首先持有要跑的runnable的對象的引用,然后在它的run()方法里面直接調用這個對象的run方法。換句話說,你放進去的線程並沒有真正注冊為一個線程跑起來。那么,通過控制worker的數量和運行模式,就可以節約很多的開銷。(worker線程不一定會因為你的runnable跑完而被銷毀,會接着去跑別的線程,實現了線程的復用)
接下來要分為兩個方向講這個問題,第一個是ThreadPoolExecutor的執行模式。第二個是來自Executors的給你裝配好的執行模式。
首先分析線程池的繼承結構。
頂層是一個接口(刪除了代碼注釋):
1 public interface Executor { 2 3 void execute(Runnable command); 4 5 }
很顯然,任何線程池(也可以理解為線程執行器)的最核心的功能就是執行線程。我個人感覺這個地方其實隱隱約約使用了命令模式(雖然是runnable,但是底層並不是start(),而是run(),我覺得可以理解為把一系列操作封裝成了一個command)。
接下來還是一個接口:
1 public interface ExecutorService extends Executor { 2 3 void shutdown(); 4 5 List<Runnable> shutdownNow(); 6 7 boolean isShutdown(); 8 9 boolean isTerminated(); 10 11 boolean awaitTermination(long timeout, TimeUnit unit) 12 throws InterruptedException; 13 14 <T> Future<T> submit(Callable<T> task); 15 16 <T> Future<T> submit(Runnable task, T result); 17 18 Future<?> submit(Runnable task); 19 20 <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) 21 throws InterruptedException; 22 23 <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks, 24 long timeout, TimeUnit unit) 25 throws InterruptedException; 26 27 <T> T invokeAny(Collection<? extends Callable<T>> tasks) 28 throws InterruptedException, ExecutionException; 29 30 <T> T invokeAny(Collection<? extends Callable<T>> tasks, 31 long timeout, TimeUnit unit) 32 throws InterruptedException, ExecutionException, TimeoutException; 33 }
可以看到這個更進一步的接口更多地約束了線程池的行為,能夠關閉,能夠遞交Future。很多人喜歡吧ThreadPoolExecutor向上轉型為ExecutorService來使用。
接下來是一個抽象類:
1 public abstract class AbstractExecutorService implements ExecutorService { 2 //代碼省略 3 }
里面封裝了一些大致差不多的代碼。注意這個地方使用了模板方法模式(比如很多地方的代碼實現還是依賴於execute()方法,而這個抽象類本身沒有實現這個方法)
所以目前我們可以認為ThreadPoolExecutor最重要的方法是execute(),事實上這個方法貫穿了整個線程池。
接下來就是重頭戲:ThreadPoolExecutor的具體實現
當然這個類的代碼很多,但是我們只抓最關鍵(執行整個流程)的execute()方法。
1 public void execute(Runnable command) { 2 if (command == null) //防御性編程,檢查輸入的參數是否為null 3 throw new NullPointerException(); 4 5 int c = ctl.get(); //這個ctl是狀態變量,表示了目前線程池的狀態 6 if (workerCountOf(c) < corePoolSize) { //如果當前worker數量低於核心池大小 7 if (addWorker(command, true)) //直接新建worker並且讓他run這個線程 8 return; 9 c = ctl.get(); //運行到這里說明前面的addWorker失敗,也許線程池狀態變化 10 } 11 12 if (isRunning(c) && workQueue.offer(command)) { //接下來首先看線程池狀態,是不是在跑。如果是,再看能不能添加到阻塞隊列里面去 13 int recheck = ctl.get(); //再次檢查線程池狀態 14 if (! isRunning(recheck) && remove(command)) //如果發現線程池狀態變化(比如終止)則嘗試從阻塞隊列中移除這個runnable 15 reject(command); //發生線程被拒絕時指向的方法(由構造參數決定) 16 17 //最后一種情況,發現目前是線程池關閉了,但是阻塞隊列還有線程,這個時候新建工人去完成阻塞隊列里面沒有完成的工作(所以其直接任務參數為null) 18 else if (workerCountOf(recheck) == 0) 19 addWorker(null, false); 20 } 21 22 else if (!addWorker(command, false)) //第三種情況,如果阻塞隊列都滿了(線程太多),那還是去新建工人完成任務吧 23 reject(command); 24 }
以上是我打注釋的源碼。在這里還是有好幾個點要說。
首先第一個是源碼大量是用了一種小技巧。如:
1 if (isRunning(c) && workQueue.offer(command))
在if里面通過條件判斷運行函數(比如上面這句話先判斷isRunning(c),如果true再執行后面的,不是就不執行了,同時如果后面的運行成功,則進入if語句)。這么寫可以讓程序本身很精簡。
然后,要談的是這個的整體的運行邏輯,不妨舉個例子。某個工廠要招工人完成訂單。有兩個重要的參數:1,長工的數目(假設為10),2,總工人的最大數目(假設為20)。
一開始,沒有工人,所以要招長工。現在開始,每有一個訂單,就去招一個工人當長工。直到長工數為10。接下來,老板考慮訂單雖然多了,可是再招工人成本就太貴了,於是把新來的訂單放在流水線上暫存起來,叫目前的工人做完手上的活之后趕緊去流水線上繼續完成別的訂單。接下來訂單繼續增長,老板發現現在的情況是長工已經完不成積壓的訂單。這怎么辦呢?沒辦法還是要工人,要不然違約的代價更大。繼續開始招短工。這些短工一般是帶着任務進廠,也就是客戶一來訂單就立馬招個短工完成這個訂單從而保證任務不積壓。接下來客戶的訂單突然少了,以至於沒有訂單了。老板發現很多工人無事可做,在他觀察一段時間后,開始裁員,當然是先裁短工了,因為一開始的打算就是招你們進去渡過這個火爆期的,現在任務完成了,你們就該走了。於是工廠的工人數又回到了長工數。一般情況下的老板不會把長工一起裁掉,即使他們不干活,也就讓他們睡覺。可是還是有很黑心的老板直接把長工都裁掉了。
以上的故事中,工廠就是線程池,工人就是worker線程,訂單就是調用execute()傳進來的runnable參數,長工數就是corePoolSize,總工數是maximumPoolSize,流水線就是阻塞隊列,老板觀察工人不干活的時間是keepAliveTime,至於老板裁不裁長工是由allowCoreThreadTimeOut參數控制的。
這就是整個線程池的運行邏輯。而工人的運行邏輯是:手上有任務就先完成手上的任務(工人的構造函數傳進來的runnable),如果沒有或者已經完成了就去阻塞隊列找任務做,當然如果沒有任務就阻塞自己。
接下來研究一下addWorker的運行邏輯。
1 private boolean addWorker(Runnable firstTask, boolean core) { 2 retry: 3 for (;;) { 4 int c = ctl.get(); //獲得當前的線程池狀態 5 int rs = runStateOf(c); //這個函數就是進行一下檢查 6 7 if (rs >= SHUTDOWN && //線程池已經關閉,並且不是派進來清場的工人就直接返回false,添加工人失敗。 8 ! (rs == SHUTDOWN && 9 firstTask == null && 10 ! workQueue.isEmpty())) 11 return false; 12 13 for (;;) { 14 int wc = workerCountOf(c); //當前工人數 15 if (wc >= CAPACITY || //當前工人太多而添加失敗(長工不可超過corePoolSize,短工不可超過maximumPoolSize) 16 wc >= (core ? corePoolSize : maximumPoolSize)) 17 return false; 18 if (compareAndIncrementWorkerCount(c)) //CAS操作增加c,如果失敗則繼續循環 19 break retry; //運行到這里就說明發現是確實需要招工人,並且CAS操作成功 20 c = ctl.get(); //CAS失敗,重新獲得當前狀態 21 if (runStateOf(c) != rs) //這個時候發現不只是CAS失敗,而是狀態就發生了變化(比如線程池關閉) 22 continue retry; //那么重頭再來 23 } 24 } 25 26 //現在確認要添加工人了 27 boolean workerStarted = false; //工人還沒有開始工作 28 boolean workerAdded = false; //工人還沒有真正添加進去 29 Worker w = null; //目前工人還是一個null 30 try { 31 final ReentrantLock mainLock = this.mainLock; //獲得這個線程池的鎖 32 w = new Worker(firstTask); //新建一個工人,同時告訴他第一個任務是什么,注意這個地方內部很重要 33 final Thread t = w.thread; //這個w.therad,是把工人這個runnable包裝成一個thread,也就是工人線程自己 34 if (t != null) { //成功獲得工人線程(注意此時工人線程已經持有了firstTask這個參數,能夠運行它) 35 mainLock.lock(); //拿鎖,注意下滿的操作和目前線程池狀態有關,如果不加鎖,有可能發生讀到錯誤的狀態。 36 try { 37 int c = ctl.get(); //得到目前狀態 38 int rs = runStateOf(c); //檢查一下 39 40 if (rs < SHUTDOWN || //保證現在線程池沒有被關閉,或者不是清理worker 41 (rs == SHUTDOWN && firstTask == null)) { 42 if (t.isAlive()) //t線程當然不能是個在跑的線程 43 throw new IllegalThreadStateException(); 44 workers.add(w); //儲存了所有的worker的一個hashset容器添加這個worker 45 int s = workers.size(); //目前一共有多少worker 46 if (s > largestPoolSize) //超過最大值 47 largestPoolSize = s; //記錄最大值 48 workerAdded = true; //修改標志,告訴別人已經添加worker了 49 } 50 } finally { 51 mainLock.unlock(); //最后一定要放鎖 52 } 53 if (workerAdded) { //檢查是否成功添加worker 54 t.start(); //添加成功就讓新加入的worker開始運行吧 55 workerStarted = true; //修改worker是否開始運行的標志位 56 } 57 } 58 } finally { 59 if (! workerStarted) //失敗的處理 60 addWorkerFailed(w); 61 } 62 return workerStarted; //返回值告訴調用者是否成功 63 }
這就是添加一個工人的代碼和我寫的注釋。基本已經寫的很清楚了
接下來解釋一下worker的邏輯,也就是以上代碼32行33行發生的事。以下代碼有刪減
1 private final class Worker extends AbstractQueuedSynchronizer implements Runnable { 2 3 final Thread thread; //這個worker要跑的線程,注意底層是這個worker調用這個線程的run方法 4 5 Runnable firstTask; //接受傳進來的參數 6 7 Worker(Runnable firstTask) { 8 setState(-1); 9 this.firstTask = firstTask; //外面來了task 10 this.thread = getThreadFactory().newThread(this); //通過threadFactory把這個runnable包裝成一個thread 11 } 12 13 public void run() { 14 runWorker(this); //注意這個時候這個worker的thread字段已經有實際意義,所以調用這個方法的時候能正確run這個thread 15 } 16 17 }
好了,原來是這樣,先告訴worker要干什么,然后用適配器模式直接把runnable適配成一個thread(threadFactory做這個)
最后記得上面寫過,在addWorker的時候會調用這個start()。
那么worker的run方法又是怎么實現的呢?
這個run方法其實調用了runWorker(this),實際上這個runWorker是個不典型的模板方法模式的應用。因為這個方法在類ThreadPoolExecutor里面。一般的模板方法模式是子類實現不同的執行細節,然后調用父類的方法(父類通過虛函數使用子類的實現)。這個地方子類倒是沒有實現什么方法,可是把自己當參數傳給了父類的方法,而且調用了父類的final方法,當時實際上這個參數也包含了重要信息(到底跑什么runnable)。所以個人覺得可以說是模板方法模式的一個變形。
runWorker方法:
1 final void runWorker(Worker w) { //注意傳進來的worker包含了重要信息 2 Thread wt = Thread.currentThread(); 3 Runnable task = w.firstTask; 4 w.firstTask = null; 5 w.unlock(); //這個地方允許被打斷 6 boolean completedAbruptly = true; 7 try { 8 while (task != null || (task = getTask()) != null) { //getTask不停去阻塞隊列拿線程任務 9 w.lock(); //拿鎖 10 // 如果線程池已經停止了,允許這個線程被打斷 11 // 否者保證這個線程不被打斷 12 // 需要一個第二次檢查來應對shutdownnow被調用 13 if ((runStateAtLeast(ctl.get(), STOP) || 14 (Thread.interrupted() && 15 runStateAtLeast(ctl.get(), STOP))) && 16 !wt.isInterrupted()) 17 wt.interrupt(); 18 try { 19 beforeExecute(wt, task); //這個交給子類來實現,這里什么都沒有,如果自己寫的線程需要,則重寫這個方法 20 Throwable thrown = null; 21 try { 22 task.run(); //最最最底層的調用,就是把worker的task拿出來運行,下一次則從阻塞隊列中拿 23 } catch (RuntimeException x) { 24 thrown = x; throw x; 25 } catch (Error x) { 26 thrown = x; throw x; 27 } catch (Throwable x) { 28 thrown = x; throw new Error(x); 29 } finally { 30 afterExecute(task, thrown); //同beforeExecute 31 } 32 } finally { 33 task = null; 34 w.completedTasks++; //這個worker完成的任務加一 35 w.unlock(); //釋放鎖 36 } 37 } 38 completedAbruptly = false; 39 } finally { 40 processWorkerExit(w, completedAbruptly); 41 } 42 }
以上就是ThreadPoolExecutor執行線程的全部核心過程。