Java多線程之Executor、ExecutorService、Executors、Callable、Future與FutureTask


1. 引子

初學Java多線程,常使用ThreadRunnable創建、啟動線程。如下例:

Thread t1 = new Thread(new Runnable() {
    @Override
    public void run() {
 		System.out.println(Thread.currentThread().getName());
    }
});
t1.start();

我們需要自己創建、啟動Thread對象。

重要概念:

  1. 實現Runnable的類應該被看作一項任務,而不是一個線程。在Java多線程中我們一定要有一個明確的理解,任務和線程是不同的概念。可以使用線程(Thread)執行任務(比如Runnable),但任務不是線程。
  2. Java多線程中有兩種不同類型的任務,Runnable類型任務(無返回值)與Callable類型任務(有返回值)。

2. 使用Executor執行線程

一些已有的執行器可以幫我們管理Thread對象。你無需自己創建與控制Thread對象。比如,你不用在代碼中編寫new Thread或者thread1.start()也一樣可以使用多線程。如下例:

ExecutorService exec = Executors.newCachedThreadPool();
for (int i = 0; i < 5; i++) {//5個任務
    exec.submit(new Runnable() {
        @Override
        public void run() {            
          	System.out.println(Thread.currentThread().getName()+" doing task");
         }
     });
}
exec.shutdown();  //關閉線程池

輸出如下:

pool-1-thread-2 doing task
pool-1-thread-1 doing task
pool-1-thread-3 doing task
pool-1-thread-4 doing task
pool-1-thread-5 doing task

從輸出我們可以看到,exec使用了線程池1中的5個線程做了這幾個任務。

這個例子中exec這個Executor負責管理任務,所謂的任務在這里就是實現了Runnable接口的匿名內部類。至於要使用幾個線程,什么時候啟動這些線程,是用線程池還是用單個線程來完成這些任務,我們無需操心。完全由exec這個執行器來負責。在這里exec(newCachedThreadPool)指向是一個可以根據需求創建新線程的線程池。

Executors相當於執行器的工廠類,包含各種常用執行器的工廠方法,可以直接創建常用的執行器。幾種常用的執行器如下:

Executors.newCachedThreadPool,根據需要可以創建新線程的線程池。線程池中曾經創建的線程,在完成某個任務后也許會被用來完成另外一項任務。

Executors.newFixedThreadPool(int nThreads) ,創建一個可重用固定線程數的線程池。這個線程池里最多包含nThread個線程。

Executors.newSingleThreadExecutor() ,創建一個使用單個 worker 線程的 Executor。即使任務再多,也只用1個線程完成任務。

Executors.newSingleThreadScheduledExecutor() ,創建一個單線程執行程序,它可安排在給定延遲后運行命令或者定期執行

newSingleThreadExecutor例子如下:

ExecutorService exec = Executors.newSingleThreadExecutor();
for (int i = 0; i < 5; i++) {
    exec.execute(new Runnable() {//execute方法接收Runnable對象,無返回值
        @Override
        public void run() {
          	System.out.println(Thread.currentThread().getName());
        }
    });
}
exec.shutdown();

輸出如下:

pool-1-thread-1
pool-1-thread-1
pool-1-thread-1
pool-1-thread-1
pool-1-thread-1

可以看出,雖然有5個任務(5個new Runnable),但是只由1個線程來完成。

最佳實踐:我們應該使用現有Executor或ExecutorService實現類。比如前面說的newCachedThreadPool可以使用線程池幫我們降低開銷(創建一個新的線程是有一定代價的),而newFixedThreadPool則可以限制並發線程數。即,我們一般使用Executors的工廠方法來創建我們需要的執行器。

Executor與ExecutorService的常用方法

execute方法:

Executor接口只有void execute(Runnable command)方法。從方法聲明中我們可以看到入參為Runnable類型對象。常用的例子如下:

Executor executor = anExecutor;
executor.execute(new RunnableTask1());

但里面具體怎么執行,是否調用線程執行由相應的Executor接口實現類決定。比如前面的newCachedThreadPool使用線程池來進行執行。Executor將任務提交與每個任務如何運行(如何使用線程、調度)相分離。

submit方法:

ExecutorService接口繼承自Executor接口,擴展了父接口中的execute方法。有兩個常用的submit方法

Future<?> submit(Runnable task) 
<T> Future<T> submit(Callable<T> task)

可以看到這兩個常用方法一個接收Runnable類型入參,一個接收Callable類型入參。Callable入參允許任務返回值,而Runnable無返回值。也就是說如果我們希望線程有一個返回結果,我們應該使用Callable類型入參。

invokeAll與invokeAny方法:

批量執行一組Callable任務。其中invokeAll是等所有任務完成后返回代表結果的Future列表。而invokeAny是等這一批任務中的任何一個任務完成后就返回。從兩個方法的返回結果我們也可以看出兩個方法的不同:

<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
<T> T invokeAny(Collection<? extends Callable<T>> tasks)

invokeAll返回的是List<Future > ,而invoke返回的是T

shutdown()方法:

啟動一次順序關閉,執行以前提交的任務,但不接受新任務。執行此方法后,線程池等待任務結束后就關閉,同時不再接收新的任務。如果執行完shutdown()方法后,再去執行execute方法則直接拋出RejectedExecutionException。不要問我為什么知道...剛從坑里爬出來。

原則:只要ExecutorService(線程池)不再使用,就應該關閉,以回收資源。要注意這個不再使用

上述方法較多,可以配合后面的實例進行理解。可以先記住execute方法與shutdown方法。

3. 使用Callable與Future

Callable接口

Runnable接口中的public void run()方法無返回值,如果我們希望線程運算后將結果返回,使用Runnable就無能為力。這時候我們應使用CallableCallable代表有返回值的任務。一個實現Callable接口的類如下所示:

class CalcTask implements Callable<String> {
    @Override
    public String call() throws Exception {
        return Thread.currentThread().getName();
    }
}

這個任務比較簡單,就是返回當前線程的名字。與Runnable相比較有一個返回值,在這里返回值類型為String,也可以為其他類型。

使用如下代碼進行調用:

ExecutorService exec = Executors.newCachedThreadPool();
List<Callable<String>> taskList = new ArrayList<Callable<String>>();
/* 往任務列表中添加5個任務 */
for (int i = 0; i < 5; i++) {
    taskList.add(new CalcTask());
}
/* 結果列表:存放任務完成返回的值 */
List<Future<String>> resultList = new ArrayList<Future<String>>();
try {
    /*invokeAll批量運行所有任務, submit提交單個任務*/
    resultList = exec.invokeAll(taskList);
} catch (InterruptedException e) {
  	e.printStackTrace();
}
try {
    /*從future中輸出每個任務的返回值*/
    for (Future<String> future : resultList) {
      	System.out.println(future.get());//get方法會阻塞直到結果返回
    }
} catch (InterruptedException e) {
  	e.printStackTrace();
} catch (ExecutionException e) {
  	e.printStackTrace();
}

輸出如下:

pool-1-thread-1
pool-1-thread-2
pool-1-thread-3
pool-1-thread-4
pool-1-thread-5

Future接口

上面的例子中我們使用了Future接口。Future 表示異步計算的結果。它提供了檢查計算是否完成的方法,以等待計算的完成,並獲取計算的結果。上面的例子中exec執行器執行了一個Callable類型的任務列表然后得到了Futuer類型的結果列表resultList。

get方法

等待計算完成,然后獲取其結果。

isDone方法

用來查詢任務是否做完,例子如下:

/*新建一個Callable任務*/
Callable<Integer> callableTask = new Callable<Integer>() {
    @Override
    public Integer call() throws Exception {
        System.out.println("Calculating 1+1!");
        TimeUnit.SECONDS.sleep(2);//休眠2秒
        return 2;
    }
}; 
ExecutorService executor = Executors.newCachedThreadPool();
Future<Integer> result = executor.submit(callableTask);
executor.shutdown();
while(!result.isDone()){//isDone()方法可以查詢子線程是否做完
    System.out.println("子線程正在執行");
    TimeUnit.SECONDS.sleep(1);//休眠1秒
}
try {
  	System.out.println("子線程執行結果:"+result.get());
} catch (InterruptedException | ExecutionException e) {
  	e.printStackTrace();
}

輸出如下:

Calculating 1+1!
子線程正在執行
子線程正在執行
子線程執行結果:2

4.FutureTask

FutureTask類是 Future 接口的一個實現。FutureTask類實現了RunnableFuture接口,RunnableFuture繼承了Runnable接口和Future接口,所以:

  1. FutureTask可以作為Runnable被線程執行
  2. 可以作為Future得到傳入的Callable對象的返回值
    例子如下:
FutureTask<Integer> futureTask = new FutureTask<>(new Callable<Integer>() {
   @Override
   public Integer call() throws Exception {
     	System.out.println("futureTask is wokring 1+1!");
     	return 2;
   }
});
Thread t1 = new Thread(futureTask);//1.可以作為Runnable類型對象使用
t1.start();
try {
   System.out.println(futureTask.get());//2.可以作為Future類型對象得到線程運算返回值
} catch (ExecutionException e) {
   e.printStackTrace();
}

輸出如下:

futureTask is wokring 1+1!
2

可以看出FutureTask可以當作一個有返回值的Runnable任務來用。

分析:FutureTask<Integer> futureTask = new FutureTask<>(new Callable...)相當於把Callable任務轉換為Runnable任務,就可以使用線程來執行該任務。而futureTask.get()相當於將Callable轉化為Future,從而得到異步運算的結果。

ExecutorService執行器除了接收Runnable與Callable類型的入參,也可以接收FutureTask類型,例子如下:

FutureTask<Integer> futureTask = new FutureTask<>(new Callable<Integer>() {
    @Override
    public Integer call() throws Exception {
        System.out.println("futureTask is wokring 1+1!");
        TimeUnit.SECONDS.sleep(2);
        return 2;
    }
});
ExecutorService executor = Executors.newCachedThreadPool();
executor.submit(futureTask);//也可以使用execute,證明其是一個Runnable類型對象
executor.shutdown();
while(!futureTask.isDone()){
    System.out.println("子線程還沒做完,我再睡會");
    TimeUnit.SECONDS.sleep(1);
}
try {
  	System.out.println("子線程運行的結果:"+futureTask.get());
} catch (InterruptedException | ExecutionException e) {
  	e.printStackTrace();
}

聲明:

本文部分內容來資源jdk文檔與Java編程思想

參考資料

線程池,這一篇或許就夠了


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM