1. 什么是Hystrix
Hystrix是Netflix的一個開源框架,地址如下:https://github.com/Netflix/Hystrix
中文名為“豪豬”,即平時很溫順,在感受到危險的時候,用刺保護自己;在危險過去后,還是一個溫順的肉球。
所以,整個框架的核心業務也就是這2點:
-
何時需要保護
-
如何保護
2. 何時需要保護
對於一個系統而言,它往往承擔着2層角色,服務提供者與服務消費者。對於服務消費者而言最大的痛苦就是如何“明哲保身”,做過網關項目的同學肯定感同身受
上面是一個常見的系統依賴關系,底層的依賴往往很多,通信協議包括 socket、HTTP、Dubbo、WebService等等。當通信層發生網絡抖動以及所依賴的系統發生業務響應異常時,我們業務本身所提供的服務能力也直接會受到影響。
這種效果傳遞下去就很有可能造成雪崩效應,即整個業務聯調發生異常,比如業務整體超時,或者訂單數據不一致。
那么核心問題就來了,如何檢測業務處於異常狀態?
成功率!成功率直接反映了業務的數據流轉狀態,是最直接的業務表現。
當然,也可以根據超時時間做判斷,比如 Sentinel 的實現。其實這里概念上可以做一個轉化,用時間做超時控制,超時=失敗,這依然是一個成功率的概念。
3. 如何保護
如同豪豬一樣,“刺”就是他的保護工具,所有的攻擊都會被刺無情的懟回去。
在 Hystrix 的實現中,這就出現了“熔斷器”的概念,即當前的系統是否處於需要保護的狀態。
當熔斷器處於開啟的狀態時,所有的請求都不會真正的走之前的業務邏輯,而是直接返回一個約定的信息,即 FallBack。通過這種快速失敗原則保護我們的系統。
但是,系統不應該永遠處於“有刺”的狀態,當危險過后需要恢復正常。
於是對熔斷器的核心操作就是如下幾個功能:
-
如果成功率過低,就打開熔斷器,阻止正常業務
-
隨着時間的流動,熔斷器處於半打開狀態,嘗試性放入一筆請求
熔斷器的核心 API 如下圖:
4. 限流、熔斷、隔離、降級
這四個概念是我們談起微服務會經常談到的概念,這里我們討論的是 Hystrix 的實現方式。
限流
-
這里的限流與 Guava 的 RateLimiter 的限流差異比較大,一個是為了“保護自我”,一個是“保護下游”
-
當對服務進行限流時,超過的流量將直接 Fallback,即熔斷。而 RateLimiter 關心的其實是“流量整形”,將不規整流量在一定速度內規整
熔斷
-
當我的應用無法提供服務時,我要對上游請求熔斷,避免上游把我壓垮
-
當我的下游依賴成功率過低時,我要對下游請求熔斷,避免下游把我拖垮
降級
-
降級與熔斷緊密相關,熔斷后業務如何表現,約定一個快速失敗的 Fallback,即為服務降級
隔離
-
業務之間不可互相影響,不同業務需要有獨立的運行空間
-
最徹底的,可以采用物理隔離,不同的機器部
-
次之,采用進程隔離,一個機器多個 Tomcat
-
次之,請求隔離
-
由於 Hystrix 框架所屬的層級為代碼層,所以實現的是請求隔離,線程池或信號量
5. 源碼分析
先上一個 Hystrix 的業務流程圖
可以看到 Hystrix 的請求都要經過 HystrixCommand 的包裝,其核心邏輯在 AbstractComman.java 類中。
下面的源碼是基於 RxJava 的,看之前最好先了解下 RxJava 的常見用法與邏輯,否則看起來會很迷惑。
簡單的說,RxJava 就是基於回調的函數式編程。通俗的說,就等同於策略模式的匿名內部類實現。
5.1 熔斷器
首先看信號量是如何影響我們請求的:
private Observable applyHystrixSemantics(final AbstractCommand _cmd) {
// 自定義擴展
executionHook.onStart(_cmd);
//判斷熔斷器是否允許請求過來
if (circuitBreaker.attemptExecution()) {
//獲得分組信號量,如果沒有采用信號量分組,返回默認通過的信號量實現
final TryableSemaphore executionSemaphore = getExecutionSemaphore();
final AtomicBoolean semaphoreHasBeenReleased = new AtomicBoolean(false);
//調用終止的回調函數
final Action0 singleSemaphoreRelease = new Action0() {
@Override
public void call() {
if (semaphoreHasBeenReleased.compareAndSet(false, true)) {
executionSemaphore.release();
}
}
};
//調用異常的回調函數
final Action1 markExceptionThrown = new Action1() {
@Override
public void call(Throwable t) {
eventNotifier.markEvent(HystrixEventType.EXCEPTION_THROWN, commandKey);
}
};
//根據信號量嘗試競爭信號量
if (executionSemaphore.tryAcquire()) {
try {
//競爭成功,注冊執行參數
executionResult = executionResult.setInvocationStartTime(System.currentTimeMillis());
return executeCommandAndObserve(_cmd)
.doOnError(markExceptionThrown)
.doOnTerminate(singleSemaphoreRelease)
.doOnUnsubscribe(singleSemaphoreRelease);
} catch (RuntimeException e) {
return Observable.error(e);
}
} else {
//競爭失敗,進入fallback
return handleSemaphoreRejectionViaFallback();
}
} else {
//熔斷器已打開,進入fallback
return handleShortCircuitViaFallback();
}
}
什么時候熔斷器可以放請求進來:
@Override
public boolean attemptExecution() {
//動態屬性判斷,熔斷器是否強制開着,如果強制開着,就不允許請求
if (properties.circuitBreakerForceOpen().get()) {
return false;
}
//如果強制關閉,就允許請求
if (properties.circuitBreakerForceClosed().get()) {
return true;
}
//如果當前是關閉,就允許請求
if (circuitOpened.get() == -1) {
return true;
} else {
//如果當前開着,就看是否已經過了"滑動窗口",過了就可以請求,不過就不可以
if (isAfterSleepWindow()) {
//only the first request after sleep window should execute
//if the executing command succeeds, the status will transition to CLOSED
//if the executing command fails, the status will transition to OPEN
//if the executing command gets unsubscribed, the status will transition to OPEN
//這里使用CAS的方式,只有一個請求能過來,即"半關閉"狀態
if (status.compareAndSet(Status.OPEN, Status.HALF_OPEN)) {
return true;
} else {
return false;
}
} else {
return false;
}
}
}
}
這里有個重要概念就是"滑動窗口":
private boolean isAfterSleepWindow() {
final long circuitOpenTime = circuitOpened.get();
final long currentTime = System.currentTimeMillis();
final long sleepWindowTime = properties.circuitBreakerSleepWindowInMilliseconds().get();
//滑動窗口的判斷就是看看熔斷器打開的時間與現在相比是否超過了配置的滑動窗口
return currentTime > circuitOpenTime + sleepWindowTime;
}
5.2 隔離
如果將業務請求進行隔離?
private Observable executeCommandWithSpecifiedIsolation(final AbstractCommand _cmd) {
//判斷隔離策略是什么,是線程池隔離還是信號量隔離
if (properties.executionIsolationStrategy().get() == ExecutionIsolationStrategy.THREAD) {
// mark that we are executing in a thread (even if we end up being rejected we still were a THREAD execution and not SEMAPHORE)
//線程池隔離的運行邏輯如下
return Observable.defer(new Func0<observable>() {
@Override
public Observable call() {
executionResult = executionResult.setExecutionOccurred();
if (!commandState.compareAndSet(CommandState.OBSERVABLE_CHAIN_CREATED, CommandState.USER_CODE_EXECUTED)) {
return Observable.error(new IllegalStateException("execution attempted while in state : " + commandState.get().name()));
}
//按照配置生成監控數據
metrics.markCommandStart(commandKey, threadPoolKey, ExecutionIsolationStrategy.THREAD);
if (isCommandTimedOut.get() == TimedOutStatus.TIMED_OUT) {
// the command timed out in the wrapping thread so we will return immediately
// and not increment any of the counters below or other such logic
return Observable.error(new RuntimeException("timed out before executing run()"));
}
if (threadState.compareAndSet(ThreadState.NOT_USING_THREAD, ThreadState.STARTED)) {
//we have not been unsubscribed, so should proceed
HystrixCounters.incrementGlobalConcurrentThreads();
threadPool.markThreadExecution();
// store the command that is being run
endCurrentThreadExecutingCommand = Hystrix.startCurrentThreadExecutingCommand(getCommandKey());
executionResult = executionResult.setExecutedInThread();
/**
* If any of these hooks throw an exception, then it appears as if the actual execution threw an error
*/
try {
//執行擴展點邏輯
executionHook.onThreadStart(_cmd);
executionHook.onRunStart(_cmd);
executionHook.onExecutionStart(_cmd);
return getUserExecutionObservable(_cmd);
} catch (Throwable ex) {
return Observable.error(ex);
}
} else {
//command has already been unsubscribed, so return immediately
return Observable.empty();
}
}
//注冊各種場景的回調函數
}).doOnTerminate(new Action0() {
@Override
public void call() {
if (threadState.compareAndSet(ThreadState.STARTED, ThreadState.TERMINAL)) {
handleThreadEnd(_cmd);
}
if (threadState.compareAndSet(ThreadState.NOT_USING_THREAD, ThreadState.TERMINAL)) {
//if it was never started and received terminal, then no need to clean up (I don't think this is possible)
}
//if it was unsubscribed, then other cleanup handled it
}
}).doOnUnsubscribe(new Action0() {
@Override
public void call() {
if (threadState.compareAndSet(ThreadState.STARTED, ThreadState.UNSUBSCRIBED)) {
handleThreadEnd(_cmd);
}
if (threadState.compareAndSet(ThreadState.NOT_USING_THREAD, ThreadState.UNSUBSCRIBED)) {
//if it was never started and was cancelled, then no need to clean up
}
//if it was terminal, then other cleanup handled it
}
//將邏輯放在線程池的調度器上執行,即將上述邏輯放入線程池中
}).subscribeOn(threadPool.getScheduler(new Func0() {
@Override
public Boolean call() {
return properties.executionIsolationThreadInterruptOnTimeout().get() && _cmd.isCommandTimedOut.get() == TimedOutStatus.TIMED_OUT;
}
}));
} else {
//走到這里就是信號量隔離,在當前線程中執行,沒有調度器
return Observable.defer(new Func0<observable>() {
@Override
public Observable call() {
executionResult = executionResult.setExecutionOccurred();
if (!commandState.compareAndSet(CommandState.OBSERVABLE_CHAIN_CREATED, CommandState.USER_CODE_EXECUTED)) {
return Observable.error(new IllegalStateException("execution attempted while in state : " + commandState.get().name()));
}
metrics.markCommandStart(commandKey, threadPoolKey, ExecutionIsolationStrategy.SEMAPHORE);
// semaphore isolated
// store the command that is being run
endCurrentThreadExecutingCommand = Hystrix.startCurrentThreadExecutingCommand(getCommandKey());
try {
executionHook.onRunStart(_cmd);
executionHook.onExecutionStart(_cmd);
return getUserExecutionObservable(_cmd); //the getUserExecutionObservable method already wraps sync exceptions, so this shouldn't throw
} catch (Throwable ex) {
//If the above hooks throw, then use that as the result of the run method
return Observable.error(ex);
}
}
});
}
}
5.3 核心運行流程
private Observable executeCommandAndObserve(final AbstractCommand _cmd) {
final HystrixRequestContext currentRequestContext = HystrixRequestContext.getContextForCurrentThread();
//執行發生的回調
final Action1 markEmits = new Action1() {
@Override
public void call(R r) {
if (shouldOutputOnNextEvents()) {
executionResult = executionResult.addEvent(HystrixEventType.EMIT);
eventNotifier.markEvent(HystrixEventType.EMIT, commandKey);
}
if (commandIsScalar()) {
long latency = System.currentTimeMillis() - executionResult.getStartTimestamp();
eventNotifier.markEvent(HystrixEventType.SUCCESS, commandKey);
executionResult = executionResult.addEvent((int) latency, HystrixEventType.SUCCESS);
eventNotifier.markCommandExecution(getCommandKey(), properties.executionIsolationStrategy().get(), (int) latency, executionResult.getOrderedList());
circuitBreaker.markSuccess();
}
}
};
//執行成功的回調,標記下狀態,熔斷器根據這個狀態維護熔斷邏輯
final Action0 markOnCompleted = new Action0() {
@Override
public void call() {
if (!commandIsScalar()) {
long latency = System.currentTimeMillis() - executionResult.getStartTimestamp();
eventNotifier.markEvent(HystrixEventType.SUCCESS, commandKey);
executionResult = executionResult.addEvent((int) latency, HystrixEventType.SUCCESS);
eventNotifier.markCommandExecution(getCommandKey(), properties.executionIsolationStrategy().get(), (int) latency, executionResult.getOrderedList());
circuitBreaker.markSuccess();
}
}
};
//執行失敗的回調
final Func1<throwable, observable<r="">> handleFallback = new Func1<throwable, observable<r="">>() {
@Override
public Observable call(Throwable t) {
circuitBreaker.markNonSuccess();
Exception e = getExceptionFromThrowable(t);
executionResult = executionResult.setExecutionException(e);
//各種回調進行各種fallback
if (e instanceof RejectedExecutionException) {
return handleThreadPoolRejectionViaFallback(e);
} else if (t instanceof HystrixTimeoutException) {
return handleTimeoutViaFallback();
} else if (t instanceof HystrixBadRequestException) {
return handleBadRequestByEmittingError(e);
} else {
/*
* Treat HystrixBadRequestException from ExecutionHook like a plain HystrixBadRequestException.
*/
if (e instanceof HystrixBadRequestException) {
eventNotifier.markEvent(HystrixEventType.BAD_REQUEST, commandKey);
return Observable.error(e);
}
return handleFailureViaFallback(e);
}
}
};
final Action1<notification<? <span="" class="hljs-keyword">super R>> setRequestContext = new Action1<notification<? <span="" class="hljs-keyword">super R>>() {
@Override
public void call(Notificationsuper R> rNotification) {
setRequestContextIfNeeded(currentRequestContext);
}
};
Observable execution;
if (properties.executionTimeoutEnabled().get()) {
execution = executeCommandWithSpecifiedIsolation(_cmd)
.lift(new HystrixObservableTimeoutOperator(_cmd));
} else {
execution = executeCommandWithSpecifiedIsolation(_cmd);
}
//注冊各種回調函數
return execution.doOnNext(markEmits)
.doOnCompleted(markOnCompleted)
.onErrorResumeNext(handleFallback)
.doOnEach(setRequestContext);
}
6. 小結
-
Hystrix 是基於單機應用的熔斷限流框架
-
根據熔斷器的滑動窗口判斷當前請求是否可以執行
-
線程競爭實現“半關閉”狀態,拿一個請求試試是否可以關閉熔斷器
-
線程池隔離將請求丟到線程池中運行,限流依靠線程池拒絕策略
-
信號量隔離在當前線程中運行,限流依靠並發請求數
-
當信號量競爭失敗/線程池隊列滿,就進入限流模式,執行 Fallback
-
當熔斷器開啟,就熔斷請求,執行 Fallback
-
整個框架采用的 RxJava 的編程模式,回調函數滿天飛