微服務1:微服務及其演進史
微服務2:微服務全景架構
微服務3:微服務拆分策略
微服務4:服務注冊與發現
微服務5:服務注冊與發現(實踐篇)
微服務6:通信之網關
微服務7:通信之RPC
微服務8:通信之RPC實踐篇(附源碼)
微服務9:服務治理來保證高可用
微服務10:系統服務熔斷、限流
1 介紹
前面的章節,我們學習了微服務中對熔斷降級的原理,參考這篇《服務治理:熔斷、降級、限流》。了解了固定窗口算法、滑動窗口算法、 漏桶原理和令牌桶原理,本文對Hystrix做進一步的分析。
Hystrix是Netflix開源的一款具備熔斷、限流、降級能力的容錯系統,設計目的是將應用中的系統訪問、多鏈路服務調用、第三方依賴服務的調用,通過流量資源控制的方式隔離開。
避免了在分布式系中的某個服務故障沿着調用鏈向上傳遞,出現整體的服務雪崩,並以此提升系統的穩定性和健壯性。
1.1 Hystrix是用來解決哪些問題的?
- 對所依賴的服務的延遲和故障進行容錯(防護+控制)
- 對服務的故障進行妥善的處理
- 對延遲過久的請求進行快速失敗並迅速恢復,避免隊列阻塞
- 返回默認值或者默認的處理(fallback),實現優雅的降級(如給用戶一個友好的提示)
- 近實時的數據監控與異常告警,及時發現問題並快速止損
1.2 Hystrix如何解決這些問題
- HystrixCommand、HystrixObservableCommand在單獨線程中執行,防止單線依賴,消耗整個服務的資源
- 服務過載的時候立即斷開並快速失敗,防止隊列阻塞,即線程池或信號量滿的時候直接拒絕請求
- 當超時或者失敗時,提供fallback能力避免用戶直接面對故障,提供優雅的反饋。
- 采用隔離技術(流量泳道和斷路器模式)來避免單個依賴導致這個鏈路的雪崩
- 近實時的數據監控與異常告警,及時發現問題並快速止損(提供監控和預警能力)
2 Hystrix 基礎模型
2.1 設計模式:命令模式(Command Pattern)
以往的訪問模式,是A鏈路 與 B鏈路(A -> B)的直接訪問。而命令模式(Command Pattern)的作用則是通過建立命令對象來解耦A、B鏈路。
在執行過程中,命令對象可以對請求進行排隊、記錄請求日志、執行故障注入、超時/故障 快速返回等操作,如 A -> Command Work -> B。
2.2 隔離模式:線程池和信號量隔離
在計算機中,線程是系統運行的基本單位,我們可以通過對線程池資源的管理,如異步請求,請求超時斷開,請求熔斷,來對系統資源進行隔離,當部分類型的資源有限,請求過載時,進行系統保護。
Java程序中,Semaphore(信號量)是用來控制同時訪問特定資源的線程數量,通過協調各個線程以保證合理地使用公共資源。也保證了資源競爭的隔離性。
3 Hystrix 工作原理
如下圖所示(圖片源自官網),Hystrix的工作流程上大概會有如下9個步驟,下文將詳細介紹每個流程:
3.1 創建命令
創建HystrixCommand 或者 HystrixObservableCommand 命令
3.2 執行命令
執行命令,如圖中的,一共有四種方式來執行run()/construct()
- execute
- queue
- observer
- toObserver
單個實例只能執行一次這4個方法。HystrixObservableCommand沒有execute()和queue()。
執行方式 | 說明 | 可用對象 |
---|---|---|
execute() | 阻塞式同步執行,返回依賴服務的單一返回結果(或者拋出異常) | HystrixCommand |
queue() | 基於Future的異步方式執行,返回依賴服務的單一返回結果(或者拋出異常) | HystrixCommand |
observe() | 基於Rxjava的Observable方式,返回通過Observable表示的依賴服務返回結果,代調用代碼先執行(Hot Obserable) | HystrixObservableCommand |
toObvsevable() | 基於Rxjava的Observable方式,返回通過Observable表示的依賴服務返回結果,執行代碼等到真正訂閱的時候才會執行(cold observable) | HystrixObservableCommand |
-
execute()
以同步堵塞方式執行run(),調用execute()后,hystrix會先創建一個新線程運行run(),執行excute時一直出於堵塞狀態,直到run()運行完成。 -
queue()
以異步非堵塞方式執行run()。一調用queue()就直接返回一個Future對象,同時hystrix創建一個新線程運行run(),調用程序通過Future.get()拿到run()的返回結果,而Future.get()是堵塞執行的。 -
observe()
事件注冊前執行run()/construct()。- 事件注冊前,先調用observe()自動觸發執行run()/construct()(如果繼承的是HystrixCommand,hystrix將創建新線程非堵塞執行run();如果繼承的是HystrixObservableCommand,將以調用程序線程堵塞執行construct())
- observe()返回結果后,調用程序調用subscribe()完成事件注冊,如果run()/construct()執行成功則觸發onNext()和onCompleted() 方法,如果執行異常則觸發 onError() 方法
-
toObservable()
事件注冊后執行run()/construct()。- 事件注冊前,調用toObservable()就立即返回 Observable
對象 - 調用subscribe()完成事件注冊后自動觸發執行run()/construct(),如果run()/construct()執行成功則觸發onNext()和onCompleted()方法,如果執行異常則觸發onError() 方法。
- 事件注冊前,調用toObservable()就立即返回 Observable
3.3 是否從緩存獲取結果返回?
如果當前命令對象配置了允許從結果緩存中取返回結果,並且在結果緩存中已經緩存了請求結果,則立即通過Observable返回。
3.4 是否啟用了熔斷器?
判斷 circuit-breaker 是否打開。如果3.3步驟沒有緩存沒有命中,則判斷一下當前斷路器的斷路狀態是否打開。如果斷路器狀態為打開狀態,則Hystrix將不會執行此Command命令,直接執行步驟3.8 調用Fallback。
如果斷路器狀態是關閉,則執行 步驟3.5 檢查是否有足夠的資源運行 Command命令。
3.5 判斷資源(線程池/隊列/信號量)是否已滿?
如果當前要執行的Command命令 先關連的線程池 和隊列(或者信號量)資源已經滿了,Hystrix將不會運行 Command命令,直接執行步驟8的Fallback降級處理;如果未滿,表示有剩余的資源執行Command命令,則執行步驟 3.6。
3.6 執行 construct() 或者 run()
執行 HystrixObservableCommand.construct() 或者 HystrixCommand.run()。
當經過步驟 3.5 判斷,有足夠的資源執行Command命令時,本步驟將調用Command命令運行方法。調用HystrixCommand的run方法。按照一下兩個條件去判斷:
- 判斷請求邏輯是否調用成功,如果調用成功返回調用結果。調用出錯(5xx),進入步驟 3.8。
- 判斷調用的依賴邏輯調用是否超時,未超時則直接返回成功結果。超時斷開,進入步驟3.8。
3.7 計算熔斷器健康情況
對於熔斷器的信息會做健康的判斷。Hystrix 統計Command命令執行執行過程中的 success count、fail count、reject count 和 timeout count, 並將這些信息記錄到斷路器(Circuit Breaker)中。
斷路器會把上面的統計信息按照時間窗統計下來。並判斷什么時候可以將請求熔斷,在熔斷后和熔斷窗口期結束之前,請求都不會被Fallback。熔斷窗口期結束后會再次校驗,通過后熔斷開關會被關閉。
3.8 熔斷后的請求執行Fallback
Hystrix會在以下場景出現后,觸發Fallback操作:
- 異常場景:run()方法拋出非HystrixBadRequestException異常。
- 超時場景:run()方法調用超時。
- 直接熔斷:熔斷器開啟攔截調,所有的請求都會被攔截。
- 容量滿額:線程池/隊列/信號量滿量之后
3.9 返回成功結果
Hystrix命令對象執行成功,會直接返回結果或者以Observable形式返回結果。返回的Observable 會執行以下流程返回結果。
Hystrix獲取返回結果執行流程如下(圖片源自官網):
4 Hystrix 實現過程
4.1 引入依賴
pom.xml加上以下依賴。我們使用原生hystrix來做案例介紹。
<dependency>
<groupId>com.netflix.hystrix</groupId>
<artifactId>hystrix-core</artifactId>
<version>1.5.8</version>
</dependency>
<dependency>
<groupId>com.netflix.hystrix</groupId>
<artifactId>hystrix-metrics-event-stream</artifactId>
<version>1.4.10</version>
</dependency>
4.2 fallBack
fallBack是指當程序符合我們執行熔斷降級的條件時候,我們默認執行的路線,可以是一個方法或者一個對象。HystrixCommand中已有,我們只需重寫即可,類似
@Override
protected String getFallback() {
return "當熔斷、降級發生時,返回的默認信息";
}
在3.8 節 我們介紹了Hystrix 觸發 fallBack的四種條件,下面我們一個個來測試。
4.2.1 程序異常fallBack
除了HystrixBadRequestException,所有程序拋出的異常,都會觸發getFallback(),調用程序將獲得getFallback()的執行並返回。
/**
* @author brand
* @Description: 模擬異常/超時的場景
* @Copyright: Copyright (c) 2022
* @Company: Helenlyn, Inc. All Rights Reserved.
* @date 2022/1/8 下午5:35
* @Update Time:
* @Updater:
* @Update Comments:
*/
public class HystrixException extends HystrixCommand<String> {
/**
* 實現getFallback()后,執行命令時遇到以上4種情況將被fallback接管,不會拋出異常或其他
* 下面演示的是異常的情況
*/
private final String name;
public HystrixException(String name) {
super(HystrixCommandGroupKey.Factory.asKey("Command Group:fallbackGroup"));
this.name = name;
}
@Override
protected String run() throws Exception {
/*---------------以下三種情況觸發fallback-------------------*/
// 1.循環+等待,超時fallBack
// int i = 0;
// while (true) {
// i++;
// Thread.currentThread().sleep(1000);
// }
// 2.除零導致異常
// int i = 1/0;
// 3.主動拋出異常
// throw new Exception("command trigger fallback");
/*---------------直接拋出HystrixBadRequestException,不觸發fallback-----------------*/
// HystrixBadRequestException,這個是非法參數或非系統錯誤引起,不觸發fallback,也不被計入熔斷器
throw new HystrixBadRequestException("HystrixBadRequestException not trigger fallback");
// return "success";
}
@Override
protected String getFallback() {
return "fallback: " + name;
}
}
編寫測試類:
/**
* @author brand
* @Description: 測試異常/超時 fallBack
* @Copyright: Copyright (c) 2022
* @Company: Helenlyn, Inc. All Rights Reserved.
* @date 2022/1/8 下午5:35
* @Update Time:
* @Updater:
* @Update Comments:
*/
public class ExceptionTimeOutFallBackTest {
@Test
public void testException() throws IOException {
try {
assertEquals("success", new HystrixException("Exception").execute());
} catch(Exception e) {
System.out.println("run()拋出HystrixBadRequestException時,會被捕獲到這里" + e.getCause());
}
}
}
測試類執行直接拋出HystrixBadRequestException,測試類會走到catch函數段中。
測試類執行其他三種情況,會得到以下結果:
4.2.2 調用超時fallBack
同上 4.2.1 中的 循環+等待,超時fallBack 的場景
4.3 熔斷策略
4.3.1 熔斷實現的基本原理
圖片源自官網,這邊就不單獨畫了。
- 斷路器設置了生效閾值,並且在時間窗內的請求數超過閾值:circuitBreaker.requestVolumeThreshold。
超過之后觸發斷路器開啟,否則不開啟。比如熔斷閾值為100,哪怕你99個都fail,也不會觸發熔斷。 - 請求的錯誤率超過錯誤率閾值:errorThresholdPercentage,比如20%,10次有2次就達到要求。
- 1與2的條件都滿足的時候,原來關閉的斷路器將開啟。
- 斷路器開啟之后,后續請求過來的流量都會被斷開。
- 斷路的那段時間我們叫做休眠時間窗:sleepWindowInMilliseconds。 休眠時間窗過去之后,再發起請求,這時候斷路器半開。
- 請求失敗:斷路器狀態繼續保持未開啟,並更新休眠時間窗。
- 請求成功:則斷路器狀態改為關閉。
4.3.2 斷路器配置參數說明
key值 | 說明 | 默認值 |
---|---|---|
circuitBreaker.enabled | 是否開啟斷路器 | true |
circuitBreaker.requestVolumeThreshold | 斷路器啟用請求數閾值 | 10 |
circuitBreaker.sleepWindowInMilliseconds | 斷路器啟用后的睡眠時間窗 | 5000(ms) |
circuitBreaker.errorThresholdPercentage | 斷路器啟用失敗率閾值 | 50(%) |
circuitBreaker.forceOpen | 是否強制將斷路器設置成開啟狀態 | false |
circuitBreaker.forceClosed | 是否強制將斷路器設置成關閉狀態 | false |
4.3.3 測試案例
- 通過withCircuitBreakerRequestVolumeThreshold配置10s(默認時間窗)內請求數超過10個時熔斷器開始生效
- 通過withCircuitBreakerErrorThresholdPercentage配置錯誤比例>50%時開始熔斷
- 然后for循環執行execute()觸發run(),在run()里,如果name是小於30的偶數則正常返回,否則異常
- 通過多次循環后,異常請求占所有請求的比例將大於50%,就會看到后續請求都不進入run()而是進入getFallback(),因為不再打印"running run():" + name了。
- 除此之外,hystrix還支持多長時間從熔斷狀態自動恢復等功能,見下文附錄。
/**
* @author brand
* @Description: 熔斷
* @Copyright: Copyright (c) 2022
* @Company: Helenlyn, Inc. All Rights Reserved.
* @date 2022/1/8 下午3:41
* @Update Time:
* @Updater:
* @Update Comments:
*/
public class HystrixCircuitBreaker extends HystrixCommand<String> {
private final String name;
public HystrixCircuitBreaker(String name) {
super(Setter.withGroupKey(HystrixCommandGroupKey.Factory.asKey("Group:CircuitBreaker"))
.andCommandKey(HystrixCommandKey.Factory.asKey("Command:CircuitBreaker"))
.andThreadPoolKey(HystrixThreadPoolKey.Factory.asKey("ThreadPool:CircuitBreakerTest"))
.andThreadPoolPropertiesDefaults( // 配置線程池
HystrixThreadPoolProperties.Setter()
.withCoreSize(200) // 配置線程池里的線程數,設置足夠多線程,以防未熔斷卻打滿threadpool
)
.andCommandPropertiesDefaults( // 配置熔斷器
HystrixCommandProperties.Setter()
.withCircuitBreakerEnabled(true)
.withCircuitBreakerRequestVolumeThreshold(10)
.withCircuitBreakerErrorThresholdPercentage(50)
// .withCircuitBreakerForceOpen(true) // true時強制將斷路器設置成開啟狀態,所有請求都將被拒絕,直接到fallback
// .withCircuitBreakerForceClosed(true) // true時強制將斷路器設置成關閉狀態,將忽略所有錯誤
// .withExecutionIsolationStrategy(ExecutionIsolationStrategy.SEMAPHORE) // 信號量隔離
// .withExecutionTimeoutInMilliseconds(5000)
)
);
this.name = name;
}
@Override
protected String run() throws Exception {
System.out.println("running num :" + name);
int num = Integer.valueOf(name);
if (num % 2 == 0 && num < 30) { // 符合條件,直接返回
return name;
} else { // 模擬異常
int j = 0;
j = num / j;
}
return name;
}
@Override
protected String getFallback() {
return "CircuitBreaker fallback: " + name;
}
}
執行結果如下,偶數正常返回,奇數進入熔斷信息,並且超過30之后全部進入fallBack
4.4 線程池/信號量隔離策略
4.4.1 線程池隔離策略
線程池隔離:不同服務通過使用不同線程池,彼此間將不受影響,達到隔離效果。
我們通過andThreadPoolKey配置使用命名為ThreadPoolTest的線程池,實現與其他命名的線程池天然隔離,如果不配置andThreadPoolKey,也可以則使用withGroupKey配置來命名線程池。
4.4.1.1 線程池未隔離情況
/**
* @author brand
* @Description: 線程池隔離
* @Copyright: Copyright (c) 2022
* @Company: Helenlyn, Inc. All Rights Reserved.
* @date 2022/1/8 下午5:58
* @Update Time:
* @Updater:
* @Update Comments:
*/
public class HystrixThreadPool extends HystrixCommand<String> {
private final String name;
public HystrixThreadPool(String name) {
super(Setter.withGroupKey(HystrixCommandGroupKey.Factory.asKey("ThreadPoolTestGroup")) // CommandGroup分組
.andCommandKey(HystrixCommandKey.Factory.asKey("testCommandKey"))
.andThreadPoolKey(HystrixThreadPoolKey.Factory.asKey("ThreadPoolTest")) // 線程池key
.andCommandPropertiesDefaults(
HystrixCommandProperties.Setter()
.withExecutionTimeoutInMilliseconds(5000)
)
.andThreadPoolPropertiesDefaults(
HystrixThreadPoolProperties.Setter()
.withCoreSize(3) // 配置線程池里的線程數為3。超過3次進行熔斷
)
);
this.name = name;
}
@Override
protected String run() throws Exception {
/*---------------如果線程數超配,會觸發fallback的case,否則休眠1s,進行正常返回-------------------*/
TimeUnit.MILLISECONDS.sleep(1000);
return name;
}
@Override
protected String getFallback() {
return "fallback: " + name;
}
}
測試一下,下面都是使用 HystrixThreadPoolKey 為 ThreadPoolTest的線程池命名,所以是公用,會返回fallBack的結果。
for(int i = 0; i < 3; i++) {
try {
Future<String> future = new HystrixThreadPool("thread pool"+i).queue(); // 以異步非堵塞方式執行run(),所以消耗了3個線程
} catch(Exception e) {
System.out.println("run()拋出HystrixBadRequestException時,被捕獲到這里" + e.getCause());
}
}
for(int i = 0; i < 10; i++) {
try {
System.out.println("===========" + new HystrixThreadPool("thread pool").execute()); //上面消耗了所有線程,這邊會執行到fallBack中
} catch(Exception e) {
System.out.println("run()拋出HystrixBadRequestException時,被捕獲到這里" + e.getCause());
}
}
4.4.1.2 線程池隔離情況
我們做一下調整,讓線程池的key(HystrixThreadPoolKey)不一致,再測試是否返回正常的執行結果。
/**
* @author brand
* @Description: 線程池隔離
* @Copyright: Copyright (c) 2022
* @Company: Helenlyn, Inc. All Rights Reserved.
* @date 2022/1/8 下午5:58
* @Update Time:
* @Updater:
* @Update Comments:
*/
public class HystrixThreadPool extends HystrixCommand<String> {
private final String name;
public HystrixThreadPool(String name) {
super(Setter.withGroupKey(HystrixCommandGroupKey.Factory.asKey("ThreadPoolTestGroup")) // CommandGroup分組
.andCommandKey(HystrixCommandKey.Factory.asKey("testCommandKey"))
.andThreadPoolKey(HystrixThreadPoolKey.Factory.asKey(name)) // 線程池key,根據請求的入參來算
.andCommandPropertiesDefaults(
HystrixCommandProperties.Setter()
.withExecutionTimeoutInMilliseconds(5000)
)
.andThreadPoolPropertiesDefaults(
HystrixThreadPoolProperties.Setter()
.withCoreSize(3) // 配置線程池里的線程數為3。超過3次進行熔斷
)
);
this.name = name;
}
@Override
protected String run() throws Exception {
/*---------------如果線程數超配,會觸發fallback的case,否則休眠1s,進行正常返回-------------------*/
TimeUnit.MILLISECONDS.sleep(1000);
return name;
}
@Override
protected String getFallback() {
return "fallback: " + name;
}
}
測試一下,下面都是使用 HystrixThreadPoolKey 為 ThreadPoolTest的線程池命名,所以是公用,會返回fallBack的結果。
for(int i = 0; i < 3; i++) {
try {
Future<String> future = new HystrixThreadPool("thread pool"+i).queue(); // 會有三個線程池組 thread pool1、thread poo2、thread pool3,不互相影響,更不會影響下面excute()的執行
} catch(Exception e) {
System.out.println("run()拋出HystrixBadRequestException時,被捕獲到這里" + e.getCause());
}
}
for(int i = 0; i < 10; i++) {
try {
System.out.println("===========" + new HystrixThreadPool("thread pool").execute()); //與上面隔離,所以這邊執行始終不會走到fallBack中
} catch(Exception e) {
System.out.println("run()拋出HystrixBadRequestException時,被捕獲到這里" + e.getCause());
}
}
4.5 代碼參考
https://github.com/WengZhiHua/Helenlyn.Grocery/tree/master/parent/HystrixDemo