血的教訓--如何正確使用線程池submit和execute方法


血的教訓之背景:使用線程池對存量數據進行遷移,但是總有一批數據遷移失敗,無異常日志打印

凶案起因

​ 聽說parallelStream並行流是個好東西,由於日常開發stream串行流的場景比較多,這次需要寫遷移程序剛好可以用得上,那還不趕緊拿來裝*一下,此時不裝更待何時。機智的我還知道在 JVM 的后台,使用通用的 fork/join 池來完成上述功能,該池是所有並行流共享的,默認情況,fork/join 池會為每個處理器分配一個線程,對應的變通方案就是創建自己的線程池如

ForkJoinPool pool = new ForkJoinPool(Runtime.getRuntime().availableProcessors());
pool.submit(() -> {
            list.parallelStream().collect(Collectors.toList());
        });

​ 於是地雷就是從這里埋下的。

submit還是execute

  public static void main(String[] args) throws InterruptedException, ExecutionException {
        final ExecutorService pool = new ForkJoinPool(Runtime.getRuntime().availableProcessors());
        List<Integer> list = Lists.newArrayList(1, 2, 3, null);
        //1.使用submit
        pool.submit(() -> {
            list.parallelStream().map(a -> a.toString()).collect(Collectors.toList());
        });
        TimeUnit.SECONDS.sleep(3);
        //2.使用 execute
        pool.execute(() -> {
            list.parallelStream().map(a -> a.toString()).collect(Collectors.toList());
        });
        //3.使用submit,調用get()
        pool.submit(() -> {
            list.parallelStream().map(a -> a.toString()).collect(Collectors.toList());
        }).get();
        TimeUnit.SECONDS.sleep(3);
    }

​ 讀者自行跑一下上面的用例,會發現單獨使用submit方法的並不會打印出錯誤日志,而使用execute方法打印出了錯誤日志,但是對submit返回的FutureJoinTask調用get()方法,又會拋出異常。於是真相大白,部分批次中的數據存在臟數據,為null值,遍歷到該null值的時候出現了異常,但是異常日志在submit方法中給catch住,沒有打印出來(心痛的感覺),而被捕獲的異常,被包裝在返回的結果類FutureJoinTask中,並沒有再次拋出。

如果不需要異步返回結果,請不要用submit 方法

​ 結論先行,我犯的錯誤就是,淺顯的認為submitexecute的區別就只是一個有返回異步結果,一個沒有返回一步結果,但是事實是殘酷的。submit()中邏輯一定包含了將異步任務拋出的異常捕獲,而因為使用方法不當而導致該異常沒有再次拋出。

​ 現在提出一個問題,ForkJoinPool#submit()中返回的ForkJoinTask可以獲取異步任務的結果,現這個異步拋出了異常,我們嘗試獲取該任務的結果會是如何? 我們直接看ForkJoinTask#get()的源碼。

public final V get() throws InterruptedException, ExecutionException {
    int s = (Thread.currentThread() instanceof ForkJoinWorkerThread) ?
        doJoin() : externalInterruptibleAwaitDone();
    Throwable ex;
    if ((s &= DONE_MASK) == CANCELLED)
        throw new CancellationException();
    //這里可以直接看到,異步任務出現異常會在調用get()獲取結果的時候,會被包裝成ExecutionException再次拋出
    if (s == EXCEPTIONAL && (ex = getThrowableException()) != null)
        throw new ExecutionException(ex);
    return getRawResult();
}

​ 異步任務出現異常會在調用get()獲取結果的時候,會被包裝成ExecutionException再次拋出,但是異常是在哪里被捕獲的呢?萬變不離其宗,所有線程的線程都需要重寫Thread#run()方法, 投遞到ForkJoinPool的線程會被包裝成ForkJoinWorkerThread,因此我們看一下ForkJoinWorkerThread#run()的實現.

public void run() {
    if (workQueue.array == null) { // only run once
        Throwable exception = null;
        try {
            onStart();
            pool.runWorker(workQueue);
        } catch (Throwable ex) {
            //出現異常,捕獲,再次拋出會在調用ForkJoinTask#get()的時候
            exception = ex;
        } finally {
            try {
                onTermination(exception);
            } catch (Throwable ex) {
                if (exception == null)
                    exception = ex;
            } finally {
                pool.deregisterWorker(this, exception);
            }
        }
    }
}

​ 上面的分析是基於ForkJoinPool的,是不是所有的線程池的submitexecute方法的實現都是類似這樣,我們常用的線程池ThreadPoolThread實現會是怎樣的,同樣的思路,我們需要找到投遞到ThreadPoolThread的異步任務最終被包裝為哪個Thread的子類或者是實現java.lang.Runnable#run,答案就是java.util.concurrent.FutureTask

 public void run() {
      ...
        try {
            Callable<V> c = callable;
            if (c != null && state == NEW) {
                V result;
                boolean ran;
                try {
                    result = c.call();
                    ran = true;
                } catch (Throwable ex) {
                    //捕獲異常
                    result = null;
                    ran = false;
                    setException(ex);
                }
                if (ran)
                    set(result);
            }
        } 
     ....
    }

總結

java.util.concurrent.ExecutorService#submit(java.lang.Runnable)為何線程池會有這種設定,實際上我們的思路不應該局限於線程池,而是放在獲取異步任務結果,異常是否也是屬於異步結果FutureTask作為JDK提供的並發工具類的實現中,已經給出了很好的答案,即獲取異步任務結果,異常也是屬於異步結果,如果異步任務出現運行時異常,那么在獲取該任務的結果時,該異常會被重新包裝拋出

​ 作者:plz叫我紅領巾

​ 出處:https://juejin.im/post/5d15c430f265da1bab29c1fe

本博客歡迎轉載,但未經作者同意必須保留此段聲明,且在文章頁面明顯位置給出原文連接,否則保留追究法律責任的權利。碼子不易,您的點贊是我習作最大的動力


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM