淺談Java多線程之FutureTask


Runnable和Callable是多線程中的兩個任務接口,實現接口的類將擁有多線程的功能,FutureTask類與這兩個類是息息相關!

FutureTask繼承體系

 

看下這張圖,原來FutureTask類實現了Runnable和Future,既然是Runnable的實現類,我們可以寫如下的代碼:

public static void main(String[] args) {

    FutureTask task = new FutureTask(new Callable() {
        @Override
        public Object call() throws Exception {
            System.out.println(Thread.currentThread().getName() + "========>正在執行!");
            return "SUCCESS";
        }
    });

    new Thread(task).start();

}

因為FutureTask是Runnable的實現類嘛,根據多態的特性,肯定可以傳到Thread的構造器中。

 

FutureTask的構造方法    

構造方法1 接收Callable對象  

public FutureTask(Callable<V> callable) {
    if (callable == null)
        throw new NullPointerException();
    this.callable = callable;
    this.state = NEW;       // ensure visibility of callable
}

構造方法2 接收Runnable對象  和一個泛型的result

public FutureTask(Runnable runnable, V result) {
    this.callable = Executors.callable(runnable, result);
    this.state = NEW;       // ensure visibility of callable
}

原來,FutureTask內部維護Callable類型的成員變量,對於Callable任務,直接賦值即可。而對於Runnable任務,需要先調用Executors#callable()把Runnable先包裝成Callable。

Executors.callable(runnable, result);

這行代碼用了適配器模式,你給我一個runnable對象,我還你一個callable對象。

public static <T> Callable<T> callable(Runnable task, T result) {
    if (task == null)
        throw new NullPointerException();
    return new Executors.RunnableAdapter<T>(task, result);
}

RunnableAdapter是Executors中的靜態內部類,上面代碼意思是調用該靜態內部類的構造方法,生成RunnableAdapter對象,而RunnableAdapter對象實現了Callable接口,根據多態也就相當於得到了一個Callable對象。

static final class RunnableAdapter<T> implements Callable<T> {
    final Runnable task;
    final T result;
    RunnableAdapter(Runnable task, T result) {
        this.task = task;
        this.result = result;
    }
    public T call() {
        task.run();
        return result;
    }
}

RunnableAdapter作為Callable的適配器,也擁有call方法,這就是適配器模式。

如果你是用第二種方式來構造FutureTask對象,因為傳入的是Runnable,Runnable的run方法是沒有返回值的,而Callable的call方法是有返回值的,所以這邊就折中一下,返回值需要你在構建FutureTask對象時自己傳進去,最后再原封不動地還給你。

如果你是用第一種方式來構造FutureTask對象,那就簡單多了,直接傳入一個Callable對象即可,返回值你自己決定。

總而言之,FutureTask的構造方法就為了做一件事,即統一Callable和Runnable

為什么FutureTask要花這么大的精力去搞定Callable和Runnable呢?就是因為統一了好辦事啊,以后在線程池的章節中,你還會頻繁看到這個類。

捋一捋思路,為什么要用FutureTask?

多線程是Java進階的難點,也是面試的重災區,請確保你把上面的代碼都理解了之后再來看這一節。

我們再回過頭來想想,如何使用多線程呢,是不是有3個方法?如果記不得了請回過去看看上一個章節【線程類】。

 

第1種方法是直接繼承Thread類,重寫run方法。

第2種方法是實現Runnable接口,然后還是要靠Thread類的構造器,把Runnable傳進去,最終調用的就是Runnable的run方法。

第3種方法是用線程池技術,用ExecutorService去提交Runnable對象/Callable對象,區別是Runnable沒有返回值,Callable對象有返回值。

 

你發現沒有,不管你用哪種方式,最終都是要靠Thread類去開啟線程的。因為,有且僅有Thread類能通過start0()方法向操作系統申請線程資源(本地方法)

 

第一種方法因為耦合性太高,很少會使用,實際開發中我們一般都會使用線程池技術,所以第3種方法是有實戰意義的。那么問題來了,Runnable和Callable對象都可以被用作線程池的任務,就有人會亂用了啊,有的人喜歡Runnable,有的喜歡Callable,到時候項目的代碼就亂成一鍋粥啦!

 

所以,我私以為Java的創始人意識到這一點,就干脆搞一個FutureTask出來一統江湖。我說的這么白,應該都明白了吧,嘿嘿。

FutureTask的7種狀態

既然FutureTask是子類,那么必然有比Callable和Runnable強悍的地方,比如FutureTask的7種狀態

 

private volatile int state;
private static final int NEW          = 0; 
private static final int COMPLETING   = 1;
private static final int NORMAL       = 2;
private static final int EXCEPTIONAL  = 3;
private static final int CANCELLED    = 4;
private static final int INTERRUPTING = 5;
private static final int INTERRUPTED  = 6;

狀態含義分別是:

●     0-剛創建 

●     1-即將完成

●     2-完成 

●     3-拋異常 

●     4-任務取消 

●     5-任務即將被打斷

●     6-任務被打斷

為什么要設置這些狀態呢,那是因為FutureTask=任務+結果,調用者何時可以去獲取這個結果result呢?FutureTask在調用get方法時,會去判斷當前任務的狀態,只有當任務完成才會給你實際的result,因此get方法是阻塞的。

 

 FutureTask的get() 方法

先看下FutureTask的get() 方法是如何使用的:

FutureTask task = new FutureTask(new Callable() {
    @Override
    public Object call() throws Exception {
        System.out.println(Thread.currentThread().getName() + "========>正在執行!");
        Thread.sleep(2000); //執行耗時操作
        return "SUCCESS";
    }
});

new Thread(task).start();

System.out.println(task.get());

效果:

Thread-0========>正在執行!
SUCCESS

過了兩秒后才打印出SUCCESS,說明get確實是阻塞的。再來一個線程池的例子:

ExecutorService executorService = Executors.newSingleThreadExecutor();

/**
 * 往線程池中提交一個Callable,立刻返回Future對象,但是該Future對象里面的返回值目前還是null
 * 只有當你調用get方法時,才會阻塞地獲取該任務真實的返回值
 */
Future<Object> objectFuture = executorService.submit(new Callable<Object>() {
    @Override
    public Object call() throws Exception {
        System.out.println(Thread.currentThread().getName() + "========>正在執行!");
        Thread.sleep(2000); //執行耗時操作
        return "SUCCESS";
    }
});

Object result = objectFuture.get();
System.out.println(result);
executorService.shutdownNow();

FutureTask的run方法

先來看看源代碼吧:

public void run() {
    if (state != NEW ||
        !UNSAFE.compareAndSwapObject(this, runnerOffset,
                                     null, Thread.currentThread()))
        return;
    try {
        Callable<V> c = callable;
        if (c != null && state == NEW) {
            V result;
            boolean ran;
            try {
                result = c.call();
                ran = true;
            } catch (Throwable ex) {
                result = null;
                ran = false;
                setException(ex);
            }
            if (ran)
                set(result);
        }
    } finally {
        // runner must be non-null until state is settled to
        // prevent concurrent calls to run()
        runner = null;
        // state must be re-read after nulling runner to prevent
        // leaked interrupts
        int s = state;
        if (s >= INTERRUPTING)
            handlePossibleCancellationInterrupt(s);
    }
}

FutureTask的run方法第一步果然是獲取callable對象,這個callable對象也可能是runnable偽裝的,上面介紹了適配器模式,這邊就不再贅述了。

最終是存儲到outcome對象了,簡而言之,FutureTask的run方法的作用就是運行callable的call方法,拿到返回值保存到outcome對象,等待有人來取。

薛定諤的FutureTask

為什么說是薛定諤的FutureTask呢?那是因為,當你把FutureTask跑起來的時候,里面的outcome可能沒有值,也可能有值。

但是又因為outcome在FutureTask源碼中被設置成private,所以如果你要獲取這個數據,只能通過get方法。而get方法是阻塞的,當你調用get方法時,一定是等到任務執行成功后,才會返回真實的值。

這就有點像薛定諤的貓,你不去觀察它,兩種狀態皆有可能,一旦你去觀察了(調用get方法),就只有一種明確的狀態。

其實這真的只是一個小技巧,相信你也能辦到,我們用代碼來模擬一下這個過程。

首先,新建一個MyFutureTask類:

/**
 * 自定義任務類
 */
public class MyFutureTask implements Runnable{

    /**
     * 為了看到效果,outcome設置為Object
     */
    public Object outcome;

    public void run(){
        try {
            /**
             * 執行耗時操作
             */
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        /**
         * 給outcome賦值
         */
        this.outcome = "SUCCESS";
    }

    public Object get(){
        return outcome;
    }

}

為了模擬線程池,新建一個MyExecutorService類:

class MyExecutorService {

    /**
     * 提交任務
     * @param myFutureTask
     * @return
     */
    public MyFutureTask submit(MyFutureTask myFutureTask){
        /**
         * 開啟一個線程把myFutureTask跑掉
         */
        new Thread(myFutureTask).start();
        /**
         * 線程有沒有跑完不關心,直接把myFutureTask返回
         * 此時myFutureTask很可能不是最終結果,但其中的outcome一定指向最終結果
         */
        return myFutureTask;
    }
}

測試:

public static void main(String[] args) throws InterruptedException {
    MyExecutorService myExecutorService = new MyExecutorService();
    MyFutureTask myFutureTask = myExecutorService.submit(new MyFutureTask());
    /**
     * 線程開啟立刻查看outcome
     */
    System.out.println(myFutureTask.outcome);
    /**
     * 主線程繼續運作
     */
    Thread.sleep(1100);
    /**
     * 再次查看outcome是否有值
     */
    System.out.println(myFutureTask.get());
}

結果:

null
SUCCESS

總結一下,ExecutorService的submit方法只是提交Runnable或Callable任務到線程池,直接返回FutureTask給你,這個FutureTask是薛定諤的FutureTask,里面的outcome現在可能有值,也可能沒有。只有當你主動調用get方法,才可以得到確切的值。

 FutureTask的get方法阻塞原理

FutureTask的get方法是阻塞的,當你調用這個方法就一定要等該線程跑完,那么為什么能做到這樣呢?

接下來我們看看get方法的阻塞原理是什么,我重新寫了一個例子,注釋能寫的都寫了,代碼如下:

package com.javaxbfs.thread;

import java.util.concurrent.locks.LockSupport;

class MyExecutorService {

  /**
     * 提交任務
     * @param myFutureTask
     * @return
     */
    public MyFutureTask submit(MyFutureTask myFutureTask){
      /**
         * 開啟一個線程把myFutureTask跑掉
         */
        Thread thread = new Thread(myFutureTask);
        thread.start();
      /**
         * 把線程賦給myFutureTask的runner屬性,以方便查看線程狀態
         */
        myFutureTask.runner = thread;
      /**
         * 線程有沒有跑完不關心,直接把myFutureTask返回
         * 此時myFutureTask很可能不是最終結果,但其中的outcome一定指向最終結果
         */
        return myFutureTask;
    }
}

/**
 * 自定義任務類
 */
public class MyFutureTask implements Runnable{

  /**
     * 為了看到效果,outcome設置為Object
     */
    public Object outcome;
  /**
     * 當前任務所在的線程
     */
    public volatile Thread runner;

    public void run(){
        try {

          /**
             * 執行耗時操作
             */
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
      /**
         * 給outcome賦值
         */
        this.outcome = "SUCCESS";
    }

    public Object get() throws InterruptedException {
        if(awaitDone())
            return outcome;
        return null;
    }

    private boolean awaitDone() throws InterruptedException {
      /**
         * 做一個死循環,輪詢檢查當前線程狀態
         */
        for(;;){
          /**
             * 如果當前線程被打斷,則拋異常結束任務
             */
            if(runner.isInterrupted()){
                throw new InterruptedException();
            }
            if(runner.getState() == Thread.State.NEW){
                System.out.println(Thread.currentThread().getName() + "新建!");
            }
            if(runner.getState() == Thread.State.RUNNABLE){
                System.out.println(Thread.currentThread().getName() + "准備就緒!");
            }

            if(runner.getState() == Thread.State.TERMINATED){
                System.out.println(Thread.currentThread().getName() + "執行完畢!");
                return true;
            }
            Thread.sleep(200);
        }
    }


    public static void main(String[] args) throws InterruptedException {
        MyExecutorService myExecutorService = new MyExecutorService();
        MyFutureTask myFutureTask = myExecutorService.submit(new MyFutureTask());
      /**
         * 線程開啟立刻查看outcome
         */
        System.out.println(myFutureTask.outcome);
      /**
         * 主線程繼續運作
         */
        //Thread.sleep(1100);
      /**
         * 再次查看outcome是否有值,現在是阻塞的
         */
        System.out.println(myFutureTask.get());
    }
}

關鍵就在於這個 awaitDone 方法(源碼也叫這個名字),它里面是一個死循環,不斷去檢查當前FutureTask所在線程的狀態,當線程執行結束,就返回true,表示可以給出精確的result了。

真實的get方法實現非常復雜,不過思路是差不多的,有興趣的童鞋可以去百度了解。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM