探究ElasticSearch中的線程池實現
ElasticSearch里面各種操作都是基於線程池+回調實現的,所以這篇文章記錄一下java.util.concurrent
涉及線程池實現和ElasticSearch中如何自定義自己的線程池的。因為我們自己開發寫代碼,也經常會用到線程池,一般很少有機會自己去擴充實現一個自己的線程池,比如下面是我經常用的套路,其中SidSearchExceptionHandler
和SidSearchRejectExecutionHandler
都只是簡單地記錄日志。
//任務隊列
private BlockingQueue<Runnable> taskQueue = new LinkedBlockingQueue<>(1024);
//線程在執行過程中的異常處理器
private SidSearchExceptionHandler exceptionHandler = new SidSearchExceptionHandler();
//向線程池提交任務時,拒絕策略
private SidSearchRejectExecutionHandler rejectExecutionHandler = new SidSearchRejectExecutionHandler();
//借助Guava包中的ThreadFactoryBuild創建線程工廠(主要是方便指定線程的名稱,debug起來清晰)
private ThreadFactory threadFactory = new ThreadFactoryBuilder().setNameFormat("audio-%d").setUncaughtExceptionHandler(exceptionHandler).build();
//創建線程池
private ThreadPoolExecutor executor = new ThreadPoolExecutor(nThreads, nThreads, 1, TimeUnit.DAYS, taskQueue, threadFactory, rejectExecutionHandler);
比如下面這個自定義線程執行時異常處理策略,在線程執行過程中拋出異常時,只是簡單地打印日志:
public class SidSearchExceptionHandler implements Thread.UncaughtExceptionHandler {
public static final Logger logger = LoggerFactory.getLogger(SidSearchExceptionHandler.class);
@Override
public void uncaughtException(Thread t, Throwable e) {
logger.error("sid search thread pool execution error,thread name:{},cause:{},msg:{}",
t.getName(), e.getCause(), e.getMessage());
}
}
因此,看ES自定義的線程池實現,看下大神們是如何繼承ThreadPoolExecutor,定義異常處理策略的。
線程池基礎知識
1. 定義任務
想要執行:任務、或者叫業務邏輯的載體是:通過定義一個類,implements Runnable接口,Override Runnable接口的run()方法,在run()方法里面寫業務邏輯處理代碼(比如將數據寫入到數據庫、向ElasticSearch提交查詢請求……)
2. 提交任務
執行 java.util.concurrent.Executor
的 execute(Runnable runnable)方法,就能提交任務,線程池中某個具體的線程會執行提交的任務。
當所有的任務執行完成后,線程池是否要關閉?如果需要執行可返回結果的任務怎么辦?於是乎ExecutorService 就擴展Executor接口:public interface ExecutorService extends Executor
,提供了這些功能。
3. 執行任務
相比於ExecutorService
,ThreadPoolExecutor
添加了兩個方法,這樣可以在任務執行前和執行完成后做一些處理。
protected void beforeExecute(Thread t, Runnable r) { }
protected void afterExecute(Runnable r, Throwable t) { }
ElasticSearch中的EsThreadPoolExecutor.java
就實現了這兩個方法。
而真正的任務執行是在ThreadPoolExecutor的內部類Worker中run()方法實現
private final class Worker extends AbstractQueuedSynchronizer implements Runnable
{
// 接受一個Runnable任務,然后執行ThreadFactory newThread()創建線程執行任務
Worker(Runnable firstTask) {
setState(-1); // inhibit interrupts until runWorker
this.firstTask = firstTask;
this.thread = getThreadFactory().newThread(this);
}
/** Delegates main run loop to outer runWorker */
public void run() {
runWorker(this);
}
}
Work implements Runnable,調用ThreadPoolExecutor的 final void runWorker(Worker w)
執行任務。
來看一下runWorker方法中的部分代碼:
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
try {
//任務執行前做一些處理
beforeExecute(wt, task);
Throwable thrown = null;
try {
//任務執行
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
//任務執行完成后做一些處理
afterExecute(task, thrown);
}
任務是由具體的線程來執行的,因此還需要考慮線程是如何創建的。ThreadFactory
定義了創建線程池的方法newThread
public interface ThreadFactory {
/**
* Constructs a new {@code Thread}. Implementations may also initialize
* priority, name, daemon status, {@code ThreadGroup}, etc.
*
* @param r a runnable to be executed by new thread instance
* @return constructed thread, or {@code null} if the request to
* create a thread is rejected
*/
Thread newThread(Runnable r);
}
在Executors
工具類里面定義了具體的工廠類,用來創建線程池
/**
* The default thread factory
*/
static class DefaultThreadFactory implements ThreadFactory {
private static final AtomicInteger poolNumber = new AtomicInteger(1);
private final ThreadGroup group;
private final AtomicInteger threadNumber = new AtomicInteger(1);
private final String namePrefix;
DefaultThreadFactory() {
SecurityManager s = System.getSecurityManager();
group = (s != null) ? s.getThreadGroup() :
Thread.currentThread().getThreadGroup();
namePrefix = "pool-" +
poolNumber.getAndIncrement() +
"-thread-";
}
public Thread newThread(Runnable r) {
Thread t = new Thread(group, r,
namePrefix + threadNumber.getAndIncrement(),
0);
if (t.isDaemon())
t.setDaemon(false);
if (t.getPriority() != Thread.NORM_PRIORITY)
t.setPriority(Thread.NORM_PRIORITY);
return t;
}
}
ElasticSearch 源碼線程池實現
1. EsThreadFactory創建線程
EsExecutors
的內部類EsThreadFactory
static class EsThreadFactory implements ThreadFactory {
final ThreadGroup group;
final AtomicInteger threadNumber = new AtomicInteger(1);
final String namePrefix;
EsThreadFactory(String namePrefix) {
this.namePrefix = namePrefix;
SecurityManager s = System.getSecurityManager();
group = (s != null) ? s.getThreadGroup() :
Thread.currentThread().getThreadGroup();
}
@Override
public Thread newThread(Runnable r) {
Thread t = new Thread(group, r,
namePrefix + "[T#" + threadNumber.getAndIncrement() + "]",
0);
t.setDaemon(true);
return t;
}
}
線程組、線程數量、線程名稱
在創建線程時,一般會為之指定一個線程執行的異常處理策略。驚奇的是EsThreadFactory
里面並沒有顯示地定義線程執行時的異常處理策略(可能在其他代碼中,通過匿名內部類的方式定義了異常處理策略吧)。而是使用ThreadGroup中定義的默認異常處理策略:
public class ThreadGroup implements Thread.UncaughtExceptionHandler {
如果要自定義線程執行過程中出現異常的處理策略,只需要 implements Thread.UncaughtExceptionHandler
並且重寫它的uncaughtException(Thread t, Throwable e)
方法即可。如果未提供線程執行過程中出現異常的處理策略,那么就使用該默認的異常處理策略。
看java.lang.ThreadGroup
里面的uncaughtException(Thread t, Throwable e)
方法的注釋:
Called by the Java Virtual Machine when a thread in this thread group stops because of an uncaught exception, and the thread does not have a specific Thread.UncaughtExceptionHandler installed.
The uncaughtException method of ThreadGroup does the following:
If this thread group has a parent thread group, the uncaughtException method of that parent is called with the same two arguments.
Otherwise, this method checks to see if there is a Thread.getDefaultUncaughtExceptionHandler default uncaught exception handler installed, and if so, its uncaughtException method is called with the same two arguments.
如果在創建線程工廠的時候指定了UncaughtExceptionHandler,通過Thread.getDefaultUncaughtExceptionHandler 就能獲取到。
//在創建線程工廠時調用setUncaughtExceptionHandler方法設置一個自定義的:UncaughtExceptionHandler
//若在線程執行過程中出現了異常,那么 exceptionHandler 對象的uncaughtException(Thread t, Throwable e) 方 //法就會被調用
private ThreadFactory threadFactory = new ThreadFactoryBuilder().setNameFormat("audio-%d").setUncaughtExceptionHandler(exceptionHandler).build();
Otherwise, this method determines if the Throwable argument is an instance of ThreadDeath. If so, nothing special is done. Otherwise, a message containing the thread's name, as returned from the thread's Thread.getName method, and a stack backtrace,using the Throwable's Throwable.printStackTrace method, is printed to the System err
當未指定異常處理器時,若參數Throwable e
是一個ThreadDeath對象,那么什么也不做。
如果參數Throwable e
不是一個ThreadDeath對象,那么就會通過方法Throwable.printStackTrac
打印異常
2.EsThreadPoolExecutor 創建線程池
public class EsThreadPoolExecutor extends ThreadPoolExecutor {
private final ThreadContext contextHolder;
private volatile ShutdownListener listener;
A ThreadContext is a map of string headers and a transient map of keyed objects that are associated with a thread. It allows to store and retrieve header information across method calls, network calls as well as threads spawned from a thread that has a ThreadContext associated with.
從它的構造方法中可看出,多了個ThreadContext
(多了保存一些線程執行上下文信息的功能)
EsThreadPoolExecutor(String name, int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit,
BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, XRejectedExecutionHandler handler,
ThreadContext contextHolder) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler);
this.name = name;
this.contextHolder = contextHolder;
}
再看EsThreadPoolExecutor Override ThreadPoolExecutor 的execute()方法:
@Override
public void execute(final Runnable command) {
doExecute(wrapRunnable(command));
}
protected void doExecute(final Runnable command) {
try {
super.execute(command);
} catch (EsRejectedExecutionException ex) {
if (command instanceof AbstractRunnable) {
// If we are an abstract runnable we can handle the rejection
// directly and don't need to rethrow it.
try {
((AbstractRunnable) command).onRejection(ex);
} finally {
((AbstractRunnable) command).onAfter();
}
} else {
throw ex;
}
}
}
doExecute()先執行super.execute(command);
在這里面有任務拒絕策略的檢查邏輯,如果任務被拒絕了,就會調用EsAbortPolicy
的rejectedExecution()
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
//調用拒絕策略
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
/**
* Invokes the rejected execution handler for the given command.
* Package-protected for use by ScheduledThreadPoolExecutor.
*/
final void reject(Runnable command) {
//handler 就是在new ThreadPoolExecutor對象 時傳遞的 RejectedExecutionHandler對象
handler.rejectedExecution(command, this);
}
然后可以在doExecute()里面多做一些額外的處理:((AbstractRunnable) command).onRejection(ex);
任務被拒絕之后發個消息通知啥的。
ElasticSearch中的拒絕策略實現EsAbortPolicy
:
public class EsAbortPolicy implements XRejectedExecutionHandler {
private final CounterMetric rejected = new CounterMetric();
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
if (r instanceof AbstractRunnable) {
//判斷任務是否要強制執行
if (((AbstractRunnable) r).isForceExecution()) {
BlockingQueue<Runnable> queue = executor.getQueue();
//創建ThreadPoolExecutor指定的 任務隊列 類型是SizeBlockingQueue
if (!(queue instanceof SizeBlockingQueue)) {
throw new IllegalStateException("forced execution, but expected a size queue");
}
try {
//盡管任務執行失敗了,還是再一次把它提交到任務隊列,這樣拒絕的任務又可以有執行機會了
((SizeBlockingQueue) queue).forcePut(r);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new IllegalStateException("forced execution, but got interrupted", e);
}
return;
}
}
rejected.inc();
throw new EsRejectedExecutionException("rejected execution of " + r + " on " + executor, executor.isShutdown());
}
@Override
public long rejected() {
return rejected.count();
}
}
public interface XRejectedExecutionHandler extends RejectedExecutionHandler {
/**
* The number of rejected executions.
*/
long rejected();
}
XRejectedExecutionHandler
統計任務被拒絕的次數。用的是java.util.concurrent.atomic.LongAdder
,又發現了一個新的計數器:關於LongAdder與AtomicLong的對比
看完這個實現,是不是下次也可以模仿實現:當向 線程池 提交任務被拒絕了,也能夠失敗重試^~^
前面講了這么多,都是在對比ElasticSearch中的線程池與JDK並發包中的線程池背后執行的一些原理。ElasticSearch中的自定義線程池就是基於JDK並發包中的線程池實現的。
下面來正式分析下ElasticSearch源碼中線程池創建流程。
在節點啟動過程中,org.elasticsearch.node.Node.java
開始創建線程池:
final ThreadPool threadPool = new ThreadPool(settings, executorBuilders.toArray(new ExecutorBuilder[0]));
看ThreadPool源碼:里面有很多實例變量,如下:
public class ThreadPool extends AbstractComponent implements Scheduler, Closeable {
private Map<String, ExecutorHolder> executors = new HashMap<>();
static final ExecutorService DIRECT_EXECUTOR = EsExecutors.newDirectExecutorService();
private final Map<String, ExecutorBuilder> builders;
private final ThreadContext threadContext;
private final ScheduledThreadPoolExecutor scheduler;
比如說:ExecutorService DIRECT_EXECUTOR = EsExecutors.newDirectExecutorService();
就是一個線程池。還有一些線程池是通過ExecutorBuilder
來創建的(Map<String, ExecutorBuilder> builders
)
線程池類型:ThreadPool的內部類ThreadPoolType
public enum ThreadPoolType {
DIRECT("direct"),
FIXED("fixed"),
FIXED_AUTO_QUEUE_SIZE("fixed_auto_queue_size"),
SCALING("scaling");
一個HashMap存儲線程池名稱,以及相應的類型。
static {
HashMap<String, ThreadPoolType> map = new HashMap<>();
map.put(Names.SAME, ThreadPoolType.DIRECT);
map.put(Names.GENERIC, ThreadPoolType.SCALING);
map.put(Names.LISTENER, ThreadPoolType.FIXED);
map.put(Names.GET, ThreadPoolType.FIXED);
map.put(Names.ANALYZE, ThreadPoolType.FIXED);
map.put(Names.INDEX, ThreadPoolType.FIXED);
map.put(Names.WRITE, ThreadPoolType.FIXED);
map.put(Names.SEARCH, ThreadPoolType.FIXED_AUTO_QUEUE_SIZE);
map.put(Names.MANAGEMENT, ThreadPoolType.SCALING);
map.put(Names.FLUSH, ThreadPoolType.SCALING);
map.put(Names.REFRESH, ThreadPoolType.SCALING);
map.put(Names.WARMER, ThreadPoolType.SCALING);
map.put(Names.SNAPSHOT, ThreadPoolType.SCALING);
map.put(Names.FORCE_MERGE, ThreadPoolType.FIXED);
map.put(Names.FETCH_SHARD_STARTED, ThreadPoolType.SCALING);
map.put(Names.FETCH_SHARD_STORE, ThreadPoolType.SCALING);
THREAD_POOL_TYPES = Collections.unmodifiableMap(map);
}
而真正創建線程池的代碼,是在ThreadPool的構造方法中的for循環final ExecutorHolder executorHolder = entry.getValue().build(executorSettings, threadContext);
,這行語句的build方法。
for (@SuppressWarnings("unchecked") final Map.Entry<String, ExecutorBuilder> entry : builders.entrySet()) {
final ExecutorBuilder.ExecutorSettings executorSettings = entry.getValue().getSettings(settings);
final ExecutorHolder executorHolder = entry.getValue().build(executorSettings, threadContext);
if (executors.containsKey(executorHolder.info.getName())) {
throw new IllegalStateException("duplicate executors with name [" + executorHolder.info.getName() + "] registered");
}
logger.debug("created thread pool: {}", entry.getValue().formatInfo(executorHolder.info));
executors.put(entry.getKey(), executorHolder);
}
前面枚舉類 ThreadPoolType 中有四種類型的線程池,對應着上圖的三個ExecutorBuild類,看org.elasticsearch.threadpool.FixedExecutorBuilder
的build方法:創建線程池需要參數FixedExecutorSettings,需要保存線程上下文 ThreadContext
@Override
ThreadPool.ExecutorHolder build(final FixedExecutorSettings settings, final ThreadContext threadContext) {
int size = settings.size;
int queueSize = settings.queueSize;
final ThreadFactory threadFactory = EsExecutors.daemonThreadFactory(EsExecutors.threadName(settings.nodeName, name()));
final ExecutorService executor =
EsExecutors.newFixed(settings.nodeName + "/" + name(), size, queueSize, threadFactory, threadContext);
final String name;
if ("write".equals(name()) && Booleans.parseBoolean(System.getProperty("es.thread_pool.write.use_bulk_as_display_name", "false"))) {
name = "bulk";
} else {
name = name();
}
final ThreadPool.Info info =
new ThreadPool.Info(name, ThreadPool.ThreadPoolType.FIXED, size, size, null, queueSize < 0 ? null : new SizeValue(queueSize));
return new ThreadPool.ExecutorHolder(executor, info);
}
其中的這兩行代碼:
final ThreadFactory threadFactory = EsExecutors.daemonThreadFactory(EsExecutors.threadName(settings.nodeName, name()));
構建線程工廠。
final ExecutorService executor =
EsExecutors.newFixed(settings.nodeName + "/" + name(), size, queueSize, threadFactory, threadContext);
構建線程池。
至此,ElasticSearch構建線程池整個流程就是這樣了。
構建出來的線程池被封裝在ThreadPool.ExecutorHolder
類中new ThreadPool.ExecutorHolder(executor, info);
final ThreadPool.Info info =
new ThreadPool.Info(name, ThreadPool.ThreadPoolType.FIXED, size, size, null, queueSize < 0 ? null : new SizeValue(queueSize));
return new ThreadPool.ExecutorHolder(executor, info);
當所有的線程池構造完成后,在節點啟動過程中初始化各種服務時,new 這些對象時,都需要傳一個ThreadPool 參數,各個服務就可以使用線程池來執行任務了。org.elasticsearch.node.Node.java
中代碼:
//構造好各種線程池
final ThreadPool threadPool = new ThreadPool(settings, executorBuilders.toArray(new ExecutorBuilder[0]));
//clusterService 用到了threadPool
final ClusterService clusterService = new ClusterService(settings, settingsModule.getClusterSettings(), threadPool,
ClusterModule.getClusterStateCustomSuppliers(clusterPlugins));
//monitorService 用到了threadPool
final MonitorService monitorService = new MonitorService(settings, nodeEnvironment, threadPool, clusterInfoService);
//actionModule
ActionModule actionModule = new ActionModule(false, settings, clusterModule.getIndexNameExpressionResolver(),
settingsModule.getIndexScopedSettings(), settingsModule.getClusterSettings(), settingsModule.getSettingsFilter(),
threadPool, pluginsService.filterPlugins(ActionPlugin.class), client, circuitBreakerService, usageService);
//...在new 很多其他 XXXService時,都需要傳一個ThreadPool參數。
因此,可以說ThreadPool在ElasticSearch各種操作中無處不在。哈哈。
總結
這篇文章寫得有點亂,主要兩個方面:一個是JDK包中原生線程池相關功能介紹,然后對比ElasticSearch中如何實現自定義的線程池。分析了ElasticSearch中自定義線程池任務提交時的拒絕策略和線程執行過程中拋出異常時的異常處理策略。然后大概分析下ElasticSearch中線程池的創建流程:從org.elasticsearch.node.Node
開始:
主要涉及到以下類:
org.elasticsearch.common.util.concurrent.EsThreadPoolExecutor
org.elasticsearch.threadpool.ExecutorBuilder
的三個子類:
org.elasticsearch.threadpool.FixedExecutorBuilder
org.elasticsearch.threadpool.AutoQueueAdjustingExecutorBuilder
org.elasticsearch.threadpool.ScalingExecutorBuilder
org.elasticsearch.threadpool.ThreadPool