Java並發編程筆記之FutureTask源碼分析


FutureTask可用於異步獲取執行結果或取消執行任務的場景。通過傳入Runnable或者Callable的任務給FutureTask,直接調用其run方法或者放入線程池執行,之后可以在外部通過FutureTask的get方法異步獲取執行結果,因此,FutureTask非常適合用於耗時的計算,主線程可以在完成自己的任務后,再去獲取結果。另外,FutureTask還可以確保即使調用了多次run方法,它都只會執行一次Runnable或者Callable任務,或者通過cancel取消FutureTask的執行等。

類圖結構如下所示:

線程池使用 FutureTask 時候需要注意的一點事,FutureTask 使用不當可能會造成調用線程一直阻塞,如何避免?

線程池使用 FutureTask 的時候如果拒絕策略設置為了 DiscardPolicyDiscardOldestPolicy並且在被拒絕的任務的 Future 對象上調用無參 get 方法那么調用線程會一直被阻塞。

下面先通過一個簡單的例子來復現問題,代碼如下:

public class FutureTest {

    //(1)線程池單個線程,線程池隊列元素個數為1
        private final static ThreadPoolExecutor executorService = new ThreadPoolExecutor(1, 1, 1L, TimeUnit.MINUTES,
            new ArrayBlockingQueue<Runnable>(1),new ThreadPoolExecutor.DiscardPolicy());

    public static void main(String[] args) throws Exception {

        //(2)添加任務one
        Future futureOne = executorService.submit(new Runnable() {
            @Override
            public void run() {

                System.out.println("start runable one");
                try {
                    Thread.sleep(5000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        });

        //(3)添加任務two
        Future futureTwo = executorService.submit(new Runnable() {
            @Override
            public void run() {
                System.out.println("start runable two");
            }
        });

        //(4)添加任務three
        Future futureThree=null;
        try {
            futureThree = executorService.submit(new Runnable() {
                @Override
                public void run() {
                    System.out.println("start runable three");
                }
            });
        } catch (Exception e) {
            System.out.println(e.getLocalizedMessage());
        }

        System.out.println("task one " + futureOne.get());//(5)等待任務one執行完畢
        System.out.println("task two " + futureTwo.get());//(6)等待任務two執行完畢
        System.out.println("task three " + (futureThree==null?null:futureThree.get()));// (7)等待任務three執行完畢

        executorService.shutdown();//(8)關閉線程池,阻塞直到所有任務執行完畢
 }

運行結果如下:

代碼 (1) 創建了一個單線程並且隊列元素個數為 1 的線程池,並且拒絕策略設置為了DiscardPolicy

代碼(2)向線程池提交了一個任務 one,那么這個任務會使用唯一的一個線程進行執行,任務在打印 start runable one后會阻塞該線程 5s.

代碼(3)向線程池提交了一個任務 two,這時候會把任務 two 放入到阻塞隊列

代碼(4)向線程池提交任務 three,由於隊列已經滿了則會觸發拒絕策略丟棄任務 three, 從執行結果看在任務 one 阻塞的 5s 內,主線程執行到了代碼 (5) 等待任務 one 執行完畢,當任務 one 執行完畢后代碼(5)返回,主線程打印出 task one null。任務 one 執行完成后線程池的唯一線程會去隊列里面取出任務 two 並執行所以輸出 start runable two 然后代碼(6)會返回,這時候主線程輸出 task two null,然后執行代碼(7)等待任務 three 執行完畢,從執行結果看代碼(7)會一直阻塞不會返回,至此問題產生,如果把拒絕策略修改為 DiscardOldestPolicy 也會存在有一個任務的 get 方法一直阻塞只是現在是任務 two 被阻塞。但是如果拒絕策略設置為默認的 AbortPolicy 則會正常返回,並且會輸出如下結果:

 

要分析這個問題需要看下線程池的 submit 方法里面做了什么,submit 方法源碼如下:

public Future<?> submit(Runnable task) {
        ...
        //(1)裝飾Runnable為Future對象
        RunnableFuture<Void> ftask = newTaskFor(task, null);
        execute(ftask);
        //(6)返回future對象
        return ftask;
}

 protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
        return new FutureTask<T>(runnable, value);
 }
public void execute(Runnable command) { ... //(2) 如果線程個數消息核心線程數則新增處理線程處理 int c = ctl.get(); if (workerCountOf(c) < corePoolSize) { if (addWorker(command, true)) return; c = ctl.get(); } //(3)如果當前線程個數已經達到核心線程數則任務放入隊列 if (isRunning(c) && workQueue.offer(command)) { int recheck = ctl.get(); if (! isRunning(recheck) && remove(command)) reject(command); else if (workerCountOf(recheck) == 0) addWorker(null, false); } //(4)嘗試新增處理線程進行處理 else if (!addWorker(command, false)) reject(command);//(5)新增失敗則調用拒絕策略 }

代碼(1)裝飾 Runnable 為 FutureTask 對象,然后調用線程池的 execute 方法。

代碼 (2) 如果線程個數消息核心線程數則新增處理線程處理

代碼(3)如果當前線程個數已經達到核心線程數則任務放入隊列

代碼(4)嘗試新增處理線程進行處理,失敗則進行代碼(5),否者直接使用新線程處理

代碼(5)執行具體拒絕策略,從這里也可以看出拒絕策略執行是使用的業務線程。

所以要分析上面例子中問題所在只需要看步驟(5)對被拒絕任務的影響,這里先看下拒絕策略 DiscardPolicy 的源碼,如下:

public static class DiscardPolicy implements RejectedExecutionHandler {
  
public DiscardPolicy() { }
  
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { }
}

可知拒絕策略 rejectedExecution 方法里面什么都沒做,所以代碼(4)調用 submit 后會返回一個 future 對象,這里有必要在重新說 future 是有狀態的,FutureTask 內部有一個state用來展示任務的狀態,並且是volatile修飾的,future 的狀態枚舉值如下:

/** Possible state transitions:
 * NEW -> COMPLETING -> NORMAL 正常的狀態轉移
 * NEW -> COMPLETING -> EXCEPTIONAL 異常
 * NEW -> CANCELLED 取消
 * NEW -> INTERRUPTING -> INTERRUPTED 中斷
 */
 
private volatile int state;
private static final int NEW          = 0;
private static final int COMPLETING   = 1;
private static final int NORMAL       = 2;
private static final int EXCEPTIONAL  = 3;
private static final int CANCELLED    = 4;
private static final int INTERRUPTING = 5;
private static final int INTERRUPTED  = 6;

在代碼(1)的時候使用 newTaskFor 方法轉換 Runnable 任務為 FutureTask,而 FutureTask 的構造函數里面設置的狀態就是 New。FutureTask的構造函數源碼如下:

public FutureTask(Runnable runnable, V result) {
     this.callable = Executors.callable(runnable, result);
     this.state = NEW;       // ensure visibility of callable
}

 

把FutureTask提交到線程池或者線程執行start時候會調用run方法,源碼如下:

public void run() {

    //如果當前不是new狀態,或者當前cas設置當前線程失敗則返回,只有一個線程可以成功。
    if (state != NEW ||
        !UNSAFE.compareAndSwapObject(this, runnerOffset,
                                     null, Thread.currentThread()))
        return;
    try {
        //當前狀態為new 則調用任務的call方法執行任務
        Callable<V> c = callable;
        if (c != null && state == NEW) {
            V result;
            boolean ran;
            try {
                result = c.call();
                ran = true;
            } catch (Throwable ex) {
                result = null;
                ran = false;
                setException(ex);完成NEW -> COMPLETING -> EXCEPTIONAL 狀態轉移
            }

            //執行任務成功則保存結果更新狀態,unpark所有等待線程。
            if (ran)
                set(result);
        }
    } finally {
        // runner must be non-null until state is settled to
        // prevent concurrent calls to run()
        runner = null;
        // state must be re-read after nulling runner to prevent
        // leaked interrupts
        int s = state;
        if (s >= INTERRUPTING)
            handlePossibleCancellationInterrupt(s);
    }
}


protected void set(V v) {
    //狀態從new->COMPLETING
    if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
        outcome = v;
        //狀態從COMPLETING-》NORMAL
        UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state
        //unpark所有等待線程。
        finishCompletion();
    }
}

 

所以使用 DiscardPolicy 策略提交任務后返回了一個狀態值為NEW的future對象。那么我們下面就要看下當future的無參get()方法的時候,future變為什么狀態才會返回,這時候就要看一下FutureTask的get方法的源碼,源碼如下:

  public V get() throws InterruptedException, ExecutionException {
        int s = state;
        //當狀態值<=COMPLETING時候需要等待,否者調用report返回
        if (s <= COMPLETING)
            s = awaitDone(false, 0L);
        return report(s);
    }

    private int awaitDone(boolean timed, long nanos)
            throws InterruptedException {
        final long deadline = timed ? System.nanoTime() + nanos : 0L;
        WaitNode q = null;
        boolean queued = false;
        for (;;) {

            //如果被中斷,則拋異常
            if (Thread.interrupted()) {
                removeWaiter(q);
                throw new InterruptedException();
            }

            //組建單列表
            int s = state;
            if (s > COMPLETING) {
                if (q != null)
                    q.thread = null;
                return s;
            }
            else if (s == COMPLETING) // cannot time out yet
                Thread.yield();
            else if (q == null)
                q = new WaitNode();
            else if (!queued)
                queued = UNSAFE.compareAndSwapObject(this, waitersOffset,
                        q.next = waiters, q);
            else if (timed) {


                nanos = deadline - System.nanoTime();
                //超時則返回
                if (nanos <= 0L) {
                    removeWaiter(q);
                    return state;
                }
                //否者設置park超時時間
                LockSupport.parkNanos(this, nanos);
            }
            else
                //直接掛起當前線程
                LockSupport.park(this);
        }
    }
    
    private V report(int s) throws ExecutionException {
        Object x = outcome;
        //狀態值為NORMAL正常返回
        if (s == NORMAL)
            return (V)x;
        //狀態值大於等於CANCELLED則拋異常
        if (s >= CANCELLED)
            throw new CancellationException();
        throw new ExecutionException((Throwable)x);
    }

也就是說當 future 的狀態 > COMPLETING 時候調用 get 方法才會返回,而明顯 DiscardPolicy 策略在拒絕元素的時候並沒有設置該 future 的狀態,后面也沒有其他機會可以設置該 future 的狀態,所以 future 的狀態一直是 NEW,所以一直不會返回,同理 DiscardOldestPolicy 策略也是這樣的問題,最老的任務被淘汰時候沒有設置被淘汰任務對於 future 的狀態。、

在submit任務后還可以調用futuretask的cancel來取消任務:

 

public boolean cancel(boolean mayInterruptIfRunning) {
        //只有任務是new的才能取消
        if (state != NEW)
            return false;
       //運行時允許中斷
        if (mayInterruptIfRunning) {
           //完成new->INTERRUPTING
            if (!UNSAFE.compareAndSwapInt(this, stateOffset, NEW, INTERRUPTING))
                return false;
            Thread t = runner;
            if (t != null)
                t.interrupt();
            //完成INTERRUPTING->INTERRUPTED
            UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED); // final state
        }
       //不允許中斷則直接new->CANCELLED
        else if (!UNSAFE.compareAndSwapInt(this, stateOffset, NEW, CANCELLED))
            return false;
        finishCompletion();
        return true;
}

 

那么默認的 AbortPolicy 策略為啥沒問題呢?

也就是說當 future 的狀態 > COMPLETING 時候調用 get 方法才會返回,而明顯 DiscardPolicy 策略在拒絕元素的時候並沒有設置該 future 的狀態,后面也沒有其他機會可以設置該 future 的狀態,所以 future 的狀態一直是 NEW,所以一直不會返回,同理 DiscardOldestPolicy 策略也是這樣的問題,最老的任務被淘汰時候沒有設置被淘汰任務對於 future 的狀態。

所以當使用 Future 的時候,盡量使用帶超時時間的 get 方法,這樣即使使用了 DiscardPolicy 拒絕策略也不至於一直等待,等待超時時間到了會自動返回的,如果非要使用不帶參數的 get 方法則可以重寫 DiscardPolicy 的拒絕策略在執行策略時候設置該 Future 的狀態大於 COMPLETING 即可,但是查看 FutureTask 提供的方法發現只有 cancel 方法是 public 的並且可以設置 FutureTask 的狀態大於 COMPLETING,重寫拒絕策略具體代碼可以如下:

/**
 * Created by cong on 2018/7/13.
 */
public class MyRejectedExecutionHandler implements RejectedExecutionHandler {
    public void rejectedExecution(Runnable runnable, ThreadPoolExecutor threadPoolExecutor) {
        if (!threadPoolExecutor.isShutdown()) {
            if(null != runnable && runnable instanceof FutureTask){
                ((FutureTask) runnable).cancel(true);
            }
        }
    }
}

使用這個策略時候由於從 report 方法知道在 cancel 的任務上調用 get() 方法會拋出異常所以代碼(7)需要使用 try-catch 捕獲異常代碼(7)修改為如下:

package com.hjc;

import java.util.concurrent.*;

/**
 * Created by cong on 2018/7/13.
 */
public class FutureTest {

    //(1)線程池單個線程,線程池隊列元素個數為1
    private final static ThreadPoolExecutor executorService = new ThreadPoolExecutor(1, 1, 1L, TimeUnit.MINUTES,
            new ArrayBlockingQueue<Runnable>(1), new MyRejectedExecutionHandler());

    public static void main(String[] args) throws Exception {

        //(2)添加任務one
        Future futureOne = executorService.submit(new Runnable() {
           
            public void run() {

                System.out.println("start runable one");
                try {
                    Thread.sleep(5000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        });

        //(3)添加任務two
        Future futureTwo = executorService.submit(new Runnable() {
            
            public void run() {
                System.out.println("start runable two");
            }
        });

        //(4)添加任務three
        Future futureThree = null;
        try {
            futureThree = executorService.submit(new Runnable() {
                
                public void run() {
                    System.out.println("start runable three");
                }
            });
        } catch (Exception e) {
            System.out.println(e.getLocalizedMessage());
        }

        System.out.println("task one " + futureOne.get());//(5)等待任務one執行完畢
        System.out.println("task two " + futureTwo.get());//(6)等待任務two執行完畢
        try{
            System.out.println("task three " + (futureThree==null?null:futureThree.get()));// (7)等待任務three
        }catch(Exception e){
            System.out.println(e.getLocalizedMessage());
        }

        executorService.shutdown();//(8)關閉線程池,阻塞直到所有任務執行完畢
    }
}

 

運行結果如下:

當然這相比正常情況下多了一個異常捕獲,其實最好的情況是重寫拒絕策略時候設置 FutureTask 的狀態為 NORMAL,但是這需要重寫 FutureTask 方法了,因為 FutureTask 並沒有提供接口進行設置。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM