深入Callable及Runnable兩個接口 獲取線程返回結果


今天碰到一個需要獲取線程返回結果的業務場景,所以了解到了Callable接口。

先來看下下面這個例子:

public class ThreadTest {

    public static void main(String[] args) throws Exception {
        ExecutorService exc = Executors.newCachedThreadPool();
        try {
            String result = null;
            FutureTask<String> task = (FutureTask<String>) exc.submit(new Runnable() {
                @Override
                public void run() {
                    for (int i = 0; i < 10; i++) {
                        try {
                            Thread.sleep(100L);
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                        System.out.println(this.getClass() + "::線程執行中.." + i);
                    }
                }
            }, result);

            System.out.println("task return value:" + task.get());

            FutureTask<String> callableTask = (FutureTask<String>) exc.submit(new Callable<String>() {

                @Override
                public String call() throws InterruptedException {
                    for (int i = 0; i < 10; i++) {
                        Thread.sleep(100L);
                        System.out.println(this.getClass() + "::線程執行中.." + i);
                    }
                    return "success";
                }

            });

            System.out.println("提前出結果了 task return value:" + task.get());

            System.out.println("callableTask return value:" + callableTask.get());

        } finally {
            exc.shutdown();
        }

    }

}

 

運行結果如下:

class thread.ThreadTest$1::線程執行中..0
class thread.ThreadTest$1::線程執行中..1
class thread.ThreadTest$1::線程執行中..2
class thread.ThreadTest$1::線程執行中..3
class thread.ThreadTest$1::線程執行中..4
class thread.ThreadTest$1::線程執行中..5
class thread.ThreadTest$1::線程執行中..6
class thread.ThreadTest$1::線程執行中..7
class thread.ThreadTest$1::線程執行中..8
class thread.ThreadTest$1::線程執行中..9
task return value:null 提前出結果了 task return value:null
class thread.ThreadTest$2::線程執行中..0
class thread.ThreadTest$2::線程執行中..1
class thread.ThreadTest$2::線程執行中..2
class thread.ThreadTest$2::線程執行中..3
class thread.ThreadTest$2::線程執行中..4
class thread.ThreadTest$2::線程執行中..5
class thread.ThreadTest$2::線程執行中..6
class thread.ThreadTest$2::線程執行中..7
class thread.ThreadTest$2::線程執行中..8
class thread.ThreadTest$2::線程執行中..9
callableTask return value:success

 

可以得到以下幾點:

1 Runnable,Callable兩個接口方法體不一樣,前者為run,后者為call,且返回值也不一樣;

2 Runnable接口由於run方法返回void所以無法解決線程成功后返回相應結果的問題;但是實現Callable接口的線程類可以,因為Callable的執行方法體call方法

 可以返回對象。

3 由於runnable接口沒有返回值,所以FutureTask為了解決此問題將runnable線程類通過支配器轉換為callable線程。

4 當通過task對象調用get方法時,已經執行完成的現成可以立刻得到返回結果,但是還沒執行完的線程一直在等待。 

 

下面進入源碼看看:

線程池執行submit方法時進入AbstractExecutorService類中的submit
  public <T> Future<T> submit(Callable<T> task) {
        if (task == null) throw new NullPointerException();
        RunnableFuture<T> ftask = newTaskFor(task);
        execute(ftask);
        return ftask;
    }

這里好理解,將線程放入任務,由線程池的execute方法去執行。

執行完成后,當調用get方法時,會進入FutureTask的get方法:

public V get() throws InterruptedException, ExecutionException {
        int s = state;
     //當線程狀態為新建活着執行中時一直調用awaitDone方法 if (s
<= COMPLETING)
       //循環判斷線程狀態是否已經執行成功,如果執行成功返回線程狀態;其中還包括線程取消,中斷等情況的判斷。可參見下方源碼。
       //所以這里便是上面例子中為什么線程執行成功后即可立即得到結果,如果還沒有執行成功
s = awaitDone(false, 0L);
       //線程狀態正常返回結果
return report(s);
}
awaitDone源碼
  private int awaitDone(boolean timed, long nanos)
        throws InterruptedException {
        final long deadline = timed ? System.nanoTime() + nanos : 0L;
        WaitNode q = null;
        boolean queued = false;
        for (;;) {
            if (Thread.interrupted()) {
                removeWaiter(q);
                throw new InterruptedException();
            }

            int s = state;
            if (s > COMPLETING) {
                if (q != null)
                    q.thread = null;
                return s;
            }
            else if (s == COMPLETING) // cannot time out yet
                Thread.yield();
            else if (q == null)
                q = new WaitNode();
            else if (!queued)
                queued = UNSAFE.compareAndSwapObject(this, waitersOffset,
                                                     q.next = waiters, q);
            else if (timed) {
                nanos = deadline - System.nanoTime();
                if (nanos <= 0L) {
                    removeWaiter(q);
                    return state;
                }
                LockSupport.parkNanos(this, nanos);
            }
            else
                LockSupport.park(this);
        }
    }
 @SuppressWarnings("unchecked")
    private V report(int s) throws ExecutionException {
        Object x = outcome;
        if (s == NORMAL)
            return (V)x;
        if (s >= CANCELLED)
            throw new CancellationException();
        throw new ExecutionException((Throwable)x);
    }

 

然后我們來看看FutureTask是如何對runnable線程進行轉換的。代碼也很簡單:

public FutureTask(Runnable runnable, V result) {
        this.callable = Executors.callable(runnable, result);
        this.state = NEW;       // ensure visibility of callable
    }
public static <T> Callable<T> callable(Runnable task, T result) {
        if (task == null)
            throw new NullPointerException();
        return new RunnableAdapter<T>(task, result);
    }
static final class RunnableAdapter<T> implements Callable<T> {
        final Runnable task;
        final T result;
        RunnableAdapter(Runnable task, T result) {
            this.task = task;
            this.result = result;
        }
        public T call() {
            task.run();
            return result;
        }
    }

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM