Executors類創建四種常見線程池


 

 

線程池架構

在這里插入圖片描述

上圖是線程池的架構圖。Java里面線程池的頂級接口是Executor,Executor並不是一個線程

池,而只是一個執行線程的工具。真正的線程池接口是ExecutorService。

比較重要的幾個類:

類/接口 描述
ExecutorService 真正的線程池接口
ScheduledExecutorService 能和Timer/TimerTask類似,解決那些需要任務重復執行的問題
ThreadPoolExecutor ExecutorService的默認實現
ScheduledThreadPoolExecutor 繼承ThreadPoolExecutor的ScheduledExecutorService接口實現,周期性任務調度的類實現

要配置一個線程池是比較復雜的,尤其是對於線程池的原理不是很清楚的情況下,很有可能配置的線程池不是較優的,因此在Executors類里面提供了一些靜態工廠,生成一些常用的線程池。

Java通過Executors工廠類提供四種線程池,分別為:

  1. newCachedThreadPool :創建一個可緩存線程池,如果線程池長度超過處理需要,可靈活回收空閑線程,若無可回收,否則新建線程。(線程最大並發數不可控制)
  2. newFixedThreadPool:創建一個固定大小的線程池,可控制線程最大並發數,超出的線程會在隊列中等待。
  3. newScheduledThreadPool : 創建一個定時線程池,支持定時及周期性任務執行。
  4. newSingleThreadExecutor :創建一個單線程化的線程池,它只會用唯一的工作線程來執行任務,保證所有任務按照指定順序(FIFO, LIFO, 優先級)執行。

我們先創建一個統一的線程任務,方便測試四種線程池

public class MyRunnable implements Runnable {

    @Override
    public void run() {
        System.out.println(Thread.currentThread().getName() + " is running...");
    }

}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8

newSingleThreadExecutor

public class SingleThreadExecutorTest {

    public static void main(String[] args) {
        ExecutorService executorService = Executors.newSingleThreadExecutor();
        MyRunnable myRunnable = new MyRunnable();
        for (int i = 0; i < 5; i++) {
            executorService.execute(myRunnable);
        }

        System.out.println("線程任務開始執行");
        executorService.shutdown();
    }

}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14

輸出結果

線程任務開始執行
pool-1-thread-1 is running...
pool-1-thread-1 is running...
pool-1-thread-1 is running...
pool-1-thread-1 is running...
pool-1-thread-1 is running...
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6

底層實現

/** * 核心線程池大小=1 * 最大線程池大小為1 * 線程過期時間為0ms * LinkedBlockingQueue作為工作隊列 */
public static ExecutorService newSingleThreadExecutor() {
    return new FinalizableDelegatedExecutorService
        (new ThreadPoolExecutor(1, 1,
                                0L, TimeUnit.MILLISECONDS,
                                new LinkedBlockingQueue<Runnable>()));
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12

從參數可以看出來,SingleThreadExecutor 相當於特殊的 FixedThreadPool,它的執行流程如下:

  1. 線程池中沒有線程時,新建一個線程執行任務
  2. 有一個線程以后,將任務加入阻塞隊列,不停的加
  3. 唯一的這一個線程不停地去隊列里取任務執行

SingleThreadExecutor 用於串行執行任務的場景,每個任務必須按順序執行,不需要並發執行

newFixedThreadPool

public class FixedThreadPoolTest {

    public static void main(String[] args) {
        ExecutorService executorService = Executors.newFixedThreadPool(2);
        MyRunnable myRunnable = new MyRunnable();
        for (int i = 0; i < 5; i++) {
            executorService.execute(myRunnable);
        }

        System.out.println("線程任務開始執行");
        executorService.shutdown();
    }

}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14

輸出結果

線程任務開始執行
pool-1-thread-1 is running...
pool-1-thread-1 is running...
pool-1-thread-2 is running...
pool-1-thread-1 is running...
pool-1-thread-2 is running...
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6

底層實現

/** * 核心線程池大小=傳入參數 * 最大線程池大小為傳入參數 * 線程過期時間為0ms * LinkedBlockingQueue作為工作隊列 */
public static ExecutorService newFixedThreadPool(int nThreads) {
    return new ThreadPoolExecutor(nThreads, nThreads,
                                  0L, TimeUnit.MILLISECONDS,
                                  new LinkedBlockingQueue<Runnable>());
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11

可以看到,FixedThreadPool 的核心線程數和最大線程數都是指定值,也就是說當線程池中的線程數超過核心線程數后,任務都會被放到阻塞隊列中。

此外 keepAliveTime 為 0,也就是多余的空余線程會被立即終止(由於這里沒有多余線程,這個參數也沒什么意義了)。

而這里選用的阻塞隊列是 LinkedBlockingQueue,使用的是默認容量 Integer.MAX_VALUE,相當於沒有上限。

因此這個線程池執行任務的流程如下:

  1. 線程數少於核心線程數,也就是設置的線程數時,新建線程執行任務
  2. 線程數等於核心線程數后,將任務加入阻塞隊列
  3. 由於隊列容量非常大,可以一直加
  4. 執行完任務的線程反復去隊列中取任務執行

FixedThreadPool 用於負載比較重的服務器,為了資源的合理利用,需要限制當前線程數量

newCachedThreadPool

public class CachedThreadPoolTest {

    public static void main(String[] args) {
        ExecutorService executorService = Executors.newCachedThreadPool();
        MyRunnable myRunnable = new MyRunnable();
        for (int i = 0; i < 5; i++) {
            executorService.execute(myRunnable);
        }

        System.out.println("線程任務開始執行");
        executorService.shutdown();
    }

}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14

輸出結果

線程任務開始執行
pool-1-thread-1 is running...
pool-1-thread-4 is running...
pool-1-thread-2 is running...
pool-1-thread-5 is running...
pool-1-thread-3 is running...
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6

底層實現

/** * 核心線程池大小=0 * 最大線程池大小為Integer.MAX_VALUE * 線程過期時間為60s * 使用SynchronousQueue作為工作隊列 */
public static ExecutorService newCachedThreadPool() {
    return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                  60L, TimeUnit.SECONDS,
                                  new SynchronousQueue<Runnable>());
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11

可以看到,CachedThreadPool 沒有核心線程,非核心線程數無上限,也就是全部使用外包,但是每個外包空閑的時間只有 60 秒,超過后就會被回收。

CachedThreadPool 使用的隊列是 SynchronousQueue,這個隊列的作用就是傳遞任務,並不會保存。

因此當提交任務的速度大於處理任務的速度時,每次提交一個任務,就會創建一個線程。極端情況下會創建過多的線程,耗盡 CPU 和內存資源。

它的執行流程如下:

  1. 沒有核心線程,直接向 SynchronousQueue 中提交任務
  2. 如果有空閑線程,就去取出任務執行;如果沒有空閑線程,就新建一個
  3. 執行完任務的線程有 60 秒生存時間,如果在這個時間內可以接到新任務,就可以繼續活下去,否則就拜拜
  4. 由於空閑 60 秒的線程會被終止,長時間保持空閑的 CachedThreadPool 不會占用任何資源。

CachedThreadPool 用於並發執行大量短期的小任務,或者是負載較輕的服務器

newScheduledThreadPool

public class ScheduledThreadPoolTest {

    public static void main(String[] args) {
        ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(3);
        MyRunnable myRunnable = new MyRunnable();
        for (int i = 0; i < 5; i++) {
            // 參數1:目標對象,參數2:隔多長時間開始執行線程,參數3:執行周期,參數4:時間單位
            scheduledExecutorService.scheduleAtFixedRate(myRunnable, 1, 2, TimeUnit.SECONDS);
        }

        System.out.println("線程任務開始執行");
    }

}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14

輸出結果

線程任務開始執行
// 打印【線程任務開始執行】后1秒輸出
pool-1-thread-1 is running...
pool-1-thread-2 is running...
pool-1-thread-1 is running...
pool-1-thread-3 is running...
pool-1-thread-2 is running...
// 2秒后輸出
pool-1-thread-1 is running...
pool-1-thread-3 is running...
pool-1-thread-2 is running...
pool-1-thread-1 is running...
pool-1-thread-3 is running...
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13

底層實現

/** * 核心線程池大小=傳入參數 * 最大線程池大小為Integer.MAX_VALUE * 線程過期時間為0ms * DelayedWorkQueue作為工作隊列 */
public ScheduledThreadPoolExecutor(int corePoolSize) {
    super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
          new DelayedWorkQueue());
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10

ScheduledThreadPoolExecutor 的執行流程如下:

  1. 添加一個任務
  2. 線程池中的線程從 DelayQueue 中取任務
  3. 然后執行任務

具體執行任務的步驟也比較復雜:

  1. 線程從 DelayQueue 中獲取 time 大於等於當前時間的 ScheduledFutureTask

  2. 執行完后修改這個 task 的 time 為下次被執行的時間

  3. 然后再把這個 task 放回隊列中

ScheduledThreadPoolExecutor 用於需要多個后台線程執行周期任務,同時需要限制線程數量的場景

Executors和ThreaPoolExecutor創建線程池的區別

Executors 各個方法的弊端:

  1. newFixedThreadPool 和 newSingleThreadExecutor:
    主要問題是堆積的請求處理隊列可能會耗費非常大的內存,甚至 OOM。
  2. newCachedThreadPool 和 newScheduledThreadPool:
    主要問題是線程數最大數是 Integer.MAX_VALUE,可能會創建數量非常多的線程,甚至 OOM。

ThreaPoolExecutor

  1. 創建線程池方式只有一種,就是走它的構造函數,參數自己指定

兩種提交任務的方法

ExecutorService 提供了兩種提交任務的方法:

  • execute():提交不需要返回值的任務
  • submit():提交需要返回值的任務

execute

void execute(Runnable command);
  • 1

execute() 的參數是一個 Runnable,也沒有返回值。因此提交后無法判斷該任務是否被線程池執行成功。

ExecutorService executor = Executors.newCachedThreadPool();
executor.execute(new Runnable() {
    @Override
    public void run() {
        //do something
    }
});
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7

submit

<T> Future<T> submit(Callable<T> task);
<T> Future<T> submit(Runnable task, T result);
Future<?> submit(Runnable task);
  • 1
  • 2
  • 3

submit() 有三種重載,參數可以是 Callable 也可以是 Runnable。

同時它會返回一個 Funture 對象,通過它我們可以判斷任務是否執行成功。

獲得執行結果調用 Future.get() 方法,這個方法會阻塞當前線程直到任務完成。

提交一個 Callable 任務時,需要使用 FutureTask 包一層:

FutureTask futureTask = new FutureTask(new Callable<String>() {    //創建 Callable 任務
    @Override
    public String call() throws Exception {
        String result = "";
        //do something
        return result;
    }
});
Future<?> submit = executor.submit(futureTask);    //提交到線程池
try {
    Object result = submit.get();    //獲取結果
} catch (InterruptedException e) {
    e.printStackTrace();
} catch (ExecutionException e) {
    e.printStackTrace();
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM