說明
原創不易,如若轉載 請標明來源!
歡迎關注本人微信公眾號:壹枝花算不算浪漫
更多內容也可查看本人博客:一枝花算不算浪漫
前言
前情回顧
上一講我們講解了Hystrix在配合feign的過程中,一個正常的請求邏輯該怎樣處理,這里涉及到線程池的創建、HystrixCommand的執行等邏輯。
如圖所示:
高清大圖:https://www.processon.com/view/link/5e1c128ce4b0169fb51ce77e
本講目錄
這一講開始講解Hystrix的看家本領:熔斷+降級。
熔斷功能是Hystrix最核心的組件,當然也是最復雜的一塊。
源碼中細節太多,本講我們主要還是專注於它的設計思想去學習。
目錄如下:
- HystrixCircuitBreaker初始化過程
- Hystrix熔斷機制(CLOSED/OPEN/HALF_OPEN)
- fallback降級機制
源碼分析
HystrixCircuitBreaker初始化過程
我們還是會以AbstractCommand
為突破口,這里繼續看它的構造函數,其中里面有初始化熔斷器initCircuitBreaker()
的過程,具體代碼如下:
abstract class AbstractCommand<R> implements HystrixInvokableInfo<R>, HystrixObservable<R> {
private static HystrixCircuitBreaker initCircuitBreaker(boolean enabled, HystrixCircuitBreaker fromConstructor,
HystrixCommandGroupKey groupKey, HystrixCommandKey commandKey,
HystrixCommandProperties properties, HystrixCommandMetrics metrics) {
if (enabled) {
if (fromConstructor == null) {
// 構建默認的HystrixCircuitBreaker
return HystrixCircuitBreaker.Factory.getInstance(commandKey, groupKey, properties, metrics);
} else {
return fromConstructor;
}
} else {
return new NoOpCircuitBreaker();
}
}
}
public interface HystrixCircuitBreaker {
public static HystrixCircuitBreaker getInstance(HystrixCommandKey key, HystrixCommandGroupKey group, HystrixCommandProperties properties, HystrixCommandMetrics metrics) {
// circuitBreakersByCommand是一個map,key為commandKey,也就是FeignClient中定義的方法名
// 類似於ServiceAFeignClient.sayHello(String)
HystrixCircuitBreaker previouslyCached = circuitBreakersByCommand.get(key.name());
if (previouslyCached != null) {
return previouslyCached;
}
// 每個commandKey都對應着自己的熔斷器,如果沒有則會構造一個HystrixCircuitBreaker
HystrixCircuitBreaker cbForCommand = circuitBreakersByCommand.putIfAbsent(key.name(), new HystrixCircuitBreakerImpl(key, group, properties, metrics));
if (cbForCommand == null) {
return circuitBreakersByCommand.get(key.name());
} else {
return cbForCommand;
}
}
class HystrixCircuitBreakerImpl implements HystrixCircuitBreaker {
private final HystrixCommandProperties properties;
private final HystrixCommandMetrics metrics;
private Subscription subscribeToStream() {
// 對HealthCounts進行訂閱
// HealthCounts中包含 總請求次數、總失敗次數、失敗率
// HealthCounts 統計數據有變化則會回調到這里來
return metrics.getHealthCountsStream()
.observe()
.subscribe(new Subscriber<HealthCounts>() {
@Override
public void onCompleted() {
}
@Override
public void onError(Throwable e) {
}
// 判斷是否要降級的核心邏輯
@Override
public void onNext(HealthCounts hc) {
// 一個時間窗口(默認10s鍾)總請求次數是否大於circuitBreakerRequestVolumeThreshold 默認為20s
if (hc.getTotalRequests() < properties.circuitBreakerRequestVolumeThreshold().get()) {
} else {
// 錯誤率(總錯誤次數/總請求次數)小於circuitBreakerErrorThresholdPercentage(默認50%)
if (hc.getErrorPercentage() < properties.circuitBreakerErrorThresholdPercentage().get()) {
} else {
// 反之,熔斷狀態將從CLOSED變為OPEN,且circuitOpened==>當前時間戳
if (status.compareAndSet(Status.CLOSED, Status.OPEN)) {
circuitOpened.set(System.currentTimeMillis());
}
}
}
}
});
}
}
}
上面就是熔斷器初始化過程,這里面做了幾件事:
-
每個commandKey都有自己的一個熔斷器
commandKey表現形式為:ServiceAFeignClient#sayHello(String) -
如果commandKey不存在熔斷器,則構建默認熔斷器
默認熔斷器會對HealthCounts進行訂閱。HealthCounts中包含時間窗口內(默認10s鍾)請求的總次數、失敗次數、失敗率 -
HealthCounts中統計數據有變化則會回調subscribe.onNext()方法進行熔斷開啟判斷
-
熔斷開啟條件:
- 時間窗口內(默認10s鍾)總請求次數大於20次
- 時間窗口內(默認10s鍾)失敗率大於50%
- 滿足上述兩個條件后熔斷器狀態從CLOSED變成OPEN
熔斷器在第一次請求時會初始化AbtractCommand
,同時也會創建對應commandKey的熔斷器 ,熔斷器默認都是關閉的(可配置為強制開啟),只有滿足觸發條件才會被開啟。下面就一起來看下熔斷、半開等狀態是如何觸發的吧。
Hystrix熔斷機制(CLOSED/OPEN/HALF_OPEN)
這里我們以AbstractCommand.applyHystrixSemantics()
為入口,一步步往下探究,這個方法在上一講已經提到過,一個正常的Feign請求都會調用此方法。
abstract class AbstractCommand<R> implements HystrixInvokableInfo<R>, HystrixObservable<R> {
private Observable<R> applyHystrixSemantics(final AbstractCommand<R> _cmd) {
// 如果熔斷了,這這里返回為false
// 這里也包含HALF_OPEN邏輯
if (circuitBreaker.attemptExecution()) {
final Action1<Throwable> markExceptionThrown = new Action1<Throwable>() {
@Override
public void call(Throwable t) {
eventNotifier.markEvent(HystrixEventType.EXCEPTION_THROWN, commandKey);
}
};
if (executionSemaphore.tryAcquire()) {
try {
executionResult = executionResult.setInvocationStartTime(System.currentTimeMillis());
return executeCommandAndObserve(_cmd)
.doOnError(markExceptionThrown)
.doOnTerminate(singleSemaphoreRelease)
.doOnUnsubscribe(singleSemaphoreRelease);
} catch (RuntimeException e) {
return Observable.error(e);
}
} else {
return handleSemaphoreRejectionViaFallback();
}
} else {
return handleShortCircuitViaFallback();
}
}
}
circuitBreaker.attemptExecution()
這個邏輯就是判斷,如果熔斷了,那么返回false。而且這里還包含HALF_OPEN
的邏輯,我們先看如何觸發熔斷的,這個后面再接着看。
接着往下跟進executeCommandAndObserve()
方法:
abstract class AbstractCommand<R> implements HystrixInvokableInfo<R>, HystrixObservable<R> {
private Observable<R> executeCommandAndObserve(final AbstractCommand<R> _cmd) {
final HystrixRequestContext currentRequestContext = HystrixRequestContext.getContextForCurrentThread();
// 省略部分代碼...
// 運行過程中,出現異常等都會進入此回調函數
final Func1<Throwable, Observable<R>> handleFallback = new Func1<Throwable, Observable<R>>() {
@Override
public Observable<R> call(Throwable t) {
circuitBreaker.markNonSuccess();
Exception e = getExceptionFromThrowable(t);
executionResult = executionResult.setExecutionException(e);
if (e instanceof RejectedExecutionException) {
return handleThreadPoolRejectionViaFallback(e);
} else if (t instanceof HystrixTimeoutException) {
return handleTimeoutViaFallback();
} else if (t instanceof HystrixBadRequestException) {
return handleBadRequestByEmittingError(e);
} else {
/*
* Treat HystrixBadRequestException from ExecutionHook like a plain HystrixBadRequestException.
*/
if (e instanceof HystrixBadRequestException) {
eventNotifier.markEvent(HystrixEventType.BAD_REQUEST, commandKey);
return Observable.error(e);
}
return handleFailureViaFallback(e);
}
}
};
Observable<R> execution;
if (properties.executionTimeoutEnabled().get()) {
// 這里創建一個 HystrixObservableTimeoutOperator
execution = executeCommandWithSpecifiedIsolation(_cmd)
.lift(new HystrixObservableTimeoutOperator<R>(_cmd));
} else {
execution = executeCommandWithSpecifiedIsolation(_cmd);
}
return execution.doOnNext(markEmits)
.doOnCompleted(markOnCompleted)
.onErrorResumeNext(handleFallback)
.doOnEach(setRequestContext);
}
}
當我們服務調用中出現異常都會進入handleFallback()
中,里面的方法我們就不繼續跟入了,猜測里面會有更新HealthCounts
中的屬性,然后觸發 HystrixCircuitBreaker
中的onNext()
方法,當滿足熔斷條件時 則會將熔斷狀態從CLOSED
變成OPEN
。
這里我們會跟進下HystrixObservableTimeoutOperator
代碼,這個是對我們執行過程中判斷是否超時。
上面代碼中,執行executeCommandWithSpecifiedIsolation()
方法時也會創建一個超時監視器:
private static class HystrixObservableTimeoutOperator<R> implements Operator<R, R> {
final AbstractCommand<R> originalCommand;
public HystrixObservableTimeoutOperator(final AbstractCommand<R> originalCommand) {
this.originalCommand = originalCommand;
}
@Override
public Subscriber<? super R> call(final Subscriber<? super R> child) {
TimerListener listener = new TimerListener() {
@Override
public void tick() {
// 判斷command的timeOut狀態,如果是未執行狀態,則更新為已超時
if (originalCommand.isCommandTimedOut.compareAndSet(TimedOutStatus.NOT_EXECUTED, TimedOutStatus.TIMED_OUT)) {
originalCommand.eventNotifier.markEvent(HystrixEventType.TIMEOUT, originalCommand.commandKey);
s.unsubscribe();
final HystrixContextRunnable timeoutRunnable = new HystrixContextRunnable(originalCommand.concurrencyStrategy, hystrixRequestContext, new Runnable() {
@Override
public void run() {
child.onError(new HystrixTimeoutException());
}
});
timeoutRunnable.run();
}
}
@Override
public int getIntervalTimeInMilliseconds() {
return originalCommand.properties.executionTimeoutInMilliseconds().get();
}
};
final Reference<TimerListener> tl = HystrixTimer.getInstance().addTimerListener(listener);
originalCommand.timeoutTimer.set(tl);
// 省略部分代碼...
s.add(parent);
return parent;
}
}
public class HystrixTimer {
public Reference<TimerListener> addTimerListener(final TimerListener listener) {
startThreadIfNeeded();
Runnable r = new Runnable() {
@Override
public void run() {
try {
// 執行上面的tick方法,改變command timeout狀態
listener.tick();
} catch (Exception e) {
logger.error("Failed while ticking TimerListener", e);
}
}
};
// 執行調度任務,延遲加載,延遲時間和調度時間默認都為1s鍾
// 這里使用線程池,coreSize=cpu核心數 maxSize為Integer.Max
ScheduledFuture<?> f = executor.get().getThreadPool().scheduleAtFixedRate(r, listener.getIntervalTimeInMilliseconds(), listener.getIntervalTimeInMilliseconds(), TimeUnit.MILLISECONDS);
return new TimerReference(listener, f);
}
}
這里面核心業務是起一個調度任務,默認每秒鍾執行一次,然后調用tick()
方法,如果當前command狀態還是NOT_EXECUTED
狀態,那么將command狀態改為TIMED_OUT
。此時會進入到之前的handleFallback
回調函數中,這里又會更新HealthCounts
中的數據,對應的觸發之前熔斷的判斷條件:
protected HystrixCircuitBreakerImpl(HystrixCommandKey key, HystrixCommandGroupKey commandGroup, final HystrixCommandProperties properties, HystrixCommandMetrics metrics) {
this.properties = properties;
this.metrics = metrics;
//On a timer, this will set the circuit between OPEN/CLOSED as command executions occur
Subscription s = subscribeToStream();
activeSubscription.set(s);
}
private Subscription subscribeToStream() {
//這里會在每次執行onNext()事件的時候來評估是否需要打開或者關閉斷路器
return metrics.getHealthCountsStream()
.observe()
.subscribe(new Subscriber<HealthCounts>() {
@Override
public void onCompleted() {
}
@Override
public void onError(Throwable e) {
}
@Override
public void onNext(HealthCounts hc) {
//首先校驗的時在時間窗范圍內的請求次數,如果低於閾值(默認是20),不做處理,如果高於閾值,則去判斷接口請求的錯誤率
if (hc.getTotalRequests() < properties.circuitBreakerRequestVolumeThreshold().get()) { // 如果沒有超過統計閾值的最低窗口值,就沒有必要去改變斷路器的狀態
// 當前如果斷路器是關閉的,那么就保持關閉狀態無需更改;
// 如果斷路器狀態為半開狀態,需要等待直到有成功的命令執行;
// 如果斷路器是打開狀態,需要等待休眠窗口過期。
} else {
//判斷接口請求的錯誤率(閾值默認是50),如果高於這個值,則斷路器打開
if (hc.getErrorPercentage() < properties.circuitBreakerErrorThresholdPercentage().get()) {
// 如果當前請求的錯誤率小於斷路器設置的容錯率百分比,也不會攔截請求
} else {
// 如果當前錯誤率太高則打開斷路器
if (status.compareAndSet(Status.CLOSED, Status.OPEN)) {
circuitOpened.set(System.currentTimeMillis());
}
}
}
}
});
}
如果符合熔斷條件,那么command熔斷狀態就會變為OPEN
,此時熔斷器打開。
如果我們command執行成功,那么就會清理掉這個timeout timer schedule任務。
abstract class AbstractCommand<R> implements HystrixInvokableInfo<R>, HystrixObservable<R> {
private void handleCommandEnd(boolean commandExecutionStarted) {
Reference<TimerListener> tl = timeoutTimer.get();
// 如果timeOutTimer不為空,這里則clear一下
// clear會關閉啟動的調度任務
if (tl != null) {
tl.clear();
}
long userThreadLatency = System.currentTimeMillis() - commandStartTimestamp;
executionResult = executionResult.markUserThreadCompletion((int) userThreadLatency);
if (executionResultAtTimeOfCancellation == null) {
// metrics統計數據
metrics.markCommandDone(executionResult, commandKey, threadPoolKey, commandExecutionStarted);
} else {
metrics.markCommandDone(executionResultAtTimeOfCancellation, commandKey, threadPoolKey, commandExecutionStarted);
}
if (endCurrentThreadExecutingCommand != null) {
endCurrentThreadExecutingCommand.call();
}
}
}
如上所屬,我們已經知道了熔斷開啟的觸發時機,那么如果一個commandKey開啟了熔斷,下次的請求是該如何直接降級呢?我們來看下代碼:
abstract class AbstractCommand<R> implements HystrixInvokableInfo<R>, HystrixObservable<R> {
private Observable<R> applyHystrixSemantics(final AbstractCommand<R> _cmd) {
// 這個if條件就代表是否開啟熔斷
if (circuitBreaker.attemptExecution()) {
// 執行業務邏輯代碼...
} else {
return handleShortCircuitViaFallback();
}
}
}
class HystrixCircuitBreakerImpl implements HystrixCircuitBreaker {
public boolean attemptExecution() {
// 如果熔斷配置的為強制開啟,那么直接返回false執行熔斷邏輯
if (properties.circuitBreakerForceOpen().get()) {
return false;
}
// 如果熔斷配置為強制關閉,那么永遠不走熔斷邏輯
if (properties.circuitBreakerForceClosed().get()) {
return true;
}
// 熔斷開啟時 circuitOpened設置為當前時間戳
if (circuitOpened.get() == -1) {
return true;
} else {
// 如果當前時間距離熔斷小於5s鍾,那么將熔斷狀態從OPEN改為HALF_OPEN
if (isAfterSleepWindow()) {
if (status.compareAndSet(Status.OPEN, Status.HALF_OPEN)) {
//only the first request after sleep window should execute
return true;
} else {
return false;
}
} else {
return false;
}
}
}
}
private boolean isAfterSleepWindow() {
final long circuitOpenTime = circuitOpened.get();
final long currentTime = System.currentTimeMillis();
// circuitBreakerSleepWindowInMilliseconds 默認為5s鍾
final long sleepWindowTime = properties.circuitBreakerSleepWindowInMilliseconds().get();
// 當前熔斷距離熔斷是否超過5s鍾
return currentTime > circuitOpenTime + sleepWindowTime;
}
}
我們可以看到,在applyHystrixSemantics()
這個核心的方法中,先判斷是否熔斷,如果熔斷則直接走fallback邏輯。
attemptExecution()
判斷條件中還涉及到HALF_OPEN
的邏輯,如果熔斷開啟,下一次請求的時候,會判斷當前時間距離上一次時間是否超過了5s鍾,如果沒有超過,則會將熔斷狀態從OPEN
變為HALF_OPEN
,此時會放一個請求按照正常邏輯去執行:
- 執行失敗,熔斷狀態又會從
HALF_OPEN
變成OPEN
。 - 執行成功,熔斷狀態從
HALF_OPEN
變成CLOSED
,並清除熔斷相關設置
執行成功后代碼:
class HystrixCircuitBreakerImpl implements HystrixCircuitBreaker {
public void markSuccess() {
if (status.compareAndSet(Status.HALF_OPEN, Status.CLOSED)) {
//This thread wins the race to close the circuit - it resets the stream to start it over from 0
metrics.resetStream();
Subscription previousSubscription = activeSubscription.get();
if (previousSubscription != null) {
previousSubscription.unsubscribe();
}
Subscription newSubscription = subscribeToStream();
activeSubscription.set(newSubscription);
circuitOpened.set(-1L);
}
}
}
上面對整個熔斷的狀態:CLOSED、OPEN、HALF_OPEN梳理的已經很清楚了,下面看看降級是該如何處理的吧。
fallback降級機制
上面已經講解了Hystrix 熔斷開啟的機制等內容,這里主要是說如果一個請求失敗(線程池拒絕、超時、badRequest等),那么Hystrix是如何執行降級的呢?
還是回到我們最初的代碼 HystrixInvocationHandler
類中,看看其invoke()
方法中的getFallback
回調函數:
protected Object getFallback() {
if (fallbackFactory == null) {
return super.getFallback();
}
try {
// 通過我們配置好的fallbackFactory找到對應的FeignClient,這里是獲取ServiceAFeignClient
Object fallback = fallbackFactory.create(getExecutionException());
// fallbackMap中key為ServiceAFeignClient.sayHello(Integer)
// 獲取具體的降級method方法
Object result = fallbackMethodMap.get(method).invoke(fallback, args);
if (isReturnsHystrixCommand(method)) {
return ((HystrixCommand) result).execute();
} else if (isReturnsObservable(method)) {
// Create a cold Observable
return ((Observable) result).toBlocking().first();
} else if (isReturnsSingle(method)) {
// Create a cold Observable as a Single
return ((Single) result).toObservable().toBlocking().first();
} else if (isReturnsCompletable(method)) {
((Completable) result).await();
return null;
} else {
return result;
}
} catch (IllegalAccessException e) {
// shouldn't happen as method is public due to being an interface
throw new AssertionError(e);
} catch (InvocationTargetException e) {
// Exceptions on fallback are tossed by Hystrix
throw new AssertionError(e.getCause());
}
}
};
這里很簡單,其實就是先獲取到我們自己在FallbackFactory中配置的的降級方法,然后執行降級邏輯。
總結
這一講核心邏輯主要是Hystrix熔斷狀態的變化,主要是CLOSED、OPEN、HALF_OPEN幾種狀態觸發的時間,互相轉變的流程,以及執行降級邏輯的原理。
我們仍然是用一個流程圖來總結一下:
高清大圖鏈接:
https://www.processon.com/view/link/5e1ee0afe4b0c62462aae684
(點擊原文可以直接查看大圖哦😄)
申明
本文章首發自本人博客:https://www.cnblogs.com/wang-meng 和公眾號:壹枝花算不算浪漫,如若轉載請標明來源!
感興趣的小伙伴可關注個人公眾號:壹枝花算不算浪漫