在微服務架構中,我們將系統拆分為很多個服務,各個服務之間通過注冊與訂閱的方式相互依賴,由於各個服務都是在各自的進程中運行,就有可能由於網絡原因或者服務自身的問題導致調用故障或延遲,隨着服務的積壓,可能會導致服務崩潰。為了解決這一系列的問題,斷路器等一系列服務保護機制出現了。
斷路器本身是一種開關保護機制,用於在電路上保護線路過載,當線路中有電器發生短路時,斷路器能夠及時切斷故障電路,防止發生過載、發熱甚至起火等嚴重后果。
在分布式架構中,斷路器模式的作用也是類似的。
針對上述問題,Spring Cloud Hystrix 實現了斷路器、線路隔離等一系列服務保護功能。它也是基於 Netflix 的開源框架 Hystrix 實現的,該框架的目標在於通過控制那些訪問遠程系統、服務和第三方庫的節點,從而對延遲和故障提供更強大的容錯能力。Hystrix 具備服務降級、服務熔斷、線程和信號隔離、請求緩存、請求合並以及服務監控等強大功能。
快速入門
在開始實現斷路器之前,先用之前實現的一些內容作為基礎,構建一個如下圖所示的服務調用關系。

需要啟動的工程有如下一些:
- eureka-server 工程:服務注冊中心,端口為8082。
- hello-service 工程:HELLO-SERVICE 的服務單元,兩個實例啟動端口分別為 2221 和 2222.
- ribbon-consumer 工程:使用 Ribbon 實現的服務消費者,端口為 3333
在未加入斷路器之前,關閉8081的實例,發送 GET 請求到 http://localhost:3333/ribbon-consumer ,可以獲取下面的輸入。

下面引入 Spring Cloud Hystrix。
- 在 ribbon-consumer 工程的 pom.xml 的 dependency 節點中引入 spring-cloud-starter-hystrix 依賴:
<dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-hystrix</artifactId> </dependency>
- 在 ribbon-consumer 工程的主類上使用 @EnableCircuitBreaker 注解開啟斷路器功能:
package com.example.demo; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.cloud.client.circuitbreaker.EnableCircuitBreaker; import org.springframework.cloud.client.discovery.EnableDiscoveryClient; import org.springframework.cloud.client.loadbalancer.LoadBalanced; import org.springframework.context.annotation.Bean; import org.springframework.web.client.RestTemplate; @EnableCircuitBreaker @EnableDiscoveryClient @SpringBootApplication public class DemoApplication { @Bean @LoadBalanced RestTemplate restTemplate(){ return new RestTemplate(); } public static void main(String[] args) { SpringApplication.run(DemoApplication.class, args); } }
注:此處還可以使用 Spring Cloud 應用中的 @SpringCloudApplication 注解來修飾主類,該注解的具體定義如下。可以看到,該注解中包含了上述所引用的三個注解,這意味着一個 Spring Cloud 標准應用應包含服務發現以及斷路器。
// // Source code recreated from a .class file by IntelliJ IDEA // (powered by Fernflower decompiler) // package org.springframework.cloud.client; import java.lang.annotation.Documented; import java.lang.annotation.ElementType; import java.lang.annotation.Inherited; import java.lang.annotation.Retention; import java.lang.annotation.RetentionPolicy; import java.lang.annotation.Target; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.cloud.client.circuitbreaker.EnableCircuitBreaker; import org.springframework.cloud.client.discovery.EnableDiscoveryClient; @Target({ElementType.TYPE}) @Retention(RetentionPolicy.RUNTIME) @Documented @Inherited @SpringBootApplication @EnableDiscoveryClient @EnableCircuitBreaker public @interface SpringCloudApplication { }
- 改造服務消費方式,新增 HelloService 類,注入 RestTemplate 實例。然后,將在 ConsumerController 中對 RestTemplate 的使用遷移到 helloService 函數中,最后,在 helloService 函數上增加 @HystrixCommand 注解來指定回調方法。
package com.example.demo.web; import com.netflix.hystrix.contrib.javanica.annotation.HystrixCommand; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RequestMethod; import org.springframework.web.client.RestTemplate; /** * @author lxx * @version V1.0.0 * @date 2017-8-16 */ @Service public class HelloService { @Autowired RestTemplate restTemplate; @HystrixCommand(fallbackMethod = "helloFallback") public String helloService(){ return restTemplate.getForEntity("http://hello-service/index", String.class).getBody(); } public String helloFallback(){ return "error"; } }
- 修改 ConsumerController 類, 注入上面實現的 HelloService 實例,並在 helloConsumer 中進行調用:
package com.example.demo.web; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RequestMethod; import org.springframework.web.bind.annotation.RestController; import org.springframework.web.client.RestTemplate; /** * @author lxx * @version V1.0.0 * @date 2017-8-9 */ @RestController public class ConsumerController { @Autowired HelloService helloService; @RequestMapping(value = "ribbon-consumer", method = RequestMethod.GET) public String helloConsumer(){ return helloService.helloService(); } }
下面,對斷路器實現的服務回調邏輯進行驗證,重新啟動之前關閉的 2221 端口的 hello-service,確保此時服務注冊中心、兩個 hello-service 和 ribbon-consumer 均已啟動,再次訪問 http://localhost:3333/ribbon-consumer 可以輪詢兩個 hello-serive 並返回一些文字信息。此時斷開其中任意一個端口的 hello-service,再次訪問,當輪詢到關閉的端口服務時,輸出內容為 error ,不再是之前的提示信息。

除了通過斷開具體的服務實例來模擬某個節點無法訪問的情況之外,還可以模擬一下服務阻塞(長時間未響應)的情況。下面對hello-serive 的 /index 接口做一些修改,具體如下:
package com.example.demo.web; import org.apache.log4j.Logger; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.cloud.client.ServiceInstance; import org.springframework.cloud.client.discovery.DiscoveryClient; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController; import java.util.Random; /** * @author lxx * @version V1.0.0 * @date 2017-8-9 */ @RestController public class HelloController { private final Logger logger = Logger.getLogger(getClass()); @Autowired private DiscoveryClient client; @RequestMapping(value = "/index") public String index(){ ServiceInstance instance = client.getLocalServiceInstance(); // 讓處理線程等待幾秒鍾 int sleepTime = new Random().nextInt(3000); logger.info("sleepTime:"+sleepTime); try { Thread.sleep(sleepTime); } catch (InterruptedException e) { e.printStackTrace(); } logger.info("/hello:host:"+instance.getHost()+" port:"+instance.getPort() +" service_id:"+instance.getServiceId()); return "hello world!"; } }
通過Thread.sleep 函數可讓 /index 接口的處理線程不是馬上返回內容,而是在阻塞幾秒后才返回內容。由於 Hystrix 默認超時時間為 2000 毫秒,所以這里采用了 0 至 3000 的隨機數以讓處理過程有一定概率發生超時來觸發斷路器。為了更精確的觀察斷路器的觸發,在消費者調用函數中做一些時間記錄,具體如下:
package com.example.demo.web; import com.netflix.hystrix.contrib.javanica.annotation.HystrixCommand; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RequestMethod; import org.springframework.web.client.RestTemplate; /** * @author lxx * @version V1.0.0 * @date 2017-8-16 */ @Service public class HelloService { @Autowired RestTemplate restTemplate; @HystrixCommand(fallbackMethod = "helloFallback") public String helloService(){ long beginTime = System.currentTimeMillis(); String body = restTemplate.getForEntity("http://hello-service/index", String.class).getBody(); long endTime = System.currentTimeMillis(); System.out.println("Spend Time : "+ (endTime - beginTime)); return body; } public String helloFallback(){ return "error"; } }
原理分析
工作流程
- 創建 HystrixCommand 或 HystrixObservableCommand 對象
首先,創建一個 HystrixCommand 或 HystrixObservableCommand 對象,用來表示對依賴服務的操作請求,同時傳遞所有需要的參數。從其命名中我們就能知道它采用了“命令模式” 來實現服務調用操作的封裝。而這兩個 Command 對象分別針對不同的應用場景。
- HystrixCommand :用在依賴的服務返回單個操作結果的時候。
- HystrixObservableCommand :用在依賴的服務返回多個操作結果的時候。
命令模式,將來自客戶端的請求封裝成一個對象,從而讓你可以使用不同的請求對客戶端進行參數化。它可以被用於實現“行為請求者” 與 “行為實現者” 的解耦,以便使兩者可以適應變化。下面的示例是對命令模式的簡單實現:
package com.example.demo.command; /** * @author lxx * @version V1.0.0 * @date 2017-8-16 */ // 接收者 public class Receiver { public void active(){ //真正的業務邏輯 System.out.println("測試命令模式"); } }
package com.example.demo.command; /** * @author lxx * @version V1.0.0 * @date 2017-8-16 */ //抽象命令 public interface Command { void excute(); }
package com.example.demo.command; import org.springframework.beans.factory.annotation.Autowired; /** * @author lxx * @version V1.0.0 * @date 2017-8-16 */ //具體命令實現 public class CommandImpl implements Command { private Receiver receiver; public CommandImpl(Receiver receiver) { this.receiver = receiver; } @Override public void excute() { this.receiver.active(); } }
package com.example.demo.command; /** * @author lxx * @version V1.0.0 * @date 2017-8-16 */ //客戶端調用 public class Invoker { private Command command; public void setCommand(Command command) { this.command = command; } public void active (){ command.excute(); } }
package com.example.demo.command; /** * @author lxx * @version V1.0.0 * @date 2017-8-16 */ public class Client { public static void main(String[] args) { Receiver receiver = new Receiver(); Command command = new CommandImpl(receiver); Invoker invoker = new Invoker(); invoker.setCommand(command); invoker.active(); //客戶端通過調用者來執行命令 } }
從代碼中,可以看到這樣幾個對象。
- Receiver:接收者,它知道如何處理具體的業務邏輯。
- Command:抽象命令,它定義了一個命令對象應具備的一系列命令操作,比如 execute 等。當命令操作被調用的時候就會觸發接收者去做具體命令對應的業務邏輯。
- CommandImpl:具體的命令實現,在這里它綁定了命令操作與接收者之間的關系,execute 命令的實現委托給了 Receiver 的 action 函數。
- Invoker:調用者,它持有一個命令對象,並且可以在需要的時候通過命令對象完成具體的業務邏輯。
從上面的示例中,我們可以看到,調用者 Invoker 與操作者 Receiver 通過 Command 命令接口實現了解耦。對於調用者來說,我們可以為其注入多個命令操作,調用者只需在需要的時候直接調用即可,而不需要知道這些操作命令實際是如何實現的。而在這里所提到的 HystrixCommand 和 HystrixObservableCommand 則是在 Hystrix 中對 Command 的進一步抽象定義。
2. 命令執行
命令執行方式一共有4種,而 Hystrix 在執行時會根據創建的Command對象以及具體的情況來選擇一種執行。其中 HystrixCommand 實現了下面兩個執行方式。
- execute():同步執行,從依賴的服務返回一個單一的結果對象,或是在發生錯誤的時候拋出異常。
- queue():異步執行,直接返回一個 Future 對象,其中包含了服務執行結束時要返回的單一結果對象。
R execute();
Future<R> queue();
而 HystrixObservableCommand 實現了另外兩種執行方式。
- observe():返回 Observable 對象,它代表了操作的多個結果,它是一個 HotObservable。
- toObservable():同樣返回 Observable 對象,也代表了操作的多個結果,但它返回的是一個 Cold Observable。
Observable<R> observe();
Observable<R> toObservable();
在 Hystrix 的底層實現中大量使用了 RxJava ,為了更容易的理解后續內容,在這里對 RxJava 的觀察者-訂閱者模式做一個簡單的入門介紹。
上面提到的 Observable 對象就是 RxJava 中的核心內容之一,可以理解為 “事件源” 或者 “被觀察者”,與其對應的 Subscriber 對象,可以理解為 “訂閱者” 或者 “觀察者”。這兩個對象是 RxJava 響應式編程的重要組成部分。
- Observable 用來向訂閱者 Subscriber 對象發布事件,Subscriber 對象則在接收到事件后對其進行處理,而在這里所指的事件通常就是對依賴服務的調用。
- 一個 Observable 可以發出多個事件,知道結束或者發生異常。
- Observable 對象每發出一個事件,就會調用對應觀察者 Subscriber 對象的 onNext() 方法。
- 每一個 Observable 的執行,最后一定會通過調用 Subscriber.onCompleted() 或者 Subscriber.onError() 來結束該事件的操作流。
下面通過一個簡單的例子來直觀理解一下 Observable 與 Subscribers:
package com.example.demo.Observable_Subsciber; import rx.Observable; import rx.Subscriber; /** * @author lxx * @version V1.0.0 * @date 2017-8-16 */ public class Obs_Subs { public static void main(String[] args) { //創建事件源 Observable<String> observable = Observable.create(new Observable.OnSubscribe<String>() { @Override public void call(Subscriber<? super String> subscriber) { subscriber.onNext("Hello RxJava "); subscriber.onNext("I'm XX"); subscriber.onCompleted(); } }); //創建訂閱者 Subscriber<String> subscriber = new Subscriber<String>() { @Override public void onCompleted() { } @Override public void onError(Throwable throwable) { } @Override public void onNext(String s) { } }; observable.subscribe(subscriber); } }
在該示例中,創建了一個簡單的事件源 observable,一個對事件傳遞內容輸出的訂閱者 subscriber ,通過 observable.subscribe(subscriber) 來觸發事件的發布。
在這里我們對於事件源 observable 提到了兩個不同的概念:Hot Observable 和 Cold Observable ,分別對應了上面的 command.observe() 和 command.toObservable() 的返回對象。其中 HotObservable,不論 “事件源” 是否有 “訂閱者” ,都會在創建后對事件進行發布,所以對於 Hot Observable 的每一個 “訂閱者” 都有可能是從 “事件源” 的中途開始的,並可能只是看到了整個操作的局部過程。而 Cold Observable 在沒有 “訂閱者” 的時候並不會發布事件,而是進行等待,直到有 “訂閱者” 之后才發布事件,所以對於 Cold Observable 的訂閱者,它可以保證從一開始看到整個操作的全部過程。
3. 結果是否被緩存
若當前命令的請求緩存功能是被啟用的,並且該命令緩存命中,那么緩存的結果會立即以 Observable 對象的形式返回。
4. 斷路器是否打開
在命令結果沒有緩存命中的時候,Hystrix 在執行命令前需要檢查斷路器是否為打開狀態:
- 打開:Hystrix不執行命令,轉到 fallback 處理邏輯(對應下面第8步)。
- 關閉:Hystrix 跳到第5步,檢查是否有可用資源來執行命令。
5. 線程池 / 請求隊列 / 信息量是否占滿
如果與命令相關的線程池 / 請求隊列 / 信息量已經占滿,那么 Hystrix 不會執行命令,跳轉到 fallback 處理邏輯(對應下面第8步)。
注意:此處的線程池並非容器的線程池,而是每個依賴服務的專有線程池。Hystrix 為了保證不會因為某個依賴服務的問題影響到其他依賴服務而采用了 “艙壁模式” 來隔離每個依賴的服務。
6. HystrixObservableCommand.construct() 或 HystrixCommand.run()
Hystrix 會根據編寫的方法來決定采取什么樣的方式去請求依賴服務。
- HystrixCommand.run() :返回一個單一的結果,或者拋出異常。
- HystrixObservableCommand.construct():返回一個 Observable 對象來發射多個結果,或通過 onError 發送錯誤通知。
如果 run() 或 construct() 方法的執行時間超過了命令設置的超時閾值,當前處理線程會拋出 TimeoutException。這種情況下,也會跳轉到 fallback 處理邏輯(第8步)。
7. 計算斷路器的健康度
Hystrix 會將 “成功”、“失敗”、“拒絕”、“超時” 等信息報告給斷路器,而斷路器會維護一組計數器來統計這些數據。
斷路器會使用這些統計數據來決定是否要將斷路器打開,來對某個依賴服務的請求進行 “熔斷 / 短路”,直到恢復期結束。
8. fallback 處理
當命令執行失敗的時候,Hystrix 會進入 fallback 嘗試回退處理,我們通常也稱為 “服務降級”。下面就是能夠引發服務降級處理的幾種情況:
- 第4步,當前命令處於 “熔斷 / 短路” 狀態,斷路器是打開的時候。
- 第5步,當前命令的線程池、請求隊列或者信號量被占滿的時候。
- 第6步,HystrixObservableCommand.construct() 或者 HystrixCommand.run() 拋出異常的時候。
9、返回成功的響應
斷路器原理
HystrixCircuitBreaker 的定義:
// // Source code recreated from a .class file by IntelliJ IDEA // (powered by Fernflower decompiler) // package com.netflix.hystrix; import com.netflix.hystrix.HystrixCommandGroupKey; import com.netflix.hystrix.HystrixCommandKey; import com.netflix.hystrix.HystrixCommandMetrics; import com.netflix.hystrix.HystrixCommandProperties; import com.netflix.hystrix.HystrixCommandMetrics.HealthCounts; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; import rx.Subscriber; import rx.Subscription; public interface HystrixCircuitBreaker { boolean allowRequest(); boolean isOpen(); void markSuccess(); void markNonSuccess(); boolean attemptExecution(); public static class NoOpCircuitBreaker implements HystrixCircuitBreaker { public NoOpCircuitBreaker() { } public boolean allowRequest() { return true; } public boolean isOpen() { return false; } public void markSuccess() { } public void markNonSuccess() { } public boolean attemptExecution() { return true; } } public static class HystrixCircuitBreakerImpl implements HystrixCircuitBreaker { private final HystrixCommandProperties properties; private final HystrixCommandMetrics metrics; private final AtomicReference<HystrixCircuitBreaker.HystrixCircuitBreakerImpl.Status> status; private final AtomicLong circuitOpened; private final AtomicReference<Subscription> activeSubscription; protected HystrixCircuitBreakerImpl(HystrixCommandKey key, HystrixCommandGroupKey commandGroup, HystrixCommandProperties properties, HystrixCommandMetrics metrics) { this.status = new AtomicReference(HystrixCircuitBreaker.HystrixCircuitBreakerImpl.Status.CLOSED); this.circuitOpened = new AtomicLong(-1L); this.activeSubscription = new AtomicReference((Object)null); this.properties = properties; this.metrics = metrics; Subscription s = this.subscribeToStream(); this.activeSubscription.set(s); } private Subscription subscribeToStream() { return this.metrics.getHealthCountsStream().observe().subscribe(new Subscriber() { public void onCompleted() { } public void onError(Throwable e) { } public void onNext(HealthCounts hc) { if(hc.getTotalRequests() >= (long)((Integer)HystrixCircuitBreakerImpl.this.properties.circuitBreakerRequestVolumeThreshold().get()).intValue() && hc.getErrorPercentage() >= ((Integer)HystrixCircuitBreakerImpl.this.properties.circuitBreakerErrorThresholdPercentage().get()).intValue() && HystrixCircuitBreakerImpl.this.status.compareAndSet(HystrixCircuitBreaker.HystrixCircuitBreakerImpl.Status.CLOSED, HystrixCircuitBreaker.HystrixCircuitBreakerImpl.Status.OPEN)) { HystrixCircuitBreakerImpl.this.circuitOpened.set(System.currentTimeMillis()); } } }); } public void markSuccess() { if(this.status.compareAndSet(HystrixCircuitBreaker.HystrixCircuitBreakerImpl.Status.HALF_OPEN, HystrixCircuitBreaker.HystrixCircuitBreakerImpl.Status.CLOSED)) { this.metrics.resetStream(); Subscription previousSubscription = (Subscription)this.activeSubscription.get(); if(previousSubscription != null) { previousSubscription.unsubscribe(); } Subscription newSubscription = this.subscribeToStream(); this.activeSubscription.set(newSubscription); this.circuitOpened.set(-1L); } } public void markNonSuccess() { if(this.status.compareAndSet(HystrixCircuitBreaker.HystrixCircuitBreakerImpl.Status.HALF_OPEN, HystrixCircuitBreaker.HystrixCircuitBreakerImpl.Status.OPEN)) { this.circuitOpened.set(System.currentTimeMillis()); } } public boolean isOpen() { return ((Boolean)this.properties.circuitBreakerForceOpen().get()).booleanValue()?true:(((Boolean)this.properties.circuitBreakerForceClosed().get()).booleanValue()?false:this.circuitOpened.get() >= 0L); } public boolean allowRequest() { return ((Boolean)this.properties.circuitBreakerForceOpen().get()).booleanValue()?false:(((Boolean)this.properties.circuitBreakerForceClosed().get()).booleanValue()?true:(this.circuitOpened.get() == -1L?true:(((HystrixCircuitBreaker.HystrixCircuitBreakerImpl.Status)this.status.get()).equals(HystrixCircuitBreaker.HystrixCircuitBreakerImpl.Status.HALF_OPEN)?false:this.isAfterSleepWindow()))); } private boolean isAfterSleepWindow() { long circuitOpenTime = this.circuitOpened.get(); long currentTime = System.currentTimeMillis(); long sleepWindowTime = (long)((Integer)this.properties.circuitBreakerSleepWindowInMilliseconds().get()).intValue(); return currentTime > circuitOpenTime + sleepWindowTime; } public boolean attemptExecution() { return ((Boolean)this.properties.circuitBreakerForceOpen().get()).booleanValue()?false:(((Boolean)this.properties.circuitBreakerForceClosed().get()).booleanValue()?true:(this.circuitOpened.get() == -1L?true:(this.isAfterSleepWindow()?this.status.compareAndSet(HystrixCircuitBreaker.HystrixCircuitBreakerImpl.Status.OPEN, HystrixCircuitBreaker.HystrixCircuitBreakerImpl.Status.HALF_OPEN):false))); } static enum Status { CLOSED, OPEN, HALF_OPEN; private Status() { } } } public static class Factory { private static ConcurrentHashMap<String, HystrixCircuitBreaker> circuitBreakersByCommand = new ConcurrentHashMap(); public Factory() { } public static HystrixCircuitBreaker getInstance(HystrixCommandKey key, HystrixCommandGroupKey group, HystrixCommandProperties properties, HystrixCommandMetrics metrics) { HystrixCircuitBreaker previouslyCached = (HystrixCircuitBreaker)circuitBreakersByCommand.get(key.name()); if(previouslyCached != null) { return previouslyCached; } else { HystrixCircuitBreaker cbForCommand = (HystrixCircuitBreaker)circuitBreakersByCommand.putIfAbsent(key.name(), new HystrixCircuitBreaker.HystrixCircuitBreakerImpl(key, group, properties, metrics)); return cbForCommand == null?(HystrixCircuitBreaker)circuitBreakersByCommand.get(key.name()):cbForCommand; } } public static HystrixCircuitBreaker getInstance(HystrixCommandKey key) { return (HystrixCircuitBreaker)circuitBreakersByCommand.get(key.name()); } static void reset() { circuitBreakersByCommand.clear(); } } }
主要定義了三個斷路器的抽象方法。
- allowRequest:Hystrix 命令的請求通過它判斷是否被執行。
- isOpen:返回當前斷路器是否打開。
- markSuccess:用來閉合斷路器。
另外還有三個靜態類。
- 靜態類 Factory 中維護了一個 Hystrix 命令與 HystrixCircuitBreaker 的關系集合:ConcurrentHashMap<String, HystrixCircuitBreaker> circuitBreakersByCommand,其中 String 類型的key 通過 HystrixCommandKey 定義,每一個 Hystrix 命令需要有一個 key 來標識,同時一個 Hystrix 命令也會在該集合中找到它對應的斷路器 HystrixCircuitBreaker 實例。
- 靜態類 NoOpCircuitBreaker 定義了一個什么都不做的斷路器實現,它允許所有請求,並且斷路器狀態始終閉合。
- 靜態類 HystrixCircuitBreakerImpl 是斷路器接口 HystrixCIrcuitBreaker 的實現類,在該類中定義斷路器的 4 個核心對象。
- HystrixCommandProperties properties :斷路器對應 HystrixCommand 實例的屬性對象。
- HystrixCommandMetrics metrics :用來讓 HystrixCommand 記錄各類度量指標的對象。
- AtomicLong circuitOpened :斷路器打開或是上一次測試的事件戳。
HystrixCircuitBreakerImpl 的各個實現方法如下:
- isOpen:判斷斷路器的打開 / 關閉狀態。
public boolean isOpen() { return ((Boolean)this.properties.circuitBreakerForceOpen().get()).booleanValue()?true:(((Boolean)this.properties.circuitBreakerForceClosed().get()).booleanValue()?false:this.circuitOpened.get() >= 0L); }
- allowRequest:判斷請求是否被允許:
public boolean allowRequest() { return ((Boolean)this.properties.circuitBreakerForceOpen().get()).booleanValue()?false:(((Boolean)this.properties.circuitBreakerForceClosed().get()).booleanValue()?true:(this.circuitOpened.get() == -1L?true:(((HystrixCircuitBreaker.HystrixCircuitBreakerImpl.Status)this.status.get()).equals(HystrixCircuitBreaker.HystrixCircuitBreakerImpl.Status.HALF_OPEN)?false:this.isAfterSleepWindow()))); }
- markSuccess:“半開路” 狀態時使用。若Hystrix命令調用成功,通過該方法將打開的斷路器關閉,並重置度量指標對象。
public void markNonSuccess() { if(this.status.compareAndSet(HystrixCircuitBreaker.HystrixCircuitBreakerImpl.Status.HALF_OPEN, HystrixCircuitBreaker.HystrixCircuitBreakerImpl.Status.OPEN)) { this.circuitOpened.set(System.currentTimeMillis()); } }
依賴隔離
Hystrix 使用 “艙壁模式” 實現線程池的隔離,它為每一個依賴服務創建一個獨立的線程池,就算某個依賴服務出現延遲過高的情況,也不會拖慢其他的依賴服務。
使用詳解
創建請求命令
Hystrix 命令就是我們之前所說的 HystrixCommand,它用來封裝具體的依賴服務調用邏輯。
可以通過繼承的方式來實現,比如:
