一、hystrix如何集成在openfeign中使用


所有文章

https://www.cnblogs.com/lay2017/p/11908715.html

 

正文

HystrixInvocationHandler

hystrix是開源的一個熔斷組件,springcloud將其集成並默認與openfeign組合使用。而openfeign又是基於jdk動態代理生成接口的代理對象的,hystrix肯定是集成在feign的接口調用過程當中的。

所以,hystrix的熔斷集成到openfeign當中就落到了jdk動態代理的InvocationHandler上。那么hystrix對它的實現又是什么呢?

那么,我們接着跟進HystrixInvocationHandler的invoke方法

public Object invoke(final Object proxy, final Method method, final Object[] args) throws Throwable {
    // ...

    HystrixCommand<Object> hystrixCommand =
        new HystrixCommand<Object>(setterMethodMap.get(method)) {
          @Override
          protected Object run() throws Exception {
            try {
              // 獲取並調用MethodHandler,MethodHandler封裝了Http請求,ribbon也在這里被集成
              return HystrixInvocationHandler.this.dispatch.get(method).invoke(args);
            } catch (Exception e) {
              throw e;
            } catch (Throwable t) {
              throw (Error) t;
            }
          }

          @Override
          protected Object getFallback() {
            // ... fallback降級的處理
          }
        };

    // ...

    return hystrixCommand.execute();
}

invoke的代碼很長,刪除后顯得清晰一點。hystrix由於采用了rxjava,代碼邏輯閱讀起來有點惡心。

核心正向調用流程依舊是通過Method來獲取對應的MethodHandler,MethodHandler負責執行http請求,並返回結果。

那么hystrix在這里干了啥呢?

比較顯而易見的是fallback部分,fallback含義很明顯了,達到一定的失敗條件就會觸發降級處理。所以,我們可以自定義fallback實現,保證高可用。

而熔斷的核心邏輯就落到了hystrixCommand的execute方法里面

 

hystrixCommand

接下來,跟進hystrixCommand的execute方法

public R execute() {
    try {
        return queue().get();
    } catch (Exception e) {
        throw Exceptions.sneakyThrow(decomposeException(e));
    }
}

queue返回一個future,get方法將獲取異步結果,跟進queue

public Future<R> queue() {
    final Future<R> delegate = toObservable().toBlocking().toFuture();
  
    final Future<R> f = new Future<R>() {
        // ... future實現,調用delegate的對應實現
    };

    // ...

    return f;
}

可以看到,核心邏輯落到了toObservable上,跟進它

public Observable<R> toObservable() {
    final AbstractCommand<R> _cmd = this;

    // ...

    final Func0<Observable<R>> applyHystrixSemantics = new Func0<Observable<R>>() {
        @Override
        public Observable<R> call() {
            if (commandState.get().equals(CommandState.UNSUBSCRIBED)) {
                return Observable.never();
            }
            return applyHystrixSemantics(_cmd);
        }
    };

    // ...
}

方法非常長,這里只關注一下applyHystrixSemantics方法

private Observable<R> applyHystrixSemantics(final AbstractCommand<R> _cmd) {
    // ...

    // 是否允許請求
    if (circuitBreaker.allowRequest()) {
        final TryableSemaphore executionSemaphore = getExecutionSemaphore();
      
        // ...

        if (executionSemaphore.tryAcquire()) {
            try {
                executionResult = executionResult.setInvocationStartTime(System.currentTimeMillis());
                // 執行業務
                return executeCommandAndObserve(_cmd)
                        .doOnError(markExceptionThrown)
                        .doOnTerminate(singleSemaphoreRelease)
                        .doOnUnsubscribe(singleSemaphoreRelease);
            } catch (RuntimeException e) {
                return Observable.error(e);
            }
        } else {
            // 信號量獲取失敗,走fallback
            return handleSemaphoreRejectionViaFallback();
        }
    } else {
        // 快速熔斷,走fallback
        return handleShortCircuitViaFallback();
    }
}

applyHystrixSemantics通過熔斷器的allowRequest方法判斷是否需要快速失敗走fallback,如果允許執行那么又會經過一層信號量的控制,都通過才會走execute。

所以,核心邏輯就落到了allowRequest上,跟進它

@Override
public boolean allowRequest() {
    // 強制開啟熔斷
    if (properties.circuitBreakerForceOpen().get()) {
        return false;
    }
    // 強制關閉熔斷
    if (properties.circuitBreakerForceClosed().get()) {
        isOpen();
        return true;
    }
    // 未開啟熔斷 或者 允許單個測試
    return !isOpen() || allowSingleTest();
}

hystrix允許強制開啟或者關閉熔斷,如果不想有請求執行就開啟,如果覺得可以忽略所有錯誤就關閉。

在沒有強制開關的情況下,主要就是判斷當前熔斷是否開啟。另外,我們不免注意到這里還有一個allowSingleTest方法。在熔斷器開啟的情況下,會在一定時間后允許發出一個測試的請求,來判斷是否開啟熔斷器。

我們先進入isOpen方法,看看如果判斷當前熔斷器的開啟

public boolean isOpen() {
    // 開關是開啟的,直接返回
    if (circuitOpen.get()) {
        return true;
    }

    // 開關未開啟,獲取健康統計
    HealthCounts health = metrics.getHealthCounts();

    //  總請求數太小的情況,不開啟熔斷
    if (health.getTotalRequests() < properties.circuitBreakerRequestVolumeThreshold().get()) {
        return false;
    }

    // 總請求數夠了,失敗率比較小的情況,不開啟熔斷
    if (health.getErrorPercentage() < properties.circuitBreakerErrorThresholdPercentage().get()) {
        return false;
    } else {
        // 總請求數和失敗率都比較大的時候,設置開關為開啟,進行熔斷
        if (circuitOpen.compareAndSet(false, true)) {
            circuitOpenedOrLastTestedTime.set(System.currentTimeMillis());
            return true;
        } else {
            return true;
        }
    }
}

總體邏輯就是判斷一個失敗次數是否達到開啟熔斷的條件,如果達到那么設置開啟的開關。

在熔斷一直開啟的情況下,偶爾會放過一個測試請求來判斷是否關閉。那么我們就跟進allowSingleTest看看

public boolean allowSingleTest() {
    // 獲取熔斷開啟時間,或者上一次的測試時間
    long timeCircuitOpenedOrWasLastTested = circuitOpenedOrLastTestedTime.get();

    // 如果熔斷處於開啟狀態,且當前時間距離熔斷開啟時間或者上一次執行測試請求時間已經到了
    if (circuitOpen.get() && System.currentTimeMillis() > timeCircuitOpenedOrWasLastTested + properties.circuitBreakerSleepWindowInMilliseconds().get()) {
        // cas控制熔斷開啟
        if (circuitOpenedOrLastTestedTime.compareAndSet(timeCircuitOpenedOrWasLastTested, System.currentTimeMillis())) {
            return true;
        }
    }
    // 否則不允許
    return false;
}

其實就是在一定時間窗口下釋放一個測試請求出去,這里還使用了cas機制來控制一定時間窗口只會釋放一個請求出去。

 

總結

到這里,本文就結束了。hystrix其實就是在feign的調用過程插了一腳,通過對請求的成功失敗的統計數據來開關是否進行熔斷。又在每個時間窗口內發送一個測試請求出去,來判斷是否關閉熔斷。總得來說還是很清晰實用的。不過最后還是要說一句,rxjava的代碼太惡心了,哈哈


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM