Elasticsearch中各種線程池分析


Elasticsearch中各種線程池分析

最近看完了ElasticSearch線程池模塊的源碼,感觸頗深,然后也自不量力地借鑒ES的 EsThreadPoolExecutor 重新造了一把輪子(源碼在這里),對線程池的理解又加深了一些。在繼承 ThreadPoolExecutor實現自定義的線程池時,ES先重寫了Runnable接口,提供了更靈活的任務運行過程中出現異常處理邏輯。簡而言之,它采用回調機制實現了線程在運行過程中拋出未受檢異常的統一處理邏輯,非常優美。實在忍不住把源碼copy下來:

/**
 * An extension to runnable.
 */
public abstract class AbstractRunnable implements Runnable {

    /**
     * Should the runnable force its execution in case it gets rejected?
     */
    public boolean isForceExecution() {
        return false;
    }

    @Override
    public final void run() {
        try {
            doRun();
        } catch (Exception t) {
            onFailure(t);
        } finally {
            onAfter();
        }
    }

    /**
     * This method is called in a finally block after successful execution
     * or on a rejection.
     */
    public void onAfter() {
        // nothing by default
    }

    /**
     * This method is invoked for all exception thrown by {@link #doRun()}
     */
    public abstract void onFailure(Exception e);

    /**
     * This should be executed if the thread-pool executing this action rejected the execution.
     * The default implementation forwards to {@link #onFailure(Exception)}
     */
    public void onRejection(Exception e) {
        onFailure(e);
    }

    /**
     * This method has the same semantics as {@link Runnable#run()}
     * @throws InterruptedException if the run method throws an InterruptedException
     */
    protected abstract void doRun() throws Exception;
}
  1. 統一的任務執行入口方法doRun(),由各個子類實現doRun()執行具體的業務邏輯

  2. try-catch中統一處理線程執行任務過程中拋出的異常,由onFailure()處理

  3. 任務執行完成(不管是正常結束還是運行過程中拋出了異常),統一由onAfter()處理

  4. isForceExecution方法,用來支持任務在提交給線程池被拒絕了,強制執行。當然了,這需要線程池的任務隊列提供相關的支持。我也是受這種方式的啟發,實現了一個線程在執行任務過程中拋出未受檢異常時,先判斷該任務是否允許強制執行isForceExecution,然后再重新提交任務運行的線程池

此外,ES內置了好幾個默認實現的線程池,比如 EsThreadPoolExecutor 、QueueResizingEsThreadPoolExecutor 和 PrioritizedEsThreadPoolExecutor。

  1. QueueResizingEsThreadPoolExecutor

    在創建線程池時會指定一個任務隊列(BlockingQueue),平常都是直接用 LinkedBlockingQueue,它是一個無界隊列,當然也可以在構造方法中指定隊列的長度。但是,ES中幾乎不用 LinkedBlockingQueue 作為任務隊列,而是使用 LinkedTransferQueue ,但是 LinkedTransferQueue 又是一個無界隊列,於是ES又基於LinkedTransferQueue 封裝了一個任務隊列,類名稱為 ResizableBlockingQueue,它能夠限制任務隊列的長度

    那么問題來了,對於一個線程池,任務隊列設置為多長合適呢?

    答案就是Little's Law。在QueueResizingEsThreadPoolExecutor 線程池中重寫了afterExecute()方法,里面統計了每個任務的運行時間、等待時間(入隊列到執行)。所以,你想知道如何統計一個任務的運行時間嗎?你想統計線程池一共提交了多少個任務,所有任務的運行時間嗎?看看QueueResizingEsThreadPoolExecutor 源碼就明白了。

    另外再提一個問題,為什么ES用 LinkedTransferQueue 作為任務隊列而不用 LinkedBlockingQueue 呢?

    我想:很重要的一個原因是LinkedBlockingQueue 是基於重量級的鎖(ReentrantLock)實現的入隊操作,而LinkedTransferQueue 是基於CAS原子指令實現的入隊操作。LinkedBlockingQueue#offer()當隊列長度達到最大值,此時不能提交任務給隊列了,直接返回false,否則通過加鎖方式將任務提交給隊列。LinkedTransferQueue本身是無界的,因此添加任務到LinkedTransferQueue時,通過CAS實現避免了加鎖帶來的上下文開銷的切換,在大部分競爭情況下,是會提升性能的。

  2. PrioritizedEsThreadPoolExecutor

    優先級任務的線程池,任務提交給線程池后是在任務隊列里面排隊,FIFO模式。而這個線程池則允許任務定義一個優先級,優先級高的任務先執行。

  3. EsThreadPoolExecutor

    這個線程池非常像JDK里面的ThreadPoolExecutor,不過,它實現了一些拒絕處理邏輯,提交任務若被拒絕(會拋出EsRejectedExecutionException異常),則進行相關處理

        @Override
        public void execute(final Runnable command) {
            doExecute(wrapRunnable(command));
        }
    
        protected void doExecute(final Runnable command) {
            try {
                super.execute(command);
            } catch (EsRejectedExecutionException ex) {
                if (command instanceof AbstractRunnable) {
                    // If we are an abstract runnable we can handle the rejection
                    // directly and don't need to rethrow it.
                    try {
                        ((AbstractRunnable) command).onRejection(ex);
                    } finally {
                        ((AbstractRunnable) command).onAfter();
    
                    }
                } else {
                    throw ex;
                }
            }
        }
    

講完了ES中常用的三個線程池實現,還想結合JDK源碼,記錄一下線程在執行任務過程中拋出運行時異常,是如何處理的。我覺得有二種方式(或者說有2個地方)來處理運行時異常。一種方式是:java.util.concurrent.ThreadPoolExecutor#afterExecute方法,另一種方式是:java.lang.Thread.UncaughtExceptionHandler#uncaughtException

  1. afterExecute

    看ThreadPoolExecutor#afterExecute(Runnable r, Throwable t) 的源碼注釋:

    Method invoked upon completion of execution of the given Runnable.This method is invoked by the thread that executed the task. If non-null, the Throwable is the uncaught RuntimeException or Error that caused execution to terminate abruptly.

    提交給線程池的任務,執行完(不管是正常結束,還是執行過程中出現了異常)后都會自動調用afterExecute()方法。如果執行過程中出現了異常,那么Throwable t 就不為null,並且導致執行終止(terminate abruptly.)。

    This implementation does nothing, but may be customized in subclasses. Note: To properly nest multiple overridings, subclasses should generally invoke super.afterExecute at the beginning of this method.

    默認的afterExecute(Runnable r, Throwable t) 方法是一個空實現,什么也沒有。因此,在繼承ThreadPoolExecutor實現自己的線程池時,如果重寫該方法,則要記住:先調用 super.afterExecute

    比如說這樣干:

     @Override
     protected void afterExecute(Runnable r, Throwable t) {
         super.afterExecute(r, t);
         if (t != null) {
             //出現了異常
             if (r instanceof AbstractRunnable && ((AbstractRunnable)r).isForceExecution()) {
                 //AbstractRunnable 設置為強制執行時重新拉起任務
                 execute(r);
                 logger.error("AbstractRunnable task run time error:{}, restarted", t.getMessage());
             }
         }
     }
    

    看,重寫afterExecute方法,當 Throwable 不為null時,表明線程執行任務過程中出現了異常,這時就重新提交任務。

    有個時候,在實現 Kafka 消費者線程的時候(while true循環),經常因為解析消息出錯導致線程拋出異常,就會導致 Kafka消費者線程掛掉,這樣就永久丟失了一個消費者了。而通過這種方式,當消費者線程掛了時,可重新拉起一個新任務。

  2. uncaughtException

    創建 ThreadPoolExecutor時,要傳入ThreadFactory 作為參數,在而創建ThreadFactory 對象時,就可以設置線程的異常處理器java.lang.Thread.UncaughtExceptionHandler。

    在用Google Guava包的時候,一般這么干:

    //先 new  Thread.UncaughtExceptionHandler對象 exceptionHandler
    private ThreadFactory threadFactory = new ThreadFactoryBuilder().setNameFormat("thread_name-%d").setUncaughtExceptionHandler(exceptionHandler).build();
    
    

    在線程執行任務過程中,如果拋出了異常,就會由JVM調用 Thread.UncaughtExceptionHandler 中實現的異常處理邏輯。看Thread.UncaughtExceptionHandler的JDK源碼注釋:

    Interface for handlers invoked when a Thread abruptly. terminates due to an uncaught exception.

    When a thread is about to terminate due to an uncaught exception the Java Virtual Machine will query the thread for its UncaughtExceptionHandler using getUncaughtExceptionHandler and will invoke the handler's uncaughtException method, passing the thread and the exception as arguments.

    其大意就是:如果線程在執行Runnable任務過程因為 uncaught exception 而終止了,那么 JVM 就會調用getUncaughtExceptionHandler 方法查找是否設置了異常處理器,如果設置了,那就就會調用異常處理器的java.lang.Thread.UncaughtExceptionHandler#uncaughtException方法,這樣我們就可以在這個方法里面定義異常處理邏輯了。

總結

ES的ThreadPool 模塊是學習線程池的非常好的一個示例,實踐出真知。它告訴你如何自定義線程池(用什么任務隊列?cpu核數、任務隊列長度等參數如何配置?)。在實現自定義任務隊列過程中,也進一步理解了CAS操作的原理,如何巧妙地使用CAS?是失敗重試呢?還是直接返回?。我想,這也是CAS與synchronized鎖、ReentrantLock鎖的一個最重要應用區別:多個線程在競執行 synchronized鎖 或者 ReentrantLock鎖 鎖住的代碼(術語叫臨界區)時,未搶到鎖的進程會被掛起,會伴隨上下文切換,而若可以把臨界區中的代碼邏輯基於CAS原子指令來實現,如果某個線程執行CAS操作失敗了,它可以選擇繼續重試,還是執行其它的處理邏輯,還是sleep若干毫秒。因此,它把線程執行的主動權交回給了程序員。比如基於CAS實現自增操作,失敗時繼續重試(這里自增操作邏輯本身要求"失敗重試直到加1成功"),直到加1成功,代碼是這樣的:

do{
    v = value.get();
}while(v!=value.compareAndSwap(v,v+1));

有個時候,代碼里面CAS失敗,並不一定就需要立即重試,因為,CAS失敗了,意味着此時有其他線程也在競爭,說明資源的競爭較激烈,那我們是不是可以先 sleep 一下再重試呢?這樣是不是更好?

  • 線程在執行Runnable任務過程中拋出了異常如何處理?這里提到了Thread.UncaughtExceptionHandler#uncaughtException 和 ThreadPoolExecutor#afterExecute。前者是由JVM自動調用的,后者則是在每個任務執行結束后都會被調用。

  • Thread.UncaughtExceptionHandler#uncaughtException 和 RejectedExecutionHandler#rejectedExecution 是不同的。RejectedExecutionHandler 用來處理任務在提交的時候,被線程池拒絕了,該怎么辦的問題,默認是AbortPolicy,即:直接丟棄。

  • 等下次有時間,好好地寫一篇分析ElasticSearch6.3.2的線程池模塊。😃

  • Lucene 源碼 org.apache.lucene.util.CloseableThreadLocal 解決了使用JDK ThreadLocal 時 JAVA對象 長期駐留內存得不到及時清除的問題,也值得好好分析一番 😃
    原文:https://www.cnblogs.com/hapjin/p/10617702.html


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM