【整理】互聯網服務端技術體系:熔斷機制的設計及Hystrix實現解析


引子

在大量微服務所構成的分布式系統中,某個基礎服務的不可用,可能導致服務雪崩效應,即:依賴該基礎服務的所有其它基礎服務及級聯的上游服務的級聯性不可用故障。

熔斷機制是防止服務雪崩的基本技術手段。通過檢查依賴服務的失敗狀況並封裝熔斷邏輯,阻止在依賴服務暫時出現故障期間的錯誤反復不斷地向上傳播。基本思路是快速失敗和 Fallback 機制。

熔斷的主要目的:防止服務雪崩效應;防止局部次要失敗影響整體可用性。

設計目標

熔斷機制的設計目標是:在未達到熔斷要求時,正常調用依賴服務;在達到熔斷要求時,調用指定的降級方法或拋出異常。這要求定義熔斷要求,做一個熔斷器設計。

熔斷機制的設計挑戰是:上一次服務調用的結果,會決定下一次調用的策略(降級、拋出異常或恢復服務調用)。原來相互獨立的服務調用被整合成有密切關聯的。若某個依賴服務配置了熔斷要求,則針對該依賴服務的所有調用必須整合成一個連續不中斷的處理流。若處理流因為超時或異常中斷,則無法正確決定下一次調用,也就起不到熔斷和恢復的作用。

針對這種需求,常用的一種處理方式是采用事件機制。將每一次依賴服務調用或者降級調用轉換成一次事件,建立事件監聽器進行事件統計,將事件統計結果傳給熔斷器,熔斷器來決定下一次調用的走向。


實現思路

熔斷機制的實現思路主要包括:

  • 熔斷配置:失敗事件的閾值、執行器配置等;失敗事件包括超時、異常事件;
  • 斷路器:在失敗事件達到指定閾值時,將依賴服務的調用熔斷,采取降級策略;采取某種規則從熔斷狀態恢復到正常狀態;
  • 事件統計:在特定時間窗口內,統計依賴服務調用成功事件、失敗事件(失敗次數、失敗比率等)、異常事件等;
  • 事件機制:連接事件統計、斷路器狀態機、服務調用與熔斷降級,串聯成完整的流程。

重點是斷路器和事件機制的設計實現。

斷路器

熔斷情形主要有:強制直接熔斷;斷路器開啟、調用次數達到總數閾值且失敗事件達到指定閾值(失敗百分比、失敗絕對次數等)。主要是斷路狀態機的設計實現。斷路器狀態機如圖所示:

斷路狀態機有三個狀態: CLOSE (關閉),HALF-OPEN (半開),OPEN (開啟)。斷路器默認是 CLOSE 狀態,此時,正常調用依賴服務。

當調用次數達到總數閾值且失敗事件的閾值達到指定值時,進入 OPEN 狀態,開啟降級邏輯;當斷路器位於 OPEN 狀態時,將進入一段斷路時間窗期,這個時間窗內的請求將不會轉發給依賴服務,而是轉發給指定的降級邏輯;當斷路器位於 OPEN 狀態,且過了斷路時間窗期,就會進入 HALF-OPEN 狀態。

斷路器使用稱為 HALF-OPEN 狀態的監視和反饋機制來了解依賴服務是否以及何時恢復。在 HALF-OPEN 狀態,根據規則將部分請求轉發給依賴服務(默認是只重試第一次請求),若調用成功則進入 CLOSE 狀態,恢復調用依賴服務,若對依賴服務的調用超時或失敗,則斷路器保持在 OPEN 狀態。

事件統計

可以划分為事件窗口和若干個數據桶分別統計,再進行數據聚合。

事件機制

將依賴服務的調用結果轉化為事件,觸發事件監聽器,對成功及失敗事件進行統計,判斷是否觸發斷路器的熔斷,以決定下一次調用是調用依賴服務還是降級策略或者直接拋出異常。

事件機制通常基於觀察者模式。


Hystrix

Hystrix 是業界采用的熔斷機制的主要實現之一。

Hystrix 使用起來比較簡單:使用 @EnableCircuitBreaker 開啟熔斷功能,使用 @HystrixCommand 為方法添加熔斷降級配置。下面先給出代碼示例,再講講原理的實現和部分源碼的理解。

使用

步驟一:引入 POM 依賴(SpringBoot 版本為 1.5.9.RELEASE )

        <!-- 引入 hystrix 熔斷 -->
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-netflix-hystrix</artifactId>
            <version>1.4.7.RELEASE</version>
        </dependency>

步驟二:在應用啟動類 Application.java 上開啟熔斷

package cc.lovesq;

import org.mybatis.spring.annotation.MapperScan;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.client.circuitbreaker.EnableCircuitBreaker;
import org.springframework.context.annotation.ComponentScan;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.ImportResource;

@EnableCircuitBreaker  // 開啟熔斷機制
@SpringBootApplication
@Configuration
@ComponentScan(basePackages = {"cc.lovesq.*"})
@MapperScan(basePackages = {"cc.lovesq.dao"})
@ImportResource(locations={"classpath:spring.xml"})
public class Application {

  public static void main(String[] args) {
    SpringApplication.run(Application.class, args);
  }

}

步驟三:為方法配置熔斷設置。通常是在 Service 層做熔斷。


package cc.lovesq.service.impl;

import com.netflix.hystrix.contrib.javanica.annotation.HystrixCommand;
import com.netflix.hystrix.contrib.javanica.annotation.HystrixProperty;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.stereotype.Component;

import java.util.Random;

/**
 * @Description 返回隨機值
 * @Date 2021/1/28 4:37 上午
 * @Created by qinshu
 */
@Component
public class RandomValueService {

    private static Log log = LogFactory.getLog(RandomValueService.class);

    Random rand = new Random(47);

    @HystrixCommand(commandKey = "randInt", fallbackMethod = "randIntDowngrade",
            commandProperties = {
                    @HystrixProperty(name="metrics.rollingStats.timeInMilliseconds", value="5000"),
                    @HystrixProperty(name="circuitBreaker.requestVolumeThreshold", value="10"),
                    @HystrixProperty(name="circuitBreaker.errorThresholdPercentage", value="50")
            })
    public Integer randInt() {

        int v = rand.nextInt(100);

        if (v == 0) {
            throw new RuntimeException("Invalid number");
        }

        return v;

    }

    public Integer randIntDowngrade() {
        log.info("randIntDowngrade");
        return 0;
    }
}

步驟四:測試熔斷。

package cc.lovesq.experiments;

import cc.lovesq.service.impl.RandomValueService;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.stereotype.Component;

import javax.annotation.Resource;

/**
 * @Description 熔斷實現
 * @Date 2021/1/28 4:39 上午
 * @Created by qinshu
 */
@Component
public class HystrixExperiment implements IExperiment {

    private static Log log = LogFactory.getLog(HystrixExperiment.class);

    @Resource
    private RandomValueService randomValueService;

    @Override
    public void test() {
        log.info("----HystrixExperiment-start----");
        for (int i=0; i < 100000; i++) {
            try {
                randomValueService.randInt();
            } catch (Exception ex) {
                log.error("CatchedException: " + ex.getMessage());
            }

        }
        log.info("----HystrixExperiment-end----");
    }
}

原理及實現

Hystrix 將服務調用封裝成一個 HystrixCommand 來執行。HystrixCommand 的熔斷流程如下圖所示:

Hystrix 的整個流程如下圖所示:

Hystrix 的基本實現如下:

  • 最主要的 API 是 @HystrixCommand 注解。@HystrixCommand 注解的處理器是 hystrix-javanica 包里的 HystrixCommandAspect.methodsAnnotatedWithHystrixCommand 方法。這里使用了切面編程來封裝和隔離依賴服務調用:從切面 joinPoint 提取目標方法 targetMethod 並轉換為可執行的 HystrixInvokable 實例( 實現類是 GenericCommand );
  • GenericCommand 實例的創建使用了 Builder 模式;GenericCommand 實例的執行使用了 Command 模式;
  • 斷路器實現類是 HystrixCircuitBreakerImpl。circuitOpened 表示斷路器開啟的時間,默認值 -1 表示熔斷未開啟,大於 0 表示熔斷已開啟;circuitBreakerSleepWindowInMilliseconds 表示斷路器開啟到重試請求的“斷路-休眠時間窗口期”。斷路器的實現並不復雜。

源碼解析

Hystrix 借用了 RxJava 庫的響應式編程機制,代碼有點繞,需要細細多看幾遍。總的來說,就是 Observable 對象產生的事件導致的回調和轉換,以及回調和轉換的級聯處理。

要理解 Hystrix 源碼,需要有一定的響應式編程基礎,可參閱 “響應式編程庫RxJava初探”。為什么要采用響應式編程模型呢?因為 Hystrix 要處理的回調比較復雜:

  • 同步執行或異步執行服務調用的命令;
  • 執行服務調用的命令的回調處理:成功時、異常時、超時時、取消時、終止時的情形(回調的多種情形處理);
  • 當執行服務調用失敗后,采取降級策略,降級方法執行同樣有:成功時、失敗時的情形(回調的嵌套與級聯);
  • 連續執行多次服務調用的結果,要源源不斷送往事件統計模塊(回調的連續性);
  • 連續執行多次服務調用,若其中某些發生異常,整個執行流不能中斷,要繼續發送給事件統計模塊(回調的連續性)。

可以看到,這需要一個連續不斷的回調流,連續不斷地處理各種事件。響應式編程模型正好能夠對付這種編程需求。

GenericCommand 的執行入口代碼如下:

    public R execute() {
        try {
            return queue().get();
        } catch (Exception e) {
            throw Exceptions.sneakyThrow(decomposeException(e));
        }
    }

其中 queue 方法的代碼如下:

public Future<R> queue() {
        /*
         * The Future returned by Observable.toBlocking().toFuture() does not implement the
         * interruption of the execution thread when the "mayInterrupt" flag of Future.cancel(boolean) is set to true;
         * thus, to comply with the contract of Future, we must wrap around it.
         */
        final Future<R> delegate = toObservable().toBlocking().toFuture();   // 這句是重點
        
        final Future<R> f = new Future<R>() {
             // 為 delegate 對象封裝線程中斷功能,暫時跳過
        }
        return f;

重點是 toObservable().toBlocking().toFuture(); 這句,主要干的事情是:首先將 GenericCommand 的執行轉換成一個 Observable 對象,從而能夠變成可監聽的事件,連接后面的事件統計、斷路器狀態機及熔斷功能,最后再轉換成一個 Future 對象,來獲取服務調用命令的結果。

先說說 toBlocking().toFuture() 這部分,toBlocking() 使用了裝飾器模式,將 Observable 對象裝飾成一個可阻塞的 BlockingObservable 對象,阻塞並等待被裝飾的 Observable 對象的執行完成事件或發生異常事件;toFuture 方法將 BlockingObservable 的執行轉換成一個 Future 對象,使用 CountDownLatch 鎖來實現阻塞功能(that 就是被裝飾的 Observable 對象):

public static <T> Future<T> toFuture(Observable<? extends T> that) {

        final CountDownLatch finished = new CountDownLatch(1);
        final AtomicReference<T> value = new AtomicReference<T>();
        final AtomicReference<Throwable> error = new AtomicReference<Throwable>();

        @SuppressWarnings("unchecked")
        final Subscription s = ((Observable<T>)that).single().subscribe(new Subscriber<T>() {

            @Override
            public void onCompleted() {
                finished.countDown();
            }

            @Override
            public void onError(Throwable e) {
                error.compareAndSet(null, e);
                finished.countDown();
            }

            @Override
            public void onNext(T v) {
                // "single" guarantees there is only one "onNext"
                value.set(v);
            }
        });

        return new Future<T>() {
            // 根據 finished 的狀態、value 及 error 封裝一個 Future 的實現,等待 that 的訂閱執行完成之后,獲取結果;暫時跳過
        };

    }

接下來就是重點的 toObservable 方法了。 這個方法首先定義了一些回調函數:

  • terminateCommandCleanup : Command 執行終止后的清理函數;
  • unsubscribeCommandCleanup : Command 取消執行時的清理函數;
  • applyHystrixSemantics : 執行 Hystrix 熔斷語義的主要函數;
  • wrapWithAllOnNextHooks : 將一個 R 對象轉換成另一個 R 對象,這里 R 通常是 Observable 對象;
  • fireOnCompletedHook : 命令執行完成后的鈎子函數。

然后創建了一個裝配了這些回調函數的帶緩存功能的命令執行的 Observable 對象,在命令執行的不同階段或發生異常時,就會執行對應的方法。

return Observable.defer(new Func0<Observable<R>>() {
            @Override
            public Observable<R> call() {
                 // 命令執行狀態判斷
                // 命令執行日志記錄
                // 帶緩存的命令執行

                Observable<R> hystrixObservable =
                        Observable.defer(applyHystrixSemantics)
                                .map(wrapWithAllOnNextHooks);

                Observable<R> afterCache;

                // put in cache
                if (requestCacheEnabled && cacheKey != null) {
                    // 從緩存里取 afterCache
                } else {
                    afterCache = hystrixObservable;
                }

                return afterCache
                        .doOnTerminate(terminateCommandCleanup)     // perform cleanup once (either on normal terminal state (this line), or unsubscribe (next line))
                        .doOnUnsubscribe(unsubscribeCommandCleanup) // perform cleanup once
                        .doOnCompleted(fireOnCompletedHook);
            }
        });

Observable.defer 將一個 Func0[Observable] 對象包裝成一個 Observable 對象。

接着看主要方法 applyHystrixSemantics ,這一段就是根據斷路器狀態及線程池許可來決定是否執行依賴服務調用。

    private Observable<R> applyHystrixSemantics(final AbstractCommand<R> _cmd) {
        // mark that we're starting execution on the ExecutionHook
        // if this hook throws an exception, then a fast-fail occurs with no fallback.  No state is left inconsistent
        executionHook.onStart(_cmd);

        /* determine if we're allowed to execute */
        if (circuitBreaker.attemptExecution()) {    // 斷路器內部的狀態邏輯,斷路器允許請求服務調用的情形
            final TryableSemaphore executionSemaphore = getExecutionSemaphore();
            final AtomicBoolean semaphoreHasBeenReleased = new AtomicBoolean(false);
            final Action0 singleSemaphoreRelease = new Action0() {
                @Override
                public void call() {   // 歸還執行調用服務的許可
                    if (semaphoreHasBeenReleased.compareAndSet(false, true)) {
                        executionSemaphore.release();
                    }
                }
            };

            final Action1<Throwable> markExceptionThrown = new Action1<Throwable>() {
                @Override
                public void call(Throwable t) {  // 標記並通知異常事件,可通過插件來實現
                    eventNotifier.markEvent(HystrixEventType.EXCEPTION_THROWN, commandKey);
                }
            };

            if (executionSemaphore.tryAcquire()) {   // 線程池是否有許可來執行服務調用
                try {
                    /* used to track userThreadExecutionTime */
                    executionResult = executionResult.setInvocationStartTime(System.currentTimeMillis());
                    return executeCommandAndObserve(_cmd)   
                            .doOnError(markExceptionThrown)    // 設置異常回調
                            .doOnTerminate(singleSemaphoreRelease)  // 設置終止時回調,歸還執行調用服務的許可
                            .doOnUnsubscribe(singleSemaphoreRelease);  // 設置取消回調,歸還執行調用服務的許可
                } catch (RuntimeException e) {
                    return Observable.error(e);
                }
            } else {  // 沒有足夠線程來執行服務調用,采取降級策略
                return handleSemaphoreRejectionViaFallback();
            }
        } else {   // 斷路器不允許請求服務調用的情形,采取降級策略
            return handleShortCircuitViaFallback();
        }
    }

斷路器執行邏輯如下:


        public boolean attemptExecution() {
            if (properties.circuitBreakerForceOpen().get()) {  // 強制開啟熔斷,始終不執行調用依賴服務
                return false;
            }
            if (properties.circuitBreakerForceClosed().get()) { // 強制關閉熔斷,始終執行調用依賴服務
                return true;
            }
            if (circuitOpened.get() == -1) {   // 默認值,熔斷未開啟過,執行依賴服務調用
                return true;
            } else {
                if (isAfterSleepWindow()) {    // 熔斷開啟過,且已經過了熔斷時間窗口期
                    if (status.compareAndSet(Status.OPEN, Status.HALF_OPEN)) {   // 如果熔斷狀態可以從 OPEN 轉換為 HALF-OPEN,可以執行一次依賴服務調用
                        //only the first request after sleep window should execute
                        return true;
                    } else {  // 熔斷狀態已經為 HALF-OPEN, 已經執行過一次服務調用,后續請求暫時不能執行依賴服務調用
                        return false;
                    }
                } else {  // // 熔斷開啟過,仍然處於熔斷時間窗口期,不調用依賴服務
                    return false;
                }
            }
        }

最后看一下executeCommandAndObserve 方法。這個方法也是一樣的套路:先定義若干回調函數,然后創建一個裝配了這些回調函數的 Observable 對象,以便在適當的時候被觸發調用。這里面還封裝了事件的通知,比如 eventNotifier.markEvent(HystrixEventType.XXX, commandKey); 以及執行結果的處理,比如 executionResult = executionResult.addEvent((int) latency, HystrixEventType.SUCCESS); 。ExecutionResult 對象使用了 Immutable Variable 模式,簡化了對執行結果的並發處理。

    private Observable<R> executeCommandAndObserve(final AbstractCommand<R> _cmd) {
        final HystrixRequestContext currentRequestContext = HystrixRequestContext.getContextForCurrentThread();

        final Action1<R> markEmits = new Action1<R>() {
            @Override
            public void call(R r) {
                if (shouldOutputOnNextEvents()) {
                    executionResult = executionResult.addEvent(HystrixEventType.EMIT);
                    eventNotifier.markEvent(HystrixEventType.EMIT, commandKey);
                }
                if (commandIsScalar()) {
                    long latency = System.currentTimeMillis() - executionResult.getStartTimestamp();
                    eventNotifier.markEvent(HystrixEventType.SUCCESS, commandKey);
                    executionResult = executionResult.addEvent((int) latency, HystrixEventType.SUCCESS);
                    eventNotifier.markCommandExecution(getCommandKey(), properties.executionIsolationStrategy().get(), (int) latency, executionResult.getOrderedList());
                    circuitBreaker.markSuccess();
                }
            }
        };

        final Action0 markOnCompleted = new Action0() {
            @Override
            public void call() {
                if (!commandIsScalar()) {
                    long latency = System.currentTimeMillis() - executionResult.getStartTimestamp();
                    eventNotifier.markEvent(HystrixEventType.SUCCESS, commandKey);
                    executionResult = executionResult.addEvent((int) latency, HystrixEventType.SUCCESS);
                    eventNotifier.markCommandExecution(getCommandKey(), properties.executionIsolationStrategy().get(), (int) latency, executionResult.getOrderedList());
                    circuitBreaker.markSuccess();
                }
            }
        };

        final Func1<Throwable, Observable<R>> handleFallback = new Func1<Throwable, Observable<R>>() {
            @Override
            public Observable<R> call(Throwable t) {
                circuitBreaker.markNonSuccess();
                Exception e = getExceptionFromThrowable(t);
                executionResult = executionResult.setExecutionException(e);
                if (e instanceof RejectedExecutionException) {
                    return handleThreadPoolRejectionViaFallback(e);
                } else if (t instanceof HystrixTimeoutException) {
                    return handleTimeoutViaFallback();
                } else if (t instanceof HystrixBadRequestException) {
                    return handleBadRequestByEmittingError(e);
                } else {
                    /*
                     * Treat HystrixBadRequestException from ExecutionHook like a plain HystrixBadRequestException.
                     */
                    if (e instanceof HystrixBadRequestException) {
                        eventNotifier.markEvent(HystrixEventType.BAD_REQUEST, commandKey);
                        return Observable.error(e);
                    }

                    return handleFailureViaFallback(e);
                }
            }
        };

        final Action1<Notification<? super R>> setRequestContext = new Action1<Notification<? super R>>() {
            @Override
            public void call(Notification<? super R> rNotification) {
                setRequestContextIfNeeded(currentRequestContext);
            }
        };

        Observable<R> execution;
        if (properties.executionTimeoutEnabled().get()) {
            execution = executeCommandWithSpecifiedIsolation(_cmd)
                    .lift(new HystrixObservableTimeoutOperator<R>(_cmd));
        } else {
            execution = executeCommandWithSpecifiedIsolation(_cmd);
        }

        return execution.doOnNext(markEmits)
                .doOnCompleted(markOnCompleted)
                .onErrorResumeNext(handleFallback)
                .doOnEach(setRequestContext);
    }

降級處理的主要邏輯如下:

private Observable<R> getFallbackOrThrowException(final AbstractCommand<R> _cmd, final HystrixEventType eventType, final FailureType failureType, final String message, final Exception originalException) {
        final HystrixRequestContext requestContext = HystrixRequestContext.getContextForCurrentThread();
        long latency = System.currentTimeMillis() - executionResult.getStartTimestamp();
        // record the executionResult
        // do this before executing fallback so it can be queried from within getFallback (see See https://github.com/Netflix/Hystrix/pull/144)
        executionResult = executionResult.addEvent((int) latency, eventType);

        if (isUnrecoverable(originalException)) {
            // 不可恢復的異常處理,調用 onError 方法;
        } else {
            if (isRecoverableError(originalException)) {
                logger.warn("Recovered from java.lang.Error by serving Hystrix fallback", originalException);
            }

            if (properties.fallbackEnabled().get()) {
                /* fallback behavior is permitted so attempt */

                final Action1<Notification<? super R>> setRequestContext = new Action1<Notification<? super R>>() {
                    @Override
                    public void call(Notification<? super R> rNotification) {
                        setRequestContextIfNeeded(requestContext);
                    }
                };

                final Action1<R> markFallbackEmit = new Action1<R>() {   // 降級邏輯執行的回調處理
                    @Override
                    public void call(R r) {
                        if (shouldOutputOnNextEvents()) {
                            executionResult = executionResult.addEvent(HystrixEventType.FALLBACK_EMIT);
                            eventNotifier.markEvent(HystrixEventType.FALLBACK_EMIT, commandKey);
                        }
                    }
                };

                final Action0 markFallbackCompleted = new Action0() {   // 降級邏輯正常完成的回調處理
                    @Override
                    public void call() {
                        long latency = System.currentTimeMillis() - executionResult.getStartTimestamp();
                        eventNotifier.markEvent(HystrixEventType.FALLBACK_SUCCESS, commandKey);
                        executionResult = executionResult.addEvent((int) latency, HystrixEventType.FALLBACK_SUCCESS);
                    }
                };

                final Func1<Throwable, Observable<R>> handleFallbackError = new Func1<Throwable, Observable<R>>() {
                    @Override
                    public Observable<R> call(Throwable t) {  // 降級邏輯發生異常時,需要發射異常事件
                        /* executionHook for all errors */
                        Exception e = wrapWithOnErrorHook(failureType, originalException);
                        Exception fe = getExceptionFromThrowable(t);

                        long latency = System.currentTimeMillis() - executionResult.getStartTimestamp();
                        Exception toEmit;

                        // 根據不同的異常,發射不同的異常事件
                        if (fe instanceof UnsupportedOperationException) {
                            logger.debug("No fallback for HystrixCommand. ", fe); // debug only since we're throwing the exception and someone higher will do something with it
                            eventNotifier.markEvent(HystrixEventType.FALLBACK_MISSING, commandKey);
                            executionResult = executionResult.addEvent((int) latency, HystrixEventType.FALLBACK_MISSING);

                            toEmit = new HystrixRuntimeException(failureType, _cmd.getClass(), getLogMessagePrefix() + " " + message + " and no fallback available.", e, fe);
                        } else {
                            logger.debug("HystrixCommand execution " + failureType.name() + " and fallback failed.", fe);
                            eventNotifier.markEvent(HystrixEventType.FALLBACK_FAILURE, commandKey);
                            executionResult = executionResult.addEvent((int) latency, HystrixEventType.FALLBACK_FAILURE);

                            toEmit = new HystrixRuntimeException(failureType, _cmd.getClass(), getLogMessagePrefix() + " " + message + " and fallback failed.", e, fe);
                        }

                        // NOTE: we're suppressing fallback exception here
                        if (shouldNotBeWrapped(originalException)) {
                            return Observable.error(e);
                        }

                        return Observable.error(toEmit);
                    }
                };

                final TryableSemaphore fallbackSemaphore = getFallbackSemaphore();
                final AtomicBoolean semaphoreHasBeenReleased = new AtomicBoolean(false);
                final Action0 singleSemaphoreRelease = new Action0() {   // 釋放調用降級方法的執行許可
                    @Override
                    public void call() {
                        if (semaphoreHasBeenReleased.compareAndSet(false, true)) {
                            fallbackSemaphore.release();
                        }
                    }
                };

                Observable<R> fallbackExecutionChain;

                // acquire a permit
                if (fallbackSemaphore.tryAcquire()) {    // 若持有降級邏輯的執行許可,則生成用於降級邏輯處理的 Observable 對象
                    try {
                        if (isFallbackUserDefined()) {
                            executionHook.onFallbackStart(this);
                            fallbackExecutionChain = getFallbackObservable();
                        } else {
                            //same logic as above without the hook invocation
                            fallbackExecutionChain = getFallbackObservable();
                        }
                    } catch (Throwable ex) {
                        //If hook or user-fallback throws, then use that as the result of the fallback lookup
                        fallbackExecutionChain = Observable.error(ex);
                    }

                    return fallbackExecutionChain   // 為降級邏輯處理裝配回調函數,處理降級邏輯的正常調用、完成、拋出異常等情形。
                            .doOnEach(setRequestContext)
                            .lift(new FallbackHookApplication(_cmd))
                            .lift(new DeprecatedOnFallbackHookApplication(_cmd))
                            .doOnNext(markFallbackEmit)                    
                            .doOnCompleted(markFallbackCompleted)  // 降級邏輯執行的處理
                            .onErrorResumeNext(handleFallbackError)  // 降級發生異常時的處理
                            .doOnTerminate(singleSemaphoreRelease)  // 降級邏輯處理終止時歸還執行許可
                            .doOnUnsubscribe(singleSemaphoreRelease);  
                } else {
                   return handleFallbackRejectionByEmittingError();
                }
            } else {
                return handleFallbackDisabledByEmittingError(originalException, failureType, message);
            }
        }
    }

Hystrix 的代碼處理流程先分析到這里。整個代碼套路基本明了:

  • Observable 對象 + 各種回調函數 => 回調處理后,生成新的 Observable 對象,實現 Observable 對象的級聯和嵌套執行;
  • Observable 對象的行為產生各種事件,然后標記事件類型,進行事件統計,並生成不可變的執行結果對象 executionResult ;
  • 封裝了服務調用的 Command 對象或可執行的 Action0[Observable](無返回值) 或 Func0[Observable] (有返回值) 對象通過 Observable.defer 方法轉換為 Observable 對象;
  • Observable 對象的預先指定的各種回調函數的調用會返回 Observable 對象,比如 Observable.doOnComplete(onCompleted),先將回調函數對象 onCompleted 行為封裝成 ActionObserver ,再包裝成 Observable 對象,從而使整個回調鏈始終處於觀察者模式驅動的事件處理流中;這里使用了OnSubscribeDoOnEach 對象來保證觀察者訂閱調用的級聯;
  • Command 對象的執行需要熔斷狀態判斷及熔斷邏輯線程池的執行許可;同樣,降級邏輯的執行需要降級邏輯線程池的執行許可;
  • Command 對象執行后產生事件,通過預先指定的各種回調函數處理后產生新的 Observable 對象,新的 Observable 對象的方法執行進一步產生新事件,新事件又需要預先指定的各種回調函數處理,往復循環,不厭其煩;簡單點說,就是把各種非 Observable 的對象、行為和結果設法再轉化成 Observable 對象,以維持龐大的 Observable 系統的周而復始的運轉;
  • Command 對象執行失敗后的降級邏輯處理,也是遵循類似的套路。

具體回調流程的細節,恐怕需要再單步調試加仔細梳理了。在這里,我們只需要知道這些代碼要實現的目標,至於代碼怎么寫(某種代碼邏輯組織方式),倒是其次的事情。我覺得 Hystrix 的開發人員再過三個月也不認得自己都寫得些啥了。總的來說, Hystrix 的代碼實現還是比較優雅的,不過優雅的代碼並不一定易懂。這個也與熔斷機制的設計挑戰有關。

最后說下事件統計,主要有兩種主要的滑動窗口計數機制,其實現分別在對象 OperatorWindowWithTime(在指定時間范圍內聚合計數) 和 OperatorWindowWithSize(在指定數量內聚合計數) 里。比如 BucketedCounterStream 的計數,使用 OperatorWindowWithTime 來實現:

this.bucketedStream = Observable.defer(new Func0<Observable<Bucket>>() {
            @Override
            public Observable<Bucket> call() {
                return inputEventStream
                        .observe()
                        .window(bucketSizeInMs, TimeUnit.MILLISECONDS) //bucket it by the counter window so we can emit to the next operator in time chunks, not on every OnNext
                        .flatMap(reduceBucketToSummary)                //for a given bucket, turn it into a long array containing counts of event types
                        .startWith(emptyEventCountsToStart);           //start it with empty arrays to make consumer logic as generic as possible (windows are always full)
            }
        });

小結

熔斷機制是分布式的微服務體系中必不可少的技術手段,用來防止服務雪崩。本文總結了熔斷機制的實現原理及 Hystrix 的使用和基本的源碼解析。

參考文獻


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM