ElasticSearch 線程池類型分析之 ResizableBlockingQueue
在上一篇文章 ElasticSearch 線程池類型分析之 ExecutorScalingQueue的末尾,談到了處理ES 搜索操作(search)的線程池的一些實現細節,本文就以下幾個問題分析SEARCH操作的線程池。
- 如何統計一個線程池中的任務的排隊等待時間、執行時間?排隊等待時間是指任務提交給了線程池,但尚未調度運行。執行時間是任務開始執行到執行完成這一段時間
- 如何設計一個可動態調整容量(最大長度)的任務隊列?
- 執行ES的SEARCH操作任務的線程池的實現細節(下文稱作 SEARCH線程池)
在ThreadPool類的構造方法中構造SEARCH線程池:
builders.put(Names.SEARCH, new AutoQueueAdjustingExecutorBuilder(settings,
Names.SEARCH, searchThreadPoolSize(availableProcessors), 1000, 1000, 1000, 2000));
SEARCH 線程池的核數線程數與部署ES節點的機器的CPU個數有關,它的任務隊列的容量可動態調整,任務隊列的初始長度為1000。SEARCH線程池的具體實現類是QueueResizingEsThreadPoolExecutor,采用的任務隊列是ResizableBlockingQueue,拒絕策略是 EsAbortPolicy。ResizableBlockingQueue 繼承了 SizeBlockingQueue,提供了可動態調整任務隊列容量的功能,關於SizeBlockingQueue 可參考ElasticSearch 線程池類型分析之 SizeBlockingQueue的分析。
org.elasticsearch.common.util.concurrent.EsExecutors.newAutoQueueFixed
ResizableBlockingQueue<Runnable> queue =
new ResizableBlockingQueue<>(ConcurrentCollections.<Runnable>newBlockingQueue(), initialQueueCapacity);
return new QueueResizingEsThreadPoolExecutor(name, size, size, 0, TimeUnit.MILLISECONDS,
queue, minQueueSize, maxQueueSize, TimedRunnable::new, frameSize, targetedResponseTime, threadFactory,
new EsAbortPolicy(), contextHolder);
提交的Runnable任務會被封裝成TimedRunnable對象,從而能夠統計任務的執行時間。在 new TimedRunnable 對象時,this.creationTimeNanos = System.nanoTime();
,記錄任務的創建時間。
finishTimeNanos-startTimeNanos
代表任務的執行時間,startTimeNanos-creationTimeNanos
表示任務的排隊時間,這樣就能記錄每個Runnable任務的排隊時間和執行時間了,非常完美的設計思路。
org.elasticsearch.common.util.concurrent.TimedRunnable
//TimedRunnable的構造方法
TimedRunnable(final Runnable original) {
this.original = original;
this.creationTimeNanos = System.nanoTime();
}
@Override
public void doRun() {
try {
//任務執行開始時間
startTimeNanos = System.nanoTime();
//任務的執行邏輯
original.run();
} finally {
//任務執行完成時間
finishTimeNanos = System.nanoTime();
}
}
下面我來詳細分析如何統計提交到線程池的Runnable任務的執行時間。先看 QueueResizingEsThreadPoolExecutor 的構造方法參數,重點看 runnableWrapper 參數,我把它理解成"處理邏輯"。
從本文的第一個代碼片段 new QueueResizingEsThreadPoolExecutor 可知,TimedRunnable::new 賦值給了 runnableWrapper,由於它是java.util.function.Function接口,當java.util.function.Function.apply 方法被調用執行時,就是執行runnableWrapper處理邏輯,即:new 一個 TimedRunnable 對象。看TimedRunnable的構造方法可知,此時已經把任務的創建時間給記錄下來了。
這里分析得這么詳細的原因是:ES源碼中大量地用到了函數式接口、Lambda表達式,剛看源碼時,一直不知道這段Lambda表達式所代表的"處理邏輯"是在哪里執行的,當慢慢熟悉了這種Lambda表達式的寫法后,就明白這種寫法極大地提升了代碼的靈活性。
//runnableWrapper聲明為函數式接口Function,它接收一個Runnable參數,執行runnableWrapper處理邏輯,返回一個Runnable結果
private final Function<Runnable, Runnable> runnableWrapper;
private final ResizableBlockingQueue<Runnable> workQueue;
QueueResizingEsThreadPoolExecutor(String name, int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit,
ResizableBlockingQueue<Runnable> workQueue, int minQueueSize, int maxQueueSize,
Function<Runnable, Runnable> runnableWrapper, final int tasksPerFrame,
TimeValue targetedResponseTime, ThreadFactory threadFactory, XRejectedExecutionHandler handler,
ThreadContext contextHolder) {
super(name, corePoolSize, maximumPoolSize, keepAliveTime, unit,
workQueue, threadFactory, handler, contextHolder);
this.runnableWrapper = runnableWrapper;
this.workQueue = workQueue;
this.tasksPerFrame = tasksPerFrame;
this.startNs = System.nanoTime();
this.minQueueSize = minQueueSize;
this.maxQueueSize = maxQueueSize;
this.targetedResponseTimeNanos = targetedResponseTime.getNanos();
this.executionEWMA = new ExponentiallyWeightedMovingAverage(EWMA_ALPHA, 0);
logger.debug(
"thread pool [{}] will adjust queue by [{}] when determining automatic queue size", getName(), QUEUE_ADJUSTMENT_AMOUNT);
}
當任務提交時,就是執行QueueResizingEsThreadPoolExecutor的doExecute()方法:
@Override
protected void doExecute(final Runnable command) {
// we are submitting a task, it has not yet started running (because super.excute() has not
// been called), but it could be immediately run, or run at a later time. We need the time
// this task entered the queue, which we get by creating a TimedRunnable, which starts the
// clock as soon as it is created.
super.doExecute(this.runnableWrapper.apply(command));//apply方法 觸發 TimedRunnable::new執行,創建TimedRunnable對象
}
上面已經能夠記錄每一個任務的執行時間了,但是任務隊列的容量設置為多少合適呢?這是由排隊理論里面的little's law決定的。關於利特爾法則,可自行Google。
/**
* Calculate Little's Law (L), which is the "optimal" queue size for a particular task rate (lambda) and targeted response time.
*
* @param lambda the arrival rate of tasks in nanoseconds
* @param targetedResponseTimeNanos nanoseconds for the average targeted response rate of requests
* @return the optimal queue size for the give task rate and targeted response time
*/
static int calculateL(final double lambda, final long targetedResponseTimeNanos) {
assert targetedResponseTimeNanos > 0 : "cannot calculate for instantaneous requests";
// L = λ * W
return Math.toIntExact((long)(lambda * targetedResponseTimeNanos));
}
Little's law 需要2個參數,一個是lambda,另一個是W。
- lambda 的值可理解為線程池處理任務的速率,即:\(速率= \frac{執行成功的任務個數}{這些任務總耗時}\),總耗時為任務的排隊時間加上處理時間。
- w 是請求的平均響應時間。一個SEARCH請求,最終是轉化成Runnable任務在線程池中提交執行的,那么這里的平均響應時間,我的理解是:Runnable任務的排隊等待時間和執行時間,並不是通常意義上我們看到的一個Client發送SEARCH請求,ES將搜索結果返回給Client這個過程的時間,因為這個過程顯然包含了網絡延時。
在ES中,這個平均響應時間可以在配置文件中指定,若未指定,則默認為1s。代碼如下:AutoQueueAdjustingExecutorBuilder的構造方法中將響應時間配置為1s
final String targetedResponseTimeKey = settingsKey(prefix, "target_response_time");
this.targetedResponseTimeSetting = Setting.timeSetting(targetedResponseTimeKey, TimeValue.timeValueSeconds(1),
TimeValue.timeValueMillis(10), Setting.Property.NodeScope);
統計線程池任務的執行個數和總耗時,是在 afterExecute 方法中完成的,ES自定義線程池重寫了ThreadPoolExecutor.afterExecute 方法,每當線程池中的任務執行完成時,會自動調用afterExecute方法做一些"后處理"
@Override
protected void afterExecute(Runnable r, Throwable t) {
//重寫 afterExecute 方法時,要先調用 super.afterExecute
super.afterExecute(r, t);
// A task has been completed, it has left the building. We should now be able to get the
// total time as a combination of the time in the queue and time spent running the task. We
// only want runnables that did not throw errors though, because they could be fast-failures
// that throw off our timings, so only check when t is null.
//只統計 類型為TimedRunnable任務 的執行時間和任務個數
assert r instanceof TimedRunnable : "expected only TimedRunnables in queue";
//單個任務的耗時(排隊時間加上執行時間)
final long taskNanos = ((TimedRunnable) r).getTotalNanos();
//所有任務的總耗時(每個任務的耗時累加求和)
final long totalNanos = totalTaskNanos.addAndGet(taskNanos);
//單個任務的執行時間(其實就是單個任務的耗時減去排隊時間)
final long taskExecutionNanos = ((TimedRunnable) r).getTotalExecutionNanos();
assert taskExecutionNanos >= 0 : "expected task to always take longer than 0 nanoseconds, got: " + taskExecutionNanos;
executionEWMA.addValue(taskExecutionNanos);
//tasksPerFrame默認為2000, 線程池每執行完一批任務(tasksPerFrame個)就進行一次任務隊列長度的調整。
if (taskCount.incrementAndGet() == this.tasksPerFrame) {
final long endTimeNs = System.nanoTime();
//線程池從啟動時刻(startNs)開始,一共運行了多長時間(注意不僅僅Runnable任務有生命周期,線程池也是有生命周期的)
final long totalRuntime = endTimeNs - this.startNs;
// Reset the start time for all tasks. At first glance this appears to need to be
// volatile, since we are reading from a different thread when it is set, but it
// is protected by the taskCount memory barrier.
// See: https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/atomic/package-summary.html
startNs = endTimeNs;
// Calculate the new desired queue size
try {
//計算lambda,tasksPerFrame個任務執行成功的總時間是 totalNanos. 因此,lambda可理解為處理速率
final double lambda = calculateLambda(tasksPerFrame, Math.max(totalNanos, 1L));
//根據 little's law 計算出來的任務隊列的理想容量(任務隊列所允許的最大長度)
final int desiredQueueSize = calculateL(lambda, targetedResponseTimeNanos);
//當前任務隊列的長度
final int oldCapacity = workQueue.capacity();
if (logger.isDebugEnabled()) {
final long avgTaskTime = totalNanos / tasksPerFrame;
logger.debug("[{}]: there were [{}] tasks in [{}], avg task time [{}], EWMA task execution [{}], " +
"[{} tasks/s], optimal queue is [{}], current capacity [{}]",
getName(),
tasksPerFrame,
TimeValue.timeValueNanos(totalRuntime),
TimeValue.timeValueNanos(avgTaskTime),
TimeValue.timeValueNanos((long)executionEWMA.getAverage()),
String.format(Locale.ROOT, "%.2f", lambda * TimeValue.timeValueSeconds(1).nanos()),
desiredQueueSize,
oldCapacity);
}
// Adjust the queue size towards the desired capacity using an adjust of
// QUEUE_ADJUSTMENT_AMOUNT (either up or down), keeping in mind the min and max
// values the queue size can have.
// 將任務隊列的容量從 oldCapacity 調整到 newCapacity,並不是直接將任務隊列的長度調整到desiredQueueSize
final int newCapacity =
workQueue.adjustCapacity(desiredQueueSize, QUEUE_ADJUSTMENT_AMOUNT, minQueueSize, maxQueueSize);
if (oldCapacity != newCapacity && logger.isDebugEnabled()) {
logger.debug("adjusted [{}] queue size by [{}], old capacity: [{}], new capacity: [{}]", getName(),
newCapacity > oldCapacity ? QUEUE_ADJUSTMENT_AMOUNT : -QUEUE_ADJUSTMENT_AMOUNT,
oldCapacity, newCapacity);
}
} catch (ArithmeticException e) {
// There was an integer overflow, so just log about it, rather than adjust the queue size
logger.warn(() -> new ParameterizedMessage(
"failed to calculate optimal queue size for [{}] thread pool, " +
"total frame time [{}ns], tasks [{}], task execution time [{}ns]",
getName(), totalRuntime, tasksPerFrame, totalNanos),
e);
} finally {
// Finally, decrement the task count and time back to their starting values. We
// do this at the end so there is no concurrent adjustments happening. We also
// decrement them instead of resetting them back to zero, as resetting them back
// to zero causes operations that came in during the adjustment to be uncounted
int tasks = taskCount.addAndGet(-this.tasksPerFrame);
assert tasks >= 0 : "tasks should never be negative, got: " + tasks;
if (tasks >= this.tasksPerFrame) {
// Start over, because we can potentially reach a "never adjusting" state,
//
// consider the following:
// - If the frame window is 10, and there are 10 tasks, then an adjustment will begin. (taskCount == 10)
// - Prior to the adjustment being done, 15 more tasks come in, the taskCount is now 25
// - Adjustment happens and we decrement the tasks by 10, taskCount is now 15
// - Since taskCount will now be incremented forever, it will never be 10 again,
// so there will be no further adjustments
logger.debug(
"[{}]: too many incoming tasks while queue size adjustment occurs, resetting measurements to 0", getName());
//任務隊列的長度調整完成后,總任務耗時重置為1,這樣可開始下一輪統計
totalTaskNanos.getAndSet(1);
taskCount.getAndSet(0);
startNs = System.nanoTime();
} else {
// Do a regular adjustment
totalTaskNanos.addAndGet(-totalNanos);
}
}
}
}
上面的代碼注釋大概描述了線程池任務隊列的長度是如何動態調整的,下面再記錄一些細節方便更好地理解整個調整過程。
1 線程池也是有生命周期的
關於線程池狀態的描述可參考java.util.concurrent.ThreadPoolExecutor類的源碼注釋。當線程池處於RUNNING狀態時,可接收新提交的任務並且能處理已在隊列中排隊的任務;當處於SHUTDOWN狀態時,不能接收新提交的任務,但能處理已在隊列中排隊等待的任務。當處於STOP狀態時,不能接收新提交的任務了,也不能處理在任務隊列中排隊等待的任務了,正在執行中的任務也會被強制中斷。所以,要想"正確"地關閉線程池,應該分步驟處理:這里給一個ES中實現的處理定時任務的線程池如何關閉的示例:
org.elasticsearch.threadpool.Scheduler.terminate
static boolean terminate(ScheduledThreadPoolExecutor scheduledThreadPoolExecutor, long timeout, TimeUnit timeUnit) {
//先調用 shutdown(), 線程池不再接收新提交的任務了
scheduledThreadPoolExecutor.shutdown();
//超時等待, 如果在timeout時間內線程池中排隊的任務和正在執行的任務都執行完成了返回true,否則返回false
if (awaitTermination(scheduledThreadPoolExecutor, timeout, timeUnit)) {
return true;
}
//last resort. 在上面awaitTermination timeout后線程池中仍有任務在執行
//調用shutdownNow強制中斷任務,關閉線程池
scheduledThreadPoolExecutor.shutdownNow();
return awaitTermination(scheduledThreadPoolExecutor, timeout, timeUnit);
}
這種先調用shutdown,再調用 awaitTermination,最后再調用shutdownNow的“三步曲”方式關閉線程池,awaitTermination起到了"緩沖"作用,盡可能減少關閉線程池導致的任務執行結果不確定的影響。看JDK源碼:java.util.concurrent.ScheduledThreadPoolExecutor.shutdownNow,可知:關閉線程池時,最好不要一開始就直接調用shutdownNow方法,而是分步驟地關閉線程池。
/**
* Attempts to stop all actively executing tasks, halts the
* processing of waiting tasks, and returns a list of the tasks
* that were awaiting execution.
*
* <p>This method does not wait for actively executing tasks to
* terminate. Use {@link #awaitTermination awaitTermination} to
* do that.
*
* <p>There are no guarantees beyond best-effort attempts to stop
* processing actively executing tasks. This implementation
* cancels tasks via {@link Thread#interrupt}, so any task that
* fails to respond to interrupts may never terminate.
*
* @return list of tasks that never commenced execution.
* Each element of this list is a {@link ScheduledFuture},
* including those tasks submitted using {@code execute},
* which are for scheduling purposes used as the basis of a
* zero-delay {@code ScheduledFuture}.
* @throws SecurityException {@inheritDoc}
*/
public List<Runnable> shutdownNow() {
return super.shutdownNow();
}
shutdownNow方法會停止所有正在執行的任務(線程),stop all actively executing tasks。會中止所有處於等待狀態的任務 halts the processing of waiting tasks,這里的waiting tasks,我的理解:就是在java.lang.Thread.State類中那些處於WAITING狀態的線程所執行的任務。並且,shutdownNow返回所有在任務隊列中排隊等待處理的所有任務 returns a list of the tasks that were awaiting execution.
shutdownNow方法不會等待正在執行的任務執行完成,而是通過中斷方式直接請求中斷該任務,This method does not wait for actively executing tasks to terminate。由於,有些任務(線程)可能會忽略中斷請求、甚至屏蔽中斷請求,因此它只能做到 best-effort 結束線程。對於那些未能響應中斷的線程而言,有可能它所執行的任務就永遠不會結束了,so any task that fails to respond to interrupts may never terminate.
因此,從這里可知:我們在編程中 implements Runnable 接口時,run方法代碼邏輯里面最好能夠保證對中斷異常的響應,而不是直接把所有的異常都catch住,只做簡單的打印處理,也不向上拋出。
2 ResizableBlockingQueue並不是每次執行完一個任務就進行一次調整
這樣顯然代價太大。而是執行完一批任務后,再進行調整。每批任務默認2000個,由tasksPerFrame變量決定每批任務個數。
任務隊列的調整長度是有上限的,每次最多調整 QUEUE_ADJUSTMENT_AMOUNT(默認50)
任務隊列長度的調整並不是直接調整到little's law 計算出來的理想任務隊列長度(desiredQueueSize)。每次調整是有限制的,長度的變化不超過QUEUE_ADJUSTMENT_AMOUNT
if (optimalCapacity > capacity + adjustmentAmount) {
// adjust up
final int newCapacity = Math.min(maxCapacity, capacity + adjustmentAmount);
this.capacity = newCapacity;
return newCapacity;
} else if (optimalCapacity < capacity - adjustmentAmount) {
// adjust down
final int newCapacity = Math.max(minCapacity, capacity - adjustmentAmount);
this.capacity = newCapacity;
return newCapacity;
} else {
return this.capacity;
}
總結
本文記錄了ES6.3.2 SEARCH線程池的源碼實現。用戶發起的搜索請求會封裝成SEARCH操作。SEARCH操作的任務是由QueueResizingEsThreadPoolExecutor處理的,采用的任務隊列是 ResizableBlockingQueue,ResizableBlockingQueue封裝了LinkedTransferQueue,但是提供了容量限制。
隨着源源不斷的搜索請求被處理,可動態調整任務隊列的容量。SEARCH線程池采用的拒絕策略是 EsAbortPolicy,搜索請求太頻繁時線程池處理不過來時會被拒絕掉。
通過將Runnable任務封裝成TimedRunnable,可實現統計每個搜索任務的執行時間、排隊時間。這些統計都是在線程池的afterExecute()方法中實現的。
另外,本文還分析了如何正確地關閉線程池,以及不恰當地關閉線程池給任務的執行結果帶來的不確定性的分析。看完ES的線程池模塊的源碼后,對線程池的認識和理解深刻了許多,后面還會分析在ES中如何實現執行定時任務、周期性任務的線程池,這種線程池可用來執行一些周期性的 ping 命令(節點之間的心跳包)等ES節點之間的通信。以上若有錯誤,還請批評指正。
參考鏈接:
到這里,ES的線程池模塊所有源碼分析都結束了。總體來說,ES對線程池的管理是"集中式"的,試想:一個大型系統,里面有各種各樣復雜的操作,是將線程池散落在代碼各處呢,還是在系統啟動時創建好,然后統一集中管理?
另外,由於JDK java.util.concurrent.Future#get()獲取任務的執行結果時必須"阻塞",另一個方法 java.util.concurrent.Future#get(long, java.util.concurrent.TimeUnit) 也是超時阻塞,這意味着線程池在提交任務執行后,在獲取結果這個步驟是必須阻塞等待的。那有沒有一種方法在獲取結果時也不阻塞呢?這就需要Listener機制(監聽器)了,Listener其實就是一種處理邏輯,一種怎樣處理某個結果(Runnable/Callable執行完成的結果)的處理邏輯。其大概思想是:當Runnable(Callable)任務執行完成后,有了結果,回調Listener,執行 處理結果的邏輯。這樣,就不用像 java.util.concurrent.Future#get() 那樣,get()阻塞直至獲取到結果,然后再執行某種處理邏輯 處理 get()獲取到的結果。
而說到異步回調處理,ES中還有一種類型的線程池,它能夠執行優先級任務。該線程池采用的任務隊列是:java.util.concurrent.PriorityBlockingQueue
,具體實現是:org.elasticsearch.common.util.concurrent.PrioritizedEsThreadPoolExecutor
,這個線程池主要用來執行ES的集群狀態更新變化操作。更進一步,org.elasticsearch.cluster.service.TaskBatcher
通過封裝 PrioritizedEsThreadPoolExecutor,實現了優先級任務的批量處理。當創建一個新索引,或者分片遷移時,集群的狀態都會更新,這時會創建一個org.elasticsearch.cluster.service.MasterService.Batcher.UpdateTask
更新任務,UpdateTask 封裝了org.elasticsearch.cluster.ClusterStateTaskListener
監聽器實例,從而在執行Runnable任務后通過 Listener 執行通知回調。將多個UpdateTask提交給PrioritizedEsThreadPoolExecutor線程池執行,從而實現集群的任務狀態更新。另外,將PrioritizedEsThreadPoolExecutor 的線程數量 core pool size 和 max pool size都設置成1,提交給該線程池的任務只能由一個線程順序執行,避免了多個狀態並發更新導致的數據不一致性,而且避免了使用鎖的方式來進行同步,這種思路非常值得借鑒。關於org.elasticsearch.cluster.service.MasterService
實現集群狀態的更新的詳細實現,以后有時間再寫吧。
ES啟動時創建的線程池一覽:
[2019-08-15T18:30:38,829][DEBUG][o.e.t.ThreadPool ] [debug_node] created thread pool: name [force_merge], size [1], queue size [unbounded]
[2019-08-15T18:30:44,782][DEBUG][o.e.t.ThreadPool ] [debug_node] created thread pool: name [fetch_shard_started], core [1], max [8], keep alive [5m]
[2019-08-15T18:30:48,517][DEBUG][o.e.t.ThreadPool ] [debug_node] created thread pool: name [listener], size [2], queue size [unbounded]
[2019-08-15T18:30:48,535][DEBUG][o.e.t.ThreadPool ] [debug_node] created thread pool: name [index], size [4], queue size [200]
[2019-08-15T18:30:48,536][DEBUG][o.e.t.ThreadPool ] [debug_node] created thread pool: name [refresh], core [1], max [2], keep alive [5m]
[2019-08-15T18:30:48,537][DEBUG][o.e.t.ThreadPool ] [debug_node] created thread pool: name [generic], core [4], max [128], keep alive [30s]
[2019-08-15T18:30:48,537][DEBUG][o.e.t.ThreadPool ] [debug_node] created thread pool: name [rollup_indexing], size [4], queue size [4]
[2019-08-15T18:30:48,538][DEBUG][o.e.t.ThreadPool ] [debug_node] created thread pool: name [warmer], core [1], max [2], keep alive [5m]
[2019-08-15T18:30:48,551][DEBUG][o.e.c.u.c.QueueResizingEsThreadPoolExecutor] thread pool [debug_node/search] will adjust queue by [50] when determining automatic queue size
[2019-08-15T18:30:48,552][DEBUG][o.e.t.ThreadPool ] [debug_node] created thread pool: name [search], size [7], queue size [1k]
[2019-08-15T18:30:48,553][DEBUG][o.e.t.ThreadPool ] [debug_node] created thread pool: name [flush], core [1], max [2], keep alive [5m]
[2019-08-15T18:30:48,553][DEBUG][o.e.t.ThreadPool ] [debug_node] created thread pool: name [fetch_shard_store], core [1], max [8], keep alive [5m]
[2019-08-15T18:30:48,554][DEBUG][o.e.t.ThreadPool ] [debug_node] created thread pool: name [management], core [1], max [5], keep alive [5m]
[2019-08-15T18:30:48,554][DEBUG][o.e.t.ThreadPool ] [debug_node] created thread pool: name [get], size [4], queue size [1k]
[2019-08-15T18:30:48,555][DEBUG][o.e.t.ThreadPool ] [debug_node] created thread pool: name [analyze], size [1], queue size [16]
[2019-08-15T18:30:48,556][DEBUG][o.e.t.ThreadPool ] [debug_node] created thread pool: name [write], size [4], queue size [200]
[2019-08-15T18:30:48,556][DEBUG][o.e.t.ThreadPool ] [debug_node] created thread pool: name [snapshot], core [1], max [2], keep alive [5m]
最后扯一扯看源碼的一些體會:當開始看一個系統的源代碼時,一般是先用過它了,在使用的過程中了解了一些功能,然后不滿足於現狀,想要了解背后的原理。那面對一個幾十萬行代碼的系統,從哪個地方入手開始看呢?我覺得有以下幾點可供參考:
- docs 了解、官方文檔、github issue。
- 系統的啟動流程debug,從main函數開始,調試跟蹤它是怎么啟動的?
- 優先看一些基礎模塊(比如ES的線程池模塊),一是基礎模塊一般代碼比較少,不涉及到具體的使用功能,而且是通用的,並不需要"領域特定知識"。作為一個新手,對某個功能的流程都不清楚,直接看功能模塊源碼,很容易陷入細節,懵住。
- 對自己使用過的功能,嘗試閱讀它的源碼實現。因為,你用了這個功能解決了問題,有好奇心、帶着問題驅動,再去讀源碼細節實現好一些。
- 再說一個更具體的,當閱讀一個復雜的JAVA類 時,里面有很多實例變量(方法)、我一般是挑基礎的類入手,基礎的類是系統的"公共服務",很重要,其他地方又用到了它,理解了它,會更好地理解其他地方的代碼。基礎的類一般是作為其他類的實例變量,提供一個小功能,簡單。