深入理解Java自帶的線程池和緩沖隊列


前言

線程池是什么

線程池的概念是初始化線程池時在池中創建空閑的線程,一但有工作任務,可直接使用線程池中的線程進行執行工作任務,任務執行完成后又返回線程池中成為空閑線程。使用線程池可以減少線程的創建和銷毀,提高性能。

舉個例子:我是一個包工頭,代表線程池,手底下有若干工人代表線程池中的線程。如果我沒接到項目,那么工人就相當於線程池中的空閑線程,一但我接到了項目,我可以立刻讓我手下的工人去工作,每個工人同一時間執行只執行一個工作任務,執行完了就去

執行另一個工作任務,知道沒有工作任務了,這時工人就可以休息了(原諒我讓工人無休止的工作),也就是又變成了線程池中的空閑線程池。

隊列是什么

隊列作為一個緩沖的工具,當沒有足夠的線程去處理任務時,可以將任務放進隊列中,以隊列先進先出的特性來執行工作任務

舉個例子,我又是一個包工頭,一開始我只接了一個小項目,所以只有三個工作任務,但我手底下有四個工人,那么其中三人各領一個工作任務去執行就好了,剩下一個人就先休息。但突然我又接到了幾個大項目,那么有現在有很多工作任務了,但手底下的工人不夠啊。

那么我有兩個選擇:

(1)雇佣更多的工人

(2)把工作任務記錄下來,按先來后到的順序執行

但雇佣更多等工人需要成本啊,對應到計算機就是資源的不足,所以我只能把工作任務先記錄下來,這樣就成了一個隊列了。

為什么要使用線程池

假設我又是一個包工頭,我現在手底下沒有工人了,但我接到了一個項目,有了工作任務要執行,那我肯定要去找工人了,但招人成本是很高的,工作完成后還要給遣散費,這樣算起來好像不值,所以我事先雇佣了固定的幾個工人作為我的長期員工,有工作任務就干活,沒有就休息,如果工作任務實在太

多,那我也可以再臨時雇佣幾個工人。一來二去工作效率高了,付出的成本也低了。Java自帶的線程池的原理也是如此。

Java自帶的線程池

Executor接口是Executor的父接口,基於生產者--消費者模式,提交任務的操作相當於生產者,執行任務的線程則相當於消費者,如果要在程序中實現一個生產者--消費者的設計,那么最簡單的方式通常是使用Executor。

ExecutorService接口是對Executor接口的擴展,提供了對生命周期的支持,以及統計信息收集、應用程序管理機制和性能監視等機制。

常用的使用方法是調用Executors中的靜態方法來創建一個連接池。

(1)newFixedThreadPool

代碼演示:

 1 public class ThreadPoolTest {
 2     public static void main(String[] args){
 3         ExecutorService executor = Executors.newFixedThreadPool(3);
 4         for (int i = 0; i < 4; i++){
 5             Runnable runnable = new Runnable() {
 6                 public void run() {
 7                     CountDownLatch countDownLatch = new CountDownLatch(1); //計數器,用於阻塞線程
 8                     System.out.println(Thread.currentThread().getName() + "正在執行");
 9                     try {
10                         countDownLatch.await();
11                     } catch (InterruptedException e) {
12                         e.printStackTrace();
13                     }
14                 }
15             };
16             executor.execute(runnable);
17         }
18     }
19 }

測試結果:

pool-1-thread-1正在執行
pool-1-thread-3正在執行
pool-1-thread-2正在執行

newFixedThreadPool將創建一個固定長度的線程池,每當提交一個任務時就會創建一個線程,直到達線程池的最大數量,這時線程池的規模不再變化(如果某個線程由於發生了未預期的Exception而結束,那么線程池會補充一個新的線程)。上述代碼中最大的線程數是3,但我提交了4個任務,而且每個任務都阻塞住,所以前三個任務占用了線程池所有的線程,那么第四個任務永遠也不會執行,因此該線程池配套使用的隊列也是無界的。所以在使用該方法創建線程池時要根據實際情況看需要執行的任務是否占用過多時間,會不會影響后面任務的執行。

(2)newCachedThreadPool

測試代碼:

 1 public class ThreadPoolTest {
 2     public static void main(String[] args){
 3         ExecutorService executor = Executors.newCachedThreadPool();
 4         for (int i = 0; i < 4; i++){
 5             Runnable runnable = new Runnable() {
 6                 public void run() {
 7                     CountDownLatch countDownLatch = new CountDownLatch(1); //計數器,用於阻塞線程
 8                     System.out.println(Thread.currentThread().getName() + "正在執行");
 9                     try {
10                         countDownLatch.await();
11                     } catch (InterruptedException e) {
12                         e.printStackTrace();
13                     }
14                 }
15             };
16             executor.execute(runnable);
17         }
18     }
19 }

測試結果:

pool-1-thread-1正在執行
pool-1-thread-3正在執行
pool-1-thread-2正在執行
pool-1-thread-4正在執行

newCachedThreadPool將創建一個可緩存的線程池。如果線程池的當前規模超過了處理需求時,那么就會回收部分空閑的線程(根據空閑時間來回收),當需求增加時,此線程池又可以智能的添加新線程來處理任務。此線程池不會對線程池大小做限制,線程池大小完全依賴於操作系統(或者說JVM)能夠創建的最大線程大小。

(3)newSingleThreadExecutor

測試代碼:

 1 public class ThreadPoolTest {
 2     public static void main(String[] args){
 3         ExecutorService executor = Executors.newSingleThreadExecutor();
 4         for (int i = 0; i < 4; i++){
 5             final int index = i;
 6             Runnable runnable = new Runnable() {
 7                 public void run() {
 8                     System.out.println(Thread.currentThread().getName() + "正在執行工作任務--- >" + index);
 9                 }
10             };
11             executor.execute(runnable);
12         }
13     }
14 }

測試結果:

pool-1-thread-1正在執行工作任務--- >0
pool-1-thread-1正在執行工作任務--- >1
pool-1-thread-1正在執行工作任務--- >2
pool-1-thread-1正在執行工作任務--- >3

newSingleThreadExecutor是一個單線程的Executor,它創建單個工作者線程來執行任務,如果這個線程異常結束,會創建另一個線程來代替。newSingleThreadExecutor能確保依照任務在隊列中的順序來串行執行。

(4)newScheduledThreadPool

測試代碼:

 1 public class ThreadPoolTest {
 2     public static void main(String[] args){
 3         ScheduledExecutorService executor = Executors.newScheduledThreadPool(3);
 4         for (int i = 0; i < 3; i++){
 5             final int index = i;
 6             Runnable runnable = new Runnable() {
 7                 public void run() {
 8                     System.out.println(Thread.currentThread().getName() + "延時1s后,每5s執行一次工作任務--- >" + index);
 9                 }
10             };
11             executor.scheduleAtFixedRate(runnable,1,5,TimeUnit.SECONDS);
12         }
13     }
14 }

測試結果:

pool-1-thread-1延時1s后,每5s執行一次工作任務--- >0
pool-1-thread-2延時1s后,每5s執行一次工作任務--- >1
pool-1-thread-3延時1s后,每5s執行一次工作任務--- >2
pool-1-thread-1延時1s后,每5s執行一次工作任務--- >0
pool-1-thread-3延時1s后,每5s執行一次工作任務--- >2
pool-1-thread-2延時1s后,每5s執行一次工作任務--- >1

newScheduledThreadPool創建了一個固定長度的線程池,而且以延遲或定時或周期的方式來執行任務,類似於Timer。可應用於重發機制。

以上四種創建線程池的方法其實都是調用以下這個方法,只是參數不一樣

corePoolSize  ---------------------> 核心線程數

maximumPoolSize ---------------> 最大線程數

keepAliveTime --------------------> 當線程數大於核心時,此為終止前多余的空閑線程等待新任務的最長時間

unit -----------------------------------> 時間單位

workQueue ------------------------> 用於存儲工作工人的隊列

threadFactory ---------------------> 創建線程的工廠

handler ------------------------------> 由於超出線程范圍和隊列容量而使執行被阻塞時所使用的處理程序

常用的幾種隊列

(1)ArrayBlockingQueue:規定大小的BlockingQueue,其構造必須指定大小。其所含的對象是FIFO順序排序的。

(2)LinkedBlockingQueue:大小不固定的BlockingQueue,若其構造時指定大小,生成的BlockingQueue有大小限制,不指定大小,其大小有Integer.MAX_VALUE來決定。其所含的對象是FIFO順序排序的。

(3)PriorityBlockingQueue:類似於LinkedBlockingQueue,但是其所含對象的排序不是FIFO,而是依據對象的自然順序或者構造函數的Comparator決定。

(4)SynchronizedQueue:特殊的BlockingQueue,對其的操作必須是放和取交替完成。

排隊策略(以下排隊策略文字來自------->https://www.oschina.net/question/565065_86540

排隊有三種通用策略:

直接提交。工作隊列的默認選項是 SynchronousQueue,它將任務直接提交給線程而不保持它們。在此,如果不存在可用於立即運行任務的線程,則試圖把任務加入隊列將失敗,因此會構造一個新的線程。此策略可以避免在處理可能具有內部依賴性的請求集時出現鎖。直接提交通常要求無界 maximumPoolSizes 以避免拒絕新提交的任務。當命令以超過隊列所能處理的平均數連續到達時,此策略允許無界線程具有增長的可能性。

無界隊列。使用無界隊列(例如,不具有預定義容量的 LinkedBlockingQueue)將導致在所有 corePoolSize 線程都忙時新任務在隊列中等待。這樣,創建的線程就不會超過 corePoolSize。(因此,maximumPoolSize的值也就無效了。)當每個任務完全獨立於其他任務,即任務執行互不影響時,適合於使用無界隊列;例如,在 Web頁服務器中。這種排隊可用於處理瞬態突發請求,當命令以超過隊列所能處理的平均數連續到達時,此策略允許無界線程具有增長的可能性。

有界隊列。當使用有限的 maximumPoolSizes時,有界隊列(如 ArrayBlockingQueue)有助於防止資源耗盡,但是可能較難調整和控制。隊列大小和最大池大小可能需要相互折衷:使用大型隊列和小型池可以最大限度地降低 CPU 使用率、操作系統資源和上下文切換開銷,但是可能導致人工降低吞吐量。如果任務頻繁阻塞(例如,如果它們是 I/O邊界),則系統可能為超過您許可的更多線程安排時間。使用小型隊列通常要求較大的池大小,CPU使用率較高,但是可能遇到不可接受的調度開銷,這樣也會降低吞吐量。

我們選其中的LinkedBlockingQueue隊列來解析

在上述Java自帶的創建線程池的方法中,newFixedThreadPool使用的隊列就是LinkedBlockingQueue

public static ExecutorService newFixedThreadPool(int nThreads) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>());
    }

如果需要執行的工作任務少於核心線程數,那么直接使用線程池的空閑線程執行任務,如果任務不斷增加,超過核心線程數,那么任務將被放進隊列中,而且是沒有限制的,線程池中的線程也不會增加。

其他線程池的工作隊列也是根據排隊的通用策略來進行工作,看客們可以自己分析。

總結:

沒有創建連接池的方式,只有最適合的方式,使用Java自帶的方式創建或者自己創建連接池都是可行的,但都要依照自身的業務情況選擇合適的方式。

如果你的工作任務的數量在不同時間差距很大,那么如果使用newFixedThreadPool創建固定的線程就不合適,創建少了到時隊列里會塞進太多的工作任務導致處理不及時,創建多了會導致工作任務少時有太多的線程處於空閑狀態造成資源浪費。

所以還是需要根據實際情況使用適合的創建方式。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM