線程池使用拒絕策略時需要注意的坑


轉載自:
http://ifeve.com/%E7%BA%BF%E7%A8%8B%E6%B1%A0%E4%BD%BF%E7%94%A8futuretask%E6%97%B6%E5%80%99%E9%9C%80%E8%A6%81%E6%B3%A8%E6%84%8F%E7%9A%84%E4%B8%80%E7%82%B9%E4%BA%8B/

  線程池使用FutureTask的時候如果拒絕策略設置為了DiscardPolicy和DiscardOldestPolicy並且在被拒絕的任務的Future對象上調用無參get方法那么調用線程會一直被阻塞。

問題復現

    public static void main(String[] args) throws Exception {

        //(1)線程池單個線程,線程池隊列元素個數為1
        ThreadPoolExecutor executorService = new ThreadPoolExecutor(1, 1,
                1L, TimeUnit.MINUTES,
                new ArrayBlockingQueue<>(1),
                new ThreadPoolExecutor.DiscardPolicy());

        //(2)添加任務one
        Future futureOne = executorService.submit(new Runnable() {
            @Override public void run() {
                System.out.println("start runable one");
                try {
                    Thread.sleep(5000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        });

        //(3)添加任務two
        Future futureTwo = executorService.submit(new Runnable() {
            @Override public void run() {
                System.out.println("start runable two");
            }
        });

        //(4)添加任務three
        Future futureThree = null;
        try {
            futureThree = executorService.submit(new Runnable() {
                @Override public void run() {
                    System.out.println("start runable three");
                }
            });
        } catch (Exception e) {
            e.printStackTrace();
            System.out.println(e.getLocalizedMessage());
        }

        System.out.println("task one finish " + futureOne.get());//(5)等待任務one執行完畢
        System.out.println("task two finish " + futureTwo.get());//(6)等待任務two執行完畢
        System.out.println("task three finish " + (futureThree == null ? null : futureThree.get()));// (7)等待任務three執行完畢

        executorService.shutdown();//(8)關閉線程池,阻塞直到所有任務執行完畢
    }
   
   
  
  
          

  運行結果:

start runable one
task one finish null
start runable two
task two finish null
   
   
  
  
          

  創建了一個單線程並且隊列元素個數為1的線程池,並且拒絕策略設置為了DiscardPolicy;先提交任務one,這個任務會使用唯一的一個線程進行執行,任務在打印 start runable one后會阻塞該線程5s;再向線程池提交了一個任務two,這時候會把任務two放入到阻塞隊列;提交任務three時,由於隊列已經滿了則會觸發拒絕策略丟棄任務three。
  從運行結果看在任務one阻塞的5s內,主線程執行到了代碼(5)等待任務one執行完畢,當任務one執行完畢后代碼(5)返回,主線程打印出task one finish null。之后線程池的唯一線程會去隊列里面取出任務two並執行所以輸出start runable two然后代碼(6)會返回,這時候主線程輸出task two finish null,然后執行代碼(7)等待任務three執行完畢,從執行結果看代碼(7)會一直阻塞不會返回。
  至此問題產生,如果把拒絕策略修改為DiscardOldestPolicy也會存在有一個任務的get方法一直阻塞只是現在是任務two被阻塞:

start runable one
task one finish null
start runable three
   
   
  
  
          

  但是如果拒絕策略設置為默認的AbortPolicy則會拋出RejectedExecutionException並正常返回。

問題分析

  要分析這個問題需要看下線程池的submit方法里面做了什么,submit方法代碼如下:

    public Future<?> submit(Runnable task) {
        if (task == null) throw new NullPointerException();
        RunnableFuture<Void> ftask = newTaskFor(task, null);
        execute(ftask);
        return ftask;
    }
   
   
  
  
          

  newTaskFor()把Runnable轉為FutureTask對象,FutureTask實現RunnableFuture接口,繼續跟execute:

public void execute(Runnable command) {
        // ...
        // 如果線程個數消息核心線程數則新增處理線程處理
        int c = ctl.get();
        if (workerCountOf(c) < corePoolSize) {
            if (addWorker(command, true))
                return;
            c = ctl.get();
        }
        // 如果當前線程個數已經達到核心線程數則任務放入隊列
        if (isRunning(c) && workQueue.offer(command)) {
            int recheck = ctl.get();
            if (! isRunning(recheck) && remove(command))
                reject(command);
            else if (workerCountOf(recheck) == 0)
                addWorker(null, false);
        }
        // 嘗試新增處理線程進行處理
        else if (!addWorker(command, false))
            reject(command);// 新增失敗則調用拒絕策略
    }
    final void reject(Runnable command) {
        handler.rejectedExecution(command, this);
    }
   
   
  
  
          

  再來看下拒絕策略DiscardPolicy的代碼:

    public static class DiscardPolicy implements RejectedExecutionHandler {
        public DiscardPolicy() { }
        public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
        }
    }
   
   
  
  
          

  這里rejectedExecution方法里面什么都沒做,所以代碼(4)調用submit后會返回一個future對象,即FutureTask:

public class FutureTask<V> implements RunnableFuture<V> {
    /** * Possible state transitions: * NEW -> COMPLETING -> NORMAL * NEW -> COMPLETING -> EXCEPTIONAL * NEW -> CANCELLED * NEW -> INTERRUPTING -> INTERRUPTED */
    private volatile int state;
    private static final int NEW          = 0;
    private static final int COMPLETING   = 1;
    private static final int NORMAL       = 2;
    private static final int EXCEPTIONAL  = 3;
    private static final int CANCELLED    = 4;
    private static final int INTERRUPTING = 5;
    private static final int INTERRUPTED  = 6;
   
   
  
  
          

  state標識FutureTask的狀態,初始狀態是New。因此使用DiscardPolicy策略提交后返回了一個狀態為NEW的FutureTask對象。
  那么下面就需要看下當調用future的無參get方法時候當future變為什么狀態時候才會返回:

    public V get() throws InterruptedException, ExecutionException {
        int s = state;
        if (s <= COMPLETING)
            s = awaitDone(false, 0L);
        return report(s);
    }
    private V report(int s) throws ExecutionException {
        Object x = outcome;
        if (s == NORMAL)  // 狀態值為NORMAL正常返回
            return (V)x;
        if (s >= CANCELLED) // 狀態值大於等於CANCELLED則拋異常
            throw new CancellationException();
        throw new ExecutionException((Throwable)x);
    }
   
   
  
  
          

  也就是說當future的狀態>COMPLETING時候調用get方法才會返回,而明顯DiscardPolicy策略在拒絕元素的時候並沒有設置該future的狀態,后面也沒有其他機會可以設置該future的狀態,所以future的狀態一直是NEW,所以一直不會返回,同理DiscardOldestPolicy策略也是這樣的問題,最老的任務被淘汰時候沒有設置被淘汰任務對於future的狀態,也會導致一直不會返回。
  那么默認的AbortPolicy策略為啥沒問題那?來看AbortPolicy策略代碼:

    public static class AbortPolicy implements RejectedExecutionHandler {
        public AbortPolicy() { }
        public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
            throw new RejectedExecutionException("Task " + r.toString() +
                                                 " rejected from " +
                                                 e.toString());
        }
    }
   
   
  
  
          

  看代碼就應該明白了吧。

  所以當使用Future的時候,盡量使用帶超時時間的get方法,這樣即使使用了DiscardPolicy拒絕策略也不至於一直等待,等待超時時間到了會自動返回的,如果非要使用不帶參數的get方法則可以重寫DiscardPolicy的拒絕策略在執行策略時候設置該Future的狀態大於COMPLETING即可,但是查看FutureTask提供的方法發現只有cancel方法是public的並且可以設置FutureTask的狀態大於COMPLETING,重寫拒絕策略具體代碼可以如下:

    public static void main(String[] args) throws Exception {
        //(1)線程池單個線程,線程池隊列元素個數為1
        ThreadPoolExecutor executorService = new ThreadPoolExecutor(1, 1,
                1L, TimeUnit.MINUTES,
                new ArrayBlockingQueue<>(1),
                new ThreadPoolExecutor.DiscardPolicy() {
                    public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
                        if (!e.isShutdown()) {
                            if (r != null && r instanceof FutureTask) {
                                ((FutureTask) r).cancel(true);
                            }
                        }
                    }
                }
        );

        //(2)添加任務one
        Future futureOne = executorService.submit(new Runnable() {
            @Override public void run() {
                System.out.println("start runable one");
                try {
                    Thread.sleep(5000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        });

        //(3)添加任務two
        Future futureTwo = executorService.submit(new Runnable() {
            @Override public void run() {
                System.out.println("start runable two");
            }
        });

        //(4)添加任務three
        Future futureThree = null;
        try {
            futureThree = executorService.submit(new Runnable() {
                @Override public void run() {
                    System.out.println("start runable three");
                }
            });
        } catch (Exception e) {
            e.printStackTrace();
            System.out.println(e.getLocalizedMessage());
        }

        try {
            System.out.println("task one finish " + futureOne.get());//(5)等待任務one執行完畢
            System.out.println("task two finish " + futureTwo.get());//(6)等待任務two執行完畢
            System.out.println("task three finish " + (futureThree == null ? null : futureThree.get()));// (7)等待任務three執行完畢
        } catch (Exception e) {
            e.printStackTrace();
        }
        executorService.shutdown();//(8)關閉線程池,阻塞直到所有任務執行完畢
    }
   
   
  
  
          

  使用這個策略時候,由於report方法中對cancel的任務上會拋出CancellationException異常,所以在get()時使用try-catch捕獲異常。運行后發現程序能正常退出。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM