Java 多線程(五)—— 線程池基礎 之 FutureTask源碼解析


FutureTask是一個支持取消行為的異步任務執行器。該類實現了Future接口的方法。
如:

  1. 取消任務執行
  2. 查詢任務是否執行完成
  3. 獲取任務執行結果(”get“任務必須得執行完成才能獲取結果,否則會阻塞直至任務完成)。
    注意:一旦任務執行完成或取消任務,則不能執行取消任務或者重新啟動任務。(除非一開始就使用runAndReset模式運行任務)

FutureTask實現了Runnable接口和Future接口,因此FutureTask可以傳遞到線程對象Thread或Excutor(線程池)來執行。

如果在當前線程中需要執行比較耗時的操作,但又不想阻塞當前線程時,可以把這些作業交給FutureTask,另開一個線程在后台完成,當當前線程將來需要時,就可以通過FutureTask對象獲得后台作業的計算結果或者執行狀態。

示例

 1 public class FutureTaskDemo {
 2     public static void main(String[]args)throws InterruptedException {
 3         FutureTask < Integer > ft = new FutureTask <  > (new Callable < Integer > () {
 4                  @Override 
 5                  public Integer call()throws Exception {
 6                     int num = new Random().nextInt(10);
 7                     TimeUnit.SECONDS.sleep(num);
 8                     return num;
 9                 }
10             });
11         Thread t = new Thread(ft);
12         t.start(); 
13         //這里可以做一些其它的事情,跟futureTask任務並行,等需要futureTask的運行結果時,可以調用get方法獲取
14         try { 
15             //等待任務執行完成,獲取返回值
16             Integer num = ft.get();
17             System.out.println(num);
18         } catch (Exception e) {
19             e.printStackTrace();
20         }
21     }
22 }

FutureTask 源碼分析

JDK1.8自己實現了一個同步等待隊列,在結果返回之前,所有的線程都被阻塞,存放到等待隊列中。

下面我們來分析下JDK1.8的FutureTask 源碼

FutureTask 類結構

 1 public class FutureTask<V> implements RunnableFuture<V> { 
 2 /** * 當前任務的運行狀態。 
 3 * 
 4 * 可能存在的狀態轉換 
 5 * NEW -> COMPLETING -> NORMAL(有正常結果) 
 6 * NEW -> COMPLETING -> EXCEPTIONAL(結果為異常) 
 7 * NEW -> CANCELLED(無結果) 
 8 * NEW -> INTERRUPTING -> INTERRUPTED(無結果) 
 9 */ 
10 private volatile int state; 
11 private static final int NEW = 0; //初始狀態 
12 private static final int COMPLETING = 1; //結果計算完成或響應中斷到賦值給返回值之間的狀態。 
13 private static final int NORMAL = 2; //任務正常完成,結果被set 
14 private static final int EXCEPTIONAL = 3; //任務拋出異常 
15 private static final int CANCELLED = 4; //任務已被取消 
16 private static final int INTERRUPTING = 5; //線程中斷狀態被設置ture,但線程未響應中斷 
17 private static final int INTERRUPTED = 6; //線程已被中斷 
18 
19 //將要執行的任務 
20 private Callable<V> callable; //用於get()返回的結果,也可能是用於get()方法拋出的異常 
21 private Object outcome; // non-volatile, protected by state reads/writes //執行callable的線程,調用FutureTask.run()方法通過CAS設置 
22 private volatile Thread runner; //棧結構的等待隊列,該節點是棧中的最頂層節點。 
23 private volatile WaitNode waiters; 
24 .... 

FutureTask實現的接口信息如下:

RunnableFuture 接口

1 public interface RunnableFuture<V> extends Runnable, Future<V> {
2     void run();
3 }

RunnableFuture 接口基礎了Runnable和Future接口

Future 接口

 1 public interface Future<V> { 
 2     //取消任務 
 3     boolean cancel(boolean mayInterruptIfRunning); 
 4     //判斷任務是否已經取消 
 5     boolean isCancelled(); 
 6     //判斷任務是否結束(執行完成或取消) 
 7     boolean isDone(); 
 8     //阻塞式獲取任務執行結果 
 9     V get() throws InterruptedException, ExecutionException; 
10     //支持超時獲取任務執行結果 
11     V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException; 
12 }

run 方法

 1 public void run() {
 2     //保證callable任務只被運行一次
 3     if (state != NEW || !UNSAFE.compareAndSwapObject(this, runnerOffset, null, Thread.currentThread()))
 4         return;
 5     try {
 6         Callable < V > c = callable;
 7         if (c != null && state == NEW) {
 8             V result;
 9             boolean ran;
10             try { 
11                 //執行任務,上面的例子我們可以看出,call()里面可能是一個耗時的操作,不過這里是同步的
12                 result = c.call();
13                 //上面的call()是同步的,只有上面的result有了結果才會繼續執行
14                 ran = true;
15             } catch (Throwable ex) {
16                 result = null;
17                 ran = false;
18                 setException(ex);
19             }
20             if (ran)
21                 //執行完了,設置result
22                 set(result);
23         }
24     }
25     finally {
26         runner = null;
27         int s = state;
28         //判斷該任務是否正在響應中斷,如果中斷沒有完成,則等待中斷操作完成
29         if (s >= INTERRUPTING)
30             handlePossibleCancellationInterrupt(s);
31     }
32 }

1.如果state狀態不為New或者設置運行線程runner失敗則直接返回false,說明線程已經啟動過,保證任務在同一時刻只被一個線程執行。
2.調用callable.call()方法,如果調用成功則執行set(result)方法,將state狀態設置成NORMAL。如果調用失敗拋出異常則執行setException(ex)方法,將state狀態設置成EXCEPTIONAL,喚醒所有在get()方法上等待的線程。
3.如果當前狀態為INTERRUPTING(步驟2已CAS失敗),則一直調用Thread.yield()直至狀態不為INTERRUPTING

set方法

1 protected void set(V v) {
2     if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
3         outcome = v;
4         UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state
5         finishCompletion();
6     }
7 }
  1. 首先通過CAS把state的NEW狀態修改成COMPLETING狀態。
  2. 修改成功則把v值賦給outcome變量。然后再把state狀態修改成NORMAL,表示現在可以獲取返回值。
  3. 最后調用finishCompletion()方法,喚醒等待隊列中的所有節點。

finishCompletion方法

 1 private void finishCompletion() {
 2     for (WaitNode q; (q = waiters) != null; ) { 
 3         //通過CAS把棧頂的元素置為null,相當於彈出棧頂元素
 4         if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) {
 5             for (; ; ) {
 6                 Thread t = q.thread;
 7                 if (t != null) {
 8                     q.thread = null;
 9                     LockSupport.unpark(t);
10                 }
11                 WaitNode next = q.next;
12                 if (next == null)
13                     break;
14                 q.next = null; // unlink to help gc
15                 q = next;
16             }
17             break;
18         }
19     }
20     done();
21     callable = null; // to reduce footprint
22 }

把棧中的元素一個一個彈出,並通過 LockSupport.unpark(t)喚醒每一個節點,通知每個線程,該任務執行完成(可能是執行完成,也可能cancel,異常等)

runAndReset 方法

 1 protected boolean runAndReset() {
 2     if (state != NEW ||
 3         !UNSAFE.compareAndSwapObject(this, runnerOffset,
 4                                      null, Thread.currentThread()))
 5         return false;
 6     boolean ran = false;
 7     int s = state;
 8     try {
 9         Callable<V> c = callable;
10         if (c != null && s == NEW) {
11             try {
12                 // 執行任務,和run方法不同的是這里不需要設置返回值
13                 c.call(); // don't set result
14                 ran = true;
15             } catch (Throwable ex) {
16                 setException(ex);
17             }
18         }
19     } finally {
20         // runner must be non-null until state is settled to
21         // prevent concurrent calls to run()
22         runner = null;
23         // state must be re-read after nulling runner to prevent
24         // leaked interrupts
25         s = state;
26         if (s >= INTERRUPTING)
27             handlePossibleCancellationInterrupt(s);
28     }
29     //這里並沒有改變state的狀態,還是NEW狀態
30     return ran && s == NEW;
31 }
runAndReset()和run()方法最大的區別是 runAndReset 不需要設置返回值,並且不需要改變任務的狀態,也就是不改變state的狀態,一直是NEW狀態。

get方法

1 public V get()throws InterruptedException, ExecutionException {
2     int s = state;
3     if (s <= COMPLETING)
4         s = awaitDone(false, 0L);
5     return report(s);
6 }

如果state狀態小於等於COMPLETING,說明任務還沒開始執行或還未執行完成,然后調用awaitDone方法阻塞該調用線程。

如果state的狀態大於COMPLETING,則說明任務執行完成,或發生異常、中斷、取消狀態。直接通過report方法返回執行結果。

awaitDone 方法

 1 private int awaitDone(boolean timed, long nanos)throws InterruptedException {
 2     final long deadline = timed ? System.nanoTime() + nanos : 0L;
 3     WaitNode q = null;
 4     boolean queued = false;
 5     for (; ; ) { 
 6         //如果該線程執行interrupt()方法,則從隊列中移除該節點,並拋出異常
 7         if (Thread.interrupted()) {
 8             removeWaiter(q);
 9             throw new InterruptedException();
10         }
11         int s = state; 
12         //如果state狀態大於COMPLETING 則說明任務執行完成,或取消
13         if (s > COMPLETING) {
14             if (q != null)
15                 q.thread = null;
16             return s;
17         } 
18         //如果state=COMPLETING,則使用yield,因為此狀態的時間特別短,通過yield比掛起響應更快。
19         else if (s == COMPLETING) // cannot time out yet
20             Thread.yield(); 
21         //構建節點
22         else if (q == null)
23             q = new WaitNode();
24         //把當前節點入棧
25         else if (!queued)
26             queued = UNSAFE.compareAndSwapObject(this, waitersOffset, q.next = waiters, q);
27         //如果需要阻塞指定時間,則使用LockSupport.parkNanos阻塞指定時間
28         //如果到指定時間還沒執行完,則從隊列中移除該節點,並返回當前狀態
29         else if (timed) {
30             nanos = deadline - System.nanoTime();
31             if (nanos <= 0L) {
32                 removeWaiter(q);
33                 return state;
34             }
35             LockSupport.parkNanos(this, nanos);
36         }
37         //阻塞當前線程
38         else
39             LockSupport.park(this);
40     }
41 }

構建棧鏈表的節點元素,並將該節點入棧,同時阻塞當前線程等待運行主任務的線程喚醒該節點。

report方法

1 private V report(int s)throws ExecutionException {
2     Object x = outcome;
3     if (s == NORMAL)
4         return (V)x;
5     if (s >= CANCELLED)
6         throw new CancellationException();
7     throw new ExecutionException((Throwable)x);
8 }

如果state的狀態為NORMAL,說明任務正確執行完成,直接返回計算后的值。
如果state的狀態大於等於CANCELLED,說明任務被成功取消執行、或響應中斷,直接返回CancellationException異常
否則返回ExecutionException異常。

總結

  1.任務開始運行后,不能在次運行,保證只運行一次(runAndReset 方法除外)
  2.任務還未開始,或者任務已被運行,但未結束,這兩種情況下都可以取消; 如果任務已經結束,則不可以被取消 。

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM