一 異步用new Thread? 大寫的"low"!!
new Thread(new Runnable() { @Override public void run() { // TODO Auto-generated method stub } }).start();
你還在像上面這么用嗎,太low 了。弊端多多:
1. 每次new Thread新建對象性能差。
2. 線程缺乏統一管理,可能無限制新建線程,相互之間競爭,及可能占用過多系統資源導致死機或oom。
3. 缺乏更多功能,如定時執行、定期執行、線程中斷。
相比new Thread,Java提供的四種線程池的好處與此相對,在於:
1. 重用存在的線程,減少對象創建、消亡的開銷,性能佳。
2. 可有效控制最大並發線程數,提高系統資源的使用率,同時避免過多資源競爭,避免堵塞。
3. 提供定時執行、定期執行、單線程、並發數控制等功能。
二 底層java.util.concurrent.ThreadPoolExecutor
無論創建哪種線程池 必須要調用ThreadPoolExecutor, 👇列舉一個構造方法
當一個任務通過execute(Runnable)方法欲添加到線程池時:
如果此時線程池中的數量小於corePoolSize,即使線程池中的線程都處於空閑狀態,也要創建新的線程來處理被添加的任務。
如果此時線程池中的數量等於corePoolSize,但是緩沖隊列 workQueue未滿,那么任務被放入緩沖隊列。
如果此時線程池中的數量大於corePoolSize,緩沖隊列workQueue滿,並且線程池中的數量小於maximumPoolSize,建新的線程來處理被添加的任務。
如果此時線程池中的數量大於corePoolSize,緩沖隊列workQueue滿,並且線程池中的數量等於maximumPoolSize,那么通過 handler所指定的策略來處理此任務。
也就是:處理任務的優先級為:
核心線程corePoolSize、任務隊列workQueue、最大線程maximumPoolSize,如果三者都滿了,使用handler處理被拒絕的任務。
當線程池中的線程數量大於 corePoolSize時,如果某線程空閑時間超過keepAliveTime,線程將被終止。這樣,線程池可以動態的調整池中的線程數。
拒絕策略:
三 幾種典型線程池的源碼分析
Java通過Executors提供四種線程池,分別為(juc.Executors包下):
newCachedThreadPool 創建一個可緩存線程池,如果線程池長度超過處理需要,可靈活回收空閑線程(60s不執行任務),若無可回收,則新建線程。
newFixedThreadPool 創建一個定長線程池,可控制線程最大並發數,超出的線程會在隊列中等待。
newScheduledThreadPool 創建一個定長線程池,支持定時及周期性任務執行。
newSingleThreadExecutor 創建一個單線程化的線程池,它只會用唯一的工作線程來串行執行任務,保證所有任務按照指定順序(FIFO, LIFO, 優先級)執行。
1. FixedThreadPool
- FixedThreadPool的corePoolSize和maxiumPoolSize都被設置為創建FixedThreadPool時指定的參數nThreads。
- FixedThreadPool使用了無界隊列LinkedBlockingQueue作為線程池的工作隊列,由於是無界的,當線程池的線程數達到corePoolSize后,新任務將在無界隊列中等待,因此線程池的線程數量不會超過corePoolSize,同時maxiumPoolSize也就變成了一個無效的參數,並且運行中的線程池並不會拒絕任務。
-
FixedThreadPool運行圖如下
執行過程如下:
1.如果當前工作中的線程數量少於corePool的數量,就創建新的線程來執行任務。
2.當線程池的工作中的線程數量達到了corePool,則將任務加入LinkedBlockingQueue。
3.線程執行完1中的任務后會從隊列中取任務。
注意LinkedBlockingQueue是無界隊列,所以可以一直添加新任務到線程池。
2. SingleThreadExecutor
SingleThreadExecutor是使用單個worker線程的Executor。特點是使用單個工作線程執行任務。它的構造源碼如下:
執行過程如下:
1.如果當前工作中的線程數量少於corePool的數量,就創建一個新的線程來執行任務。
2.當線程池的工作中的線程數量達到了corePool,則將任務加入LinkedBlockingQueue。
3.線程執行完1中的任務后會從隊列中取任務。
注意:由於在線程池中只有一個工作線程,所以任務可以按照添加順序執行。
3. CachedThreadPool
CachedThreadPool是一個”無限“容量的線程池,它會根據需要創建新線程。特點是可以根據需要來創建新的線程執行任務,沒有特定的corePool。下面是它的構造方法:

1.首先執行SynchronousQueue.offer(Runnable task)。如果在當前的線程池中有空閑的線程正在執行SynchronousQueue.poll(),那么主線程執行的offer操作與空閑線程執行的poll操作配對成功,主線程把任務交給空閑線程執行。execute()方法執行成功,否則執行步驟2
2.當線程池為空(初始maximumPool為空)或沒有空閑線程時,配對失敗,將沒有線程執行SynchronousQueue.poll操作。這種情況下,線程池會創建一個新的線程執行任務。
3.在創建完新的線程以后,將會執行poll操作。當步驟2的線程執行完成后,將等待60秒,如果此時主線程提交了一個新任務,那么這個空閑線程將執行新任務,否則被回收。因此長時間不提交任務的CachedThreadPool不會占用系統資源。
SynchronousQueue是一個不存儲元素阻塞隊列,每次要進行offer操作時必須等待poll操作,否則不能繼續添加元素。
4. 手動調用new ThreadPoolExecutor
四 引申的幾個問題
1. 無限大CachedThreadPool的OOM
ExecutorService executorService = Executors.newCachedThreadPool(); for (int i = 1; i <= 2100; i++) { executorService.submit(() -> { try { Thread.sleep(5000); } catch (Exception e) { //ignore } }); }
Exception in thread "main" java.lang.OutOfMemoryError: unable to create new native thread at java.lang.Thread.start0(Native Method) at java.lang.Thread.start(Thread.java:717) at java.util.concurrent.ThreadPoolExecutor.addWorker(ThreadPoolExecutor.java:950) at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1368) at java.util.concurrent.AbstractExecutorService.submit(AbstractExecutorService.java:112) at com.balfish.hotel.train.concurrent.countDownLatch.CountDownLatchMain.main(CountDownLatchMain.java:31)
可以看出來是堆外內存溢出,因為我們新建的線程都在工作(代碼中用sleep模擬在工作中),newCachedThreadPool 只會重用空閑並且可用的線程,所以上述代碼只能不停地創建新線程,
在 64-bit JDK 1.7 中 -Xss 默認是 1024k,也就是 1M,那就是需要 2100*1M = 2.1G 的堆外內存空間來給線程使用,機器內存分配不夠創建新的線程,所以就 OOM 了。
newCachedThreadPool最大值初始化時默認為Integer.MAX_VALUE,一般來說機器都沒那么大內存給它不斷使用。那么我們一般去重寫一個方法限制一下這個最大值,或者看下newFixedThreadPool是否滿足
2. ArrayBlockingQueue 和 LinkedBlockingQueue 的區別
(1)一把鎖 vs 分離鎖
ArrayBlockingQueue中的鎖是沒有分離的,即生產者和消費者用的是一個鎖
LinkedBlockingQueue的鎖是分離的,即生產用的是putLock,消費用的是takeLock
(2)數組 vs 鏈表
ArrayBlockingQueue基於數組,生產和消費的時候,直接將枚舉對象插入或移除,不會有額外的對象實例的空間開銷
LinkedBlockingQueue基於鏈表,生產和消費的時候,需要把枚舉轉換為Node<E>進行插入或移除,有額外的Node對象開銷。這在大批量並發處理數據時,對GC有一定影響
(3)隊列長度
ArrayBlockingQueue是有界的,必須指定隊列大小
LinkedBlockingQueue是無界的,可以不指定隊列的大小,默認是Integer.MAX_VALUE。(也可以指定隊列大小,從而成為有界的)
(4)隊列效率
ArrayBlockingQueue快。LinkedBlockingQueue用默認大小且生產速度大於消費速度時候,可能會OOM
3. shutdown和shutdownNow
可以調用線程池的shutdown或者shutdownNow方法來關閉線程池。他們的原理是遍歷線程池的工作線程,然后逐個調用線程的interrupt方法來中斷線程,所以無法響應中斷的任務可能永遠無法停止。
區別:
shutdown方法將執行平緩的關閉過程:不再接收新的任務,同時等待已提交的任務執行完成, 包括那些還未開始執行的任務。
shutdownNow方法將執行粗暴的關閉過程:它將嘗試取消所有運行中的任務,並且不再啟動隊列中尚未開始執行的任務。
只要調用了這兩個關閉方法中的任意一個,isShutdown方法就會返回true,當所有的任務都已關閉后,才表示線程池關閉成功,這時調用isTerminated方法會返回true。至於應該調用哪一種方法來關閉線程池,應該由提交到線程池的任務特性決定,通常調用shutdown方法來關閉線程池,如果任務不一定要執行完,則可以調用shutdownNow方法。
4. 繼承覆寫ThreadPoolExecutor
可以通過繼承線程池來自定義線程池,重寫線程池的beforeExecute, afterExecute和terminated方法。在執行任務的線程中將調用beforeExecute和afterExecute等方法,在這些方法中還可以添加日志、計時、監視或者統計信息收集的功能。無論任務是從run中正常返回,還是拋出一個異常而返回,afterExecute都會被調用。如果任務在完成后帶有一個Error,那么就不會調用afterExecute。如果beforeExecute拋出一個RuntimeException,那么任務將不被執行,並且afterExecute也不會被調用。在線程池完成關閉時調用terminated,也就是在所有任務都已經完成並且所有工作者線程也已經關閉后,terminated可以用來釋放Executor在其生命周期里分配的各種資源,此外還可以執行發送通知、記錄日志或者finalize統計等操作