說線程池必須說隊列,因為線程池跟隊列有着莫大的關系
一、阻塞隊列(7個):
數組阻塞隊列、鏈表阻塞隊列、優先級排序隊列,還有對應的無界阻塞隊列,另外還有雙向阻塞隊列,排序規則分為先進先出FIFO 與先進后出LIFO兩種。
對於阻塞隊列,針對插入與移除有有4種操作方式。如下:
| 方法 | 拋出異常 | 返回特殊值 | 一直阻塞 | 超時退出 |
| 插入 | add(e) | offer | put | offer(e,time,unit) |
| 移除 | remove | poll | take | poll(time,unit) |
| 檢查 | element | peek | 不可用 | 不可用 |
測試(有界隊列):
1.拋出異常:

輸出:

第一次正常:

輸出:

第二次隊列設置長度為1,add方法調用了2次,結果拋出異常。
2.返回特殊值

輸出:

由結果看出,使用offer方法不會拋出異常,在添加元素時如果隊列已滿,返回失敗標識,當使用移除方法poll時,如果隊列已空,則會返回null
3.一直阻塞:put/take

輸出:

由結果看出,因為有界隊列為長度為1,主線程執行了2次put方法,第二次因為隊列已滿,所以會一直阻塞當前線程知道隊列不滿時才會繼續執行,修改一下,如下:

輸出:

執行完畢,由結果看出,上面代碼執行開始執行后,主線程與子線程亂序輸出,但是可以看出當主線程執行第二次put方法后會等待子線程take后,主線程再執行take,也就是說當線程滿后put方法會導致當前線程阻塞,當線程空了也會當作當前線程阻塞,我們可以再原先的代碼上再多加一句。如下:

輸出:

首先主線程等待子線程先執行,子線程首先執行take方法,因為隊列中沒有元素,所以子線程等待,2秒后主線程開始執行,第一次put后,子線程發現隊列中有元素了,所以不再阻塞,進入runable狀態,接着線程再執行take 方法。結束。
總之,關於上面3類方法,各有特色。(當然也有可能主線程會先執行take方法,但是這種可能性為0,想一想,因為put方法是連續執行的,再執行第二個put時,因為隊列已滿,所以必須要等待隊列非滿時,主線程才不會阻塞)

1.ArrayBlockingQueue 數組有界阻塞隊列FIFO
按照阻塞的先后順序訪問隊列,默認情況下不保證線程公平的訪問隊列~如果要保證公平性,會降低一定的吞吐量,源碼如下:


2.LinkedBlockingQueue
鏈表有界阻塞隊列,默認最大長度為Integer.MAX_VALUE。此隊列按照先進先出的原則對元素進行排序。
3.PriorityBlockingQueue
優先級隊列,可以自定義排序方法,但是對於同級元素不能保證順序
4.DelayQueue
延遲獲取元素隊列,指定時間后獲取,為無界阻塞隊列。
5.SynchronousQueue
不存儲元素的阻塞隊列。每一個put操作必須訂單tabke 操作,否則不能繼續添加元素。
6.LinkedTransfetQueue
無界阻塞隊列,多了tryTransfer 和transfet方法
7.LinkedBlockingQueue
鏈表結構組成的雙向阻塞隊列。可以從隊列的兩端插入和移除元素。
另外:非阻塞算法的安全隊列- ConcurrentLinkedQueue(CAS 實現 compare and swap)
首先它是一個基礎鏈接節點的無界的線程安全隊列,有head和tail組成~沒了,個人對線程理解比較淺,有興趣的小伙伴可以研究一下~
二、線程池
使用線程池的好處:
1.降低資源消耗:重復利用已創建的線程降低線程創建和銷毀造成的消耗
2.提高響應速度:任務到達時,任務不需要等待線程創建
3.提高線程的可管理性:可以對線程統一分配、調優和監控。
關於線程池的實現原理:
1.判斷核心線程線程是否都在執行任務,如不是,則創建一個工作線程來執行任務,否則進入2
2.判斷隊列是不是滿了,如果沒有則提交任務到工作隊列,否則進入3
3.判斷線程池是否都在工作,如果沒有則創建一個新的工作線程執行任務。否則,交給飽和策略來處理這個任務。
線程池中創建新線程執行任務的時候需要獲取全局鎖,所以java在執行任務的時候,如果在第二個步驟盡量進入到隊列中,因為其不需要獲取全局鎖,在執行execute方法時,工作線程在執行完任務后會從隊列中獲取工作任務執行。
構造方法:

參數:

1. corePoolSize:線程池基本大小,當提交一個新任務到工作線程時,線程池會創建一個線程來執行任務,即使空閑的基本線程能夠執行新任務也會創建新線程,等到任務數大於線程基本大小時就不再創建,如果
調用prestartAllCoreThreads方法,線程池會提前創建並啟動所有基本線程。
2.maximumPoolSize:線程池中允許的最大線程數。如果核心線程池滿了,會丟到隊列中,如果隊列也滿了(這里說的是有界隊列),會創建一個新的線程執行這個工作任務。打個比方:過節回家的時候,高速和國道,假設高速允許100輛車同時上高速,
允許有100輛車可以在入口等待,國道允許200輛車通行,那么也就是說允許同時出現100+100+200 輛車,也就是400輛,這就是最大線程數(不是很貼切,理解就好)
3.keeyAliveTime:線程活動保持時間,線程空閑后,保持的時間。如果超過時間則銷毀線程。
4.TimeUnit:時間單位
5.阻塞隊列:用於保存等待執行任務的阻塞隊列,數組有界隊列、鏈表隊列、同步隊列、優先級無界隊列。newFixedThreadPool 使用的是鏈表隊列。
6.ThreadFactory:創建線程的工廠。
7.RejectedRxecutionHandler:飽和策略。也就是說當超過了最大線程池數量,那么會執行飽和策略。以2中的例子接着說,如果車輛超過了400輛,那么從第401輛車開始需要給出另外的解決方法,可以是在國道入口排隊,或則直接讓你坐火車去~
提交任務的方法:
1.execute:不返回值
2.submit:返回值(future類型對象)
如下:
public static void main(String[] args) { // 創建線程池 ExecutorService executor = Executors.newFixedThreadPool(10); Runnable runnable = new Runnable() { @Override public void run() { System.out.println(Thread.currentThread().getName()); } }; Callable callable = new Callable() { @Override public Object call() throws Exception { return "hi 我是返回值"; } }; executor.execute(runnable); Future future = executor.submit(callable); try { Object o = future.get(); System.out.println(o); } catch (InterruptedException e) { e.printStackTrace(); } catch (ExecutionException e) { e.printStackTrace(); } executor.shutdown(); }
輸出結果:

future 是一個接口類,其實現類是FutureTask該類源碼如下:


Callable 類似Runnable 但是它會有返回值。
關於關閉線程:
推薦使用shutdown 而不是shutdownNiw,shutdown 會遍歷線程池中的線程,並逐個關閉其狀態,對於正在執行線程會等待其執行結束(interrupt),shutdownNow類似立即停止,所以~推薦使用shutdown就好,當然如果有特殊情況除外。
監控:
通過繼承線程池可以自定義線程池,重寫beforeExecute、afterExecute、terminated方法,在開始前、執行后、中止的時候調用。如:
static class TestCustomMyPool extends ThreadPoolExecutor { public TestCustomMyPool(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) { super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue); } @Override protected void beforeExecute(Thread t, Runnable r) { super.beforeExecute(t, r); System.out.println("開始執行"); } @Override protected void afterExecute(Runnable r, Throwable t) { super.afterExecute(r, t); System.out.println("執行結束"); } @Override protected void terminated() { super.terminated(); System.out.println("中止"); } } public static void main(String[] args) { TestCustomMyPool testCustomMyPool = new TestCustomMyPool(10, 10, 0L, TimeUnit.MICROSECONDS, new LinkedBlockingQueue<>()); Runnable runnable = new Runnable() { @Override public void run() { System.out.println("正在執行線程"); } }; testCustomMyPool.execute(runnable); testCustomMyPool.shutdown(); }
輸出:

由結果看出,線程在執行前后執行了重寫的方法。
另外,除了這些,線程池還有其他一些方法可以來監控線程池的情況,如:
public static void main(String[] args) { TestCustomMyPool testCustomMyPool = new TestCustomMyPool(10, 10, 0L, TimeUnit.MICROSECONDS, new LinkedBlockingQueue<>()); Runnable runnable = new Runnable() { @Override public void run() { System.out.println("正在執行線程"); } }; testCustomMyPool.execute(runnable); try { Thread.sleep(2000); } catch (InterruptedException e) { e.printStackTrace(); } // 需要執行的任務數量 System.out.println("需要執行的任務數量:"+testCustomMyPool.getTaskCount()); // 已完成的任務數量 System.out.println("已完成的任務數量:"+testCustomMyPool.getCompletedTaskCount()); // 創建最大線程數量 System.out.println("創建最大線程數量:"+testCustomMyPool.getLargestPoolSize()); // 線程池的線程數量 System.out.println("線程池的線程數量:"+testCustomMyPool.getPoolSize()); // 線程池活動的線程數 System.out.println("線程池活動的線程數:"+testCustomMyPool.getActiveCount()); testCustomMyPool.shutdown(); }
輸出:

關於future,這里有個參考鏈接,有興趣的小伙伴也可以參考這位博主的文章 https://www.cnblogs.com/dolphin0520/p/3949310.html
