異步任務執行器Executor簡介


以前線程Thread既表示執行的任務,又表示執行的機制。在JDK1.5中,java並發框架提供了一種“執行服務”的相關API,它將"任務的執行"和"任務的提交“相分離,”執行服務“封裝了任務執行的細節,對於任務提交者來說,它可進一步聚焦於任務本身,如任務提交、獲取任務執行后的結果、取消任務而不需要關注任務執行的細節,如線程的創建、任務的調試、線程的復用或關閉等。

1.基本接口

任務執行服務主要涉及4個接口

  • Runnable和Callable: 表示要執行的異步任務

  • Executor和ExecutorService:表示執行任務的服務

  • Future : 表示任務執行后的結果

Runnable是我們最常用的異步任務接口,這個接口的方法沒有返回值,不能拋出異常。而Callable接口就是為了解決Runnable的不足而在JDK1.5引入的接口,此接口的方法有返回值,且可拋出異常。

@FunctionalInterface
public interface Callable<V> {
    V call() throws Exception;
}

1)Executor接口

Executor接口也只有一個方法,這個方法接受一個Runnable類型的參數,這是個抽象方法,它無法指定任務該如何執行。它可能是新建一個線程執行任務,也可能是利用線程池中的一個線程,還可能是在調用者線程中執行。

public interface Executor {
    void execute(Runnable command);
}

ExecutorService擴展了Executor接口,它添加了一些新功能,如支持有返回結果的任務、支持超時任務、支持取消任務、支持批量提交任務。這里的submit方法,返回類型是Future,返回后,只表示任務已提交,不代表已經執行,具體執行與否要看”執行服務“如何調度,通過Future可以查詢異步任務的狀態、獲取最終的結果、取消任務等。

public interface ExecutorService extends Executor {
    void shutdown();
    List<Runnable> shutdownNow();
    boolean isShutdown();
    boolean isTerminated();
    boolean awaitTermination(long timeout, TimeUnit unit)  throws InterruptedException;
    <T> Future<T> submit(Callable<T> task);
    <T> Future<T> submit(Runnable task, T result);
    Future<?> submit(Runnable task);
    <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) throws InterruptedException;
    <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,long timeout, TimeUnit unit) throws InterruptedException;
    <T> T invokeAny(Collection<? extends Callable<T>> tasks)  throws InterruptedException, ExecutionException;
    <T> T invokeAny(Collection<? extends Callable<T>> tasks,long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException;
}

2)Future接口

Future接口是對任務的結果做了進一步封裝,字面上”future"是未來的意思,這里確實是表示“未來(或最終)的結果”,“結果”需要等待。

public interface Future<V> {
    boolean cancel(boolean mayInterruptIfRunning);
    boolean isCancelled();
    boolean isDone();
    V get() throws InterruptedException, ExecutionException;
    V get(long timeout, TimeUnit unit)  throws InterruptedException, ExecutionException, TimeoutException;
}

cancel()方法用於取消異步任務,若任務已完成、已取消或因其他原因不能取消等原因而導致任務取消失敗就會返回false,反之返回true. 如果任務還未開始,則不再運行,但若任務已經在運行,則不一定能取消這個任務。參數mayInterruptIfRunning表示,如果任務正在執行,是否調用Thread.interrupt()方法中斷線程,而interrupt()方法只是設置線程中斷標志,它不一定能真的中斷線程。

isCancelled() 和isDone()分別返回任務是否被取消、任務是否已完成的布爾值。只要cancel方法返回true,那么即使執行任務的線程還未結束,isCancelled方法也一定會返回true。不管什么原因,無論是任務正常結束、任務拋出異常或任務被取消,只要任務結束了,isDone都會返回true.

get()方法用於返回異步任務的結果,若任務未完成,當前線程則會阻塞待。get(long,TimeUnit)方法需要設定等待時長,若在給定的時間內還未完成任務,則會拋出TimeoutException異常。

get方法的最終結果大致有3種: ①任務正常完成,get方法返回任務的執行結果,若任務是Runnable且入參未提供結果,最終返回null ②任務被取消了,get方法會拋出CancellationException. ③任務執行過程中拋出了異常,get方法會將異常包裝為ExecutionException重新拋出,此異常的getCause方法可獲得原始異常。

Future是實現”任務的提交“與”任務的執行“相分離的關鍵,它是兩者的橋梁,它使任務的提交者和任務的執行器的關注點相隔離,同時又讓兩者彼此聯系。

2.用法示例

下面的例子中,使用異步任務,計算0-300的累加結果,在計算出結果前Future的get()方法將阻塞等待。

這里使用了Executors工具類的newSingleThreadeExecutor方法創建一了個執行服務。Executors有很多靜態方法,分別創建各種線程池執行服務。

class SimpleTaskTest {
    static class Accumlation implements Callable<Integer> {
        @Override
        public Integer call() throws Exception {
            int sum =0;
            for (int i = 0; i < 300; i++) {
                sum += i;
                Thread.sleep(10);
            }
            return sum;
        }
    }
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        ExecutorService executor = Executors.newSingleThreadExecutor();
        Future<Integer> future = executor.submit(new Accumlation());
        long start = System.currentTimeMillis();
        int result = future.get();
        System.out.println("Future.get方法等待了" + (System.currentTimeMillis() - start) + "毫秒才返回,計算結果:" + result);
    }
}

打印的結果

Future.get方法等待了3147毫秒才返回,計算結果:44850

3.基本原理

1)ExecutorService實現

ExecutorService是Executor的子接口,它添加了一些新功能,如支持有返回結果的任務、支持超時任務、支持取消任務、支持批量提交任務。

public interface ExecutorService extends Executor {
    void shutdown();
    List<Runnable> shutdownNow();
    boolean isShutdown();
    boolean isTerminated();
    boolean awaitTermination(long timeout, TimeUnit unit)  throws InterruptedException;
    <T> Future<T> submit(Callable<T> task);
    <T> Future<T> submit(Runnable task, T result);
    Future<?> submit(Runnable task);
    <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) throws InterruptedException;
    <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,long timeout, TimeUnit unit) throws InterruptedException;
    <T> T invokeAny(Collection<? extends Callable<T>> tasks)  throws InterruptedException, ExecutionException;
    <T> T invokeAny(Collection<? extends Callable<T>> tasks,long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException;
}

①ExecutorService有兩個關閉執行服務的方法,分別是shutdown()和shutdownNow. 兩者關閉執行服務的方式有所差別,shutdown()方法調用后不會接受新的任務,但已提交的任務將繼續執行(即使任務還未真正開始執行);而shutdownNow()方法不僅不會接受新任務,而且還會終止已經提交但未執行的任務,對於正在執行的任務,一般調用Thread.interrupt()方法設置中斷標志,不過線程可能不響應中斷,shutdownNow會返回已提交但未執行的任務列表。shutdown和shutdownNow不會阻塞等待,它們返回后不代表所有任務都結束。

②isShutdown返回執行服務是否被關閉的布爾值(不會等待),只要shutdown或shutdownNow任意一方法被調用后,isShutdwon都將返回true.

③awaitTermination方法用於等待執行服務中的所有任務完成,此方法需要設置超時時間,如果在限時間內所有任務都結束了(允許非正常結束),

④isTerminated 返回在執行服務關閉后所有任務是否已完成的布爾值,如果在此之前shutdownNow或shutdown沒有被調用,這里永不可能返回true.

⑤三個submit方法都用於提交單任務,submit(Callable<T> )方法中入參Callable本身有返回結果 ;submit(Runnable,T)方法在設定任務的同時可以提供一個結果,在任務結束時將返回這個結果;submit(Runnable )方法入參沒有提供結果,最終返回的結果是null 。

⑥ExecutorService有兩類批量提交任務的方法,invokeAll和invokeAny,它們都有兩個版本,一個不限時版本、一個超時版本。

invokeAll等待(給定的)所有任務完成,返回的Future集合中,每個Future的isDone方法都返回true,但isDone是true並不代表任務完成了,也有可能是因任務被取消而導致任務非正常結束。invokeAll的超時版本方法,需要指定等待時間的時間,若超時后還有任務還未完成,這些任務就會被取消。而invokeAny,只要有一個任務正常完成(沒拋出異常)后,它就返回此任務的結果;在正常返回或異常拋出返回后,其他任務則會被取消。對於invokeAny的超時版本,如果在限時內有一任務正常(沒拋出異常)完成,就返回此任務的結果 ,其他將任務會被取消;如果沒有任務能在限時內成功完成返回,就拋出TimeoutException; 沒有任務正常成功返回(可能是因發生某種異常而返回),將拋出ExecutionException.

在了解ExecutorService接口的相關抽象方法定義后,我們來進一步分析它的實現類和實現原理。

ExecutorService的主要實現類是ThreadPoolExecutor,它是基於線程池實現的,ExecutorService有一個很重要的抽象類AbstractExecutorService, 而且ThreadPoolExecutor就是直接繼承於AbstractExecutorService。

 

我們可以基於此抽象類實現一個簡易的ExecutorService。AbstractExecutorService提供了submit 、invokeAll和invokeAny的默認實現,子類只需要實現其他方法就行了。shutdown與isShutdown 等方法與生命周期管理有關,我暫時可以不用去管它,其實它的子類最關鍵在於實現execute方法,因為submit、invokeAll、invokeAny等方法底層主要還是調用execute方法。

import java.util.List;
import java.util.concurrent.*;

public class CustomizeExecutorService extends AbstractExecutorService {
    @Override
    public void shutdown() {
        System.out.println("=====shutdown=====");
    }
    @Override
    public List<Runnable> shutdownNow() {
        System.out.println("=====shutdownNow=====");
        return null;
    }
    @Override
    public boolean isShutdown() {
        return false;
    }
    @Override
    public boolean isTerminated() {
        return false;
    }
    @Override
    public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException {
        return false;
    }
    @Override
    public void execute(Runnable command) {
        new Thread(command).start();
    }
}
class Accumlation implements Callable<Integer> {
    @Override
    public Integer call() throws Exception {
        int sum = 0;
        for (int i = 0; i < 300; i++) {
            sum += i;
            Thread.sleep(10);
        }
        return sum;
    }
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        ExecutorService executor = new CustomizeExecutorService();
        Future<Integer> future = executor.submit(new Accumlation());
        long start = System.currentTimeMillis();
        int result = future.get();
        System.out.println("Future.get方法等待了" + (System.currentTimeMillis() - start) + "毫秒,計算結果:" + result);
    }
}
CustomizeExecutorService

打印結果

Future.get方法等待了3142毫秒,計算結果:44850

AbstractExecutorService的基本方法是submit,其他方法的實現可以此為參照。可以看下其submit系列方法的實現是怎樣的。

public Future<?> submit(Runnable task) {
        if (task == null) throw new NullPointerException();
        RunnableFuture<Void> ftask = newTaskFor(task, null);
        execute(ftask);
        return ftask;
    }
    public <T> Future<T> submit(Runnable task, T result) {
        if (task == null) throw new NullPointerException();
        RunnableFuture<T> ftask = newTaskFor(task, result);
        execute(ftask);
        return ftask;
    }
    public <T> Future<T> submit(Callable<T> task) {
        if (task == null) throw new NullPointerException();
        RunnableFuture<T> ftask = newTaskFor(task);
        execute(ftask);
        return ftask;
    }

不論入參是Runnable還是Callable,最終都會將其封裝成RunnableFuture,而RunnableFuture又是Runnable的子接口,而后去調用execute(Runnable),最后返回RunnableFuture 。這里有點奇怪的地方在於對入參的封裝,其實質相當於將Runnable或Callable任務封裝成Runnable結果。這里也再次體現出了RunnableFuture的作用,它是連接“任務的提交”和“任務的執行”的橋梁。

上面的submit方法體中調用newTaskFor,它實際上返回一個FutureTask類型的實例對象。

protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
    return new FutureTask<T>(runnable, value);
}
protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
    return new FutureTask<T>(callable);
}

2)Future實現

上面的提到的RunnableFuture同時繼承了Runnable和Future接口,RunnableFuture本身沒有增加任何抽象方法.

public interface RunnableFuture<V> extends Runnable, Future<V> {
    void run();//此方法是父接口Runnable的抽象方法
}

而FutureTask又是RunnableFutrue接口的主要實現類,我們可以來看看其的實現細節

它有一個成員變量state表示狀態

private volatile int state;

它有這些可能取值

private static final int NEW          = 0;//剛開始的狀態或任務在運行中
private static final int COMPLETING   = 1;//臨時狀態,任務即將結束,正在設置結果
private static final int NORMAL       = 2;//任務正常完成
private static final int EXCEPTIONAL  = 3;//因拋出異常而結束任務
private static final int CANCELLED    = 4;//任務被取消
private static final int INTERRUPTING = 5;//任務正在被中斷
private static final int INTERRUPTED  = 6;//任務被中斷(中斷的最終狀態)

其他成員變量

    private Callable<V> callable;
    private Object outcome; // non-volatile, protected by state reads/writes
    private volatile Thread runner;
    private volatile WaitNode waiters;

成員變量callable表示要執行的任務。

成員變量outcome表示任務的結果或任務非正常結束的異常

成員變量runner表示執行此任務的線程

成員變量waiter表示等待任務執行結果的等待棧表(數據結構是單向鏈表,先進后出)。WaitNode是一個簡單的靜態內部,一個成員變量thread表示等待結果的線程,另一個成員變量next表示下一個等待節點(線程)。

    static final class WaitNode {
        volatile Thread thread;
        volatile WaitNode next;
        WaitNode() { thread = Thread.currentThread(); }
    }

FutureTask的構造方法會初始化callable和state。FutureTask的構造方法可以接受Runnable類型參數,它會調用Executors.callable將Runnable轉換為Callable類型實例,以便於統一處理。

public FutureTask(Runnable runnable, V result) {
    this.callable = Executors.callable(runnable, result);
    this.state = NEW;       // ensure visibility of callable
}

任務執行服務會使用一個線程執行FutureTask的run方法,run方法的實現

public void run() {
        if (state != NEW ||
                !UNSAFE.compareAndSwapObject(this, runnerOffset,
                        null, Thread.currentThread()))
            //將當前線程設置為執行任務的線程
            return;
        try {
            Callable<V> c = callable;
            if (c != null && state == NEW) {
                V result;
                boolean ran;
                try {
                    result = c.call();//執行任務
                    ran = true;
                } catch (Throwable ex) {
                    //運行時有異常,設置異常
                    result = null;
                    ran = false;
                    setException(ex);
                }
                if (ran)
                    set(result);//設置結果
            }
        } finally {
            // runner must be non-null until state is settled to
            // prevent concurrent calls to run()
            runner = null; //state已是最終狀態,不再變化,將runer設為null,防止run方法被並發調用
            // state must be re-read after nulling runner to prevent
            // leaked interrupts
            int s = state; //清空運行線程runner后再重新獲取state,防止遺漏掉對中斷的處理
            if (s >= INTERRUPTING)
                handlePossibleCancellationInterrupt(s);
        }
    }

其主要邏輯是:

①檢查狀態,設置運行任務的線程

②調用callable的call方法去執行任務,並捕獲運行中可能出現的異常

③如果任務正常完成,調用set設置任務的結果,將state設為NORMAL, 將結果保存到outcome ,喚醒所有等待結果的線程

④若執行任務過程中發生了異常,調用setException設置異常,將state設為EXCEPTIONAL ,將此異常也保存到outcome ,喚醒所有等待結果的線程

⑤最后將運行線程runner清空,若狀態可能是任務被取消的中斷還要處理此中斷。

 

對於任務提交者,可通過get方法獲取任務的最終結果,它的超時版本是

public V get(long timeout, TimeUnit unit)
    throws InterruptedException, ExecutionException, TimeoutException {
    if (unit == null)
        throw new NullPointerException();
    int s = state;
    if (s <= COMPLETING && //還未完成
        (s = awaitDone(true, unit.toNanos(timeout))) <= COMPLETING) //等待完成
        //到了限定時間,任務仍未完成,拋出超時異常TimeoutException
        throw new TimeoutException();
    return report(s);//報告結果
}

其主要邏輯是:若任務未完成就等待任務完成,最后調用report報告結果,report會根據狀態返回結果或拋出異常。

而report方法的基本邏輯也很簡單:若是任務正常結束就返回這個任務的結果,若是任務被取消,就拋出任務取消異常CancellationException,若是在執行任務過程中發生了異常就統一將其封裝成ExecutionException並拋出。

private V report(int s) throws ExecutionException {
    Object x = outcome;
    if (s == NORMAL)
        return (V)x;
    if (s >= CANCELLED)
        throw new CancellationException();
    throw new ExecutionException((Throwable)x);
}

說到任務被取消,我們可以看看cancel(boolean)方法如何實現的

public boolean cancel(boolean mayInterruptIfRunning) {
    if (!(state == NEW &&
            UNSAFE.compareAndSwapInt(this, stateOffset, NEW,
                    mayInterruptIfRunning ? INTERRUPTING : CANCELLED)))
        //①不是NEW狀態,表示任務至少是COMPLETING(即將結束)狀態,返回false
        //②CAS更新state為INTERRUPTING或CANCELLED失敗,返回false
        //只有state狀態更新成功,才能取消任務(防止被並發調用)
        return false;
    try {    // in case call to interrupt throws exception
        if (mayInterruptIfRunning) {//允許中斷就設置中斷標志
            try {
                Thread t = runner;
                if (t != null)
                    t.interrupt();//設置中斷標志
            } finally { // final state 設置中斷的最終狀態
                //INTERRUPTING -> INTERRUPTED ,將state由“正在中斷”更新為”已經中斷“
                UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED);
            }
        }
    } finally {
        //從等待棧中喚醒並移除所有的線程(節點)
        finishCompletion();
    }
    return true;
}

其基本邏輯:

①任務已結束或被取消,返回false

②若mayInterruptIfRunning為true,調用interrupt設置中斷標志,將state設置為INTERRUPTED,若mayInterruptIfRunning為false,將state設為CANCELLED.

③調用finishCompletion喚醒並移除等待棧中的所有線程

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM