為什么會有線程池??
之前的時候,我們每使用一次線程就去創建一個線程,這樣雖然實現起來非常簡便,但是會有一個問題,如果並發的線程數量很多,並且每個線程都是執行一個時間很短的任務就結束了,這樣頻繁創建線程就會大大降低系統的效率,因為頻繁創建線程和銷毀線程需要時間,而且還容易造成線程池溢出。於是人們想到了使用線程池的方法,這樣使得線程就可以被復用,就是說當一個線程執行完一個任務后,並不會被銷毀,而是放到線程池中,這樣再次需要使用線程時,就可以直接從線程池中拿取一個線程使用,從而降低資源消耗,提高利用效率。
- 線程的創建和銷毀是需要時間的。加入一個服務完成需要的時間:T1表示創建線程時間,T2表示線程執行任務的耗時,T3表示線程消耗的時間,如果:T1+T3遠大於T2,得不償失。
- 線程需要占用內存資源,大量線程的創建會占用寶貴內存資源,可以導致OOM(Out Of Memory)異常。
- 大量的線程回收時也會給GC操作帶來很大的壓力,延長GC的停頓時間。
- 線程搶占CPU資源,CPU不停的在各個線程中進行上下文切換,上下文的切換也是耗時的過程。
線程池溢出:頻繁創建多線程,非常占用CPU內存,一個線程占1MB內存。
線程池的目的:
1.通過重復利用線程池中已創建的代碼,來降低創建和銷毀線程時的開銷,從而降低資源消耗,提高利用效率。
2.提高響應速度。當任務到達時,任務不需要來等待線程創建就能立即執行,因為線程池中有已創建好的線程可以直接供使用。
3.提高線程的可管理性。線程池時稀缺資源,如果無限制的創建,當線程數到達一定數量時,不僅會消耗系統資源,還會降低系統的穩定性,所以使用線程池,就可以進行統一分配,調優和監控。
4. 提供定時執行、定期執行、單線程、並發數控制等功能。
線程池的繼承:
Java里面線程池的頂級接口是Executor,但是嚴格意義上講Executor並不是一個線程池,而只是一個執行線程的工具。真正的線程池接口是ExecutorService,ExecutorService接口繼承了Excutor接口,提供了一些對線程池操作的擴展方法:shoudown,submit,是真正的線程池接口。Executors是線程池的靜態工廠,其提供了快捷創建線程池的靜態方法。AbstractExecutorService抽象類實現了ExecutorService接口提供的大部分方法
比較重要的幾個類:
ExecutorService接口 |
真正的線程池接口。 |
ScheduledExecutorService接口 |
能和Timer/TimerTask類似,解決那些需要任務重復執行的問題。 |
ThreadPoolExecutor類 |
ExecutorService的默認實現。 |
ScheduledThreadPoolExecutor類 |
繼承ThreadPoolExecutor和實現ScheduledExecutorService接口,周期性任務調度的類實現。
|
Executors慎用:
在《阿里巴巴Java開發手冊》中有一條
【強制】線程池不允許使用 Executors 去創建,而是通過 ThreadPoolExecutor 的方式,這樣的處理方式讓寫的同學更加明確線程池的運行規則,規避資源耗盡的風險。
Executors 返回的線程池對象的弊端如下:
FixedThreadPool 和 SingleThreadPool : 允許的請求隊列長度為 Integer.MAX_VALUE,可能會堆積大量的請求,從而導致 OOM。
CachedThreadPool 和 ScheduledThreadPool : 允許的創建線程數量為 Integer.MAX_VALUE,可能會創建大量的線程,從而導致 OOM。
要配置一個線程池是比較復雜的,尤其是對於線程池的原理不是很清楚的情況下,很有可能配置的線程池不是較優的,因此在Executors類里面提供了一些靜態工廠,生成一些常用的線程池。
線程池的四種創建方式:
Java通過Executors(java1.5並發包中的類)來創建四種線程池,分別為:
newCachedThreadPool創建一個可緩存線程池,如果線程池長度超過處理需要,可靈活回收空閑線程,若無可回收,則創建線程。
newFixedThreadPool創建一個定長線程池,可控制線程池最大並發數,超出的線程會在隊列中等待。
newScheduledThreadPool創建一個 定時線程池,支持定時以及周期性任務執行。
newSingleThreadExecutor創建一個單線程化的線程池,它只會用唯一的工作線程來執行任務,保證所有任務按照指定順序執行。(FIFO,LIFO,優先級)
無論創建哪種線程池在其底層都 必須要調用ThreadPoolExecutor:
- corePoolSize: 核心線程數---線程池中實際運行的線程數
- maximumPoolSize:最大線程數---線程池創建線程的最大數量 。值得注意的是,如果使用了無界的任務隊列這個參數就沒什么效果。
- keepAliveTime: 線程池維護線程所允許的空閑時間
- unit: 線程池維護線程所允許的空閑時間的單位 。可選的單位有天(DAYS)、小時(HOURS)、分鍾(MINUTES)、毫秒(MILLISECONDS)、微秒(MICROSECONDS,千分之一毫秒)和納秒(NANOSECONDS,千分之一微秒)。
- workQueue: 線程池所使用的緩沖隊列
- handler: 線程池對拒絕任務的處理策略。當隊列和線程池都滿了,說明線程池處於飽和狀態,那么必須采取一種策略處理提交的新任務。這個策略默認情況下是AbortPolicy,表示無法處理新任務時拋出異常。在JDK 1.5中Java線程池框架提供了以下4種策略。
·AbortPolicy:直接拋出異常。·CallerRunsPolicy:只用調用者所在線程來運行任務。·DiscardOldestPolicy:丟棄隊列里最近的一個任務,並執行當前任務。·DiscardPolicy:不處理,丟棄掉
線程池原理:
提交一個任務到線程池中,線程池的處理流程如下:
1.判斷線程池里的核心線程數是否都在執行任務,如果不是(核心線程空閑或者還有核心線程沒有被創建),則創建一個新的工作線程來執行任務。如果核心線程數都在執行任務,則進入下一個流程。
2.線程池判斷工作隊列是否已滿,如果工作隊列沒有滿,則將新提交的任務存儲在這個工作隊列里。如果工作隊列滿了,則進入下一個流程。
3.判斷線程池里的線程是否都處於工作狀態,如果沒有,則創建一個新的工作流程來執行任務。如果已經滿了,則交給飽和策略來處理這個任務。
如果當前線程池中的線程數目小於corePoolSize,則每來一個任務,就會創建一個線程去執行這個任務;如果當前線程池中的線程數目大於等於corePoolSize,則每來一個任務,就會嘗試將其添加到任務緩存隊列當中,若添加成功,則該任務會等待空閑線程將其取出去執行;若添加失敗(一般情況是因為任務緩存隊列已滿),則會嘗試創建新的線程去執行這個任務,如果此時線程池中總線程數小於maximumPoolSize,則會創建新的線程,如果當前線程池中的線程數目大於等於maximumPoolSize,則會采取任務拒絕策略進行處理。
注意:線程數目統計的是正在在執行任務的線程,而execute提交的只是任務,不一定每次提交都會創建出線程來。
1:newCachedThreadPool創建一個可緩存線程池。
♦CachedThreadPool的corePoolSize被設置為0,即corePool為空;
♦maximumPoolSize被設置為Integer.MAX_VALUE,即maximum是無界的。
♦這里keepAliveTime設置為60秒,意味着空閑的線程最多可以等待任務60秒,否則將被回收。
♦CachedThreadPool使用沒有容量的SynchronousQueue作為主線程池的工作隊列,它是一個沒有容量的阻塞隊列。每個插入操作必須等待另一個線程的對應移除操作。這意味着,如果主線程提交任務的速度高於線程池中處理任務的速度時,CachedThreadPool會不斷創建新線程。極端情況下,CachedThreadPool會因為創建過多線程而耗盡CPU資源。
1.首先執行SynchronousQueue.offer(Runnable task)。如果在當前的線程池中有空閑的線程正在執行SynchronousQueue.poll(),那么主線程執行的offer()插入操作與空閑線程執行的poll()彈出操作配對成功,主線程把任務交給空閑線程執行,則execute()方法執行成功,否則執行步驟2
2.當線程池為空(初始maximumPool為空)或沒有空閑線程時,將沒有線程執行SynchronousQueue.poll操作,則配對失敗。這種情況下,線程池會創建一個新的線程執行任務。
3.在創建完新的線程以后,將會執行poll操作,與執行SynchronousQueue.offer(Runnable task)的任務相匹配。當步驟2的線程執行完成后,將等待60秒,如果此時主線程提交了一個新任務,那么這個空閑線程將執行新任務,否則被回收。因此長時間不提交任務的CachedThreadPool不會占用系統資源。
SynchronousQueue是一個不存儲元素阻塞隊列,每次要進行offer插入操作時必須等待poll彈出操作,否則不能繼續添加元素。
【代碼演示】:耗時短,不需要考慮同步的情況。
1 import java.util.concurrent.ExecutorService; 2 import java.util.concurrent.Executors; 3 public class CachedThreadDemo { 4 public static void main(String[] args) { 5 ExecutorService cachedthreadpool=Executors.newCachedThreadPool(); 6 for (int i = 0; i <10; i++) { 7 cachedthreadpool.execute(new Runnable() { 8 @Override 9 public void run() { 10 System.out.println(Thread.currentThread().getName()); 11 } 12 }); 13 } 14 cachedthreadpool.shutdown(); 15 } 16 }
運行結果:
2:newFixedThreadPool創建一個定長線程池。(指定core和max,且core=max=nThraed)
通過構造函數可知,該線程池的核心線程數和最大線程數是一樣的。
- FixedThreadPool的corePoolSize的值和maxiumPoolSize的值都被設置為創建FixedThreadPool時指定的參數nThreads的大小。
- 0L則表示當線程池中的線程數量超過核心線程的數量時,多余的線程將被立即停止
- 最后一個參數表示FixedThreadPool使用了無界隊列LinkedBlockingQueue作為線程池的做工隊列,由於是無界的,當線程池的線程數達到corePoolSize后,新任務將在無界隊列中等待,因此線程池的線程數量不會超過maxiumPoolSize,同時maxiumPoolSize也就變成了一個無效的參數,並且運行中的線程池也並不會拒絕任務。
執行過程如下:
1.如果當前工作中的線程數量少於corePool的數量,就創建新的線程來執行任務。
2.當線程池的工作中的線程數量達到了corePool,則將任務加入LinkedBlockingQueue。
3.線程執行完1中的任務后會從隊列中去取任務。
注意LinkedBlockingQueue是無界隊列,所以可以一直添加新任務到線程池。
【代碼演示】:newFixedThreadPool創建一個定長線程池,可控制線程池最大並發數,超出的線程會在隊列中等待。線程池的大小一旦達到最大值就會保持不變,如果某個線程因為執行異常而結束,那么線程池會補充一個新線程。
1 import java.util.concurrent.ExecutorService; 2 import java.util.concurrent.Executors; 3 public class FixedThreadDemo { 4 public static void main(String[] args) { 5 ExecutorService fixedThreadpool= Executors.newFixedThreadPool(3);//參數是線程池中線程的數目 6 for (int i = 0; i <10 ; i++) { 7 int tmp=i; 8 fixedThreadpool.execute(new Runnable() { 9 //execute方法表示啟動線程 10 @Override 11 public void run() { 12 System.out.println(Thread.currentThread().getName()+", "+tmp); 13 } 14 }); 15 } 16 fixedThreadpool.shutdown();//停止線程池的運行 17 } 18 }
運行結果:
不論運行多少次,我們都可以看到,整個結果中其實只有3個線程,而且被反復利用着。
3:newScheduledThreadPool創建一個周期性的線程池。(指定core,max為max)
執行過程如下:
1.如果當前工作中的線程數量少於corePool的數量,就創建一個新的線程來執行任務。
2.當線程池的工作中的線程數量達到了corePool,則將任務加入DelayedWorkQueue。
3.線程執行完1中的任務后會從隊列中去任務。
注意該線程池的最大線程數為MAX_VALUE。
【代碼演示】:創建一個大小無限的線程池。此線程池支持定時以及周期性執行任務的需求。
1 import java.util.concurrent.Executors; 2 import java.util.concurrent.ScheduledExecutorService; 3 import java.util.concurrent.TimeUnit; 4 public class ScheduledThreadDemo { 5 public static void main(String[] args) { 6 ScheduledExecutorService scheduledThreadpool=Executors.newScheduledThreadPool(2);//核心線程數 7 //注意返回的是ScheduledExecutorService 類型的 8 for (int i = 0; i <4; i++) { 9 int tmp=i; 10 /*scheduledThreadpool.schedule(new Runnable() { 11 @Override 12 public void run() { 13 System.out.println(Thread.currentThread().getName()+", "+tmp); 14 } 15 }, 2, TimeUnit.SECONDS); 16 //傳進一個線程,2表示線程開始時的延時*/ 17 scheduledThreadpool.scheduleAtFixedRate(new Runnable() { 18 @Override 19 public void run() { 20 System.out.println(Thread.currentThread().getName()+", "+tmp); 21 } 22 }, 2,3, TimeUnit.SECONDS); 23 //傳進一個線程,2表示線程開始時的延遲時間,3表示周期即每3秒運行一次,TimeUnit.SECONDS 24 } 25 try { 26 Thread.sleep(30000);//先睡眠會,在關閉線程池,否則其他線程運行不了 27 } catch (InterruptedException e) { 28 e.printStackTrace(); 29 }finally { 30 scheduledThreadpool.shutdown();//關閉線程池 31 } 32 33 } 34 }
運行結果:
通過結果我可以發現它是周期性運行的,且在開始打印時延時了2秒,然后每過3秒重復一次。
4:newSingleThreadExecutor創建一個單線程化的線程池。(core和max都默認為1)
- SingleThreadExecutor的corePoolSize和maxiumPoolSize都被設置1。
執行過程如下:
1.如果當前工作中的線程數量少於corePool的數量,就創建一個新的線程來執行任務。
2.當線程池的工作中的線程數量達到了corePool,則將任務加入LinkedBlockingQueue。
3.線程執行完1中的任務后會從隊列中去任務。
注意:由於在線程池中只有一個工作線程,所以任務可以按照添加順序執行。
【代碼演示】:創建一個單線程的線程池。這個線程池只有一個線程在工作,也就是相當於單線程串行執行所有任務。如果這個唯一的線程因為異常結束,那么會有一個新的線程來替代它。此線程池保證所有任務的執行順序按照任務的提交順序執行。
1 import java.util.concurrent.ExecutorService; 2 import java.util.concurrent.Executors; 3 public class SingleThreadDemo { 4 public static void main(String[] args) { 5 ExecutorService singlethread=Executors.newSingleThreadExecutor();//單線程池 6 for (int i = 0; i <5; i++) { 7 singlethread.execute(new Runnable() { 8 @Override 9 public void run() { 10 System.out.println(Thread.currentThread().getName()); 11 } 12 }); 13 } 14 singlethread.shutdown();//關閉線程池 15 } 16 }
運行結果:
通過運行結果可以發現,始終只有一個線程在被反復運行着。
線程池的狀態:
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0)); private static final int COUNT_BITS = Integer.SIZE - 3; //32-3 private static final int CAPACITY = (1 << COUNT_BITS) - 1; // runState is stored in the high-order bits private static final int RUNNING = -1 << COUNT_BITS; private static final int SHUTDOWN = 0 << COUNT_BITS; private static final int STOP = 1 << COUNT_BITS; private static final int TIDYING = 2 << COUNT_BITS; private static final int TERMINATED = 3 << COUNT_BITS; // Packing and unpacking ctl private static int runStateOf(int c) { return c & ~CAPACITY; }//獲取高3位的線程狀態 private static int workerCountOf(int c) { return c & CAPACITY; }//獲取低29位的線程數量 private static int ctlOf(int rs, int wc) { return rs | wc; } //rs表示線程池狀態,wc表示線程池中線程數量,通過兩個參數可獲取ctl值
其中ctl是一個AtomicInteger類型的屬性,其提供了線程池的狀態及線程池中線程的數量的統計,其中高3位用於維護線程池的運行狀態,低29位用來維護線程池中線程的數量
RUNNING = -1 << COUNT_BITS;即高3位為1,低29位0,該狀態的線程池可以接受新任務,也會處理阻塞隊列中的任務 SHUTDOWN = 0 << COUNT_BITS:即高3位是0,低29位為0,該狀態的線程池不會接受新的任務,但是會處理阻塞隊列中的任務 STOP = 1 << COUNT_BITS:即高3位為001,低29位為0,該狀態的線程池不會接受新任務,也不會處理阻塞隊列中的任務,並且還會終止正在執行的任務 TIDYING = 2 << COUNT_BITS:即高3位為010,低29位為0,所有的任務都被終止了,線程的數量為0,還要調用terminated()方法 TERMINATED = 3 << COUNT_BITS:即高3位為011,低29位為0,terminated()方法執行之后的狀態 這些狀態是int類型,大小關系為RUNNING<SHUTDOWN<STOP<TIDYING<TERMINATED
任務提交過程:
execute():提交任務
public void execute(Runnable command) { //提交的任務不能為null if (command == null) throw new NullPointerException(); int c = ctl.get(); //判斷工作線程數量小於核心線程數量 if (workerCountOf(c) < corePoolSize) { //新創建一個線程來執行任務體====addWorker(command, true) if (addWorker(command, true)) //添加成功,返回 return; //添加任務失敗;線程狀態已經到showdown,不在接收新任務/工作線程數量大於核心線程數量 c = ctl.get(); } //線程是運行狀態,將任務添加到阻塞隊列 if (isRunning(c) && workQueue.offer(command)) { int recheck = ctl.get(); if (! isRunning(recheck) && remove(command)) reject(command); else if (workerCountOf(recheck) == 0) addWorker(null, false); } else if (!addWorker(command, false)) reject(command); }
Java並發包中的阻塞隊列一共7個,當然他們都是線程安全的。
DealyQueue:一個使用優先級隊列實現的無界阻塞隊列。
LinkedTransferQueue:一個由鏈表結構組成的無界阻塞隊列。
LinkedBlockingDeque:一個由鏈表結構組成的雙向阻塞隊列。
1.submit和execute的區別。
submit即可以提交Runnable類型的,也可以提交Callable類型的,而execute只能提交Runnable類型的。
submit方法具有返回值,Runnable返回的是null,Callable返回的是call方法的返回值。而execute方法不具有返回值,所以說execute方法也不能判斷線程是否執行完畢。
2.shutdown和shutdownNow的區別。
可以通過調用線程池的shutdown或shutdownNow方法來關閉線程池。它們的原理是遍歷線程池中的工作線程,然后逐個調用線程的interrupt方法來中斷線程,所以無法響應中斷的任務可能永遠無法終止。但是它們存在一定的區別,shutdownNow首先將線程池的狀態設置成STOP,然后嘗試停止所有的正在執行或暫停任務的線程,並返回等待執行任務的列表,而shutdown只是將線程池的狀態設置成SHUTDOWN狀態,然后中斷所有沒有正在執行任務的線程。