本文介紹Hystrix線程池的工作原理和參數配置,指出存在的問題並提供規避方案,閱讀本文需要對Hystrix有一定的了解。
文本討論的內容,基於hystrix 1.5.18:
<dependency>
<groupId>com.netflix.hystrix</groupId>
<artifactId>hystrix-core</artifactId>
<version>1.5.18</version>
</dependency>
線程池和Hystrix Command之間的關系
當hystrix command的隔離策略配置為線程,也就是execution.isolation.strategy設置為THREAD時,command中的代碼會放到線程池里執行,跟發起command調用的線程隔離開。摘要官方wiki如下:
execution.isolation.strategy
This property indicates which isolation strategy HystrixCommand.run() executes with, one of the following two choices:
THREAD — it executes on a separate thread and concurrent requests are limited by the number of threads in the thread-pool
SEMAPHORE — it executes on the calling thread and concurrent requests are limited by the semaphore count
一個線上的服務,往往會有很多hystrix command分別用來管理不同的外部依賴。 但會有幾個hystrix線程池存在呢,這些command跟線程池的對應關系又是怎樣的呢,是一對一嗎?
答案是不一定,command跟線程池可以做到一對一,但通常不是,受到HystrixThreadPoolKey和HystrixCommandGroupKey這兩項配置的影響。
優先采用HystrixThreadPoolKey來標識線程池,如果沒有配置HystrixThreadPoolKey那么就使用HystrixCommandGroupKey來標識。command跟線程池的對應關系,就看HystrixCommandKey、HystrixThreadPoolKey、HystrixCommandGroupKey這三個參數的配置。
獲取線程池標識的代碼如下,可以看到跟我的描述是一致的:
/*
* ThreadPoolKey
*
* This defines which thread-pool this command should run on.
*
* It uses the HystrixThreadPoolKey if provided, then defaults to use HystrixCommandGroup.
*
* It can then be overridden by a property if defined so it can be changed at runtime.
*/
private static HystrixThreadPoolKey initThreadPoolKey(HystrixThreadPoolKey threadPoolKey, HystrixCommandGroupKey groupKey, String threadPoolKeyOverride) {
if (threadPoolKeyOverride == null) {
// we don't have a property overriding the value so use either HystrixThreadPoolKey or HystrixCommandGroup
if (threadPoolKey == null) {
/* use HystrixCommandGroup if HystrixThreadPoolKey is null */
return HystrixThreadPoolKey.Factory.asKey(groupKey.name());
} else {
return threadPoolKey;
}
} else {
// we have a property defining the thread-pool so use it instead
return HystrixThreadPoolKey.Factory.asKey(threadPoolKeyOverride);
}
}
Hystrix會保證同一個線程池標識只會創建一個線程池:
/*
* Use the String from HystrixThreadPoolKey.name() instead of the HystrixThreadPoolKey instance as it's just an interface and we can't ensure the object
* we receive implements hashcode/equals correctly and do not want the default hashcode/equals which would create a new threadpool for every object we get even if the name is the same
*/
/* package */final static ConcurrentHashMap<String, HystrixThreadPool> threadPools = new ConcurrentHashMap<String, HystrixThreadPool>();
/**
* Get the {@link HystrixThreadPool} instance for a given {@link HystrixThreadPoolKey}.
* <p>
* This is thread-safe and ensures only 1 {@link HystrixThreadPool} per {@link HystrixThreadPoolKey}.
*
* @return {@link HystrixThreadPool} instance
*/
/* package */static HystrixThreadPool getInstance(HystrixThreadPoolKey threadPoolKey, HystrixThreadPoolProperties.Setter propertiesBuilder) {
// get the key to use instead of using the object itself so that if people forget to implement equals/hashcode things will still work
String key = threadPoolKey.name();
// this should find it for all but the first time
HystrixThreadPool previouslyCached = threadPools.get(key);
if (previouslyCached != null) {
return previouslyCached;
}
// if we get here this is the first time so we need to initialize
synchronized (HystrixThreadPool.class) {
if (!threadPools.containsKey(key)) {
threadPools.put(key, new HystrixThreadPoolDefault(threadPoolKey, propertiesBuilder));
}
}
return threadPools.get(key);
}
Hystrix線程池參數一覽
- coreSize 核心線程數量
- maximumSize 最大線程數量
- allowMaximumSizeToDivergeFromCoreSize 允許maximumSize大於coreSize,只有配了這個值coreSize才有意義
- keepAliveTimeMinutes 超過這個時間多於coreSize數量的線程會被回收,只有maximumsize大於coreSize,這個值才有意義
- maxQueueSize 任務隊列的最大大小,當線程池的線程線程都在工作,也不能創建新的線程的時候,新的任務會進到隊列里等待
- queueSizeRejectionThreshold 任務隊列中存儲的任務數量超過這個值,線程池拒絕新的任務。這跟maxQueueSize本來是一回事,只是受限於hystrix的實現方式maxQueueSize不能動態配置,所以有了這個配置。
根據給定的線程池參數猜測線程池表現
可以看到hystrix的線程池參數跟JDK線程池ThreadPoolExecutor參數很像但又不一樣,即便是完整地看了文檔,仍然讓人迷惑。不過無妨,先來猜猜幾種配置下的表現。
coreSize = 2; maxQueueSize = 10
線程池中常駐2個線程。新任務提交到線程池,有空閑線程則直接執行,否則入隊等候。等待隊列中的任務數=10時,拒絕接受新任務。
coreSize = 2; maximumSize = 5; maxQueueSize = -1
線程池中常駐2個線程。新任務提交到線程池,有空閑線程則直接執行,沒有空閑線程時,如果當前線程數小於5則創建1個新的線程用來執行任務,否則拒絕任務。
coreSize = 2; maximumSize = 5; maxQueueSize = 10
這種配置下從官方文檔中已經看不出來實際表現會是怎樣的。猜測有如下兩種可能:
-
可能一。線程池中常駐2個線程。新任務提交到線程池,2個線程中有空閑則直接執行,否則入隊等候。當2個線程都在工作且等待隊列中的任務數=10時,開始為新任務創建線程,直到線程數量為5,此時開始拒絕新任務。這樣的話,對資源敏感型的任務比較友好,這也是JDK線程池ThreadPoolExecutor的行為。
-
可能二。線程池中常駐2個線程。新任務提交到線程池,有空閑線程則直接執行,沒有空閑線程時,如果當前線程數小於5則創建1個新的線程用來執行任務。當線程數量達到5個且都在工作時,任務入隊等候。等待隊列中的任務數=10時,拒絕接受新任務。這樣的話,對延遲敏感型的任務比較友好。
兩種情況都有可能,從文檔中無法確定究竟如何。
並發情況下Hystrix線程池的真正表現
本節中,通過測試來看看線程池的行為究竟會怎樣。
還是這個配置:
coreSize = 2; maximumSize = 5; maxQueueSize = 10
我們通過不斷提交任務到hystrix線程池,並且在任務的執行代碼中使用CountDownLatch占住線程來模擬測試,代碼如下:
public class HystrixThreadPoolTest {
public static void main(String[] args) throws InterruptedException {
final int coreSize = 2, maximumSize = 5, maxQueueSize = 10;
final String commandName = "TestThreadPoolCommand";
final HystrixCommand.Setter commandConfig = HystrixCommand.Setter
.withGroupKey(HystrixCommandGroupKey.Factory.asKey(commandName))
.andCommandKey(HystrixCommandKey.Factory.asKey(commandName))
.andCommandPropertiesDefaults(
HystrixCommandProperties.Setter()
.withExecutionTimeoutEnabled(false))
.andThreadPoolPropertiesDefaults(
HystrixThreadPoolProperties.Setter()
.withCoreSize(coreSize)
.withMaximumSize(maximumSize)
.withAllowMaximumSizeToDivergeFromCoreSize(true)
.withMaxQueueSize(maxQueueSize)
.withQueueSizeRejectionThreshold(maxQueueSize));
// Run command once, so we can get metrics.
HystrixCommand<Void> command = new HystrixCommand<Void>(commandConfig) {
@Override protected Void run() throws Exception {
return null;
}
};
command.execute();
Thread.sleep(100);
final CountDownLatch stopLatch = new CountDownLatch(1);
List<Thread> threads = new ArrayList<Thread>();
for (int i = 0; i < coreSize + maximumSize + maxQueueSize; i++) {
final int fi = i + 1;
Thread thread = new Thread(new Runnable() {
public void run() {
try {
HystrixCommand<Void> command = new HystrixCommand<Void>(commandConfig) {
@Override protected Void run() throws Exception {
stopLatch.await();
return null;
}
};
command.execute();
} catch (HystrixRuntimeException e) {
System.out.println("Started Jobs: " + fi);
System.out.println("Job:" + fi + " got rejected.");
printThreadPoolStatus();
System.out.println();
}
}
});
threads.add(thread);
thread.start();
Thread.sleep(200);
if(fi == coreSize || fi == coreSize + maximumSize || fi == coreSize + maxQueueSize ) {
System.out.println("Started Jobs: " + fi);
printThreadPoolStatus();
System.out.println();
}
}
stopLatch.countDown();
for (Thread thread : threads) {
thread.join();
}
}
static void printThreadPoolStatus() {
for (HystrixThreadPoolMetrics threadPoolMetrics : HystrixThreadPoolMetrics.getInstances()) {
String name = threadPoolMetrics.getThreadPoolKey().name();
Number poolSize = threadPoolMetrics.getCurrentPoolSize();
Number queueSize = threadPoolMetrics.getCurrentQueueSize();
System.out.println("ThreadPoolKey: " + name + ", PoolSize: " + poolSize + ", QueueSize: " + queueSize);
}
}
}
執行代碼得到如下輸出:
// 任務數 = coreSize。此時coreSize個線程在工作
Started Jobs: 2
ThreadPoolKey: TestThreadPoolCommand, PoolSize: 2, QueueSize: 0
// 任務數 > coreSize。此時仍然只有coreSize個線程,多於coreSize的任務進入等候隊列,沒有創建新的線程
Started Jobs: 7
ThreadPoolKey: TestThreadPoolCommand, PoolSize: 2, QueueSize: 5
// 任務數 = coreSize + maxQueueSize。此時仍然只有coreSize個線程,多於coreSize的任務進入等候隊列,沒有創建新的線程
Started Jobs: 12
ThreadPoolKey: TestThreadPoolCommand, PoolSize: 2, QueueSize: 10
// 任務數 > coreSize + maxQueueSize。此時仍然只有coreSize個線程,等候隊列已滿,新增任務被拒絕
Started Jobs: 13
Job:13 got rejected.
ThreadPoolKey: TestThreadPoolCommand, PoolSize: 2, QueueSize: 10
Started Jobs: 14
Job:14 got rejected.
ThreadPoolKey: TestThreadPoolCommand, PoolSize: 2, QueueSize: 10
Started Jobs: 15
Job:15 got rejected.
ThreadPoolKey: TestThreadPoolCommand, PoolSize: 2, QueueSize: 10
Started Jobs: 16
Job:16 got rejected.
ThreadPoolKey: TestThreadPoolCommand, PoolSize: 2, QueueSize: 10
Started Jobs: 17
Job:17 got rejected.
ThreadPoolKey: TestThreadPoolCommand, PoolSize: 2, QueueSize: 10
完整的測試代碼,參見這里
可以看到Hystrix線程池的實際表現,跟之前的兩種猜測都不同,跟JDK線程池的表現不同,跟另一種合理猜測也不通。當maxSize > coreSize && maxQueueSize != -1的時候,maxSize這個參數根本就不起作用,線程數量永遠不會超過coreSize,對於的任務入隊等候,隊列滿了,就直接拒絕新任務。
不得不說,這是一種讓人疑惑的,非常危險的,容易配置錯誤的線程池表現。
JDK線程池ThreadPoolExecutor
繼續分析Hystrix線程池的原理之前,先來復習一下JDK中的線程池。
只說跟本文討論的內容相關的參數:
- corePoolSize核心線程數,maximumPoolSize最大線程數。這個兩個參數跟hystrix線程池的coreSize和maximumSize含義是一致的。
- workQueue任務等候隊列。跟hystrix不同,jdk線程池的等候隊列不是指定大小,而是需要使用方提供一個BlockingQueue。
- handler當線程池無法接受任務時的處理器。hystrix是直接拒絕,jdk線程池可以定制。
可以看到,jdk的線程池使用起來更加靈活。配置參數的含義也十分清晰,沒有hystrx線程池里面allowMaximumSizeToDivergeFromCoreSize、queueSizeRejectionThreshold這種奇奇怪怪讓人疑惑的參數。
關於jdk線程池的參數配置,參加如下jdk源碼:
/**
* Creates a new {@code ThreadPoolExecutor} with the given initial
* parameters.
*
* @param corePoolSize the number of threads to keep in the pool, even
* if they are idle, unless {@code allowCoreThreadTimeOut} is set
* @param maximumPoolSize the maximum number of threads to allow in the
* pool
* @param keepAliveTime when the number of threads is greater than
* the core, this is the maximum time that excess idle threads
* will wait for new tasks before terminating.
* @param unit the time unit for the {@code keepAliveTime} argument
* @param workQueue the queue to use for holding tasks before they are
* executed. This queue will hold only the {@code Runnable}
* tasks submitted by the {@code execute} method.
* @param threadFactory the factory to use when the executor
* creates a new thread
* @param handler the handler to use when execution is blocked
* because the thread bounds and queue capacities are reached
* @throws IllegalArgumentException if one of the following holds:<br>
* {@code corePoolSize < 0}<br>
* {@code keepAliveTime < 0}<br>
* {@code maximumPoolSize <= 0}<br>
* {@code maximumPoolSize < corePoolSize}
* @throws NullPointerException if {@code workQueue}
* or {@code threadFactory} or {@code handler} is null
*/
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}
那么在跟hystrix線程池對應的參數配置下,jdk線程池的表現會怎樣呢?
corePoolSize = 2; maximumPoolSize = 5; workQueue = new ArrayBlockingQueue(10); handler = new ThreadPoolExecutor.DiscardPolicy()
這里不再測試了,直接給出答案。線程池中常駐2個線程。新任務提交到線程池,2個線程中有空閑則直接執行,否則入隊等候。當2個線程都在工作且等待隊列中的任務數=10時,開始為新任務創建線程,直到線程數量為5,此時開始拒絕新任務。
相關邏輯涉及的源碼貼在下面。值得一提的是,jdk線程池並不根據等候任務的數量來判斷等候隊列是否已滿,而是直接調用workQueue的offer方法,如果workQueue接受了那就入隊等候,否則執行拒絕策略。
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
/*
* Proceed in 3 steps:
*
* 1. If fewer than corePoolSize threads are running, try to
* start a new thread with the given command as its first
* task. The call to addWorker atomically checks runState and
* workerCount, and so prevents false alarms that would add
* threads when it shouldn't, by returning false.
*
* 2. If a task can be successfully queued, then we still need
* to double-check whether we should have added a thread
* (because existing ones died since last checking) or that
* the pool shut down since entry into this method. So we
* recheck state and if necessary roll back the enqueuing if
* stopped, or start a new thread if there are none.
*
* 3. If we cannot queue task, then we try to add a new
* thread. If it fails, we know we are shut down or saturated
* and so reject the task.
*/
int c = ctl.get();
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
else if (!addWorker(command, false))
reject(command);
}
可以看到hystrix線程池的配置參數跟jdk線程池是非常像的,從名字到含義,都基本一致。
為什么
事實上hystrix的線程池,就是在jdk線程池的基礎上實現的。相關代碼如下:
public ThreadPoolExecutor getThreadPool(final HystrixThreadPoolKey threadPoolKey, HystrixThreadPoolProperties threadPoolProperties) {
final ThreadFactory threadFactory = getThreadFactory(threadPoolKey);
final boolean allowMaximumSizeToDivergeFromCoreSize = threadPoolProperties.getAllowMaximumSizeToDivergeFromCoreSize().get();
final int dynamicCoreSize = threadPoolProperties.coreSize().get();
final int keepAliveTime = threadPoolProperties.keepAliveTimeMinutes().get();
final int maxQueueSize = threadPoolProperties.maxQueueSize().get();
final BlockingQueue<Runnable> workQueue = getBlockingQueue(maxQueueSize);
if (allowMaximumSizeToDivergeFromCoreSize) {
final int dynamicMaximumSize = threadPoolProperties.maximumSize().get();
if (dynamicCoreSize > dynamicMaximumSize) {
logger.error("Hystrix ThreadPool configuration at startup for : " + threadPoolKey.name() + " is trying to set coreSize = " +
dynamicCoreSize + " and maximumSize = " + dynamicMaximumSize + ". Maximum size will be set to " +
dynamicCoreSize + ", the coreSize value, since it must be equal to or greater than the coreSize value");
return new ThreadPoolExecutor(dynamicCoreSize, dynamicCoreSize, keepAliveTime, TimeUnit.MINUTES, workQueue, threadFactory);
} else {
return new ThreadPoolExecutor(dynamicCoreSize, dynamicMaximumSize, keepAliveTime, TimeUnit.MINUTES, workQueue, threadFactory);
}
} else {
return new ThreadPoolExecutor(dynamicCoreSize, dynamicCoreSize, keepAliveTime, TimeUnit.MINUTES, workQueue, threadFactory);
}
}
public BlockingQueue<Runnable> getBlockingQueue(int maxQueueSize) {
/*
* We are using SynchronousQueue if maxQueueSize <= 0 (meaning a queue is not wanted).
* <p>
* SynchronousQueue will do a handoff from calling thread to worker thread and not allow queuing which is what we want.
* <p>
* Queuing results in added latency and would only occur when the thread-pool is full at which point there are latency issues
* and rejecting is the preferred solution.
*/
if (maxQueueSize <= 0) {
return new SynchronousQueue<Runnable>();
} else {
return new LinkedBlockingQueue<Runnable>(maxQueueSize);
}
}
既然hystrix線程池基於jdk線程池實現,為什么在如下兩個基本一致的配置上,行為卻不一樣呢?
//hystrix
coreSize = 2; maximumSize = 5; maxQueueSize = 10
//jdk
corePoolSize = 2; maximumPoolSize = 5; workQueue = new ArrayBlockingQueue(10); handler = new ThreadPoolExecutor.DiscardPolicy()
jdk在隊列滿了之后會創建線程執行新任務直到線程數量達到maximumPoolSize,而hystrix在隊列滿了之后直接拒絕新任務,maximumSize這項配置成了擺設。
原因就在於hystrix判斷隊列是否滿是否要拒絕新任務,沒有通過jdk線程池在判斷,而是自己判斷的。參見如下hystrix源碼:
public boolean isQueueSpaceAvailable() {
if (queueSize <= 0) {
// we don't have a queue so we won't look for space but instead
// let the thread-pool reject or not
return true;
} else {
return threadPool.getQueue().size() < properties.queueSizeRejectionThreshold().get();
}
}
public Subscription schedule(Action0 action, long delayTime, TimeUnit unit) {
if (threadPool != null) {
if (!threadPool.isQueueSpaceAvailable()) {
throw new RejectedExecutionException("Rejected command because thread-pool queueSize is at rejection threshold.");
}
}
return worker.schedule(new HystrixContexSchedulerAction(concurrencyStrategy, action), delayTime, unit);
}
可以看到hystrix在隊列大小達到maxQueueSize時,根本不會往底層的ThreadPoolExecutor提交任務。ThreadPoolExecutor也就沒有機會判斷workQueue能不能offer,更不能創建新的線程了。
怎么辦
對用慣了jdk的ThreadPoolExecutor的人來說,再用hystrix的確容易出錯,筆者就曾在多個重要線上服務的代碼里看到過錯誤的配置,稱一聲危險的hystrix線程池不為過。
那怎么辦呢?
配置的時候規避問題
同時配置maximumSize > coreSize,maxQueueSize > 0,像下面這樣,是不行了。
coreSize = 2; maximumSize = 5; maxQueueSize = 10
妥協一下,如果對延遲比較看重,配置maximumSize > coreSize,maxQueueSize = -1。這樣在任務多的時候,不會有等候隊列,直接創建新線程執行任務。
coreSize = 2; maximumSize = 5; maxQueueSize = -1
如果對資源比較看重, 不希望創建過多線程,配置maximumSize = coreSize,maxQueueSize > 0。這樣在任務多的時候,會進等候隊列,直到有線程空閑或者超時。
coreSize = 2; maximumSize = 2; maxQueueSize = 10
在hystrix上修復這個問題
技術上是可行的,有很多方案可以做到。但Netflix已經宣布不再維護hystrix了,這條路也就不通了,除非維護自己的hystrix分支版本。
Reference
https://github.com/Netflix/Hystrix/wiki/Configuration
https://github.com/Netflix/Hystrix/issues/1589
https://github.com/Netflix/Hystrix/pull/1670