Spring Cloud專題之三:Hystrix 斷路器


在微服務架構中,我們將系統拆分成很多個服務單元,各單位的應用間通過服務注冊與訂閱的方式相互依賴。由於每個單元都在不同的進程中運行,依賴通過遠程調用的方式執行,這樣就有可能因為網絡原因或是依賴服務自身問題出現調用故障或延遲,而這些問題會直接導致調用方的對外服務也出現延遲,若此時調用方的請求不斷增加,最后就會因等待出現故障的依賴方響應形成任務積壓,最終導致自身服務的不可用。這樣的架構相對於傳統架構更加的不穩定,為了解決這樣的問題,就產生了斷路器等一系列的服務保護機制。而Spring Cloud Hystrix就是這樣的實現了服務降級、服務熔斷、線程和信號量隔離、請求緩存、請求合並以及服務監控等一系列服務保護功能的組件。

本篇文章還是在前兩篇文章的基礎上所作的:

SpringCloud專題之一:Eureka

Spring Cloud專題之二:OpenFeign

歡迎大家查看!!!

先啟動需要的服務工程:

  • EUREKA-SERVER:注冊中心,端口為9001
  • HELLO-SERVER:提供服務的客戶端,端口為9002和9003
  • EUREKA-CUSTOMER:消費服務的消費者端,端口為9004

在未加入Hystrix(斷路器)之前,如果我關閉掉一個客戶端,那么使用消費者訪問的時候可以獲得如下的輸出:

因為feign的默認連接時間是1s,所以超過1s后就會報連接不上的錯。

Hystrix代碼

由於 openfeign 包 默認集成了 hystrix,所以只需要開啟開關即可

#開啟Hystrix降級處理
feign.hystrix.enabled=true

引入jar包

<!--hystrix-->
<dependency>
   <groupId>org.springframework.cloud</groupId>
   <artifactId>spring-cloud-starter-netflix-hystrix</artifactId>
</dependency>
<dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-starter-openfeign</artifactId>
</dependency>
<!--hystrix-javanica-->
<dependency>
    <groupId>com.netflix.hystrix</groupId>
    <artifactId>hystrix-javanica</artifactId>
</dependency>

1.服務降級

降級是指當請求超時,資源不足等情況發生時,進行的服務降級處理,不調用真實服務邏輯,而是使用快速失敗的方式直接返回一個托底數據,保證服務鏈路的完整,避免服務雪崩。

降級的實現是指在調用遠程服務的方法上增加@HystrixCommand的注解,通過指定fallbackMethod的值設置失敗的回調方法,也可以使用@FeignClient的方式指定fallback類。

1.在Customer1Feign類的@FeignClient注解上添加fallback的類

/**
 * @className: Customer1Feign
 * @description: 測試多個feign使用相同的name的問題
 * @author: charon
 * @create: 2021-06-06 09:42
 */
@FeignClient(value = "HELLO-SERVER",fallback = EurekaClientFallBack.class)
public interface Customer1Feign {
    /**
     * 要求:
     *    必須要指定RequestParam屬性的value值,同時RequestMethod的method也需要指定
     *    方法上添加SpringMVC的注解
     * @return
     */
    @RequestMapping(value = "/sayHello1",method = RequestMethod.GET)
    String sayHello1(@RequestParam("name") String name);
}

3.編寫fallback使用的類:EurekaClientFallBack

/**
 * @className: EurekaClientFallBack
 * @description: 客戶端的降級實現類
 * @author: charon
 * @create: 2021-06-20 22:06
 */
@Component
public class EurekaClientFallBack implements Customer1Feign {
    /**
     * 日志記錄類
     */
    private final Logger logger = LoggerFactory.getLogger(getClass());

    /**
     * sayHello1接口的服務降級類
     * @param name 參數
     * @return
     */
    @Override
    public String sayHello1(String name) {
        logger.error("您訪問了EurekaClientFallBack#sayHello1(),傳入的參數為:{}" , name);
        return "您訪問了EurekaClientFallBack#sayHello1(),傳入的參數為:" + name;
    }
    
}

然后消費者端再次調用接口,會發現頁面展示為如下圖,而不是之前的Whitelabel Error Page了。

到這里,我們就實現了一個最簡單的斷路器功能了。

4.模擬實現提供服務的客戶端代碼執行超時的情況:

@RestController
public class Hello1Controller {
    /**
     * 日志記錄類
     */
    private final Logger logger = LoggerFactory.getLogger(getClass());

    @Value("${server.port}")
    private String host;

    @Value("${spring.application.name}")
    private String instanceName;

    @RequestMapping("/sayHello1")
    public String sayHello1(@RequestParam("name") String name){
        try {
            int sleepTime = new Random().nextInt(3000);
            logger.error("讓線程阻塞 {} 毫秒",sleepTime);
            Thread.sleep(sleepTime);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        logger.info("你好,服務名:{},端口為:{},接收到的參數為:{}",instanceName,host,name);
        return "你好,服務名:"+instanceName+",端口為:"+host+",接收到的參數為:"+name;
    }
}

​ 在HELLO-SERVER的工程中讓線程隨機sleep幾秒,然后消費者端調用,可以發現,當調用HELLO-SERVER超過1000毫秒時就會因為服務超時從而觸發熔斷請求,並調用回調邏輯返回結果。

除了上面的方式外,還可以使用@HystrixCommand的注解來配置fallbackMethod方法(更靈活)。

@HystrixCommand(fallbackMethod = "sayHello1Fallback")
@Override
public String invokeSayHello1(String name) {
    long startTime = System.currentTimeMillis();
    String result = feign1.sayHello1(name);
    logger.error("您訪問了CustomerServiceImpl#sayHello1(),執行時間為:{} 毫秒",System.currentTimeMillis() - startTime );
    return result;
}

public String sayHello1Fallback(String name){
    logger.error("出錯了,您訪問了CustomerServiceImpl#sayHello1Fallback()" );
    return "出錯了,您訪問了CustomerServiceImpl#sayHello1Fallback()";
}

2.服務熔斷

熔斷就是當一定時間內,異常請求的比例(請求超時,網絡故障,服務異常等)達到閾值,啟動熔斷器,熔斷器一旦啟動,則會停止調用具體的服務邏輯,通過fallback快速返回托底數據,保證服務鏈的完整。熔斷又自動恢復的機制,如:當熔斷器啟動后,每隔5秒嘗試將新的請求發送給服務端,如果服務可正常執行並返回結果,則關閉熔斷器,服務恢復。如果仍然調用失敗,則繼續返回托底數據,熔斷器處於開啟狀態。

服務降級是指調用服務出錯了返回托底數據,而熔斷則是出錯后如果開啟了熔斷器將在一定的時間內不調用服務端。

熔斷的實現是指在調用遠程服務的方法上增加@HystrixCommand的注解。通過@HystrixProperty的name屬性指定需要配置的屬性(可以是字符串,也可以使用HystrixPropertiesManager常量類的常量),通過value設置屬性的值,也可以通過setter方式設置屬性值。

 /**
  * 注解的配置意思為:當時間在20s內的10個請求中,當出現了30%(3個)的失敗,則觸發熔斷
  * @param name 參數
  * @return 結果
  */
@HystrixCommand(fallbackMethod = "sayHello1Fallback",commandProperties = {
    @HystrixProperty(name="circuitBreaker.requestVolumeThreshold",value = "10"),
    @HystrixProperty(name= HystrixPropertiesManager.EXECUTION_ISOLATION_THREAD_TIMEOUT_IN_MILLISECONDS,value = "20000"),
    @HystrixProperty(name= HystrixPropertiesManager.CIRCUIT_BREAKER_ERROR_THRESHOLD_PERCENTAGE,value = "30"),
})
@Override
public String invokeSayHello1(String name) {
    long startTime = System.currentTimeMillis();
    String result = feign1.sayHello1(name);
    logger.error("您訪問了CustomerServiceImpl#sayHello1(),執行時間為:{} 毫秒",System.currentTimeMillis() - startTime );
    return result;
}

Hystrix 常用屬性配置

3.請求緩存

請求緩存是保證在一次請求中多次調用同一個服務提供者接口,在cacheKey不變的情況下,后續調用結果都是第一次的緩存結果,而不是多次請求服務提供者,從而降低服務提供者處理重復請求的壓力。

設置請求緩存:

@Override
@CacheResult//用來標記請求命令返回的結果應該被緩存
@HystrixCommand
public User getUserById( String id) {
    return feign1.getUserById(id);
}

定義緩存key:

當使用注解來定義請求緩存時,若要為請求命令指定具體的緩存key的生成規則,可以使用@CacheResult和@CacheRemove注解的cacheKeyMethod方法指定具體的生成函數,也可以通過@CacheKey注解在方法參數中指定用於組裝緩存key的元素。

@Override
@CacheResult(cacheKeyMethod = "getUserByIdCacheKey")//用來標記請求命令返回的結果應該被緩存
@HystrixCommand
public User getUserById( String id) {
    return feign1.getUserById(id);
}

public String getUserByIdCacheKey(String id){
    return id;
}

/**
 * 第二種使用@CacheKey的方式,@CacheKey用來在請求命令的參數上標記,使其作為緩存的key值,如果沒有標記則會使用所有參數,如果 
 * 同事還使用了@CacheResult和@CacheRemove注解的cacheKeyMethod方法指定緩存Key的生成,那么該注解將不會起作用
 * 有些教程中說使用這個可以指定參數,比如:@CacheKey("id"),在我這邊會報錯:
 *  java.beans.IntrospectionException: Method not found: isId
 */
@Override
@CacheResult
@HystrixCommand
public User getUserById(@CacheKey String id) {
    return feign1.getUserById(id);
}

清理緩存:

@CacheRemove注解的commandKey屬性是必須要指定的,它用來指明需要使用請求緩存的請求命令,因為只有通過該屬性的配置,Hystrix才能找到正確的請求命令緩存位置。

@Override
@CacheRemove(commandKey = "getUserByIdCacheKey")//用來讓請求命令的緩存失效,失效的緩存根據定義的key決定
@HystrixCommand
public User removeUserById( String id) {
    return feign1.getUserById(id);
}

controller調用,注意定義是要在同一個請求中,如果是不同的請求,則沒有效果。

@RequestMapping("/getUserById")
public List<User> getUserById(String id){
    User user1 = serivce.getUserById(id);
    User user2 = serivce.getUserById(id);
    List<User> lsrUser = new ArrayList<>(2);
    lsrUser.add(user1);
    lsrUser.add(user2);
    return lsrUser;
}

4.請求合並

請求合並是指在一段時間內將所有請求合並為一個請求,以減少通信的消耗和線程數的占用,從而大大降低服務端的負載。

請求合並的缺點:

​ 在設置請求合並之后,本來一個請求可能5ms就搞定了,但是現在必須再等10ms等待其他的請求一起,這樣一個請求的耗時就從5ms增加到了15ms了。不過如果我們要發起的命令本身就是一個高延遲的命令,那么這個時候就可以使用請求合並了,因為這個時候,等待的時間消耗就顯得微不足道了。所以如果需要設置請求合並,千萬不能將等待時間設置的過大。

服務提供者的控制類:

@RestController
public class UserBatchController {

    /**
     * 請求合並的方法
     * @param ids
     * @return
     */
    @RequestMapping(value = "/getUserList", method = RequestMethod.GET)
    public List<User> getUserList(String ids) {
        System.out.println("ids===:" + ids);
        String[] split = ids.split(",");
        return Arrays.asList(split)
                .stream()
                .map(id -> new User(Integer.valueOf(id),"charon"+id,Integer.valueOf(id)*5))
                .collect(Collectors.toList());
    }

    /**
     * 請求單個user的方法
     * @param id
     * @return
     */
    @RequestMapping(value = "/getUser/{id}", method = RequestMethod.GET)
    public User getUser(@PathVariable("id") String id) {
        User user = new User(1, "Charon",15);
        return user;
    }
}

消費者feign的調用接口:

@RequestMapping(value = "/getUser",method = RequestMethod.GET)
Future<User> getUser(@RequestParam("id")Integer id);

@RequestMapping(value = "/getUserList",method = RequestMethod.GET)
List<User> getUserList (@RequestParam("ids") String ids);

消費者的service及實現類:

Future<User> getUser(Integer i);

/**
 * 表示在10s內的getUser請求將會合並到getUserList請求上,合並發出,最大的合並請求數為200
 * @param userId 用戶id
 * @return
 */
@HystrixCollapser(batchMethod = "getUserList",scope = com.netflix.hystrix.HystrixCollapser.Scope.GLOBAL,
                  collapserProperties = {
                      @HystrixProperty(name="timerDelayInMilliseconds",value="10"),
                      @HystrixProperty(name="maxRequestsInBatch",value="200")
                  }
                 )
@Override
public Future<User> getUser(Integer userId){
    Future<User> user = feign1.getUser(userId);
    return user;
}

@HystrixCommand
public List<User> getUserList(List<Integer> userIdList) {
    List<User> lstUser = feign1.getUserList(StringUtils.join(userIdList,","));
    return lstUser;
}

消費者控制類:

/**
 * 獲取單個用戶
 * @return User
 */
@RequestMapping("/getUser")
public User getUser() throws ExecutionException, InterruptedException {
    Future<User> user = serivce.getUser(1);
    System.out.println("返回的結果:"+user);
    return user.get();
}

/**
 * 獲取用戶list
 * @return list
 */
@RequestMapping("/getUserList")
public List<User> getUserList() throws ExecutionException, InterruptedException {
    Future<User> user1 = serivce.getUser(1);
    Future<User> user2= serivce.getUser(2);
    Future<User> user3= serivce.getUser(3);
    List<User> users = new ArrayList<>();
    users.add(user1.get());
    users.add(user2.get());
    users.add(user3.get());
    System.out.println("返回的結果:" + users);
    return users;
}

標注了HystrixCollapser這個注解的,這個方法永遠不會執行,當有請求來的時候,直接請求batchMethod所指定的方法。batchMethod的方法在指定延遲時間內會將所有的請求合並一起執行

5.線程池隔離

Hystrix使用艙壁模式實現線程池的隔離,它會為每一個依賴服務創建一個獨立的線程池,這樣就算某個依賴服務出現延遲過高的情況,也只是對該依賴服務的調用產生影響,而不會拖慢其他的依賴服務。

使用線程池隔離的優點:

  • 應用自身得到完全保護,不會受不可控的依賴服務影響,即便給依賴服務分配的線程池被填滿,也不會影響到其他的服務
  • 可以有效降低接入新服務的風險,如果新服務接入后運行不穩定或存在問題,完全不會影響原來的請求
  • 每個服務都是獨立的線程池,在一定程度上解決了高並發的問題
  • 由於線程池有個數限制,所以也解決了限流的問題

使用線程池隔離的缺點:

  • 增加了CPU的開銷,因為不僅有tomcat的線程池,還需要有Hystrix的線程池
  • 每個操作都是獨立的線程,就有排隊、調度和上下文切換等問題

不配置線程隔離:

@RequestMapping("/useThread")
public String useThread(){
    return serivce.useThread1() + "   " + serivce.useThread2();
}

@Override
public String useThread1() {
    String threadName = Thread.currentThread().getName();
    logger.error("使用的線程名稱為:{}",threadName);
    return "使用的線程名稱為:" + threadName;
}

@Override
public String useThread2() {
    String threadName = Thread.currentThread().getName();
    logger.error("使用的線程名稱為:{}",threadName);
    return "使用的線程名稱為:" + threadName;
}

如果不配置線程隔離,則使用的是同一個線程

如果我們給useThread1方法設置線程隔離:

 @HystrixCommand(groupKey = "useThread1",//分組,設置服務名,一個group使用一個線程
            commandKey = "useThread1",//命令名稱,默認值為當前執行的方法名稱
            threadPoolKey = "useThread1",//是配置線程池名稱,配置全局唯一標識接口線程池的名稱,相同名稱的線程池是同一個。默認值是分組名groupKey
            threadPoolProperties = {
                    @HystrixProperty(name = "coreSize", value = "30"),//線程池大小
                    @HystrixProperty(name = "maxQueueSize", value = "100"),//最大隊列長度
                    @HystrixProperty(name = "keepAliveTimeMinutes", value = "2"),//線程存活時間
                    @HystrixProperty(name = "queueSizeRejectionThreshold", value = "15")//拒絕請求
            })

使用了線程池隔離之后,可以看到兩個請求使用的不通的線程池。

6.信號量隔離

信號量隔離是指在規定時間內只允許指定數量的信號量進行服務訪問,其他得不到信號量的線程進入fallback,訪問結束后,歸還信號量。說白了就是做了一個限流。

@RequestMapping("/semaphore")
public String semaphore(){
    for (int i = 0; i < 15; i++) {
        new Thread(new Runnable() {
            @Override
            public void run() {
                serivce.semaphore();
            }
        }).start();
    }
    return "OK";
}

@HystrixCommand(fallbackMethod = "semaphoreFallback",commandProperties = {
    @HystrixProperty(name="execution.isolation.strategy",value="SEMAPHORE"), //使用信號量隔離,默認為THREAD
    @HystrixProperty(name="execution.isolation.semaphore.maxConcurrentRequests",value="10"), // 信號量最大並發度
})
@Override
public void semaphore() {
    try {
        Thread.sleep(900);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
    logger.error("正常執行方法");
}

public void semaphoreFallback(){
    logger.error("執行了fallback方法");
}

如下圖所示,有10個線程拿到了信號量,執行了正常的方法,有5個線程沒有拿到信號量,直接調用fallback方法。

原理分析

上一篇文章說過openFeign主要是通過jdk的動態代理構建對象,所以Hystrix集成到feign當中也是使用的jdk動態代理的invocationHandler上,那么來看看Hystrix實現的jdk的動態代理類--HystrixInvocationHandler吧!

invoke方法:

@Override
public Object invoke(final Object proxy, final Method method, final Object[] args)
    throws Throwable {
  // 如果調用的方法來自 java.lang.Object 則提前退出代碼與 ReflectiveFeign.FeignInvocationHandler 相同
  // ...

  HystrixCommand<Object> hystrixCommand =
      new HystrixCommand<Object>(setterMethodMap.get(method)) {
        @Override
        protected Object run() throws Exception {
          try {
            // 獲取並調用MethodHandler,MethodHandler封裝了Http請求,ribbon也在這里被集成
            return HystrixInvocationHandler.this.dispatch.get(method).invoke(args);
          } catch (Exception e) {
            throw e;
          } catch (Throwable t) {
            throw (Error) t;
          }
        }

        // fallback的降級方法
        @Override
        protected Object getFallback() {
          if (fallbackFactory == null) {
            return super.getFallback();
          }
          try {
            Object fallback = fallbackFactory.create(getExecutionException());
            Object result = fallbackMethodMap.get(method).invoke(fallback, args);
            if (isReturnsHystrixCommand(method)) {
              return ((HystrixCommand) result).execute();
            } else if (isReturnsObservable(method)) {
              // Create a cold Observable
              return ((Observable) result).toBlocking().first();
            } else if (isReturnsSingle(method)) {
              // Create a cold Observable as a Single
              return ((Single) result).toObservable().toBlocking().first();
            } else if (isReturnsCompletable(method)) {
              ((Completable) result).await();
              return null;
            } else if (isReturnsCompletableFuture(method)) {
              return ((Future) result).get();
            } else {
              return result;
            }
          } catch (IllegalAccessException e) {
            // shouldn't happen as method is public due to being an interface
            throw new AssertionError(e);
          } catch (InvocationTargetException | ExecutionException e) {
            // Exceptions on fallback are tossed by Hystrix
            throw new AssertionError(e.getCause());
          } catch (InterruptedException e) {
            // Exceptions on fallback are tossed by Hystrix
            Thread.currentThread().interrupt();
            throw new AssertionError(e.getCause());
          }
        }
      };

  if (Util.isDefault(method)) {
    return hystrixCommand.execute();
  } else if (isReturnsHystrixCommand(method)) {
    return hystrixCommand;
  } else if (isReturnsObservable(method)) {
    // Create a cold Observable
    return hystrixCommand.toObservable();
  } else if (isReturnsSingle(method)) {
    // Create a cold Observable as a Single
    return hystrixCommand.toObservable().toSingle();
  } else if (isReturnsCompletable(method)) {
    return hystrixCommand.toObservable().toCompletable();
  } else if (isReturnsCompletableFuture(method)) {
    return new ObservableCompletableFuture<>(hystrixCommand);
  }
  return hystrixCommand.execute();
}

首先創建一個HystrixCommand,用來表示對依賴服務的操作請求,同時傳遞所有需要的參數,從命名中可以知道才用了“命令模式”來實現對服務調用操作的封裝。

命令模式:是指將來自客戶端的請求封裝成一個對象,從而讓調用者使用不同的請求對服務提供者進行參數化。

上面的兩種命令模式一共有4種命令的執行方式,Hystrix在執行的時候會根據創建的Command對象以及具體的情況來選擇一個執行。

  • execute() 方法 :同步執行,從依賴的服務返回一個單一的結果對象,或是在發生錯誤時拋出異常
  • queue() 方法 :異步執行,直接返回一個Future對象,其中包含了服務執行結束時要返回的單一結果對象
  • observe()方法:返回Observable對象,它代表了操作的多個結果,是一個Hot observable
  • toObservable()方法:同樣返回一個Observable對象,也表示了操作的多個結果,但它返回的是一個Cold Observable

接下來首先來看看HystrixCommand#execute()方法:

public R execute() {
    try {
        // queue()返回一個Future,get()同步等待執行結束,然后獲取異步的結果。
        return queue().get();
    } catch (Exception e) {
        throw Exceptions.sneakyThrow(decomposeException(e));
    }
}

跟進queue()方法:

public Future<R> queue() {
     // 通過toObservable()獲得一個Cold Observable,
     // 並且通過toBlocking()將該Observable轉換成BlockingObservable,可以把數據以阻塞的方式發射出來
     // toFuture()則是把BlockingObservable轉換成一個Future
     final Future<R> delegate = toObservable().toBlocking().toFuture();
  
     final Future<R> f = new Future<R>() {
		// future實現,調用delegate的對應實現
     }
     return f;
 }

在queue()中調用了核心方法--toObservable()方法,

public Observable<R> toObservable() {
    final AbstractCommand<R> _cmd = this;
    // ...
    final Func0<Observable<R>> applyHystrixSemantics = new Func0<Observable<R>>() {
        @Override
        public Observable<R> call() {
            if (commandState.get().equals(CommandState.UNSUBSCRIBED)) {
                return Observable.never();
            }
            return applyHystrixSemantics(_cmd);
        }
    };

    final Func1<R, R> wrapWithAllOnNextHooks = new Func1<R, R>() {
        @Override
        public R call(R r) {
            R afterFirstApplication = r;

            try {
                afterFirstApplication = executionHook.onComplete(_cmd, r);
            } catch (Throwable hookEx) {
                logger.warn("Error calling HystrixCommandExecutionHook.onComplete", hookEx);
            }

            try {
                return executionHook.onEmit(_cmd, afterFirstApplication);
            } catch (Throwable hookEx) {
                logger.warn("Error calling HystrixCommandExecutionHook.onEmit", hookEx);
                return afterFirstApplication;
            }
        }
    };

    final Action0 fireOnCompletedHook = new Action0() {
        @Override
        public void call() {
            try {
                executionHook.onSuccess(_cmd);
            } catch (Throwable hookEx) {
                logger.warn("Error calling HystrixCommandExecutionHook.onSuccess", hookEx);
            }
        }
    };

    return Observable.defer(new Func0<Observable<R>>() {
        @Override
        public Observable<R> call() {
             /* this is a stateful object so can only be used once */
            if (!commandState.compareAndSet(CommandState.NOT_STARTED, CommandState.OBSERVABLE_CHAIN_CREATED)) {
                IllegalStateException ex = new IllegalStateException("This instance can only be executed once. Please instantiate a new instance.");
                //TODO make a new error type for this
                throw new HystrixRuntimeException(FailureType.BAD_REQUEST_EXCEPTION, _cmd.getClass(), getLogMessagePrefix() + " command executed multiple times - this is not permitted.", ex, null);
            }

            commandStartTimestamp = System.currentTimeMillis();

            if (properties.requestLogEnabled().get()) {
                // log this command execution regardless of what happened
                if (currentRequestLog != null) {
                    currentRequestLog.addExecutedCommand(_cmd);
                }
            }

            final boolean requestCacheEnabled = isRequestCachingEnabled();
            final String cacheKey = getCacheKey();

            // 先從緩存中獲取如果有的話直接返回
            if (requestCacheEnabled) {
                HystrixCommandResponseFromCache<R> fromCache = (HystrixCommandResponseFromCache<R>) requestCache.get(cacheKey);
                if (fromCache != null) {
                    isResponseFromCache = true;
                    return handleRequestCacheHitAndEmitValues(fromCache, _cmd);
                }
            }

            Observable<R> hystrixObservable =
                    Observable.defer(applyHystrixSemantics)
                            .map(wrapWithAllOnNextHooks);

            Observable<R> afterCache;

            // put in cache
            if (requestCacheEnabled && cacheKey != null) {
                // 里面訂閱了,所以開始執行hystrixObservable
                HystrixCachedObservable<R> toCache = HystrixCachedObservable.from(hystrixObservable, _cmd);
                HystrixCommandResponseFromCache<R> fromCache = (HystrixCommandResponseFromCache<R>) requestCache.putIfAbsent(cacheKey, toCache);
                if (fromCache != null) {
                    // another thread beat us so we'll use the cached value instead
                    toCache.unsubscribe();
                    isResponseFromCache = true;
                    return handleRequestCacheHitAndEmitValues(fromCache, _cmd);
                } else {
                    // we just created an ObservableCommand so we cast and return it
                    afterCache = toCache.toObservable();
                }
            } else {
                afterCache = hystrixObservable;
            }

            return afterCache
                    .doOnTerminate(terminateCommandCleanup)     // perform cleanup once (either on normal terminal state (this line), or unsubscribe (next line))
                    .doOnUnsubscribe(unsubscribeCommandCleanup) // perform cleanup once
                    .doOnCompleted(fireOnCompletedHook);
        }
    });
}

這個方法非常長,首先看看applyHystrixSemantics()方法:

private Observable<R> applyHystrixSemantics(final AbstractCommand<R> _cmd) {
    executionHook.onStart(_cmd);

    // 判斷是否開啟斷路器
    if (circuitBreaker.allowRequest()) {
        // 斷路器是關閉的,則檢查識都有可用的資源來執行命令
        // 獲取信號量實例
        final TryableSemaphore executionSemaphore = getExecutionSemaphore();
        final AtomicBoolean semaphoreHasBeenReleased = new AtomicBoolean(false);
        final Action0 singleSemaphoreRelease = new Action0() {
            @Override
            public void call() {
                if (semaphoreHasBeenReleased.compareAndSet(false, true)) {
                    // 釋放信號量
                    executionSemaphore.release();
                }
            }
        };

        final Action1<Throwable> markExceptionThrown = new Action1<Throwable>() {
            @Override
            public void call(Throwable t) {
                eventNotifier.markEvent(HystrixEventType.EXCEPTION_THROWN, commandKey);
            }
        };
		// 嘗試獲取信號量
        if (executionSemaphore.tryAcquire()) {
            try {
                // 執行業務
                executionResult = executionResult.setInvocationStartTime(System.currentTimeMillis());
                return executeCommandAndObserve(_cmd)
                        .doOnError(markExceptionThrown)
                        .doOnTerminate(singleSemaphoreRelease)
                        .doOnUnsubscribe(singleSemaphoreRelease);
            } catch (RuntimeException e) {
                return Observable.error(e);
            }
        } else {
            // 信號量獲取失敗,走fallback
            return handleSemaphoreRejectionViaFallback();
        }
    } else {
        // 斷路器是打開的,快速熔斷,走fallback
        return handleShortCircuitViaFallback();
    }
}

applyHystrixSemantics()通過熔斷器的allowRequest()方法判斷是否需要快速失敗走fallback,如果允許執行那么又會經過一層信號量的控制,都通過才會走execute。

所以,核心邏輯就落到了HystrixCircuitBreaker#allowRequest()方法上:

public boolean allowRequest() { 
    // 強制開啟熔斷 
    if (properties.circuitBreakerForceOpen().get()) { 
        return false; 
    }
    // 強制關閉熔斷 
    if (properties.circuitBreakerForceClosed().get()) { 
        isOpen(); 
        return true; 
    }
    // 判斷和計算當前斷路器是否打開 或者 允許單個測試 ,通過這兩個方法的配合,實現了斷路器的打開和關閉狀態的切換
    return !isOpen() || allowSingleTest();
}

Hystrix允許強制開啟或者關閉熔斷,如果不想有請求執行就開啟,如果覺得可以忽略所有錯誤就關閉。在沒有強制開關的情況下,主要就是判斷當前熔斷是否開啟。另外,在熔斷器開啟的情況下,會在一定時間后允許發出一個測試的請求,來判斷是否開啟熔斷器。

首先來看看isOpen()方法:

public boolean isOpen() {
    if (circuitOpen.get()) {
        // 開關是開啟的,直接返回
        return true;
    }

    // 開關未開啟,獲取健康統計
    HealthCounts health = metrics.getHealthCounts();

    // 總請求數太小的情況,不開啟熔斷
    if (health.getTotalRequests() < properties.circuitBreakerRequestVolumeThreshold().get()) {
        return false;
    }
    // 總請求數夠了,失敗率比較小的情況,不開啟熔斷
    if (health.getErrorPercentage() < properties.circuitBreakerErrorThresholdPercentage().get()) {
        return false;
    } else {
       // 總請求數和失敗率都比較大的時候,設置開關為開啟,進行熔斷
        if (circuitOpen.compareAndSet(false, true)) {
            circuitOpenedOrLastTestedTime.set(System.currentTimeMillis());
            return true;
        } else {
            return true;
        }
    }
}

總體邏輯就是判斷一個失敗次數是否達到開啟熔斷的條件,如果達到那么設置開啟的開關。在熔斷一直開啟的情況下,偶爾會放過一個測試請求來判斷是否關閉。

下面看看allowSingleTest()方法:

public boolean allowSingleTest() {
    // 獲取熔斷開啟時間,或者上一次的測試時間
    long timeCircuitOpenedOrWasLastTested = circuitOpenedOrLastTestedTime.get();
    // 如果熔斷處於開啟狀態,且當前時間距離熔斷開啟時間或者上一次執行測試請求時間已經到了
    if (circuitOpen.get() && System.currentTimeMillis() > timeCircuitOpenedOrWasLastTested + properties.circuitBreakerSleepWindowInMilliseconds().get()) {
        // 使用cas機制控制熔斷的開啟
        if (circuitOpenedOrLastTestedTime.compareAndSet(timeCircuitOpenedOrWasLastTested, System.currentTimeMillis())) {
            return true;
        }
    }
    return false;
}

回到applyHystrixSemantics()這個方法中,獲取到信號量之后,執行業務的方法,在executeCommandAndObserve()中進行了一些超時及失敗的邏輯處理之后,進入HystrixCommand#executeCommandWithSpecifiedIsolation()中:

private Observable<R> executeCommandAndObserve(final AbstractCommand<R> _cmd) {
    final HystrixRequestContext currentRequestContext = HystrixRequestContext.getContextForCurrentThread();
    // ...
    Observable<R> execution;
    // 判斷是否開啟超時設置
    if (properties.executionTimeoutEnabled().get()) {
        execution = executeCommandWithSpecifiedIsolation(_cmd)
                .lift(new HystrixObservableTimeoutOperator<R>(_cmd));
    } else {
        execution = executeCommandWithSpecifiedIsolation(_cmd);
    }

    return execution.doOnNext(markEmits)
            .doOnCompleted(markOnCompleted)
            .onErrorResumeNext(handleFallback)
            .doOnEach(setRequestContext);
}

在executeCommandWithSpecifiedIsolation(),先判斷是否進行線程隔離,及一些狀態變化之后,進入getUserExecutionObservable():

private Observable<R> executeCommandWithSpecifiedIsolation(final AbstractCommand<R> _cmd) {
    // 線程隔離
    if (properties.executionIsolationStrategy().get() == ExecutionIsolationStrategy.THREAD) {
        return Observable.defer(new Func0<Observable<R>>() {
            @Override
            public Observable<R> call() {
                executionResult = executionResult.setExecutionOccurred();
                // 狀態校驗
                if (!commandState.compareAndSet(CommandState.OBSERVABLE_CHAIN_CREATED, CommandState.USER_CODE_EXECUTED)) {
                    return Observable.error(new IllegalStateException("execution attempted while in state : " + commandState.get().name()));
                }
			  // 同級標記命令
                metrics.markCommandStart(commandKey, threadPoolKey, ExecutionIsolationStrategy.THREAD);

                if (isCommandTimedOut.get() == TimedOutStatus.TIMED_OUT) {
                    // 該命令在包裝線程中超時,立即返回
                    return Observable.error(new RuntimeException("timed out before executing run()"));
                }
                if (threadState.compareAndSet(ThreadState.NOT_USING_THREAD, ThreadState.STARTED)) {
                    //we have not been unsubscribed, so should proceed
                    HystrixCounters.incrementGlobalConcurrentThreads();
                    threadPool.markThreadExecution();
                    // store the command that is being run
                    endCurrentThreadExecutingCommand = Hystrix.startCurrentThreadExecutingCommand(getCommandKey());
                    executionResult = executionResult.setExecutedInThread();
                    /**
                     * If any of these hooks throw an exception, then it appears as if the actual execution threw an error
                     */
                    try {
                        executionHook.onThreadStart(_cmd);
                        executionHook.onRunStart(_cmd);
                        executionHook.onExecutionStart(_cmd);
                        return getUserExecutionObservable(_cmd);
                    } catch (Throwable ex) {
                        return Observable.error(ex);
                    }
                } else {
                    //command has already been unsubscribed, so return immediately
                    return Observable.error(new RuntimeException("unsubscribed before executing run()"));
                }
            }
        }).doOnTerminate(new Action0() {
            @Override
            public void call() {
                if (threadState.compareAndSet(ThreadState.STARTED, ThreadState.TERMINAL)) {
                    handleThreadEnd(_cmd);
                }
                if (threadState.compareAndSet(ThreadState.NOT_USING_THREAD, ThreadState.TERMINAL)) {
                    //if it was never started and received terminal, then no need to clean up (I don't think this is possible)
                }
                //if it was unsubscribed, then other cleanup handled it
            }
        }).doOnUnsubscribe(new Action0() {
            @Override
            public void call() {
                if (threadState.compareAndSet(ThreadState.STARTED, ThreadState.UNSUBSCRIBED)) {
                    handleThreadEnd(_cmd);
                }
                if (threadState.compareAndSet(ThreadState.NOT_USING_THREAD, ThreadState.UNSUBSCRIBED)) {
                    //if it was never started and was cancelled, then no need to clean up
                }
                //if it was terminal, then other cleanup handled it
            }
        }).subscribeOn(threadPool.getScheduler(new Func0<Boolean>() {
            @Override
            public Boolean call() {
                return properties.executionIsolationThreadInterruptOnTimeout().get() && _cmd.isCommandTimedOut.get() == TimedOutStatus.TIMED_OUT;
            }
        }));
    } else {
        return Observable.defer(new Func0<Observable<R>>() {
            @Override
            public Observable<R> call() {
                executionResult = executionResult.setExecutionOccurred();
                if (!commandState.compareAndSet(CommandState.OBSERVABLE_CHAIN_CREATED, CommandState.USER_CODE_EXECUTED)) {
                    return Observable.error(new IllegalStateException("execution attempted while in state : " + commandState.get().name()));
                }

                metrics.markCommandStart(commandKey, threadPoolKey, ExecutionIsolationStrategy.SEMAPHORE);
                // semaphore isolated
                // store the command that is being run
                endCurrentThreadExecutingCommand = Hystrix.startCurrentThreadExecutingCommand(getCommandKey());
                try {
                    executionHook.onRunStart(_cmd);
                    executionHook.onExecutionStart(_cmd);
                    return getUserExecutionObservable(_cmd); 
                } catch (Throwable ex) {
                    //If the above hooks throw, then use that as the result of the run method
                    return Observable.error(ex);
                }
            }
        });
    }
}

在getUserExecutionObservable()和getExecutionObservable()中,主要是封裝用戶定義的run方法:

private Observable<R> getUserExecutionObservable(final AbstractCommand<R> _cmd) {
    Observable<R> userObservable;

    try {
        // 獲取用戶定義邏輯的Observable
        userObservable = getExecutionObservable();
    } catch (Throwable ex) {
        // the run() method is a user provided implementation so can throw instead of using Observable.onError
        // so we catch it here and turn it into Observable.error
        userObservable = Observable.error(ex);
    }

    return userObservable
            .lift(new ExecutionHookApplication(_cmd))
            .lift(new DeprecatedOnRunHookApplication(_cmd));
}

HystrixCommand#getExecutionObservable():

@Override
final protected Observable<R> getExecutionObservable() {
    return Observable.defer(new Func0<Observable<R>>() {
        @Override
        public Observable<R> call() {
            try {
                // 包裝定義的run方法
                return Observable.just(run());
            } catch (Throwable ex) {
                return Observable.error(ex);
            }
        }
    }).doOnSubscribe(new Action0() {
        @Override
        public void call() {
            // Save thread on which we get subscribed so that we can interrupt it later if needed
            executionThread.set(Thread.currentThread());
        }
    });
}

到這里,hystris分析就結束了。hystrix其實就是在feign的調用過程插了一腳,通過對請求的成功失敗的統計數據來開關是否進行熔斷。又在每個時間窗口內發送一個測試請求出去,來判斷是否關閉熔斷。總得來說還是很清晰實用的。

參考文章:

翟永超老師的《Spring Cloud微服務實戰》

https://zhuanlan.zhihu.com/p/114942145


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM