【一起學源碼-微服務】Hystrix 源碼三:Hystrix核心流程:Hystix降級、熔斷等原理剖析


說明

原創不易,如若轉載 請標明來源!

歡迎關注本人微信公眾號:壹枝花算不算浪漫
更多內容也可查看本人博客:一枝花算不算浪漫

前言

前情回顧

上一講我們講解了Hystrix在配合feign的過程中,一個正常的請求邏輯該怎樣處理,這里涉及到線程池的創建、HystrixCommand的執行等邏輯。

如圖所示:
Hystrix線程池創建過程及線程調用原理.jpg

高清大圖:https://www.processon.com/view/link/5e1c128ce4b0169fb51ce77e

本講目錄

這一講開始講解Hystrix的看家本領:熔斷+降級。
熔斷功能是Hystrix最核心的組件,當然也是最復雜的一塊。
源碼中細節太多,本講我們主要還是專注於它的設計思想去學習。

目錄如下:

  1. HystrixCircuitBreaker初始化過程
  2. Hystrix熔斷機制(CLOSED/OPEN/HALF_OPEN)
  3. fallback降級機制

源碼分析

HystrixCircuitBreaker初始化過程

我們還是會以AbstractCommand為突破口,這里繼續看它的構造函數,其中里面有初始化熔斷器initCircuitBreaker()的過程,具體代碼如下:

abstract class AbstractCommand<R> implements HystrixInvokableInfo<R>, HystrixObservable<R> {

	private static HystrixCircuitBreaker initCircuitBreaker(boolean enabled, HystrixCircuitBreaker fromConstructor,
                                                            HystrixCommandGroupKey groupKey, HystrixCommandKey commandKey,
                                                            HystrixCommandProperties properties, HystrixCommandMetrics metrics) {
        if (enabled) {
            if (fromConstructor == null) {
                // 構建默認的HystrixCircuitBreaker
                return HystrixCircuitBreaker.Factory.getInstance(commandKey, groupKey, properties, metrics);
            } else {
                return fromConstructor;
            }
        } else {
            return new NoOpCircuitBreaker();
        }
    }
}


public interface HystrixCircuitBreaker {
	public static HystrixCircuitBreaker getInstance(HystrixCommandKey key, HystrixCommandGroupKey group, HystrixCommandProperties properties, HystrixCommandMetrics metrics) {
	    // circuitBreakersByCommand是一個map,key為commandKey,也就是FeignClient中定義的方法名
	    // 類似於ServiceAFeignClient.sayHello(String)
	    HystrixCircuitBreaker previouslyCached = circuitBreakersByCommand.get(key.name());
	    if (previouslyCached != null) {
	        return previouslyCached;
	    }

	    // 每個commandKey都對應着自己的熔斷器,如果沒有則會構造一個HystrixCircuitBreaker
	    HystrixCircuitBreaker cbForCommand = circuitBreakersByCommand.putIfAbsent(key.name(), new HystrixCircuitBreakerImpl(key, group, properties, metrics));
	    if (cbForCommand == null) {
	        return circuitBreakersByCommand.get(key.name());
	    } else {
	        return cbForCommand;
	    }
	}

	class HystrixCircuitBreakerImpl implements HystrixCircuitBreaker {
		private final HystrixCommandProperties properties;
        private final HystrixCommandMetrics metrics;

        private Subscription subscribeToStream() {
        	// 對HealthCounts進行訂閱
        	// HealthCounts中包含 總請求次數、總失敗次數、失敗率
        	// HealthCounts 統計數據有變化則會回調到這里來
            return metrics.getHealthCountsStream()
		            .observe()
		            .subscribe(new Subscriber<HealthCounts>() {
		                @Override
		                public void onCompleted() {

		                }

		                @Override
		                public void onError(Throwable e) {

		                }

		                // 判斷是否要降級的核心邏輯
		                @Override
		                public void onNext(HealthCounts hc) {
		                	// 一個時間窗口(默認10s鍾)總請求次數是否大於circuitBreakerRequestVolumeThreshold 默認為20s
		                    if (hc.getTotalRequests() < properties.circuitBreakerRequestVolumeThreshold().get()) {
		                        
		                    } else {
		                    	// 錯誤率(總錯誤次數/總請求次數)小於circuitBreakerErrorThresholdPercentage(默認50%)
		                        if (hc.getErrorPercentage() < properties.circuitBreakerErrorThresholdPercentage().get()) {

		                        } else {
		                            // 反之,熔斷狀態將從CLOSED變為OPEN,且circuitOpened==>當前時間戳
		                            if (status.compareAndSet(Status.CLOSED, Status.OPEN)) {
		                                circuitOpened.set(System.currentTimeMillis());
		                            }
		                        }
		                    }
		                }
		            });
        }
	}
}

上面就是熔斷器初始化過程,這里面做了幾件事:

  1. 每個commandKey都有自己的一個熔斷器
    commandKey表現形式為:ServiceAFeignClient#sayHello(String)

  2. 如果commandKey不存在熔斷器,則構建默認熔斷器
    默認熔斷器會對HealthCounts進行訂閱。HealthCounts中包含時間窗口內(默認10s鍾)請求的總次數、失敗次數、失敗率

  3. HealthCounts中統計數據有變化則會回調subscribe.onNext()方法進行熔斷開啟判斷

  4. 熔斷開啟條件:

  • 時間窗口內(默認10s鍾)總請求次數大於20次
  • 時間窗口內(默認10s鍾)失敗率大於50%
  • 滿足上述兩個條件后熔斷器狀態從CLOSED變成OPEN

熔斷器在第一次請求時會初始化AbtractCommand,同時也會創建對應commandKey的熔斷器 ,熔斷器默認都是關閉的(可配置為強制開啟),只有滿足觸發條件才會被開啟。下面就一起來看下熔斷、半開等狀態是如何觸發的吧。

Hystrix熔斷機制(CLOSED/OPEN/HALF_OPEN)

這里我們以AbstractCommand.applyHystrixSemantics() 為入口,一步步往下探究,這個方法在上一講已經提到過,一個正常的Feign請求都會調用此方法。

abstract class AbstractCommand<R> implements HystrixInvokableInfo<R>, HystrixObservable<R> {
    private Observable<R> applyHystrixSemantics(final AbstractCommand<R> _cmd) {
        // 如果熔斷了,這這里返回為false
        // 這里也包含HALF_OPEN邏輯
        if (circuitBreaker.attemptExecution()) {
            final Action1<Throwable> markExceptionThrown = new Action1<Throwable>() {
                @Override
                public void call(Throwable t) {
                    eventNotifier.markEvent(HystrixEventType.EXCEPTION_THROWN, commandKey);
                }
            };

            if (executionSemaphore.tryAcquire()) {
                try {
                    executionResult = executionResult.setInvocationStartTime(System.currentTimeMillis());
                    return executeCommandAndObserve(_cmd)
                            .doOnError(markExceptionThrown)
                            .doOnTerminate(singleSemaphoreRelease)
                            .doOnUnsubscribe(singleSemaphoreRelease);
                } catch (RuntimeException e) {
                    return Observable.error(e);
                }
            } else {
                return handleSemaphoreRejectionViaFallback();
            }
        } else {
            return handleShortCircuitViaFallback();
        }
    }
}

circuitBreaker.attemptExecution() 這個邏輯就是判斷,如果熔斷了,那么返回false。而且這里還包含HALF_OPEN的邏輯,我們先看如何觸發熔斷的,這個后面再接着看。

接着往下跟進executeCommandAndObserve() 方法:

abstract class AbstractCommand<R> implements HystrixInvokableInfo<R>, HystrixObservable<R> {
    private Observable<R> executeCommandAndObserve(final AbstractCommand<R> _cmd) {
        final HystrixRequestContext currentRequestContext = HystrixRequestContext.getContextForCurrentThread();

        // 省略部分代碼...

        // 運行過程中,出現異常等都會進入此回調函數
        final Func1<Throwable, Observable<R>> handleFallback = new Func1<Throwable, Observable<R>>() {
            @Override
            public Observable<R> call(Throwable t) {
                circuitBreaker.markNonSuccess();
                Exception e = getExceptionFromThrowable(t);
                executionResult = executionResult.setExecutionException(e);
                if (e instanceof RejectedExecutionException) {
                    return handleThreadPoolRejectionViaFallback(e);
                } else if (t instanceof HystrixTimeoutException) {
                    return handleTimeoutViaFallback();
                } else if (t instanceof HystrixBadRequestException) {
                    return handleBadRequestByEmittingError(e);
                } else {
                    /*
                     * Treat HystrixBadRequestException from ExecutionHook like a plain HystrixBadRequestException.
                     */
                    if (e instanceof HystrixBadRequestException) {
                        eventNotifier.markEvent(HystrixEventType.BAD_REQUEST, commandKey);
                        return Observable.error(e);
                    }

                    return handleFailureViaFallback(e);
                }
            }
        };

        Observable<R> execution;
        if (properties.executionTimeoutEnabled().get()) {
            // 這里創建一個 HystrixObservableTimeoutOperator
            execution = executeCommandWithSpecifiedIsolation(_cmd)
                    .lift(new HystrixObservableTimeoutOperator<R>(_cmd));
        } else {
            execution = executeCommandWithSpecifiedIsolation(_cmd);
        }

        return execution.doOnNext(markEmits)
                .doOnCompleted(markOnCompleted)
                .onErrorResumeNext(handleFallback)
                .doOnEach(setRequestContext);
    }
}

當我們服務調用中出現異常都會進入handleFallback()中,里面的方法我們就不繼續跟入了,猜測里面會有更新HealthCounts中的屬性,然后觸發 HystrixCircuitBreaker中的onNext()方法,當滿足熔斷條件時 則會將熔斷狀態從CLOSED變成OPEN

這里我們會跟進下HystrixObservableTimeoutOperator 代碼,這個是對我們執行過程中判斷是否超時。
上面代碼中,執行executeCommandWithSpecifiedIsolation() 方法時也會創建一個超時監視器:

private static class HystrixObservableTimeoutOperator<R> implements Operator<R, R> {

    final AbstractCommand<R> originalCommand;

    public HystrixObservableTimeoutOperator(final AbstractCommand<R> originalCommand) {
        this.originalCommand = originalCommand;
    }

    @Override
    public Subscriber<? super R> call(final Subscriber<? super R> child) {
        TimerListener listener = new TimerListener() {

            @Override
            public void tick() {
                // 判斷command的timeOut狀態,如果是未執行狀態,則更新為已超時
                if (originalCommand.isCommandTimedOut.compareAndSet(TimedOutStatus.NOT_EXECUTED, TimedOutStatus.TIMED_OUT)) {
                    originalCommand.eventNotifier.markEvent(HystrixEventType.TIMEOUT, originalCommand.commandKey);
                    s.unsubscribe();

                    final HystrixContextRunnable timeoutRunnable = new HystrixContextRunnable(originalCommand.concurrencyStrategy, hystrixRequestContext, new Runnable() {
                        @Override
                        public void run() {
                            child.onError(new HystrixTimeoutException());
                        }
                    });


                    timeoutRunnable.run();
                }
            }

            @Override
            public int getIntervalTimeInMilliseconds() {
                return originalCommand.properties.executionTimeoutInMilliseconds().get();
            }
        };

        final Reference<TimerListener> tl = HystrixTimer.getInstance().addTimerListener(listener);
        originalCommand.timeoutTimer.set(tl);

        // 省略部分代碼...
        s.add(parent);

        return parent;
    }
}

public class HystrixTimer {
    public Reference<TimerListener> addTimerListener(final TimerListener listener) {
        startThreadIfNeeded();
        Runnable r = new Runnable() {
            @Override
            public void run() {
                try {
                    // 執行上面的tick方法,改變command timeout狀態
                    listener.tick();
                } catch (Exception e) {
                    logger.error("Failed while ticking TimerListener", e);
                }
            }
        };

        // 執行調度任務,延遲加載,延遲時間和調度時間默認都為1s鍾
        // 這里使用線程池,coreSize=cpu核心數 maxSize為Integer.Max
        ScheduledFuture<?> f = executor.get().getThreadPool().scheduleAtFixedRate(r, listener.getIntervalTimeInMilliseconds(), listener.getIntervalTimeInMilliseconds(), TimeUnit.MILLISECONDS);
        return new TimerReference(listener, f);
    }
}

這里面核心業務是起一個調度任務,默認每秒鍾執行一次,然后調用tick()方法,如果當前command狀態還是NOT_EXECUTED狀態,那么將command狀態改為TIMED_OUT 。此時會進入到之前的handleFallback回調函數中,這里又會更新HealthCounts中的數據,對應的觸發之前熔斷的判斷條件:

protected HystrixCircuitBreakerImpl(HystrixCommandKey key, HystrixCommandGroupKey commandGroup, final HystrixCommandProperties properties, HystrixCommandMetrics metrics) {
  this.properties = properties;
  this.metrics = metrics;

  //On a timer, this will set the circuit between OPEN/CLOSED as command executions occur
  Subscription s = subscribeToStream();
  activeSubscription.set(s);
}

private Subscription subscribeToStream() {
  //這里會在每次執行onNext()事件的時候來評估是否需要打開或者關閉斷路器
  return metrics.getHealthCountsStream()
    .observe()
    .subscribe(new Subscriber<HealthCounts>() {
      @Override
      public void onCompleted() {

      }

      @Override
      public void onError(Throwable e) {

      }

      @Override
      public void onNext(HealthCounts hc) {
        //首先校驗的時在時間窗范圍內的請求次數,如果低於閾值(默認是20),不做處理,如果高於閾值,則去判斷接口請求的錯誤率
        if (hc.getTotalRequests() < properties.circuitBreakerRequestVolumeThreshold().get()) {           // 如果沒有超過統計閾值的最低窗口值,就沒有必要去改變斷路器的狀態
          // 當前如果斷路器是關閉的,那么就保持關閉狀態無需更改;
          // 如果斷路器狀態為半開狀態,需要等待直到有成功的命令執行;
          // 如果斷路器是打開狀態,需要等待休眠窗口過期。
        } else {
          //判斷接口請求的錯誤率(閾值默認是50),如果高於這個值,則斷路器打開
          if (hc.getErrorPercentage() < properties.circuitBreakerErrorThresholdPercentage().get()) {
    
            // 如果當前請求的錯誤率小於斷路器設置的容錯率百分比,也不會攔截請求
          } else {
            // 如果當前錯誤率太高則打開斷路器
            if (status.compareAndSet(Status.CLOSED, Status.OPEN)) {
              circuitOpened.set(System.currentTimeMillis());
            }
          }
        }
      }
    });
}

如果符合熔斷條件,那么command熔斷狀態就會變為OPEN,此時熔斷器打開。

如果我們command執行成功,那么就會清理掉這個timeout timer schedule任務。

abstract class AbstractCommand<R> implements HystrixInvokableInfo<R>, HystrixObservable<R> {
	private void handleCommandEnd(boolean commandExecutionStarted) {
	    Reference<TimerListener> tl = timeoutTimer.get();
	    // 如果timeOutTimer不為空,這里則clear一下
	    // clear會關閉啟動的調度任務
	    if (tl != null) {
	        tl.clear();
	    }

	    long userThreadLatency = System.currentTimeMillis() - commandStartTimestamp;
	    executionResult = executionResult.markUserThreadCompletion((int) userThreadLatency);
	    if (executionResultAtTimeOfCancellation == null) {
	    	// metrics統計數據
	        metrics.markCommandDone(executionResult, commandKey, threadPoolKey, commandExecutionStarted);
	    } else {
	        metrics.markCommandDone(executionResultAtTimeOfCancellation, commandKey, threadPoolKey, commandExecutionStarted);
	    }

	    if (endCurrentThreadExecutingCommand != null) {
	        endCurrentThreadExecutingCommand.call();
	    }
	}
}

如上所屬,我們已經知道了熔斷開啟的觸發時機,那么如果一個commandKey開啟了熔斷,下次的請求是該如何直接降級呢?我們來看下代碼:

abstract class AbstractCommand<R> implements HystrixInvokableInfo<R>, HystrixObservable<R> {
	private Observable<R> applyHystrixSemantics(final AbstractCommand<R> _cmd) {

        // 這個if條件就代表是否開啟熔斷
        if (circuitBreaker.attemptExecution()) {
            // 執行業務邏輯代碼...
        } else {
            return handleShortCircuitViaFallback();
        }
    }
}

class HystrixCircuitBreakerImpl implements HystrixCircuitBreaker {
	public boolean attemptExecution() {
			// 如果熔斷配置的為強制開啟,那么直接返回false執行熔斷邏輯
            if (properties.circuitBreakerForceOpen().get()) {
                return false;
            }
            // 如果熔斷配置為強制關閉,那么永遠不走熔斷邏輯
            if (properties.circuitBreakerForceClosed().get()) {
                return true;
            }
            // 熔斷開啟時 circuitOpened設置為當前時間戳
            if (circuitOpened.get() == -1) {
                return true;
            } else {
            	// 如果當前時間距離熔斷小於5s鍾,那么將熔斷狀態從OPEN改為HALF_OPEN
                if (isAfterSleepWindow()) {
                    if (status.compareAndSet(Status.OPEN, Status.HALF_OPEN)) {
                        //only the first request after sleep window should execute
                        return true;
                    } else {
                        return false;
                    }
                } else {
                    return false;
                }
            }
        }
    }

    private boolean isAfterSleepWindow() {
        final long circuitOpenTime = circuitOpened.get();
        final long currentTime = System.currentTimeMillis();
        // circuitBreakerSleepWindowInMilliseconds 默認為5s鍾
        final long sleepWindowTime = properties.circuitBreakerSleepWindowInMilliseconds().get();
        // 當前熔斷距離熔斷是否超過5s鍾
        return currentTime > circuitOpenTime + sleepWindowTime;
    }
}

我們可以看到,在applyHystrixSemantics()這個核心的方法中,先判斷是否熔斷,如果熔斷則直接走fallback邏輯。

attemptExecution()判斷條件中還涉及到HALF_OPEN的邏輯,如果熔斷開啟,下一次請求的時候,會判斷當前時間距離上一次時間是否超過了5s鍾,如果沒有超過,則會將熔斷狀態從OPEN變為HALF_OPEN,此時會放一個請求按照正常邏輯去執行:

  1. 執行失敗,熔斷狀態又會從HALF_OPEN變成OPEN
  2. 執行成功,熔斷狀態從HALF_OPEN變成CLOSED,並清除熔斷相關設置

執行成功后代碼:

class HystrixCircuitBreakerImpl implements HystrixCircuitBreaker {
	public void markSuccess() {
	    if (status.compareAndSet(Status.HALF_OPEN, Status.CLOSED)) {
	        //This thread wins the race to close the circuit - it resets the stream to start it over from 0
	        metrics.resetStream();
	        Subscription previousSubscription = activeSubscription.get();
	        if (previousSubscription != null) {
	            previousSubscription.unsubscribe();
	        }
	        Subscription newSubscription = subscribeToStream();
	        activeSubscription.set(newSubscription);
	        circuitOpened.set(-1L);
	    }
	}
}

上面對整個熔斷的狀態:CLOSED、OPEN、HALF_OPEN梳理的已經很清楚了,下面看看降級是該如何處理的吧。

fallback降級機制

上面已經講解了Hystrix 熔斷開啟的機制等內容,這里主要是說如果一個請求失敗(線程池拒絕、超時、badRequest等),那么Hystrix是如何執行降級的呢?

還是回到我們最初的代碼 HystrixInvocationHandler類中,看看其invoke()方法中的getFallback回調函數:

protected Object getFallback() {
    if (fallbackFactory == null) {
      return super.getFallback();
    }
    try {
      // 通過我們配置好的fallbackFactory找到對應的FeignClient,這里是獲取ServiceAFeignClient
      Object fallback = fallbackFactory.create(getExecutionException());
      // fallbackMap中key為ServiceAFeignClient.sayHello(Integer)
      // 獲取具體的降級method方法
      Object result = fallbackMethodMap.get(method).invoke(fallback, args);
      if (isReturnsHystrixCommand(method)) {
        return ((HystrixCommand) result).execute();
      } else if (isReturnsObservable(method)) {
        // Create a cold Observable
        return ((Observable) result).toBlocking().first();
      } else if (isReturnsSingle(method)) {
        // Create a cold Observable as a Single
        return ((Single) result).toObservable().toBlocking().first();
      } else if (isReturnsCompletable(method)) {
        ((Completable) result).await();
        return null;
      } else {
        return result;
      }
    } catch (IllegalAccessException e) {
      // shouldn't happen as method is public due to being an interface
      throw new AssertionError(e);
    } catch (InvocationTargetException e) {
      // Exceptions on fallback are tossed by Hystrix
      throw new AssertionError(e.getCause());
    }
  }
};

這里很簡單,其實就是先獲取到我們自己在FallbackFactory中配置的的降級方法,然后執行降級邏輯。

總結

這一講核心邏輯主要是Hystrix熔斷狀態的變化,主要是CLOSED、OPEN、HALF_OPEN幾種狀態觸發的時間,互相轉變的流程,以及執行降級邏輯的原理。

我們仍然是用一個流程圖來總結一下:
Hystrix熔斷_降級機制原理.jpg

高清大圖鏈接:
https://www.processon.com/view/link/5e1ee0afe4b0c62462aae684

(點擊原文可以直接查看大圖哦😄)

申明

本文章首發自本人博客:https://www.cnblogs.com/wang-meng 和公眾號:壹枝花算不算浪漫,如若轉載請標明來源!

感興趣的小伙伴可關注個人公眾號:壹枝花算不算浪漫

WechatIMG33.jpeg


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM