Java並發——任務執行(Executors、線程池)


本篇博文是Java並發編程實戰的筆記。

直接構建線程的問題

無論在單處理器還是多處理器系統中,多線程都能夠提高程序的整體性能,但是如果我們在程序中直接的構建線程,可能會出現一些問題:

public class DirectRunInNewThreadServer {
    public void serve() throws IOException {
        ServerSocket serverSocket = new ServerSocket();
        serverSocket.bind(new InetSocketAddress(80));
        while (true) {
            Socket socket = serverSocket.accept();
            new Thread(
                    () -> handleRequest(socket)
            ).start();
        }
    }
}

上面簡單的服務器程序為每個請求連接的客戶端都創建一個線程來處理它的請求,在高負載的情況下,它的性能不僅不會向預想中的高,而且還會造成很多風險,甚至造成服務器宕機。

該創建多少個線程

首先,無論你創建多少個線程,機器上的處理器資源是有限的,如果使用你的個人電腦,可能只有8個CPU(核)可用,也就是說最多同時只有8個線程在運行,就算加上超線程技術還有服務器上的CPU數量加成,那么最多也不會有很多個線程在同時運行。

那么只創建CPU個數個線程夠嗎?不一定,如果你處理用戶請求時需要執行的任務都是計算任務,那么夠了,因為CPU個數個線程足以讓全部CPU在高負載情況下全部保持忙碌狀態,如果你要執行的任務中包含IO任務呢(比如連接數據庫,讀寫網絡IO)?那就不夠了!因為這類任務的特性是,它們通常由於要等待外部系統的返回所以不能立即執行,在等待時,這個線程只能空等,這時如果再加入一個或一批線程,就能夠在它等待的同時繼續處理其它任務。

所以該創建多少個線程沒有一個萬全的策略,這通常和你要處理的任務類型有關。只是如果線程創建過多,CPU資源不僅不能得到更多的利用,而且頻繁創建切換線程的開銷還可能會蓋過它們所帶來的收益,而如果線程創建的過少,CPU資源可能不能被充分利用而導致它們時常處於空閑狀態,而實際上還有任務要處理

無限制創建線程的問題

  1. 如果handleRequest中要執行的內容很快就會返回(實際上在大部分事務型應用中是這樣),那么創建並銷毀線程的開銷甚至要比執行任務的開銷還大
  2. 如果線程數量超過了JVM或操作系統的最大線程數,程序可能會宕機
  3. 更多的線程占用更多的內存

線程池

通過上面的描述,我們也知道了線程的創建和銷毀是有開銷的,並且還不小,因為者涉及到系統調用,並且無限制的創建線程除了可能帶來性能的負收益以外,還有可能引入風險。

那能不能有一種容器使得我們可以復用已經創建的線程,並且限制容器內最大的可用線程數?那就是線程池。

現在我們要修改我們以往代碼中,任務的執行方式,要把下面的代碼改成線程池版本

Runnable task = ...;
new Thread(task).start();
Runnable task = ...;
// 將任務提交到線程池中

將上面的代碼修改會很麻煩,而且我們的代碼中可能還有一些地方就是需要直接創建線程,或者使用不同的線程池,或者干脆讓任務以單線程的方式執行,如果有一種同一一致的接口可以將輕松在這些種執行方式中替換就好了。

Executor

Executor是這樣的接口,它是一個可以執行已提交的Runnable任務的對象,它提供了一種將任務的提交和任務的執行方式解耦的手段,線程將如何被創建、使用和調度的細節將都由Executor來管控,你只需要提交任務對象即可。

你可以簡單的理解為Executor是任務的執行器,它會執行你提交的任務,你不用關心其內部是如何執行你的任務的

比如下面,我們對一種計算密集型的任務使用了能夠容納與當前CPU數量相同的線程的線程池來執行,以保證我們的程序能最大的利用CPU的處理能力並且不會創建出更多的無用線程。

private final static int NCPU =
        Runtime.getRuntime().availableProcessors();

@Test
void testFixedExecutor() throws InterruptedException {
    System.out.println("CPU COUNT : " + NCPU);
    // 創建具有與CPU個數相同個線程的線程池來執行任務
    Executor threadPool = Executors.newFixedThreadPool(NCPU);


    Runnable computeTask = () -> {
        int sum = 0;
        for (int i=0; i<100; i++) sum += i;
        System.out.println(Thread.currentThread() + " : " + sum);
    };

    // 執行計算任務100次
    for (int i=0; i<100; i++) {
        threadPool.execute(computeTask);
    }

}

假設有一天,我突然希望這些計算任務能夠串行被執行,只需要改動一行代碼即可:

Executor threadPool = Executors.newSingleThreadExecutor();

Executors中提供了一些工廠方法來創建不同類型的Executor,你也可以實現自己的Executor,該類的注釋中給出了一些例子:

第一個通過直接調用rrun方法來同步的執行每一個任務,它甚至沒有引入線程,這相當於SingleThreadExecutor,只不過比它更加簡單。第二個為每個任務創建一個新線程,相當於最初我們的簡單Web服務器的寫法。

下面是Executor接口的全部代碼

public interface Executor {
    /**
    * 在未來的某一時間執行command,這個command可能
    * 在一個新線程、一個被池化的線程或者調用者線程被執行
    * 具體取決於不同的Executor實現
    * 
    * Throws:
    *  RejectedExecutionException —— 如果command不能被接受(即Executor有權拒絕一個任務)
    *  NullPointerExecption —— 如果command == null */
    void execute(Runnable command);
}

之前直接使用線程的時候,或者單線程的時候,我們總有一些辦法來結束任務的執行,但是Executor接口並沒有給我們相關的方法來結束任務或者查看任務的相關狀態。

ExecutorService

JVM提供的Executor實際上都實現自ExecutorService接口,這個接口提供了一些擴展功能。

ExecutorService是一個提供了管理任務中斷的方法和一些產生用於跟蹤一個或多個異步任務進度的Future對象的方法的Executor

ExecutorService可以被結束,這將導致它將拒絕新來的所有任務。有兩個方法可以用於結束ExecutorServiceshutdown方法允許之前已提交的任務在ExecutorService進入中斷狀態(isTerminated()==true)之前執行,shutdownNow方法會阻止那些之前已提交但尚在等待狀態的任務被執行並嘗試關閉當前正在執行的任務。

一旦ExecutorService進入了中斷狀態,那么代表它當前沒有正在執行中的任務,沒有任務正在等待執行並且沒有新任務可以提交進來。

submit方法擴展了Executor.execute方法,它返回一個Future,這個Future可以被用來取消執行或等待執行完成,invokeAnyinvokeAll是批量執行任務的常用方式,執行一系列任務並且等待至少一個(Any)或是全部(All)執行完成。

ThreadPoolExecutor

一個使用多個被池化的線程中的一個來執行已提交任務的ExecutorService,該類一般不會直接被創建,通常是通過Executors中的工廠方法進行配置並創建。

該類提供了許多可調節的參數和擴展鈎子,這讓該類可以在很多情況下可以使用。盡管這樣,程序員還是被推薦使用更便捷的Executors工廠方法來創建該類的實例。

下面介紹該類的一些基礎配置參數,內容截取自官方文檔。

核心和最大池大小

一個線程池會自動的根據corePoolSizemaximumPoolSize自動調整池大小。當一個新的任務被提交,如果小於corePoolSize個線程正在運行,一個新的線程將被創建用來處理這個請求,盡管其它工作線程處於空閑狀態。否則,如果小於maximumPoolSize個線程正在運行並且僅當隊列已滿時,一個新的線程將被創建用來處理這個請求。

當你設置一致的corePoolSizemaximumPoolSize時,你將得到一個固定大小的線程池。通過將線程池的maximumPoolSize設為一個基本無界的值(如Integer.MAX_VALUE),代表你允許線程池容納任意數量的並發任務。一般來說,core和maximum pool大小在構造時就已經被創建,但是它們可以通過setCorePoolSizesetMaximumPoolSize被動態更改。

按需構造

默認情況下,每一個核心線程僅在新任務到達時被創建並start,但是這個行為可以通過prestartCoreThreadprestartAllCoreThreads被動態的覆蓋。如果你使用非空隊列構造池,那么你可能想要prestart線程。

創建新線程

新線程通過ThreadFactory來創建,如果未指定則使用Executors.defaultThreadFactory。它會將所有線程創建到一個ThreadGroup中,並且它們都具有NORM_PRIORITY優先級並且都是非守護線程。如果你想修改這個行為,提供自己的ThreadFactory即可。

keep-alive time

如果池中具有多於corePoolSize個線程,那么剩余的線程將在它們的空閑時間多於keepAliveTime時被終結。這是為了當任務不重時減少資源的占用。

getKeepAliveTimesetKeepAliveTime可以獲取和設置這個值,allowCoreThreadTimeout用於設定是否允許這個動態縮減線程池中線程的操作可以在池中線程小於等於corePoolSize時依舊縮減。

隊列

任何BlockingQueue都可以用來傳輸和保存提交的任務:

  1. 如果小於corePoolSize個線程正在運行,Executor總是傾向於創建一個新線程而非讓任務排隊
  2. 如果大於corePoolSize或更多個線程正在運行,Executor總是傾向於讓任務排隊,而非創建新線程
  3. 如果一個請求不能被排隊,一個新線程將被創建除非超出maximumPoolSize。這種情況下,這個任務將被拒絕。

三種通用的排隊策略:

  1. 直接交接:使用SynchronousQueue。如果沒有可用線程運行一個任務,那么這個任務的排隊嘗試將失敗,新線程將被創建。通常和無界的maximumPoolSize一起設置以避免拒絕新任務,但一旦任務到達的速度大於任務處理的速度,這樣做會耗盡系統資源。
  2. 無界隊列:使用無界隊列(LinkedBlockingQueue)將導致新任務在corePoolSize個線程都忙時等待它們中的一個變空閑,所以,池中不會有多於corePoolSize個線程。
  3. 有界隊列:如ArrayBlockingQueue。與有限的maximumPoolSize一起使用時,它可以防止系統資源被耗盡,但可能更難以調節和控制。隊列大小和maximumPoolSize的選擇應該折衷。使用大的隊列和小的池使得CPU和系統資源利用率,上下文切換開銷變小,但會造成人為的吞吐量降低;而使用小隊列和大的池則會遇到不可接受的調度開銷,這也會降低吞吐量。

核心參數的總結

所以一個線程池在創建一個任務時,會經歷如下階段:

  1. 判斷是否當前池中正在運行線程數小於corePoolSize,如果是就新建一個線程
  2. 否則,判斷池中是否具有空閑的線程,如果有就讓它執行
  3. 否則,判斷當前任務排隊隊列是否未滿,如果是則讓任務排隊
  4. 否則,判斷是否當前池中正在運行的線程數小於maximumPoolSize,如果是就創建一個線程
  5. 否則,拒絕該任務

上面五條是我在閱讀了官方文檔后自己總結的結論,如果不對歡迎指正。

Executors

下面看看Executors中的工廠方法。

newFixedThreadPool

該方法將corePoolSizemaximumPoolSize設成了一致的,說明池中最大具有nThreads個線程,keepAliveTime雖然被設置成了0毫秒,但是沒用,因為在沒有使用allowCoreThreadTimeout之前,空閑線程銷毀只對corePoolSize之外的線程有效,而這里沒有之外的線程。LinkedBlockingQueue代表該線程池永遠不會拒絕我們的任務。

newCachedThreadPool

corePoolSize是0,maximumPoolSize是Int最大值,這說明該線程池是一個無界線程池,keepAliveTime為60秒,這意味着任何線程如果處於空閑狀態60秒就會被銷毀(因為核心池大小為0),SynchronousQueue與無界線程池配合代表它不會拒絕用戶任務,線程池將為任何無法立即得到線程執行的任務將直接創建新線程。

newSingleThreadExecutor


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM