ElasticSearch 線程池類型分析之SizeBlockingQueue


ElasticSearch 線程池類型分析之SizeBlockingQueue

盡管前面寫好幾篇ES線程池分析的文章(見文末參考鏈接),但都不太滿意。但從ES的線程池中了解到了不少JAVA線程池的使用技巧,於是忍不住再寫一篇(ES6.3.2版本的源碼)。文中給出的每個代碼片斷,都標明了這些代碼是來自哪個類的哪個方法。
ElasticSearch里面一共有四種類型的線程池,源碼:ThreadPool.ThreadPoolType

        DIRECT("direct"),
        FIXED("fixed"),
        FIXED_AUTO_QUEUE_SIZE("fixed_auto_queue_size"),
        SCALING("scaling");

GET、SEARCH、WRITE、INDEX、FLUSH...等各種操作是交由這些線程池實現的。為什么定義不同類型的線程池呢?舉個最簡單的例子:程序里面有IO密集型任務,也有CPU密集型任務,這些任務都提交到一個線程池中執行?還是根據任務的執行特點將CPU密集型的任務都提交到一個線程池,IO密集型任務都提交到另一個線程池執行?

不同種類的操作(INDEX、SEARCH...)交由不同類型的線程池執行是有很多好處的:

  1. 互不影響:INDEX操作頻繁時,並不會影響SEARCH操作的執行。
  2. 資源合理利用(提升性能):如果只有一個線程池來處理所有的操作,線程池隊列長度配置為多大合適?線程的數量配置多少合適?這些操作難道都要共用一個拒絕策略嗎?線程執行過程中出現異常了,針對不同類型的操作,異常處理方案也是不一樣的,顯然:只有一個線程池(或者說只有一種類型的線程池),是無法滿足這些需求的。

再來說一下ES中的線程池都是如何創建的?
ES節點啟動時,執行Node類的構造方法 :org.elasticsearch.node.Node.Node(org.elasticsearch.env.Environment, java.util.Collection<java.lang.Class<? extends org.elasticsearch.plugins.Plugin>>)
final ThreadPool threadPool = new ThreadPool(settings, executorBuilders.toArray(new ExecutorBuilder[0]));
new ThreadPool對象,從這里開始創建線程池。看懂了ThreadPool類,就理解了ES線程池的一半。

每個操作都有一個線程池,每個線程池都有一個相應的 ExecutorBuilder 對象,線程池都是通過ExecutorBuilder類的build()方法創建的。
在org.elasticsearch.threadpool.ThreadPool.ThreadPool的構建函數里面創建各種ExecutorBuilder對象。可以看出:INDEX操作的線程池的 ExecutorBuilder對象實際類型是FixedExecutorBuilder

 builders.put(Names.GENERIC, new ScalingExecutorBuilder(Names.GENERIC, 4, genericThreadPoolMax, TimeValue.timeValueSeconds(30)));
        builders.put(Names.INDEX, new FixedExecutorBuilder(settings, Names.INDEX, availableProcessors, 200, true));
        builders.put(Names.WRITE, new FixedExecutorBuilder(settings, Names.WRITE, "bulk", availableProcessors, 200));
        builders.put(Names.GET, new FixedExecutorBuilder(settings, Names.GET, availableProcessors, 1000));
        builders.put(Names.ANALYZE, new FixedExecutorBuilder(settings, Names.ANALYZE, 1, 16));
        builders.put(Names.SEARCH, new AutoQueueAdjustingExecutorBuilder(settings,
                        Names.SEARCH, searchThreadPoolSize(availableProcessors), 1000, 1000, 1000, 2000));

如上代碼所示,雖然ES為我們內置好了許多線程池(GENERIC、INDEX、WRITE、GET...),但還可以自定義 ExecutorBuilder對象,創建自定義的線程池。所有的ExecutorBuilder對象創建完畢后,保存到一個HashMap里面。

        for (final ExecutorBuilder<?> builder : customBuilders) {
            if (builders.containsKey(builder.name())) {
                throw new IllegalArgumentException("builder with name [" + builder.name() + "] already exists");
            }
            builders.put(builder.name(), builder);
        }

最后,遍歷builders 這個HashMap 取出 ExecutorBuilder對象,調用它的build()方法創建線程池

        for (@SuppressWarnings("unchecked") final Map.Entry<String, ExecutorBuilder> entry : builders.entrySet()) {
            final ExecutorBuilder.ExecutorSettings executorSettings = entry.getValue().getSettings(settings);
            //這里執行build方法創建線程池
            final ExecutorHolder executorHolder = entry.getValue().build(executorSettings, threadContext);
            if (executors.containsKey(executorHolder.info.getName())) {
                throw new IllegalStateException("duplicate executors with name [" + executorHolder.info.getName() + "] registered");
            }
            logger.debug("created thread pool: {}", entry.getValue().formatInfo(executorHolder.info));
            executors.put(entry.getKey(), executorHolder);
        }

創建INDEX操作的線程池需要指定任務隊列,這個任務隊列就是:SizeBlockingQueue。當然了,也有一些其他操作(比如GET操作)的線程池的任務隊列也是SizeBlockingQueue。
下面參數可看出:該任務隊列的長度為200,org.elasticsearch.threadpool.ThreadPool.ThreadPool的構造方法:

    builders.put(Names.INDEX, new FixedExecutorBuilder(settings, Names.INDEX, availableProcessors, 200, true));

前面已經提到了,每個線程池都由ExecutorBuilder的build方法創建的。具體到INDEX操作的線程池,它的ExecutorBuilder實例對象是: FixedExecutorBuilder對象,在ExecutorBuilder 保存一些線程池參數信息:(core pool size、max pool size、queue size...)

final ExecutorService executor =
                EsExecutors.newFixed(settings.nodeName + "/" + name(), size, queueSize, threadFactory, threadContext);

如果queue_size配置為 -1,那就是一個無界隊列(LinkedTransferQueue)。我們是可以修改線程池配置參數的:關於線程池隊列長度的配置信息參考:官方文檔threadpool
而INDEX操作對應的線程池的任務隊列長度為200,因此下面代碼創建了一個長度為200的 SizeBlockingQueue,在代碼最后一行,為該線程池指定的拒絕策略是 EsAbortPolicy

    public static EsThreadPoolExecutor newFixed(String name, int size, int queueCapacity, ThreadFactory threadFactory, ThreadContext contextHolder) {
        BlockingQueue<Runnable> queue;
        if (queueCapacity < 0) {
            queue = ConcurrentCollections.newBlockingQueue();
        } else {
            queue = new SizeBlockingQueue<>(ConcurrentCollections.<Runnable>newBlockingQueue(), queueCapacity);
        }
        return new EsThreadPoolExecutor(name, size, size, 0, TimeUnit.MILLISECONDS, queue, threadFactory, new EsAbortPolicy(), contextHolder);
    }

下面開始分析SizeBlockingQueue的源碼:

一般在自定義線程池時,要么是直接 new ThreadPoolExecutor,要么是繼承ThreadPoolExecutor,在創建ThreadPoolExecutor對象時需要指定線程池的配置參數。比如,線程池的核心線程數(core pool size),最大線程數,任務隊列,拒絕策略。這里我想提一下拒絕策略,因為某些ES的操作具有"強制"執行的特點:如果某個任務被標記為強制執行,那么向線程池提交該任務時,就不能拒絕它。是不是很厲害?想想,線程池是如何做到的?
下面舉個例子:

//創建任務隊列,這里沒有指定任務隊列的長度,那么這就是一個無界隊列
private BlockingQueue<Runnable> taskQueue = new LinkedBlockingQueue<>();
//創建線程工廠,由它來創建線程
private ThreadFactory threadFactory = new ThreadFactoryBuilder().setNameFormat("thread-%d").setUncaughtExceptionHandler(exceptionHandler).build();

//創建線程池,核心線程數為4,最大線程數為16
private ThreadPoolExecutor executor = new ThreadPoolExecutor(4, 16, 1, TimeUnit.DAYS, taskQueue, threadFactory, rejectExecutionHandler);

這里創建的線程池,它的線程數量永遠不可能達到最大線程數量16,為什么?因為我們的任務隊列是一個無界隊列,當向線程池中提交任務時,LinkedBlockingQueue.offer方法不會返回false。而在JDK源碼java.util.concurrent.ThreadPoolExecutor.execute中,當任務入隊列失敗返回false時,才有可能觸發addWork創建新線程。這個時候,你可能會說:在 new LinkedBlockingQueue的時候指定隊列長度不就完了?比如這樣指定隊列長度為1024

private BlockingQueue<Runnable> taskQueue = new LinkedBlockingQueue<>(1024);

但是,有沒有一種方法,能夠做到:當core pool size 個核心線程數處理不過來時,先讓線程池的線程數量創建到最大值(max pool size),然后,若還有任務提交到線程池,則讓任務排隊等待處理?SizeBlockingQueue 重寫了BlockingQueue的offer方法,實現了這個功能。
另外,我再反問一下?如何確定1024就是一個合適的隊列容量?萬一提交任務速度很快,一下子任務隊列就滿了,長度1024就會導致大量的任務被拒絕。
ES中的 ResizableBlockingQueue 實現了一種可動態調整隊列長度的任務隊列,有興趣的可以去研究一下。

SizeBlockingQueue 封裝了 LinkedTransferQueue,而 LinkedTransferQueue 是一個無界隊列,與LinkedBlockingQueue不同的是,LinkedTransferQueue的構造方法是不能指定任務隊列的長度(capacity)的。因此,SizeBlockingQueue定義一個capacity屬性提供了隊列有界的功能。

好,來看看SizeBlockingQueue是如何重寫offer方法的:org.elasticsearch.common.util.concurrent.SizeBlockingQueue.offer(E)

    @Override
    public boolean offer(E e) {
        while (true) {
            //獲取當前任務隊列的長度,即:當前任務隊列里面有多少個任務正在排隊等待執行
            final int current = size.get();
            //如果正在等待排隊的任務數量大於等於任務隊列長度的最大值(容量),
            //返回false 就有可能 觸發 java.util.concurrent.ThreadPoolExecutor.addWorker 調用創建新線程
            if (current >= capacity()) {
                return false;
            }
            
            //當前正在排隊的任務數量尚未超過隊列的最大長度,使用CAS 先將任務隊列長度加1,[CAS的經典用法]
            if (size.compareAndSet(current, 1 + current)) {
                break;
            }
        }
        //將任務添加到隊列
        boolean offered = queue.offer(e);
        if (!offered) {
        //如果未添加成功,再把數量減回去即可
            size.decrementAndGet();
        }
        return offered;
    }

上面,就是通過先判斷當前排隊的任務是否小於任務隊列的最大長度(容量) 來實現:優先創建線程數量到 max pool size。下面來模擬一下使用 SizeBlockingQueue 時處理任務的步驟:
根據前面的介紹:線程池 core pool size=4,max pool size=16,taskQueue 是 SizeBlockingQueue,任務隊列的最大長度是200
1,提交1-4號 四個任務給線程池,線程池創建4個線程處理這些任務
2,1-4號 四個任務正在執行中...此時又提交了8個任務到線程池
3,這時,線程池是再繼續創建8個線程,處理 5-12號任務。此時,線程池中一共有4+8=12個線程,小於max pool size
4,假設 1-12號任務都正在處理中,此時又提交了8個任務到線程池
5,這時,線程池會再創建4個新線程處理其中的13-16號 這4個任務,線程數量已經達到max pool size,不能再創建新線程了,還有4個任務(17-20號)入隊列排隊等待。

有沒有興趣模擬一下使用LinkedBlockingQueue作為任務隊列時,線程池又是如何處理這一共提交的20個任務的?

最后來分析一下 SizeBlockingQueue 如何支持:當向線程池提交任務時,如果任務被某種拒絕策略拒絕了,如果這種任務又很重要,那能不能強制將該任務提交到線程池的任務隊列中呢?
這里就涉及到:在創建線程池時,為線程池配置了何種拒絕策略了。下面以INDEX操作的線程池為例說明:
在org.elasticsearch.common.util.concurrent.EsExecutors.newFixed 中:可知該線程池所使用的拒絕策略是:EsAbortPolicy

return new EsThreadPoolExecutor(name, size, size, 0, TimeUnit.MILLISECONDS, queue, threadFactory, new EsAbortPolicy(), contextHolder);

看 EsAbortPolicy 的源碼:org.elasticsearch.common.util.concurrent.EsAbortPolicy.rejectedExecution

if (r instanceof AbstractRunnable) {
            //判斷該任務是不是一個 可強制提交的任務
            if (((AbstractRunnable) r).isForceExecution()) {
                BlockingQueue<Runnable> queue = executor.getQueue();
                if (!(queue instanceof SizeBlockingQueue)) {
                    throw new IllegalStateException("forced execution, but expected a size queue");
                }
                //是一個可強制提交的任務,並且 線程池的任務隊列是 SizeBlockingQueue時,強制提交任務
                try {
                    ((SizeBlockingQueue) queue).forcePut(r);
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                    throw new IllegalStateException("forced execution, but got interrupted", e);
                }
                return;
            }
        }
         rejected.inc();
        //任務被拒絕且未能強制執行, 拋出EsRejectedExecutionException異常后,會被 EsThreadPoolExecutor.doExecute catch, 進行相應的處理
        throw new EsRejectedExecutionException("rejected execution of " + r + " on " + executor, executor.isShutdown());

AbstractRunnable 是提交的Runnable任務,只要Runnable任務的 isForceExecution()返回true,就表明這個任務需要“強制提交”。關於AbstractRunnable,可參考:Elasticsearch中各種線程池分析

那為什么只有當任務隊列是 SizeBlockingQueue 時,才可以強制提交呢?這很好理解:首先SizeBlockingQueue它封裝了LinkedTransferQueue,LinkedTransferQueue本質上是一個無界隊列,實際上可以添加無窮多個任務(不考慮OOM),只不過是用 capacity 屬性限制了隊列的長度而已。
如果,任務隊列是 new LinkedBlockingQueue<>(1024),肯定是不能支持強制提交的,因為當LinkedBlockingQueue長度達到1024后,再提交任務,直接返回false了。從這里也可以借鑒ES線程池任務隊列的設計方式,應用到項目中去。
綜上:只有Runnable任務 isForceExecution返回true,並且線程池的任務隊列是SizeBlockingQueue時,向線程池提交任務時,總是能提交成功(強制執行機制保證)。其他情況下,任務被拒絕時,會拋出EsRejectedExecutionException異常。

強制提交,把任務添加到任務隊列 SizeBlockingQueue 中,源碼如下:
org.elasticsearch.common.util.concurrent.SizeBlockingQueue.forcePut

    /**
     * Forces adding an element to the queue, without doing size checks.
     */
    public void forcePut(E e) throws InterruptedException {
        size.incrementAndGet();
        try {
            queue.put(e);
        } catch (InterruptedException ie) {
            size.decrementAndGet();
            throw ie;
        }
    }

總結:

ES會為每種操作創建一個線程池,本文基於INDEX操作分析了ES中線程池的任務隊列SizeBlockingQueue。對於 INDEX 操作而言,它的線程池是由org.elasticsearch.threadpool.FixedExecutorBuilder 的build方法創建的,線程池的最大核心線程數和最大線程數相同,使用的任務隊列是 SizeBlockingQueue,長度為200,拒絕策略是:org.elasticsearch.common.util.concurrent.EsAbortPolicy。

為什么要為不同的操作分配不同的線程池呢?

假設 index 操作 和 snapshot 操作使用同一個線程池,如果某節點發生故障,index操作被阻塞了,而 Client發起的索引文檔操作的 QPS又很高,就很容易影響 snapshot 服務了。

SizeBlockingQueue 本質上是一個 LinkedTransferQueue,其實ES中所有的任務隊列都是封裝LinkedTransferQueue實現的,並沒有使用LinkedBlockingQueue。

ES中的所有任務(Runnable)都是基於org.elasticsearch.common.util.concurrent.AbstractRunnable這個抽象類封裝的,當然有一些任務是通過Lambda表達式的形式提交的。任務的具體處理邏輯在 org.elasticsearch.common.util.concurrent.AbstractRunnable#doRun 方法中,任務執行完成由onAfter()處理,執行出現異常由onFailure()處理。線程池的 org.elasticsearch.common.util.concurrent.EsThreadPoolExecutor#doExecute 方法 里面就是整個任務的處理流程:

    protected void doExecute(final Runnable command) {
        try {
            super.execute(command);
        } catch (EsRejectedExecutionException ex) {
            if (command instanceof AbstractRunnable) {
                // If we are an abstract runnable we can handle the rejection
                // directly and don't need to rethrow it.
                try {
                    ((AbstractRunnable) command).onRejection(ex);
                } finally {
                    ((AbstractRunnable) command).onAfter();

                }
            } else {
                throw ex;
            }
        }
    }

最后,ES的線程池模塊代碼主要在 org.elasticsearch.threadpool 和 org.elasticsearch.common.util.concurrent 包下。總體來說,threadpool模塊相比於ES的其他模塊,是一個小模塊,代碼不算復雜。但是threadpool又很重要,因為它是其他模塊執行邏輯的基礎,threadpool 再配上異步執行機制,是ES源碼中其他操作的源碼實現思路。

參考:
探究ElasticSearch中的線程池實現
Elasticsearch中各種線程池分析


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM