服務容錯保護斷路器Hystrix之二:Hystrix工作流程解析


一、總運行流程

當你發出請求后,hystrix是這么運行的

 

紅圈 :Hystrix 命令執行失敗,執行回退邏輯。也就是大家經常在文章中看到的“服務降級”。
綠圈 :四種情況會觸發失敗回退邏輯( fallback )。
第一種 :short-circuit ,處理鏈路處於熔斷的回退邏輯,在 「3. #handleShortCircuitViaFallback()」 詳細解析。
第二種 :semaphore-rejection ,處理信號量獲得失敗的回退邏輯,在 「4. #handleShortCircuitViaFallback()」 詳細解析。
第三種 :thread-pool-rejection ,處理線程池提交任務拒絕的回退邏輯,在 「5. #handleThreadPoolRejectionViaFallback()」 詳細解析。
第四種 :execution-timeout ,處理命令執行超時的回退邏輯,在 「6. #handleTimeoutViaFallback()」 詳細解析。
第五種 :execution-failure ,處理命令執行異常的回退邏輯,在 「7. #handleFailureViaFallback()」 詳細解析。
第六種 :bad-request ,TODO 【2014】【HystrixBadRequestException】,和 hystrix-javanica 子項目相關。

另外,#handleXXXX() 方法,整體代碼比較類似,最終都是調用 #getFallbackOrThrowException() 方法,獲得【回退邏輯 Observable】或者【異常 Observable】,在 「8. #getFallbackOrThrowException(…)」 詳細解析。

 

 

 詳細解釋個步驟

1.創建  HystrixCommand or HystrixObservableCommand Object

  HystrixCommand

用於返回單一的響應
HystrixObservableCommand
用於返回多個可自定義的響應

命令模式,將來自客戶端的請求封裝成一個對象,從而讓你可以使用不同的請求對客戶端進行參數化。它可以被用於實現“行為請求者"與”行為實現者“的解耦,以便使兩者可以適應變化。
這一過程也包含了策略、資源的初始化,參看AbstractCommand的構造函數:
protected AbstractCommand(...) {
    // 初始化group,group主要是用來對不同的command key進行統一管理,比如統一監控、告警等
    this.commandGroup = initGroupKey(...);
    // 初始化command key,用來標識降級邏輯,可以理解成command的id
    this.commandKey = initCommandKey(...);
    // 初始化自定義的降級策略
    this.properties = initCommandProperties(...);
    // 初始化線程池key,相同的線程池key將共用線程池
    this.threadPoolKey = initThreadPoolKey(...);
    // 初始化監控器
    this.metrics = initMetrics(...);
    // 初始化斷路器
    this.circuitBreaker = initCircuitBreaker(...);
    // 初始化線程池
    this.threadPool = initThreadPool(...);
 
    // Hystrix通過SPI實現了插件機制,允許用戶對事件通知、處理和策略進行自定義
    this.eventNotifier = HystrixPlugins.getInstance().getEventNotifier();
    this.concurrencyStrategy = HystrixPlugins.getInstance().getConcurrencyStrategy();
    HystrixMetricsPublisherFactory.createOrRetrievePublisherForCommand(this.commandKey, this.commandGroup, this.metrics, this.circuitBreaker, this.properties);
    this.executionHook = initExecutionHook(executionHook);
 
    this.requestCache = HystrixRequestCache.getInstance(this.commandKey, this.concurrencyStrategy);
    this.currentRequestLog = initRequestLog(this.properties.requestLogEnabled().get(), this.concurrencyStrategy);
 
    /* fallback semaphore override if applicable */
    this.fallbackSemaphoreOverride = fallbackSemaphore;
 
    /* execution semaphore override if applicable */
    this.executionSemaphoreOverride = executionSemaphore;
}

 

其實構造函數中的很多初始化工作只會集中在創建第一個Command時來做,后續創建的Command對象主要是從靜態Map中取對應的實例來賦值,比如監控器、斷路器和線程池的初始化,因為相同的Command的command key和線程池key都是一致的,在HystrixCommandMetricsHystrixCircuitBreaker.FactoryHystrixThreadPool中會分別有如下靜態屬性:

private static final ConcurrentHashMap<String, HystrixCommandMetrics> metrics = new ConcurrentHashMap<String, HystrixCommandMetrics>();
 
private static ConcurrentHashMap<String, HystrixCircuitBreaker> circuitBreakersByCommand = new ConcurrentHashMap<String, HystrixCircuitBreaker>();
 
final static ConcurrentHashMap<String, HystrixThreadPool> threadPools = new ConcurrentHashMap<String, HystrixThreadPool>();

可見所有Command對象都可以在這里找到自己對應的資源實例

2. Execute the Command(命令執行)

對於HystrixCommand有4個執行方法
對於HystrixObservableCommand只有后兩個
//同步阻塞方法,其實就是調用了queue().get()
execute() — blocks, then returns the single response received from the dependency (or throws an exception in case of an error)
 
//異步非阻塞方法,直接返回Future,可以先做自己的事情,做完再.get()
queue() — returns a Future with which you can obtain the single response from the dependency
 
//熱觀察,可以被立即執行,如果訂閱了那么會重新通知,其實就是調用了toObservable()並內置ReplaySubject,詳細可以參考RxJava
observe() — subscribes to the Observable that represents the response(s) from the dependency and returns an Observable that replicates that source Observable
 
//冷觀察,返回一個Observable對象,當調用此接口,還需要自己加入訂閱者,才能接受到信息,詳細可以參考RxJava
toObservable() — returns an Observable that, when you subscribe to it, will execute the Hystrix command and emit its responses
 
注:由於Hystrix底層采用了RxJava框架開發,所以沒接觸過的可能會一臉懵逼,需要再去對RxJava有所了解。

工作流程的源碼說明:

工作流程圖中的第1,2步:HystrixCommand.java的execute()是入口,調用的是queue():
    public R execute() {
        try {
            return queue().get();    
        } catch (Exception e) {
            throw Exceptions.sneakyThrow(decomposeException(e));
        }
    }

在queue()中調用了toObservable()的方法,接着看源碼:

3. Is the Response Cached?(結果是否被緩存)

判斷是否使用緩存:是否實現了getCacheKey() 的方法

如果使用緩存,再判斷如果請求緩存可用fromCache != null,並且對於該請求的響應也在緩存中,那么命中的響應會以Observable直接返回。

工作流程的源碼說明:

工作流程圖中的第3步:AbstractCommand.java的toObservable()方法中的片段:
//....
                final boolean requestCacheEnabled = isRequestCachingEnabled();
                final String cacheKey = getCacheKey();

                /* try from cache first */
                if (requestCacheEnabled) {
                    HystrixCommandResponseFromCache<R> fromCache = (HystrixCommandResponseFromCache<R>) requestCache.get(cacheKey);
                    if (fromCache != null) {
                        isResponseFromCache = true;
                        return handleRequestCacheHitAndEmitValues(fromCache, _cmd);
                    }
                }
//.... 

              protected boolean isRequestCachingEnabled() {
                 return properties.requestCacheEnabled().get() && getCacheKey() != null;
              }

 
下圖關於是請求緩存的整個生命周期

4. Is the Circuit Open?(斷路器是否打開)

在命令結果沒有緩存命中的時候,Hystrix在執行命令前需檢查斷路器是否為打開狀態:

  • 如果是打開的,那么Hystrix不會執行命令,而是轉接到fallback處理邏輯(對應下面的第8步)
  • 如果斷路器是關閉的,那么Hystrix調到第5步,檢查是否有可用資源來執行命令。

5. Is the Thread Pool/Queue/Semaphore Full?(線程池/請求隊列/信號量是否已經占滿)

線程池或者信號量是否已經滿負荷,如果已經滿負荷那么快速失敗

6. HystrixObservableCommand.construct() or HystrixCommand.run())

兩個斷路器的入口,如果是繼承HystrixObservableCommand,那么就調用construct()函數,如果是繼承HystrixCommand,那么就調用run()函數。

7. Calculate Circuit Health(計算斷路器的健康度)

Hystrix記錄了成功,失敗,拒絕,超時四種報告

這些報告用於決定哪些用於斷路,被斷路的點在恢復周期內無法被后來的請求訪問到。

8. Get the Fallback

快速失敗會在以下幾個場景觸發

1.由construct() or run()拋出了一個異常

2.斷路器已經打開的時候

3.沒有空閑的線程池和隊列或者信號量

4.一次命令執行超時

 

可以重寫快速失敗函數來自定義,

HystrixObservableCommand.resumeWithFallback()

HystrixCommand.getFallback()

9. 成功返回

整體的函數調用流程如下,其實這就是源碼的調用流程

 

 

源碼:

一、AbstractCommand 主要功能點

實現run、getFallback等方法,你就擁有了一個具有基本熔斷功能的類。從使用來看,所有的核心邏輯都由AbstractCommand(即HystrixCommand的父類,HystrixCommand只是對AbstractCommand進行了簡單包裝)抽象類串起來,從功能上來說,AbstractCommand必須將如下功能聯系起來:

策略配置:Hystrix有兩種降級模型,即信號量(同步)模型和線程池(異步)模型,這兩種模型所有可定制的部分都體現在了HystrixCommandProperties和HystrixThreadPoolProperties兩個類中。然而還是那句老話,Hystrix只提供了配置修改的入口,沒有將配置界面化,如果想在頁面上動態調整配置,還需要自己實現。

數據統計:Hystrix以命令模式的方式來控制業務邏輯以及熔斷邏輯的調用時機,所以說數據統計對它來說不算難事,但如何高效、精准的在內存中統計數據,還需要一定的技巧。

斷路器:斷路器可以說是Hystrix內部最重要的狀態機,是它決定着每個Command的執行過程。

監控露出:能通過某種可配置方式將統計數據展現在儀表盤上。

 二. Hystrix的斷路器設計

斷路器是Hystrix最核心的狀態機,只有了解它的變更條件,我們才能准確掌握Hystrix的內部行為。上面的內部流程圖中【斷路器狀態判斷】這個環節直接決定着這次請求(或者說這個Command對象)是嘗試去執行正常業務邏輯(即run())還是走降級后的邏輯(即getFallback()),斷路器HystrixCircuitBreaker有三個狀態,

為了能做到狀態能按照指定的順序來流轉,並且是線程安全的,斷路器的實現類HystrixCircuitBreakerImpl使用了AtomicReference:

class HystrixCircuitBreakerImpl implements HystrixCircuitBreaker {
        private final HystrixCommandProperties properties;
        private final HystrixCommandMetrics metrics;

        enum Status {
            CLOSED, OPEN, HALF_OPEN;
        }

// 斷路器初始狀態肯定是關閉狀態
private final AtomicReference<Status> status = new AtomicReference<Status>(Status.CLOSED);

 

斷路器在狀態變化時,使用了AtomicReference#compareAndSet來確保當條件滿足時,只有一筆請求能成功改變狀態。

那么,什么條件下斷路器會改變狀態?

1. CLOSED -> OPEN :

時間窗口內(默認10秒請求量大於請求量閾值(即circuitBreakerRequestVolumeThreshold,默認值是20),並且該時間窗口內錯誤率大於錯誤率閾值(即circuitBreakerErrorThresholdPercentage,默認值為50,表示50%),那么斷路器的狀態將由默認的CLOSED狀態變為OPEN狀態。看代碼可能更直接

// 檢查是否超過了我們設置的斷路器請求量閾值
if (hc.getTotalRequests() < properties.circuitBreakerRequestVolumeThreshold().get()) {
    // 如果沒有超過統計窗口的請求量閾值,則不改變斷路器狀態,
    // 如果它是CLOSED狀態,那么仍然是CLOSED.
    // 如果它是HALF-OPEN狀態,我們需要等待請求被成功執行,
    // 如果它是OPEN狀態, 我們需要等待睡眠窗口過去。
} else {
    if (hc.getErrorPercentage() < properties.circuitBreakerErrorThresholdPercentage().get()) {
        //如果沒有超過統計窗口的錯誤率閾值,則不改變斷路器狀態,,
        // 如果它是CLOSED狀態,那么仍然是CLOSED.
        // 如果它是HALF-OPEN狀態,我們需要等待請求被成功執行,
        // 如果它是OPEN狀態, 我們需要等待【睡眠窗口】過去。
    } else {
        // 如果錯誤率太高,那么將變為OPEN狀態
        if (status.compareAndSet(Status.CLOSED, Status.OPEN)) {
            // 因為斷路器處於打開狀態會有一個時間范圍,所以這里記錄了變成OPEN的時間
            circuitOpened.set(System.currentTimeMillis());
        }
    }
}

 

這里的錯誤率是個整數,即errorPercentage= (int) ((doubleerrorCount totalCount 100);,至於睡眠窗口,下面會提到。

2. OPEN ->HALF_OPEN: 

前面說過,當進入OPEN狀態后,會進入一段睡眠窗口,即只會OPEN一段時間,所以這個睡眠窗口過去,就會“自動”從OPEN狀態變成HALF_OPEN狀態,這種設計是為了能做到彈性恢復,這種狀態的變更,並不是由調度線程來做,而是由請求來觸發,每次請求都會進行如下檢查:

@Override
public boolean attemptExecution() {
    if (properties.circuitBreakerForceOpen().get()) {
        return false;
    }
    if (properties.circuitBreakerForceClosed().get()) {
        return true;
    }
    // circuitOpened值等於1說明斷路器狀態為CLOSED
    if (circuitOpened.get() == -1) {
        return true;
    } else {
        if (isAfterSleepWindow()) {
            // 睡眠窗口過去后只有第一個請求能被執行
            // 如果執行成功,那么狀態將會變成CLOSED
            // 如果執行失敗,狀態仍變成OPEN
            if (status.compareAndSet(Status.OPEN, Status.HALF_OPEN)) {
                return true;
            } else {
                return false;
            }
        } else {
            return false;
        }
    }
}
 
// 睡眠窗口是否過去
private boolean isAfterSleepWindow() {
    // 還記得上面CLOSED->OPEN時記錄的時間嗎?
    final long circuitOpenTime = circuitOpened.get();
    final long currentTime = System.currentTimeMillis();
    final long sleepWindowTime = properties.circuitBreakerSleepWindowInMilliseconds().get();
    return currentTime > circuitOpenTime + sleepWindowTime;
}

 

 

3. HALF_OPEN ->CLOSED :

變為半開狀態后,會放第一筆請求去執行,並跟蹤它的執行結果,如果是成功,那么將由HALF_OPEN狀態變成CLOSED狀態

@Override
public void markSuccess() {
    if (status.compareAndSet(Status.HALF_OPEN, Status.CLOSED)) {
        //This thread wins the race to close the circuit - it resets the stream to start it over from 0
        metrics.resetStream();
        Subscription previousSubscription = activeSubscription.get();
        if (previousSubscription != null) {
            previousSubscription.unsubscribe();
        }
        Subscription newSubscription = subscribeToStream();
        activeSubscription.set(newSubscription);
        // 已經進入了CLOSED階段,所以將OPEN的修改時間設置成-1
        circuitOpened.set(-1L);
    }
}

4. HALF_OPEN ->OPEN :

 變為半開狀態時如果第一筆被放去執行的請求執行失敗(資源獲取失敗、異常、超時等),就會由HALP_OPEN狀態再變為OPEN狀態

@Override
public void markNonSuccess() {
    if (status.compareAndSet(Status.HALF_OPEN, Status.OPEN)) {
        // This thread wins the race to re-open the circuit - it resets the start time for the sleep window
        circuitOpened.set(System.currentTimeMillis());
    }
}

三. 滑動窗口(滾動窗口)

上面提到的斷路器需要的時間窗口請求量和錯誤率這兩個統計數據,都是指固定時間長度內的統計數據,斷路器的目標,就是根據這些統計數據來預判並決定系統下一步的行為,Hystrix通過滑動窗口來對數據進行“平滑”統計,默認情況下,一個滑動窗口包含10個桶Bucket),每個桶時間寬度是1秒,負責1秒的數據統計。滑動窗口包含的總時間以及其中的桶數量都是可以配置的,來張官方的截圖認識下滑動窗口:

上圖的每個小矩形代表一個桶,可以看到,每個桶都記錄着1秒內的四個指標數據:成功量、失敗量、超時量和拒絕量,這里的拒絕量指的就是上面流程圖中【信號量/線程池資源檢查】中被拒絕的流量。10個桶合起來是一個完整的滑動窗口,所以計算一個滑動窗口的總數據需要將10個桶的數據加起來

 

我們現在來具體看看滑動窗口和桶的設計,如果將滑動窗口設計成對一個長度為10的整形數組的操作,第一個想到的應該是AtomicLongArray,AtomicLongArray中每個位置的數據都能線程安全的操作,提供了譬如incrementAndGet、getAndSet、compareAndSet等常用方法。但由於一個桶需要維護四個指標,如果用四個AtomicLongArray來實現,做法不夠高級,於是我們想到了AtomicReferenceArray<Bucket>Bucket對象內部可以用AtomicLong來維護着這四個指標。滑動窗口和桶的設計特別講究技巧,需要盡可能做到性能、數據准確性兩方面的極致,我們來看Hystrix是如何做到的。

 

桶的數據統計簡單來說可以分為兩類,一類是簡單自增計數器,比如請求量、錯誤量等,另一類是並發最大值,比如一段時間內的最大並發量(或者說線程池的最大任務數),下面是桶類Bucket的定義:

class Bucket {
    // 標識是哪一秒的桶數據
    final long windowStart;
    // 如果是簡單自增統計數據,那么將使用adderForCounterType
    final LongAdder[] adderForCounterType;
    // 如果是最大並發類的統計數據,那么將使用updaterForCounterType
    final LongMaxUpdater[] updaterForCounterType;
 
    Bucket(long startTime) {
        this.windowStart = startTime;
 
        // 預分配內存,提高效率,不同事件對應不同的數組index
        adderForCounterType = new LongAdder[HystrixRollingNumberEvent.values().length];
        for (HystrixRollingNumberEvent type : HystrixRollingNumberEvent.values()) {
            if (type.isCounter()) {
                adderForCounterType[type.ordinal()] = new LongAdder();
            }
        }
 
        // 預分配內存,提高效率,不同事件對應不同的數組index
        updaterForCounterType = new LongMaxUpdater[HystrixRollingNumberEvent.values().length];
        for (HystrixRollingNumberEvent type : HystrixRollingNumberEvent.values()) {
            if (type.isMaxUpdater()) {
                updaterForCounterType[type.ordinal()] = new LongMaxUpdater();
                // initialize to 0 otherwise it is Long.MIN_VALUE
                updaterForCounterType[type.ordinal()].update(0);
            }
        }
    }
    //...略...
}

我們可以看到,並沒有用所謂的AtomicLong,為了方便的管理各種事件(參見com.netflix.hystrix.HystrixEventType)的數據統計,Hystrix對不同的事件使用不同的數組index(即枚舉的順序),這樣對於某個桶(即某一秒)的指定類型的數據,總能從數組中找到對應的LongAdder(用於統計前面說的簡單自增)或LongMaxUpdater(用於統計前面說的最大並發值)對象來進行自增或更新操作。對於性能有要求的中間件或庫類都避不開要CPUCache優化的問題,比如cache line,以及cache line帶來的false sharing問題。Bucket的內部並沒有使用AtomicLong,而是使用了JDK8新提供的LongAdder,在高並發的單調自增場景,LongAdder提供了比AtomicLong更好的性能,至於LongAdder的設計思想,本文不展開,感興趣的朋友可以去拜讀Doug Lea大神的代碼(有意思的是Hystrix沒有直接使用JDK中的LongAdder,而是copy過來改了改)。LongMaxUpdater也是類似的,它和LongAddr一樣都派生於Striped64,這里不再展開。

滑動窗口由多個桶組成,業界一般的做法是將數組做成環,Hystrix中也類似,多個桶是放在AtomicReferenceArray<Bucket>來維護的,為了將其做成環,需要保存頭尾的引用,於是有了ListState類: 

class ListState {
    /*
     * 這里的data之所以用AtomicReferenceArray而不是普通數組,是因為data需要
     * 在不同的ListState對象中跨線程來引用,需要可見性和並發性的保證。
     */
    private final AtomicReferenceArray<Bucket> data;
    private final int size;
    private final int tail;
    private final int head;
 
    private ListState(AtomicReferenceArray<Bucket> data, int head, int tail) {
        this.head = head;
        this.tail = tail;
        if (head == 0 && tail == 0) {
            size = 0;
        } else {
            this.size = (tail + dataLength - head) % dataLength;
        }
        this.data = data;
    }
    //...略...
}

  我們可以發現,真正的數據是data,而ListState只是一個時間段的數據快照而已,所以tail和head都是final,這樣做的好處是我們不需要去為head、tail的原子操作而苦惱,轉而變成對ListState的持有操作,所以滑動窗口看起來如下:

我們可以看到,由於默認一個滑動窗口包含10個桶,所以AtomicReferenceArray<Bucket>的size得達到10+1=11才能“滑動/滾動”起來,在確定的某一秒內,只有一個桶被更新,其他的桶數據都沒有變化。既然通過ListState可以拿到所有的數據,那么我們只需要持有最新的ListState對象即可,為了能做到可見性和原子操作,於是有了環形桶類BucketCircularArray

class BucketCircularArray implements Iterable<Bucket> {
    // 持有最新的ListState
    private final AtomicReference<ListState> state;
     //...略...
}

注意到BucketCircularArray實現了迭代器接口,這是因為我們輸出給斷路器的數據需要計算滑動窗口中的所有桶,於是你可以看到真正的滑動窗口類HystrixRollingNumber有如下屬性和方法:

public class HystrixRollingNumber {
    // 環形桶數組
    final BucketCircularArray buckets;
 
    // 獲取該事件類型當前滑動窗口的統計值
    public long getRollingSum(HystrixRollingNumberEvent type) {
        Bucket lastBucket = getCurrentBucket();
        if (lastBucket == null)
            return 0;
    
        long sum = 0;
        // BucketCircularArray實現了迭代器接口環形桶數組
        for (Bucket b : buckets) {
            sum += b.getAdder(type).sum();
        }
        return sum;
    }
    //...略...
}

斷路器就是通過監控來從HystrixRollingNumber的getRollingSum方法來獲取統計值的

到這里斷路器和滑動窗口的核心部分已經分析完了,當然里面還有不少細節沒有提到,感興趣的朋友可以去看一下源碼。Hystrix中通過RxJava來實現了事件的發布和訂閱,所以如果想深入了解Hystrix,需要熟悉RxJava,而RxJava在服務端的應用沒有像客戶端那么廣,一個原因是場景的限制,還一個原因是大多數開發者認為RxJava設計的過於復雜,加上響應式編程模型,有一定的入門門檻。

四、線程池隔離

     不同的業務線之間選擇用線程池隔離,降低互相影響的概率。設置隔離策略為線程池隔離:
.withExecutionIsolationStrategy(HystrixCommandProperties.ExecutionIsolationStrategy.THREAD));
在Hystrix內部,是根據 properties.executionIsolationStrategy().get()這個字段判斷隔離級別。如在 getRunObservableDecoratedForMetricsAndErrorHandling這個方法中會先判斷是不是線程池隔離,如果是就獲取線程池,如果不是則進行信號量隔離的操作。
     如果是線程池隔離,還需要設置線程池的相關參數如:線程池名字andThreadPoolKey , coreSize(核心線程池大小) , KeepAliveTimeMinutes(線程存存活時間),MaxQueueSize(最大隊列長度),QueueSizeRejectionThreshold(拒絕執行的閥值)等等。
. andThreadPoolPropertiesDefaults(HystrixThreadPoolProperties.Setter().withCoreSize(resourcesManager.getThreadPoolProperties(platformProtocol.getAppId()).getCoreSize())
                                    .withKeepAliveTimeMinutes(resourcesManager.getThreadPoolProperties(platformProtocol.getAppId()).getKeepAliveSeconds())
                                    .withMaxQueueSize(resourcesManager.getThreadPoolProperties(platformProtocol.getAppId()).getMaxQueueSize())
                                    .withQueueSizeRejectionThreshold(resourcesManager.getThreadPoolProperties(platformProtocol.getAppId()).getQueueSizeRejectionThreshold()))
threadPoolKey 也是線程池的名字的前綴,默認前綴是 hystrix 。在Hystrix中,核心線程數和最大線程數是一致的,減少線程臨時創建和銷毀帶來的性能開銷。線程池的默認參數都在HystrixThreadPoolProperties中,重點講解一下參數queueSizeRejectionThreshold 和maxQueueSize 。queueSizeRejectionThreshold默認值是5,允許在隊列中的等待的任務數量。maxQueueSize默認值是-1,隊列大小。如果是Fast Fail 應用,建議使用默認值。線程池飽滿后直接拒絕后續的任務,不再進行等待。代碼如下HystrixThreadPool類中:
        @Override
        public boolean isQueueSpaceAvailable() {
            if (queueSize <= 0) {
                // we don't have a queue so we won't look for space but instead
                // let the thread-pool reject or not
                return true;
            } else {
                return threadPool.getQueue().size() < properties.queueSizeRejectionThreshold().get();
            }
        }
線程池一旦創建完成,相關參數就不會更改,存放在靜態的ConcurrentHashMap中,key是對應的commandKey 。而queueSizeRejectionThreshold是每個命令都是設置的。
     
     線程池的相關參數都保存在HystrixThreadPool這個類文件中,線程池的創建方法getThreadPool則在HystrixConcurrencyStrategy類文件中。從getThreadPool方法可以看出線程池的名字就是hystrix-threadPoolKey-threadNumber.
@Override
            public Thread newThread(Runnable r) {
                Thread thread = new Thread(r, "hystrix-" + threadPoolKey.name() + "-" + threadNumber.incrementAndGet());
                thread.setDaemon(true);
                return thread;
            }
     
     在HystrixThreadPool實現類的構造方法中,並發HystrixConcurrencyStrategy實例是通過HystrixPlugins獲取的,所以可以通過HystrixPlugins設置自定義插件。具體的HystrixPlugins如何使用,會在后面章節中講解。
 
線程池的創建     
     前面說了,在Hystrix內部大部分類都是單實例,同樣ThreadPool也不例外,也是單實例。並且相同commandKey的依賴還必須是使用同一個線程池。這就需要把ThreadPool保存在一個靜態的map中,key是commandKey,同時要保證線程安全,Hytstrix使用了ConcurrentHashMap。關於為什么不適用HashTable保證線程安全問題的疑問請自行Google。線程池的創建在HystrixThreadPool這個類文件中的內部類Factory中的getInstance方法。
 
/* package */final static ConcurrentHashMap<String, HystrixThreadPool> threadPools = new ConcurrentHashMap<String, HystrixThreadPool>();
     String key = threadPoolKey.name();

            // this should find it for all but the first time
            HystrixThreadPool previouslyCached = threadPools.get(key);
            if (previouslyCached != null) {
                return previouslyCached;
            }

            // if we get here this is the first time so we need to initialize
            synchronized (HystrixThreadPool.class) {
                if (!threadPools.containsKey(key)) {
                    threadPools.put(key, new HystrixThreadPoolDefault(threadPoolKey, propertiesBuilder));
                }
            }
            return threadPools.get(key);
     
線程池的使用
     HystrixCommand類的execute()內部調用了queue() ,queue又調用了父類AbstractCommand的toObservable方法,toObservable方法處理了是否可緩存問題后,交給了getRunObservableDecoratedForMetricsAndErrorHandling方法,這個方法設置了一系列的executionHook之后,交給了getExecutionObservableWithLifecycle,這個方法通過getExecutionObservable()獲取了執行器。getExecutionObservable()是個抽象方法,具體實現放在了子類:HystrixCommand和HystrixObservableCommand類中。下面是HystrixCommand類中的getExecutionObservable方法實現:
final protected Observable<R> getExecutionObservable() {
        return Observable.create(new OnSubscribe<R>() {

            @Override
            public void call(Subscriber<? super R> s) {
                try {
                    s.onNext(run());
                    s.onCompleted();
                } catch (Throwable e) {
                    s.onError(e);
                }
            }

        });
    }
在這個Call方法中執行了具體的業務邏輯run() ;

 

 

 

二、模塊詳解

2.1、創建請求命令

2.1.1、有4種方式

1、同步阻塞方法,其實就是調用了queue().get()
2、異步非阻塞方法,直接返回Future,可以先做自己的事情,做完再.get()
3、熱觀察,可以被立即執行,如果訂閱了那么會重新通知,其實就是調用了toObservable()並內置ReplaySubject,詳細可以參考RxJava
4、冷觀察,返回一個Observable對象,當調用此接口,還需要自己加入訂閱者,才能接受到信息,詳細可以參考RxJava

2.1.2、原生模式

基於hystrix的原生接口,也就是繼承HystrixCommand或者HystirxObservableCommand。

在《服務容錯保護斷路器Hystrix之一:入門介紹》中的示例基礎上修改如下,

同步方式/異步方式:

package com.dxz.ribbon;

import org.springframework.web.client.RestTemplate;

import com.netflix.hystrix.HystrixCommand;
import com.netflix.hystrix.HystrixCommandGroupKey;

public class ComputeCommand extends HystrixCommand<String> {
    RestTemplate restTemplate;
    String a;
    String b;
    
    protected ComputeCommand(RestTemplate restTemplate, String a, String b) {
        super(HystrixCommandGroupKey.Factory.asKey("ExampleGroup"));
        this.restTemplate = restTemplate;
        this.a = a;
        this.b = b;
    }
    
    @Override
    protected String run() throws Exception {
        return restTemplate.getForEntity("http://compute-service/add?a="+a +"&b="+b+"&sn=1", String.class).getBody();
    }
     /**
     * 快速失敗后調用函數
     * @return
     */
    @Override
    protected String getFallback(){
        return "404 :)";
    }
}

觀察方式:

package com.dxz.ribbon;

import org.springframework.web.client.RestTemplate;

import com.netflix.hystrix.HystrixCommandGroupKey;
import com.netflix.hystrix.HystrixObservableCommand;

import rx.Observable;
import rx.Subscriber;

public class ComputeObservableCommand extends HystrixObservableCommand<String> {
    RestTemplate restTemplate;
    String a;
    String b;
    
    protected ComputeObservableCommand(RestTemplate restTemplate, String a, String b) {
        super(HystrixCommandGroupKey.Factory.asKey("ExampleGroup"));
        this.restTemplate = restTemplate;
        this.a = a;
        this.b = b;
    }
    
    @Override
    protected Observable<String> construct() {
        return Observable.create(new Observable.OnSubscribe<String>() {

            @Override
            public void call(Subscriber<? super String> observer) {
                if(!observer.isUnsubscribed()) {
                    String result = restTemplate.getForEntity("http://compute-service/add?a="+a +"&b="+b+"&sn=1", String.class).getBody();
                    observer.onNext(result);
                    observer.onCompleted();
                }
            }
            
        });
    }

}

調用方法:

package com.dxz.ribbon;

import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
import org.springframework.web.client.RestTemplate;

import rx.functions.Action1;

@RestController
public class ConsumerController2 {

    @Autowired
    RestTemplate restTemplate;

    @RequestMapping(value = "/add2", method = RequestMethod.GET)
    public String add(@RequestParam String m) throws InterruptedException, ExecutionException {
        if("s".equals(m)) {
            String result = new ComputeCommand(restTemplate, "1", "2").execute();
            System.out.println("result:="+result);
            return result;    
        } else if("a".equals(m)) {
            Future<String> result = new ComputeCommand(restTemplate, "1", "2").queue();
            System.out.println("result:="+result.get());
            return result.get();    
        } else {
            ComputeObservableCommand command1 = new ComputeObservableCommand(restTemplate, "1","2");
            rx.Observable<String> result = command1.observe();  
            result.subscribe(new Action1<String>() {  
                @Override  
                public void call(String s) {  
                    System.out.println("Command called. Result is:" + s);  
                }
            });  
            return null;
        }
        
    }

}

結果:

 2.1.3、注解模式

在《服務容錯保護斷路器Hystrix之一:入門介紹》已經展示過。

 

2.2、定義服務降級

有些情況不去實現降級邏輯,如下所示。
執行寫操作的命令:當Hystrix命令是用來執行寫操作而不是返回一些信息的時候,通常情況下這類操作的返回類型時void或是為空的Observable,實現服務降級的意義不是很大。當寫入操作失敗的時候,我們通常只需要通知調用者即可。
執行批處理或離線計算的命令:當Hystrix命令是用來執行批處理程序生成一份報告或是進行任何類型的離線計算時,那么通常這些操作只需要將錯誤傳播給調用者,然后讓調用者稍后重試而不是發送給調用者一個靜默的降級處理響應。

 
        

2.3、工作流程圖

 

2.4、開關條件

關於斷路器打開

·時間窗口內請求次數(限流)

如果在10s內,超過某個閾值的請求量,才會考慮斷路(小於這個次數不會被斷路)

配置是circuitBreaker.requestVolumeThreshold

默認10s 20次

·失敗率

默認失敗率超過50%就會被斷路

配置是circuitBreaker.errorThresholdPercentage

 

關於斷路器關閉

·重新嘗試

在一定時間之后,重新嘗試請求來決定是否繼續打開或者選擇關閉斷路器

配置是circuitBreaker.sleepWindowInMilliseconds

默認5000ms

 

2.5、關於隔離

bulkhead pattern模式(艙壁模式)

Htstrix使用了bulkhead pattern模式,典型的例子就是線程隔離。

簡單解釋一下bulkhead pattern模式。一般情況我們都用一個線程池來管理所有線程,容易造成一個問題,粒度太粗,無法對線程進行分類管理,會導致局部問題影響全局。bulkhead pattern模式在於,采用多個線程池來管理線程,這樣使得1個線程池資源出現問題時不會造成另一個線程池資源問題。盡量使問題最小化。

如圖所示,采用了bulkhead pattern模式的效果

 

 

說完原理說實現,如何針對不同依賴采用不同的線程池管理呢

Hystrix給了我們三種key來用於隔離:

·CommandKey,針對相同的接口一般CommandKey值相同,目的是把HystrixCommand,HystrixCircuitBreaker,HytrixCommandMerics以及其他相關對象關聯在一起,形成一個原子組。采用原生接口的話,默認值為類名;采用注解形式的話,默認值為方法名。

·CommandGroupKey,對CommandKey分組,用於真正的隔離。相同CommandGroupKey會使用同一個線程池或者信號量。一般情況相同業務功能會使用相同的CommandGroupKey。

·ThreadPoolKey,如果說CommandGroupKey只是邏輯隔離,那么ThreadPoolKey就是物理隔離,當沒有設置ThreadPoolKey的時候,線程池或者信號量的划分按照CommandGroupKey,當設置了ThreadPoolKey,那么線程池和信號量的划分就按照ThreadPoolKey來處理,相同ThreadPoolKey采用同一個線程池或者信號量。

 

Coding

原生模式

可以通過HystrixCommand.Setter來自定義配置
HystrixCommandGroupKey.Factory.asKey(""))
HystrixCommandKey.Factory.asKey("")
HystrixThreadPoolKey.Factory.asKey("")

注解模式

可以直接在方法名上添加

@HystrixCommand(groupKey = "", commandKey = "", threadPoolKey = "")

2.6、關於請求緩存

工作流程圖

 

優勢

·復用性

  這里的復用性指的是代碼復用性

·一致性

  也就是常說的冪等性,不管請求幾次,得到的結果應該都是一樣的

·減少重復工作

  由於請求緩存是在HystrixCommand的construct()或run()運行之前運行,所有可以有效減少線程的使用

適用場景

請求緩存的優勢顯而易見,但是也不是銀彈。

在讀少寫多的場景就顯得不太合適,對於讀的請求,需要add緩存。對於增刪改的請求,需要把緩存remove。在增加系統資源開銷的同時,又很雞肋。

所以一般適合讀多寫少的場景。似乎所有緩存機制都有這個局限性吧。

Coding

原生模式

繼承HystrixCommand后,重寫getCacheKey()方法,該方法默認返回的是null,也就是不使用請求緩存功能。相同key的請求會使用相同的緩存。
package com.dxz.ribbon;

import org.springframework.web.client.RestTemplate;

import com.netflix.hystrix.HystrixCommand;
import com.netflix.hystrix.HystrixCommandGroupKey;

public class ComputeCommandCache extends HystrixCommand<String> {
    RestTemplate restTemplate;
    String a;
    String b;
    
    protected ComputeCommandCache(RestTemplate restTemplate, String a, String b) {
        super(HystrixCommandGroupKey.Factory.asKey("ExampleGroup"));
        this.restTemplate = restTemplate;
        this.a = a;
        this.b = b;
    }
    
    @Override
    protected String run() throws Exception {
        return restTemplate.getForEntity("http://compute-service/add?a="+a +"&b="+b+"&sn=1", String.class).getBody();
    }
    
 @Override protected String getCacheKey() {
        System.out.println("調用getCacheKey");//打印一下什么時候會觸發
        return a + b;
    }
    
     /**
     * 快速失敗后調用函數
     * @return
     */
    @Override
    protected String getFallback(){
        return "404 :)";
    }
}

調用類,如果不加HystrixRequestContext.initializeContext();//初始化請求上下文,會報錯如下:

報錯了:java.util.concurrent.ExecutionException: Observable onError
Caused by: java.lang.IllegalStateException: Request caching is not available. Maybe you need to initialize the HystrixRequestContext?

 

package com.dxz.ribbon;

import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
import org.springframework.web.client.RestTemplate;

import com.netflix.hystrix.strategy.concurrency.HystrixRequestContext;

import rx.functions.Action1;

@RestController
public class ConsumerControllerCache {

    @Autowired
    RestTemplate restTemplate;

    @RequestMapping(value = "/add3", method = RequestMethod.GET)
    public String add(@RequestParam String m) throws InterruptedException, ExecutionException {
        HystrixRequestContext.initializeContext();//初始化請求上下文
        if("s".equals(m)) {
            String result = new ComputeCommandCache(restTemplate, "1", "2").execute();
            System.out.println("result:="+result);
            return result;    
        } else if("a".equals(m)) {
            Future<String> result = new ComputeCommandCache(restTemplate, "1", "2").queue();
            System.out.println("result:="+result.get());
            return result.get();    
        } else {
            ComputeObservableCommand command1 = new ComputeObservableCommand(restTemplate, "1","2");
            rx.Observable<String> result = command1.observe();  
            result.subscribe(new Action1<String>() {  
                @Override  
                public void call(String s) {  
                    System.out.println("Command called. Result is:" + s);  
                }
            });  
            return null;
        }
        
    }

}

注解模式

在方法名上增加,並添加與cacheKeyMethod字符串相同的方法。兩者共用入參。
復制代碼
@CacheResult(cacheKeyMethod = "getCacheKey")
public String post2AnotherService(String seed){
}
public String getCacheKey(String seed){

    return seed;

}
復制代碼

 

初始化HystrixRequestContext

還有關鍵的一步,在調用HystrixCommand之前初始化HystrixRequestContext,其實就是創建一個ThreadLocal的副本,共享請求緩存就是通過ThreadLocal來實現的。
HystrixRequestContext context=HystrixRequestContext.initializeContext();
操作完成后context.shutdown();
一般情況可以在過濾器中控制是初始化和關閉整個生命周期
復制代碼
//啟動HystrixRequestContext
HystrixRequestContext context = HystrixRequestContext.initializeContext();
try {
    chain.doFilter(req, res);
} finally {
    //關閉HystrixRequestContext
    context.shutdown();
}
復制代碼

 

2.7、關於請求合並(Requst Collapsing)

工作流程圖

 

上半部分是模擬請求,下半部分是該請求的依賴設置,時間窗口默認是10ms,在這個時間窗口內,所有對於該接口的請求都會被加入隊列,然后進行批處理。這樣的好處在於,如果短時間內對於某個接口有大量請求,那么可以只處理一次就完成所有響應。

 

優勢

全局線程合並

在tomcat容器中,所有請求共用一個進程,也就是一個JVM容器,在並發場景下會派生出許多線程,collapsing可以合並整個JVM中的請求線程,這樣可以解決不同使用者同時請求的大量並發問題。

 

局部線程合並

可以合並單個tomcat請求線程,比如在10ms內有10個請求被同一線程處理(這不是像往常一樣請求->處理,而是請求->加入請求隊列,所有可以快速收集請求),那這些請求可以被合並。

對象建模和代碼復雜度

在實際場景下,調用接口取數據的復雜度往往高於數據的復雜度,通俗來說就是取數據可以千變萬化的取,而數據就那么幾個接口。

collapsing可以幫助你更好的實現你的業務,比如多次請求合並結果后再廣播出去。

 

適用場景

·並發量大接口

當並發量小,一個時間窗口內只有幾個或沒有請求,那么就白白浪費了請求合並的資源。

·請求耗時接口

時間窗口是固定的,假如一個請求實際耗時10ms,加上固定的時間窗口,最大延遲達到20ms,延遲被提高了100%。若一個請求實際耗時有1s,那么時間窗口的延遲就可以被忽略不計。

 

Coding

原生模式

復制代碼
/**
 * 批量返回值類型
 * 返回值類型
 * 請求參數類型
 */
public class CommandCollapserGetValueForKey extends HystrixCollapser<List<String>, String, Integer> {

    private static Logger logger = LoggerFactory.getLogger(CommandCollapserGetValueForKey.class);

    private final Integer key;

    public CommandCollapserGetValueForKey(Integer key) {
        this.key = key;
    }

    /**
     *獲取請求參數
     */
    public Integer getRequestArgument() {
        return key;
    }

    /**
     *合並請求產生批量命令的具體實現
     */
    protected HystrixCommand<List<String>> createCommand(final Collection<CollapsedRequest<String, Integer>> requests) {
        return new BatchCommand(requests);
    }

    /**
     *批量命令結果返回后的處理,需要實現將批量結果拆分並傳遞給合並前的各原子請求命令的邏輯中
     */
    protected void mapResponseToRequests(List<String> batchResponse, Collection<CollapsedRequest<String, Integer>> requests) {
        int count = 0;
        //請求響應一一對應
        for (CollapsedRequest<String, Integer> request : requests) {
            request.setResponse(batchResponse.get(count++));
        }
    }

    private static final class BatchCommand extends HystrixCommand<List<String>> {
        private static Logger logger = LoggerFactory.getLogger(CommandCollapserGetValueForKey.BatchCommand.class);

        private final Collection<CollapsedRequest<String, Integer>> requests;

        private BatchCommand(Collection<CollapsedRequest<String, Integer>> requests) {
                super(Setter.withGroupKey(HystrixCommandGroupKey.Factory.asKey("ExampleGroup"))
                    .andCommandKey(HystrixCommandKey.Factory.asKey("GetValueForKey")));
            this.requests = requests;
        }

        @Override
        protected List<String> run() {
            ArrayList<String> response = new ArrayList<String>();
            // 處理每個請求,返回結果
            for (CollapsedRequest<String, Integer> request : requests) {
                logger.info("request.getArgument()={}",request.getArgument());
                // artificial response for each argument received in the batch
                response.add("ValueForKey: " + request.getArgument());
            }
            return response;
        }
    }
}
復制代碼
調用的時候只需要new CommandCollapserGetValueForKey(1).queue()
在同一個時間窗口內,批處理的函數調用順序為
getRequestArgument()->createCommand()->mapResponseToRequests()

//官方配置文檔

https://github.com/Netflix/Hystrix/wiki/Configuration#circuitBreaker.sleepWindowInMilliseconds


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM