Callable、Future和線程池(ThreadPoolExecutor)的基礎學習


本文介紹另外兩種創建多線程的方式,這兩種方式我們在實際中會用的多一點,尤其是線程池。而在前面文章中我們講述了創建線程最基本的兩種方式:一種是直接繼承Thread,另外一種就是實現Runnable接口。但是這兩種方式創建線程有一個缺陷,那就是無法獲取到線程運行后的結果,因為這兩個方式都是重寫了 run()方法,而run()方法是用void修飾的。所以后來就有了Callable和Future這兩個接口,它們能夠獲取線程執行的結果。

1、Callable介紹

Callable是在JDK1.5中出現的接口,它和Runnable接口很相似,所以可以認為:Callable接口是Runnable接口的增強版,因為Runnable有的功能Callable都有,而且還能獲取任務執行的結果。所以下面來看一下Callable和Runnable接口的對比:

先來看一下Runnable接口的源碼:

public interface Runnable {
    public abstract void run();
}

Callable接口的源代碼:

public interface Callable<V> {
    V call() throws Exception;
}

可以很明顯的看出它們二者的區別:

  1. Callable使用的是call(),而Runnable中使用的是run()。
  2. Callable的call()可以拋出異常,而Runnable的run()不會拋出異常。
  3. Callable能接受一個泛型,然后在call()中返回一個這個類型的值。而Runnable的run()沒有返回值。
  4. 補充:Callable不能直接替換Runnable,因為Thread類的構造方法根本沒有Callable。

上面說Callable是可以返回任務執行結果的,而獲取返回結果需使用到Future。所以下面要介紹一下Future。

2、Future介紹

Future也是一個接口,通過它可以獲得任務執行的返回值。該接口的內部源碼如下:

public interface Future<V> {
  boolean cancel(boolean mayInterruptIfRunning);

  boolean isCancelled();

  boolean isDone();

  V get() throws InterruptedException, ExecutionException;

  V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException;
}

可以發現在Future接口中聲明了5個方法,下面依次解釋每個方法的作用:

  • cancel(boolean mayInterruptIfRunning):用來取消任務,如果取消任務成功則返回true,如果取消任務失敗則返回false。參數mayInterruptIfRunning表示是否允許取消正在執行卻沒有執行完畢的任務,如果設置true,則表示可以取消正在執行過程中的任務。如果任務已經完成,則無論mayInterruptIfRunning為true還是false,此方法肯定返回false,即如果取消已經完成的任務會返回false;如果任務正在執行,若mayInterruptIfRunning設置為true,則返回true,若mayInterruptIfRunning設置為false,則返回false;如果任務還沒有執行,則無論mayInterruptIfRunning為true還是false,肯定返回true。
  • isCancelled():表示任務是否被取消成功,如果在任務正常完成前被取消成功,則返回 true。
  • isDone():表示任務是否已經完成,若任務完成,則返回true;
  • get():用來獲取執行結果,這個方法會產生阻塞,阻塞的線程為調用get()方法的線程,會一直等到任務執行完畢返回結果,之后阻塞的主線程才能夠往后執行。
  • get(long timeout, TimeUnit unit):用來獲取執行結果,如果在指定時間內,還沒獲取到結果,就會拋出TimeoutException異常(慎用這個方法,因為有很多坑)。

下面是Future接口中在java.util.concurrent包下類的結構圖:

image

由於Future只是一個接口,所以是無法直接用來創建對象使用的,所以真正獲取結果用到的是FutureTask這個類。


『FutureTask』

通過上面的圖片發現FutureTask類是實現了RunnableFuture接口,而這個接口又繼承了Future接口,我們具體點開其源碼來看。

public class FutureTask<V> implements RunnableFuture<V>{
    code...
}

打開RunnableFuture接口的實現:

public interface RunnableFuture<V> extends Runnable, Future<V> {
    void run();
}

可以看出RunnableFuture繼承了Runnable和Future接口,而FutureTask實現了RunnableFuture接口。所以它既可以作為Runnable被線程執行,又可以作為Future得到Callable的返回值。


上面說了這么多,接下來使用Callable+FutureTask創建線程並獲取執行結果的一個栗子如下:

  1. 創建一個實現Callable接口的類。
  2. 重寫call方法,將線程要執行的操作定義在call()中。
  3. 創建Callable接口實現類的對象。
  4. 創建FutureTask對象,並將上面Callable接口實現類的對象傳入FutureTask構造器中。
  5. 將FutureTask的對象作為參數傳入Thread類的構造器中,創建Thread類對象,並且啟動線程。
  6. 獲取Callable中call方法的返回值。
package com.thr;

import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.FutureTask;

/**
 * @author Administrator
 * @date 2020-04-09
 * @desc Callable+Future創建並獲取線程執行結果
 */
//1、創建一個實現Callable接口的類
class MyCallable implements Callable<Integer>{

    //2、重寫call方法,將線程要執行的操作定義在call()中
    @Override
    public Integer call() throws Exception {
        int num=0;
        for (int i = 1; i <= 100; i++) {
            num+=i;
        }
        return num;
    }
}

public class CallableFutureDemo {

    public static void main(String[] args) {
        //3、創建Callable接口實現類的對象
        MyCallable callable = new MyCallable();
        //4、創建FutureTask對象,並將上面Callable接口實現類的對象傳入FutureTask構造器中
        FutureTask<Integer> task = new FutureTask<Integer>(callable);
        //5、將FutureTask的對象作為參數傳入Thread類的構造器中,創建Thread類對象,並且啟動線程
        new Thread(task).start();
        try {
            //6、獲取Callable中call方法的返回值,調用get()時,main線程會阻塞,知道任務線程返回結果
            Integer integer = task.get();
            System.out.println(integer);
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (ExecutionException e) {
            e.printStackTrace();
        }
    }
}

關於FutureTask的一些小結:

  • 在主線程中需要執行比較耗時的操作時,但又不想阻塞主線程時,可以把這些作業交給Future對象在后台完成。
  • 當主線程將來需要時,就可以通過Future對象獲得后台作業的計算結果或者執行狀態。
  • 一般FutureTask多用於耗時的計算,主線程可以在完成自己的任務后,再去獲取結果。
  • 僅在計算完成時才能檢索結果;如果計算尚未完成,則阻塞 get 方法。
  • 一旦計算完成,就不能再重新開始或取消計算。
  • get方法而獲取結果只有在計算完成時獲取,否則會一直阻塞直到任務轉入完成狀態,然后會返回結果或者拋出異常。
  • get()只會計算一次,並且會導致主線程阻塞,所以get()方法一般發在最后。

我們知道Callable用於產生結果,Future用於獲取結果。不過Callable和Future一般都和線程池搭配使用,所以下面再來簡單介紹一下線程池的使用。

3、線程池的介紹

在前面的文章中介紹了Thread、Runnable和Callable這三種方式創建線程,我們在創建少量線程的時候使用它們是非常的簡單方便的,但是如果我們需要創建成百上千的線程時,那么豈不是要創建成百上千個線程對象,調用成百上千的start()方法,可見這樣是非常浪費時間、消耗資源和降低程序效率的,因為線程的創建和銷毀需要耗費大量的系統資源。那為了解決這一問題出現了線程池。


線程池顧名思義,就是由很多線程構成的池子。在有任務的時候隨時取用線程,當任務完成后又將線程放回池中。

所以合理利用線程池能夠帶來三個好處。

  1. 降低資源消耗。通過重復利用已創建的線程降低線程創建和銷毀造成的消耗。
  2. 提高響應速度。當任務到達時,任務可以不需要等到線程創建就能立即執行。
  3. 提高線程的可管理性。線程是稀缺資源,如果無限制的創建,不僅會消耗系統資源,還會降低系統的穩定性,使用線程池可以進行統一的分配,調優和監控。

線程池的創建一般用Executors這個工具類來創建,常見的有以下四種方式:

  • newFixedThreadPool(int nThreads):創建一個固定線程數目的線程池,超出的線程處理數量的任務會在隊列中等待。
  • newSingleThreadExecutor():創建一個單線程化的線程池。它只會用唯一的工作線程來執行任務,保證所有任務按照指定順序(FIFO, LIFO, 優先級)執行。
  • newCacheThreadPool():創建一個可緩存的線程池。如果現有任務沒有線程來處理,則創建一個新線程並添加到緩存池中。如果有被使用完但是還沒銷毀的線程,就復用該線程。如果有線程60s未被使用的話就會從緩存中移出並終止(銷毀)。因此,長時間保持空閑的線程池不會使用任何資源。
  • newScheduledThreadPool(int corePoolSize):創建一個大小無限的支持定時及周期性的任務執行的線程池,多數情況下可用來替代Time類。

『一般不推薦使用Executors的方式來創建線程池,因為可能會出現OOM(Out Of Memory,內存溢出)的情況,下面我們依次詳細的分析這四個方式:』


①、Executors.newFixedThreadPool(int nThread)

 public static ExecutorService newFixedThreadPool(int nThreads) {
       return new ThreadPoolExecutor(nThreads, nThreads,0L, TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>());
 }

可以發現最后一行使用了LinkedBlockingQueue,泛型是Runnable類型,這里的隊列是用來存放線程任務的。我們再來看看這個LinkedBlockingQueue部分源碼:

public LinkedBlockingQueue() {
        this(Integer.MAX_VALUE);
}

public LinkedBlockingQueue(int capacity) {
        if (capacity <= 0) throw new IllegalArgumentException();
        this.capacity = capacity;
        last = head = new Node<E>(null);
}

在上一章博客中提過LinkedBlockingQueue是鏈表實現的有界阻塞隊列,其capacity是可以選擇進行設置的,如果不設置的話,將是一個無邊界的阻塞隊列,隊列的最大長度為Integer.MAX_VALUE。而上面newFixedThreadPool的源碼中,我們可以很清晰的看到LinkedBlockingQueue是沒有指定capacity的。所以此時LinkedBlockingQueue就是一個無邊界隊列,對於一個無邊界隊列來說,是可以不斷的向隊列中加入任務的,這種情況下就有可能因為隊列中等待的線程數太多而導致OOM。

下面我們來一個簡單的例子,模擬一下使用Executors導致OOM的情況:

首先將JVM參數調一下:-Xmx8m –Xms8m

image

示例代碼如下:

package com.thr;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

/**
 * @author Administrator
 * @date 2020-04-11
 * @desc Excutors出現OOM舉例
 */
public class ExecutorsDemo {
    private static ExecutorService service = Executors.newFixedThreadPool(15);

    public static void main(String[] args) {
        for (int i = 0; i < Integer.MAX_VALUE; i++) {
            service.execute(new SubThread());
        }
    }
}

class SubThread implements Runnable {
    @Override
    public void run() {
        try {
            Thread.sleep(10);
        } catch (InterruptedException e) {
            //do nothing
        }
    }
}

運行結果:

image


②、Executors.newSingleThreadExexutor()

public static ExecutorService newSingleThreadExecutor() {
        return new FinalizableDelegatedExecutorService
            (new ThreadPoolExecutor(1, 1,0L, TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>()));
}

可以發現還是使用阻塞隊列LinkedBlockingQueue,所以問題是一樣的。


③、Executors.newCacheThreadPool()

public static ExecutorService newCachedThreadPool() {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,60L, TimeUnit.SECONDS,new SynchronousQueue<Runnable>());
}

可以發現ThreadPoolExecutor對象中的第二個參數為Integer.MAX_VALUE,而這個位置參數的意思為線程池最大線程數。所以還是會出現OOM的情況。


④、Executors.newScheduleThreadPool()

public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
        return new ScheduledThreadPoolExecutor(corePoolSize);
}

public ScheduledThreadPoolExecutor(int corePoolSize) {
        super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
              new DelayedWorkQueue());
}

public class ScheduledThreadPoolExecutor
        extends ThreadPoolExecutor
        implements ScheduledExecutorService {

        code...

        }

通過上面三段代碼可以發現newScheduleThreadPool()方法返回了ScheduledThreadPoolExecutor對象,而它又繼承了ThreadPoolExecutor類,並且調用的是父類的構造器,而構造器中的第二個參數為Integer.MAX_VALUE,所以還是同樣的問題。

newScheduleThreadPool()的簡單案例(由於前面三個的使用非常的簡單,所有就不舉例了) 代碼如下:

[1]、延遲執行,線程在延遲5秒后執行。

/**
 * newScheduleThreadPool()的簡單案例
 */
public class ExecutorDemo {
    public static void main(String[] args) {

        // 創建一個定時處理任務的線程池
        ScheduledExecutorService executorService = Executors.newScheduledThreadPool(10);

        // 設置延遲5秒執行,時間到了就會執行線程
        executorService.schedule(() -> {
            System.out.println("newScheduledThreadPool---延遲5秒后打印");
        }, 5, TimeUnit.SECONDS);

        // 釋放資源
        executorService.shutdown();
    }
}

[2]、定期執行,表示延遲5秒后每3秒執行一次。特別注意:線程池不能關閉

/**
 * newScheduleThreadPool()的簡單案例
 */
public class ExecutorDemo {
    public static void main(String[] args) {

        // 創建一個定時處理任務的線程池
        ScheduledExecutorService executorService = Executors.newScheduledThreadPool(10);

        // 設置延遲5秒,然后沒過3秒打印一次
        executorService.scheduleAtFixedRate(() -> {
            System.out.println("newScheduledThreadPool---延遲5秒后每過3秒打印一次");
        }, 5,3, TimeUnit.SECONDS);

        // 注意這里不能釋放資源
        //executorService.shutdown();
    }
}

這就是使用Executors工具類創建線程池的缺陷所在,在《阿里巴巴開發手冊》中是不建議使用這種方式創建線程池的,而是推薦使用new ThreadPoolExecutor構造函數來創建線程池。如果你細心一點會發現上面四種方式中其實最終都是使用ThreadPoolExecutor這個類,所以這個類才是線程池的核心,我們只有徹底了解這個類才能真正的理解線程池。

image

4、ThreadPoolExecutor

上面既然說推薦使用ThreadPoolExecutor來創建線程池,那么先來看一下ThreadPoolExecutor的內容。在ThreadPoolExecutor類中提供了四個構造器,由於前三個構造器其實都是調用了第四個構造器來完成初始化的,所以這里就列出第四個構造器:

public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue,
                          ThreadFactory threadFactory,
                          RejectedExecutionHandler handler) {
    if (corePoolSize < 0 ||
        maximumPoolSize <= 0 ||
        maximumPoolSize < corePoolSize ||
        keepAliveTime < 0)
        throw new IllegalArgumentException();
    if (workQueue == null || threadFactory == null || handler == null)
        throw new NullPointerException();
    this.acc = System.getSecurityManager() == null ?
        null :
    AccessController.getContext();
    this.corePoolSize = corePoolSize;
    this.maximumPoolSize = maximumPoolSize;
    this.workQueue = workQueue;
    this.keepAliveTime = unit.toNanos(keepAliveTime);
    this.threadFactory = threadFactory;
    this.handler = handler;
}

可以發現構造器有7個參數,這7個參數特別的重要,下面分別解釋下構造器中各個參數的含義:

  • corePoolSize:核心池的大小。在創建了線程池后,默認情況下,線程池中並沒有任何線程,而是等待有任務到來才創建線程去執行任務。除非調用了prestartAllCoreThreads()或者prestartCoreThread()方法,從這2個方法是預創建線程的意思,即在沒有任務到來之前就創建corePoolSize個線程或者一個線程。所以在默認情況下,創建了線程池后,線程池中的線程數為0,當有任務來之后,就會創建一個線程去執行任務,當線程池中的線程數目達到corePoolSize后,就會把到達的任務放到緩存隊列當中,例如LinkedBlockingQueue,如果緩存隊列中存放的任務滿了,則會繼續創建新的線程來執行任務,直到創建maximumPoolSize個線程為止,也就是下面要定義的參數。
  • maximumPoolSize:線程池最大線程數,這個參數也是一個非常重要的參數,它表示在線程池中最多能創建多少個線程,所以它的數值肯定是不能小於corePoolSize的,從源碼中也可以看到,如果這樣做在運行時會拋出異常:IllegalArgumentException。
  • keepAliveTime:空閑線程的存活時間。就是當線程的數量大於corePoolSize時,如果等待了keepAliveTime時長還沒有任務可執行,則線程終止(前提是線程池中的線程數必須大於corePoolSize時,keepAliveTime才會起作用,否則是沒用的),直到線程池中的線程數不超過corePoolSize。但是如果調用了allowCoreThreadTimeOut(boolean)方法,在線程池中的線程數不大於corePoolSize時,keepAliveTime參數也會起作用,直到線程池中的線程數為0。
  • unit:它為參數keepAliveTime的時間單位,它在TimeUnit類中有7種靜態屬性可取。
    • 天:TimeUnit.DAYS;
    • 小時:TimeUnit.HOURS;
    • 分鍾:TimeUnit.MINUTES;
    • 秒:TimeUnit.SECONDS;
    • 毫秒:TimeUnit.MILLISECONDS;
    • 微妙:TimeUnit.MICROSECONDS;
    • 納秒:TimeUnit.NANOSECONDS;
  • workQueue:一個阻塞隊列,用來存儲等待執行的任務,這個參數的選擇也很重要,會對線程池的運行過程產生重大影響,一般來說,這里的阻塞隊列有以下幾種選擇:
    • ArrayBlockingQueue:是一個基於數組結構的有界阻塞隊列,此隊列按FIFO(先進先出)原則對元素進行排序,可以指定緩存隊列的大小。
    • LinkedBlockingQueue:一個基於鏈表結構的無界阻塞隊列,此隊列按FIFO(先進先出)原則對元素進行排序,吞吐量通常要高於ArrayBlockingQueue,所以這個比較常用,靜態工廠方法Executors.newFixedThreadPool()使用了這個隊列。
    • SynchronousQueue:它沒有容量,是無緩沖等待隊列,是一個不存儲元素的阻塞隊列,會直接將任務交給消費者,必須等隊列中的添加元素被消費后才能繼續添加新的元素,否則插入操作一直處於阻塞狀態。擁有公平(FIFO)和非公平(LIFO,SynchronousQueue默認)兩種策略模式,非公平模式很容易出現飢渴的情況,即可能有某些生產者或者是消費者的數據永遠都得不到處理。但是它的吞吐量通常要高於LinkedBlockingQueue,靜態工廠方法Executors.newCachedThreadPool使用了這個列。
    • PriorityBlockingQueue:一個具有優先級的無限阻塞隊列(優先級的判斷通過構造函數傳入的Compator對象來決定)。但需要注意的是PriorityBlockingQueue並不會阻塞數據生產者,而只會在沒有可消費的數據時,阻塞數據的消費者。因此使用的時候要特別注意,生產者生產數據的速度絕對不能快於消費者消費數據的速度,否則時間一長,會最終耗盡所有的可用堆內存空間。
    • 注:其中ArrayBlockingQueue和PriorityBlockingQueue使用較少,一般使用LinkedBlockingQueue和SynchronousQueue。
  • threadFactory:線程工廠,它用於創建新的線程。threadFactory創建的線程也是采用new Thread()方式,threadFactory創建的線程名都具有統一的風格:pool-m-thread-n(m為線程池的編號,n為線程池內的線程編號)。
  • handler:線程飽和策略或拒絕策略。當線程池和隊列都滿了,再加入的任務會執行此策略,它有四種策略。
    • AbortPolicy:丟棄任務並拋出RejectedExecutionException異常。線程池默認的拒絕策略。如何使用new ThreadPoolExecutor.AbortPolicy()
    • DiscardPolicy:也是丟棄任務,但是不拋出異常。
    • DiscardOldestPolicy:丟棄隊列最前面的任務,也就是隊列頭的元素,然后重新嘗試執行任務(重復此過程)。如果此時阻塞隊列使用PriorityBlockingQueue優先級隊列,將會導致優先級最高的任務被拋棄。
    • CallerRunsPolicy:既不拋棄任務也不拋出異常,而是由調用線程的主線程來處理該任務。換言之就是由調用線程池的主線程自己來執行任務(例如:是有main線程啟動的線程池,當觸發次策略時,多余的任務就會交由main線程來執行),因此在執行任務的這段時間里主線程無法再提交新任務,從而使線程池中工作線程有時間將正在處理的任務處理完成,所以對性能和效率必然是極大的損耗。

上面既然介紹完了線程池構造方法中的各個參數,那么再來介紹線程池的工作流程:

image

  • 當線程池中的線程個數小於corePoolSize,每次提交一個新任務到線程池時,都會創建一個新的工作線程來執行任務,直到當前線程數等於corePoolSize;
  • 當線程池中的線程個數等於corePoolSize,繼續提交的任務被保存到阻塞隊列中,等待被執行;
  • 當線程池中的線程個數等於corePoolSize時,並且隊列也滿了(有界隊列),這個時候再來新的任務,就會繼續創建新的線程去處理(非核心線程),直到線程池中的線程數達到maximumPoolSize。這個時候創建的線程,當線程空閑下來的時候經過keepAliveTime的時間就會被銷毀;
  • 當線程池中的線程數達到maximumPoolSize時,這個時候再來新的任務,就由飽和策略來處理提交的任務;

注:如果存儲任務的隊列滿了,並且線程數量大於corePoolSize,小於maximumPoolSize,此時創建的線程會優先執行新來的任務,之后在執行隊列中的。


我們在打開ThreadPoolExecutor類的代碼可以看到,ThreadPoolExecutor繼承了AbstractExecutorService,我們來看一下AbstractExecutorService的實現:

public abstract class AbstractExecutorService implements ExecutorService {
    protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) { };
    protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) { };
    public Future<?> submit(Runnable task) {};
    public <T> Future<T> submit(Runnable task, T result) { };
    public <T> Future<T> submit(Callable<T> task) { };
    private <T> T doInvokeAny(Collection<? extends Callable<T>> tasks,boolean timed, long nanos)
        throws InterruptedException, ExecutionException, TimeoutException {
    };
    public <T> T invokeAny(Collection<? extends Callable<T>> tasks)
        throws InterruptedException, ExecutionException {
    };
    public <T> T invokeAny(Collection<? extends Callable<T>> tasks,long timeout, TimeUnit unit)
        throws InterruptedException, ExecutionException, TimeoutException {
    };
    public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
        throws InterruptedException {
    };
    public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit)
        throws InterruptedException {
    };
}

AbstractExecutorService是一個抽象類,它實現了ExecutorService接口。我們接着看ExecutorService接口的實現:

public interface ExecutorService extends Executor {
    void shutdown();
    boolean isShutdown();
    boolean isTerminated();
    boolean awaitTermination(long timeout, TimeUnit unit)
        throws InterruptedException;
    <T> Future<T> submit(Callable<T> task);
    <T> Future<T> submit(Runnable task, T result);
    Future<?> submit(Runnable task);
    <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
        throws InterruptedException;
    <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,long timeout, TimeUnit unit)
        throws InterruptedException;

    <T> T invokeAny(Collection<? extends Callable<T>> tasks)
        throws InterruptedException, ExecutionException;
    <T> T invokeAny(Collection<? extends Callable<T>> tasks,long timeout, TimeUnit unit)
        throws InterruptedException, ExecutionException, TimeoutException;
}

而ExecutorService又是繼承了Executor接口,我們看一下Executor接口的實現:

public interface Executor {
    void execute(Runnable command);
}

所以到這里,大家應該明白了ThreadPoolExecutor、AbstractExecutorService、ExecutorService和Executor幾個之間的關系了。

image

Executor是一個線程池頂層接口,在它里面只聲明了一個方法execute(Runnable),返回值為void,參數為Runnable類型,它就是用來執行傳進去的任務的,但沒有返回值;

然后ExecutorService接口繼承了Executor接口,並聲明了一些方法:submit、invokeAll、invokeAny以及shutdown等;

抽象類AbstractExecutorService實現了ExecutorService接口,基本實現了ExecutorService中聲明的所有方法;

然后ThreadPoolExecutor繼承了類AbstractExecutorService。

在ThreadPoolExecutor類中有幾個非常重要的方法:

  • execute(Runnable command):用於提交不需要返回值的任務,所以無法判斷任務是否被線程池執行成功。
  • submit(Callable task)/submit(Runnable task):用於提交需要返回值的任務。線程池會返回一個future類型的對象,通過這個future對象可以判斷任務是否執行成功,並且可以通過future的get()方法來獲取返回值,get()方法會阻塞當前線程直到任務完成,而使用get(long timeout,TimeUnit unit),在指定的時間內會等待任務執行,超時則拋出超時異常,等待時候會阻塞當前線程。
  • shutdown():不會立即終止線程池,而是要等所有任務緩存隊列中的任務都執行完后才終止,但再也不會接受新的任務。
  • awaitTermination(long timeout, TimeUnit unit):用於設定超時時間及單位。當等待超過設定時間時,會監測ExecutorService是否已經關閉,若關閉則返回true,否則返回false。一般情況下會和shutdown方法組合使用。
  • shutdownNow():立即終止線程池,並嘗試打斷正在執行的任務,並且清空任務緩存隊列,返回尚未執行的任務。

所以通過上面的講解大家應該知道創建線程池的正確姿勢了吧:

ExecutorService es = new ThreadPoolExecutor(5,,20,0L,TimeUnit.MILLISECONDS,new LinkedBlockingQueue<>(10));

最后簡單的舉個創建線程池的例子吧:

package com.thr;

import java.util.concurrent.*;
/**
 * @author Administrator
 * @date 2020-04-11
 * @desc 使用自定義參數ThreadPoolExecutor創建線程池
 */
public class ExecutorServiceDemo {
    public static void main(String[] args) {
    //定義線程池參數
        ExecutorService es = new ThreadPoolExecutor(5, 20,0L,
                TimeUnit.MILLISECONDS,
                new LinkedBlockingQueue<>(10));
        //創建Callable和Future對象
        MyCallable myCallable = new MyCallable();
        Future<Integer> future = es.submit(myCallable);
        try {
            //獲取結果並打印
            Integer num = future.get();
            System.out.println(num);
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (ExecutionException e) {
            e.printStackTrace();
        }finally {
            //關閉線程池
            es.shutdown();
        }
    }
}

class MyCallable implements Callable<Integer>{
    @Override
    public Integer call() throws Exception {
        int sum=0;
        for (int i = 1; i <= 100; i++) {
            sum+=i;
        }
        return sum;
    }
}

當然除了自己定義ThreadPoolExecutor外。還有其他方法。比如各種開源工具如Guava等。這里推薦使用Guava提供的ThreadFactoryBuilder來創建線程池。因為當我們需要給新創建的線程取名字、或者設置為守護線程、錯誤處理器等操作時,它的好處就體現出來了。簡單舉例:(注意使用Guava需要引入包)

public class ThreadFactoryBuilderTest {

    public static void main(String[] args) {
        ThreadFactory threadFactory = new ThreadFactoryBuilder().setNameFormat("線程名稱-%s").build();
        // 創建一個線程對象
        Thread newThread = threadFactory.newThread(()->{

        });
        System.out.println(newThread.getName());
    }
}

參考資料:


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM