Java當中的線程池是通過Executor這個框架接口來實現的,該框架當中用到了Executor,Executors工具類,ExecutorService,ThreadPoolExecutor
Executors創建線程的三種方法:
ExecutorService threadPool = Executors.newFixedThreadPool(5); //固定容量 ExecutorService threadPool = Executors.newSingleThreadExecutor(); //單例的、單個線程的線程池 ExecutorService threadPool = Executors.newCachedThreadPool(); //緩存的 即超出就自動創建線程的
接下來講解一下這三個的區別:
固定容量的線程池
首先我們看的是第一個固定容量的線程池Executors.newFixedThreadPool(5);
:
首先看代碼:
/** * 主要特點:線程復用;控制最大並發數;管理線程。 * * @author Cocowwy * @create 2020-05-05-20:20 * Executor/ExecutorServic(Interface) * Executors 線程池的工具類 */ public class MyThreadPoolDemo { public static void main(String[] args) { //一池五個受理線程 ExecutorService threadPool = Executors.newFixedThreadPool(5); //看源碼是LinkedBlockingQueue<Runnable>() try { //模擬10個用戶辦理業務,但是只有5個受理窗口 for (int i = 0; i < 10; i++) { threadPool.execute(() -> { System.out.println(Thread.currentThread().getName() + "\t" + "辦理業務"); }); Thread.sleep(400); } } catch (Exception e) { e.printStackTrace(); } finally { threadPool.shutdown(); //關閉線程池 } } }
結果如下:
接着我們加上一句線程睡眠一小會的代碼:
public class MyThreadPoolDemo { public static void main(String[] args) { //一池五個受理線程 ExecutorService threadPool = Executors.newFixedThreadPool(5); //看源碼是LinkedBlockingQueue<Runnable>() try { //模擬10個用戶辦理業務,但是只有5個受理窗口 for (int i = 0; i < 10; i++) { threadPool.execute(() -> { System.out.println(Thread.currentThread().getName() + "\t" + "辦理業務"); }); Thread.sleep(400); } } catch (Exception e) { e.printStackTrace(); } finally { threadPool.shutdown(); //關閉線程池 } } }
在這里我們可以看到有序辦理了每個業務。可以看出這個是固定了大小的線程池,每次都是從這個線程池中取的線程。
單例的線程池
這是第二個,單例的線程池:
ExecutorService threadPool = Executors.newSingleThreadExecutor(); //一池1個受理線程
public class MyThreadPoolDemo { public static void main(String[] args) { // ExecutorService threadPool = Executors.newFixedThreadPool(5); //一池五個受理線程,看源碼是LinkedBlockingQueue<Runnable>() ExecutorService threadPool = Executors.newSingleThreadExecutor(); //一池1個受理線程 try { for (int i = 0; i < 10; i++) { threadPool.execute(() -> { System.out.println(Thread.currentThread().getName() + "\t" + "辦理業務"); }); } } catch (Exception e) { e.printStackTrace(); } finally { threadPool.shutdown(); //關閉線程池 } } }
我們可以看到一直是一個線程在受理業務。
可擴展的線程池
接下來是第三個線程池:ExecutorService threadPool = Executors.newCachedThreadPool(); //一池N個受理線程 可擴展的
ExecutorService threadPool = Executors.newCachedThreadPool(); //一池N個受理線程 可擴展的
接下來上代碼:
public class MyThreadPoolDemo { public static void main(String[] args) { // ExecutorService threadPool = Executors.newFixedThreadPool(5); //一池五個受理線程,看源碼是LinkedBlockingQueue<Runnable>() // ExecutorService threadPool = Executors.newSingleThreadExecutor(); //一池1個受理線程 ExecutorService threadPool = Executors.newCachedThreadPool(); //一池N個受理線程 try { for (int i = 0; i < 10; i++) { threadPool.execute(() -> { try { Thread.sleep(400); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(Thread.currentThread().getName() + "\t" + "辦理業務"); }); } } catch (Exception e) { e.printStackTrace(); } finally { threadPool.shutdown(); //關閉線程池 } } }
先看看效果:
在上面的代碼中,我們可以發現的是,對從線程池取的線程睡了0.4s,然而卻可以發現創建出了3,6,9,10,1,8,7,4,5.....這么多的線程
因為我們睡眠的時間太短了,表明需要受理的業務頻率太多,所以才開辟了這么多的線程去處理。
tips:我們可以設置睡眠時間,來控制線程池的開辟數量。當我們將睡眠時間設置的盡可能的大,那么開辟的線程數自然而然的就少了下來,證明需要受理的業務不那么頻繁
所以我們可以發現當請求過多,過於頻繁的時候使用可擴展的線程池newCachedThreadPool將會創建更多的線程。
線程池的源碼
首先點進newFixedThreadPool()的源碼可以看到:
接下來點進去newSingleThreadExecutor()的源碼可以看到:
接下來點進去newCachedThreadPool()的源碼可以看到:
綜上所述,返回的實際上只是一個ThreadPoolExecutor
(可以看看繼承圖),利用構造器傳入的不同的參數而已,而且我們也能發現底層是阻塞隊列。
同時說明我們也可以通過ThreadPoolExecutor`來創建線程池,Executors只是一個創建線程池的工具類,實際上返回的還是ThreadPoolExecutor。
ThreadPoolExecutor的七大參數
接着我們繼續點進ThreadPoolExecutor
:
接着再點進這this,我們可以看到它有七個參數,:corePoolSize,maximumPoolSize,keepAliveTime,unit,workQueue,threadFactory,handler
下圖是這七大參數的解釋:
線程池的底層工作原理圖
接下來結合下圖理解理解上述的7大參數:
首先看看線程池的底層工作原理圖:
看上圖以及參數解析對照我們可以知道maximumPool包含corePool,maximumPool表示最多能放的線程數,而corePool表示的就是線程的常駐數,可以理解為銀行的有最多有5個受理窗口,但是常用的卻只有2個。
而候客區就相當於我們的阻塞隊列(BlockingQueue),那當我們的阻塞隊列滿了之后,handle拒絕策略出來了,相當於銀行門口立了塊牌子,上面寫着不辦理后面的業務了!
然后當客戶都辦理的差不多了,此時多出來(在corePool的基礎上擴容的窗口)的窗口在經過keepAliveTime的時間后就關閉了,重新恢復到corePool個受理窗口。
總結一下線程池的工作流程:
首先線程池接收到任務,先判斷核心線程數是否滿了,如果corepool沒有滿接客則核心(常駐)線程處理。
常駐線程滿了就放到阻塞隊列,如果阻塞隊列沒滿,這些任務放在阻塞隊列。
如果阻塞隊列也滿了,就擴容線程數到最大線程數。
如果最大線程數也滿了,就是我們的拒絕策略。
這就是線程池四大步驟。 接客、放入隊列,擴容線程,拒絕策略!
也可以看下圖流程解釋:太妙了!!
實際開發當中如何合適的使用線程池
為什么不建議使用Executors工具類去創建線程池?
舉個例子,回到之前講的 newSingleThreadExecutor(); ;
以及Executors.newCachedThreadPool( );
創建的線程池,看看源碼,
正如源碼中看到的那樣:
如果用Executors去創建,默認的Integer.MAX_VALUE的大小是21億............極大的消耗內存,線程池永遠不會慢,內存會被你壓爆
又如下面源碼:
//這是Single的 public static ExecutorService newSingleThreadExecutor() { return new FinalizableDelegatedExecutorService (new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>())); } } //點進去LinkedBlockQueue public LinkedBlockingQueue() { this(Integer.MAX_VALUE); } //這是Cahed的 public static ExecutorService newCachedThreadPool() { return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>()); }
對於newSingleThreadExecutor()而言,LinkedBlockQueue的長度是Integer.MAX_VALUE,
對於newCachedThreadPool()而言,maximumPool的值竟然為Integer.MAX_VALUE!!
兩者均會導致OOM異常!
自定義線程池
public class MyThreadPoolDemo { public static void main(String[] args) { //自定義線程池 ThreadPoolExecutor threadPool = new ThreadPoolExecutor(2, 5, 2L, TimeUnit.SECONDS, new LinkedBlockingQueue<>(3), //不寫的話默認也是Integer.MAX_VALUE Executors.defaultThreadFactory(), new ThreadPoolExecutor.AbortPolicy());//默認的拒絕策略 try { //模擬10個用戶辦理業務,但是只有5個受理窗口 for (int i = 0; i < 9; i++) { threadPool.execute(() -> { System.out.println(Thread.currentThread().getName() + "\t" + "辦理業務"); }); } } catch (Exception e) { e.printStackTrace(); } finally { threadPool.shutdown(); //關閉線程池 } }
threadPool 是我們自定義的線程池,連接過上面的參數的應該都知道。
該線程池最大支持的並發量就應該是maximumPool+Queue的大小,即5+3=8,而超過了大小之后就會報錯:java.util.concurrent.RejectedExecutionException
拒絕執行異常
線程池的四大拒絕策略
接下來我們看看線程池的四大拒絕策略,上述是JDK默認的拒絕策略:
接下來看看另外三種策略的運行結果
將上述代碼的拒絕策略改成第二種new ThreadPoolExecutor.CallerRunsPolicy(),回退到原始調用者,這里之main線程
第三種new ThreadPoolExecutor.DiscardOldestPolicy()
:不報錯。
第四種new ThreadPoolExecutor.DiscardPolicy()
: 同樣不報錯。
以上策略均繼承自RejectedExecutionHandler
接口。
怎么設置maximumPoolSize合理
最后提一句怎么設置maximumPoolSize合理,
了解:IO密集型,CPU密集型:(調優)
1、CPU 密集型,一般設置為CPU核數加1,可以保持CPu的效率最高!
System.out.println(Runtime.getRuntime().availableProcessors()); //獲取CPU的核數,8核
2、IO 密集型, 判斷你程序中十分耗IO的線程