特性
1.延遲和失敗容忍
防止級聯錯誤,錯誤回退,優雅降級。快速失敗和恢復
線程和信號量隔離
2.實時監控和配置更改
3.並發
並行執行,請求緩存,自動批處理失敗請求
總運行流程
當你發出請求后,hystrix是這么運行的

詳細解釋個步驟
1. Construct a HystrixCommand or HystrixObservableCommand Object
用於返回單一的響應
HystrixObservableCommand
用於返回多個可自定義的響應
2. Execute the Command
對於HystrixCommand有4個執行方法
對於HystrixObservableCommand只有后兩個
//阻塞方法,其實就是調用了queue().get() execute() — blocks, then returns the single response received from the dependency (or throws an exception in case of an error) //非阻塞方法,直接返回Future,可以先做自己的事情,做完再.get() queue() — returns a Future with which you can obtain the single response from the dependency //熱觀察,可以被立即執行,如果訂閱了那么會重新通知,其實就是調用了toObservable()並內置ReplaySubject,詳細可以參考RxJava observe() — subscribes to the Observable that represents the response(s) from the dependency and returns an Observable that replicates that source Observable //冷觀察,返回一個Observable對象,當調用此接口,還需要自己加入訂閱者,才能接受到信息,詳細可以參考RxJava toObservable() — returns an Observable that, when you subscribe to it, will execute the Hystrix command and emit its responses 注:由於Hystrix底層采用了RxJava框架開發,所以沒接觸過的可能會一臉懵逼,需要再去對RxJava有所了解。
3. Is the Response Cached?
如果請求緩存可用,並且對於該請求的響應也在緩存中,那么命中的響應會以Observable直接返回
下圖關於是請求緩存的整個生命周期

4. Is the Circuit Open?
執行command,hystrix會檢查circuit是否打開,如果是打開的(失敗率超過閾值)那么直接快速失敗,否則進入下一流程
5. Is the Thread Pool/Queue/Semaphore Full?
線程池或者信號量是否已經滿負荷,如果已經滿負荷那么快速失敗
6. HystrixObservableCommand.construct() or HystrixCommand.run()
兩個斷路器的入口,如果是繼承HystrixObservableCommand,那么就調用construct()函數,如果是繼承HystrixCommand,那么就調用run()函數。
7. Calculate Circuit Health
Hystrix記錄了成功,失敗,拒絕,超時四種報告
這些報告用於決定哪些用於斷路,被斷路的點在恢復周期內無法被后來的請求訪問到。
8. Get the Fallback
快速失敗會在以下幾個場景觸發
1.由construct() or run()拋出了一個異常
2.斷路器已經打開的時候
3.沒有空閑的線程池和隊列或者信號量
4.一次命令執行超時
可以重寫快速失敗函數來自定義,
HystrixObservableCommand.resumeWithFallback()
HystrixCommand.getFallback()
9. 成功返回
整體的函數調用流程如下,其實這就是源碼的調用流程

Coding
原生模式
基於hystrix的原生接口,也就是繼承HystrixCommand或者HystirxObservableCommand。
public static class HelloHystrixCommand extends HystrixCommand<String>{ public HelloHystrixCommand() { super(HystrixCommandGroupKey.Factory.asKey("ExampleGroup")); } /** * 實際操作的函數 * @return * @throws Exception */ public String run() throws Exception { Thread.sleep(1500); return "hello hystrix"; } /** * 快速失敗后調用函數 * @return */ protected String getFallback(){ return "404 :)"; } }
注解模式
利用netflix的開源框架javanica,在需要使用斷路器的方法上加上注解,參考代碼如下
@Autowired private RestTemplate restTemplate; /** * 使用斷路器的方法 * 其中fallback是快速失敗的調用函數 * @return */ @HystrixCommand(fallbackMethod = "fallback") public String post2AnotherService(){ HttpHeaders headers = new HttpHeaders(); headers.set("Content-Type", "application/json;charset=UTF-8"); HttpEntity<Object> formEntity = new HttpEntity<Object>(headers); //發送post請求 String res = restTemplate.postForObject("http://template/template/hello_world?",formEntity,String.class); return res; } /** * 當post2AnotherService函數出現問題,則調用此函數 * @return */ public String fallback(){ return "404 :)"; }
當調用post2AnotherService()的時候,由於函數返回超過規定時間默認是1s,就會執行fallback()函數,並返回。值得注意的是,如果對於fallback()也想使用降級機制,那么也可以加上@HystrixCommand
關於斷路器
工作流程圖

開關條件
關於斷路器打開
·時間窗口內請求次數(限流)
如果在10s內,超過某個閾值的請求量,才會考慮斷路(小於這個次數不會被斷路)
配置是circuitBreaker.requestVolumeThreshold
默認10s 20次
·失敗率
默認失敗率超過50%就會被斷路
配置是circuitBreaker.errorThresholdPercentage
關於斷路器關閉
·重新嘗試
在一定時間之后,重新嘗試請求來決定是否繼續打開或者選擇關閉斷路器
配置是circuitBreaker.sleepWindowInMilliseconds
默認5000ms
關於隔離
bulkhead pattern模式
Htstrix使用了bulkhead pattern模式,典型的例子就是線程隔離。
簡單解釋一下bulkhead pattern模式。一般情況我們都用一個線程池來管理所有線程,容易造成一個問題,粒度太粗,無法對線程進行分類管理,會導致局部問題影響全局。bulkhead pattern模式在於,采用多個線程池來管理線程,這樣使得1個線程池資源出現問題時不會造成另一個線程池資源問題。盡量使問題最小化。
如圖所示,采用了bulkhead pattern模式的效果

說完原理說實現,如何針對不同依賴采用不同的線程池管理呢
Hystrix給了我們三種key來用於隔離。
·CommandKey,針對相同的接口一般CommandKey值相同,目的是把HystrixCommand,HystrixCircuitBreaker,HytrixCommandMerics以及其他相關對象關聯在一起,形成一個原子組。采用原生接口的話,默認值為類名;采用注解形式的話,默認值為方法名。
·CommandGroupKey,對CommandKey分組,用於真正的隔離。相同CommandGroupKey會使用同一個線程池或者信號量。一般情況相同業務功能會使用相同的CommandGroupKey。
·ThreadPoolKey,如果說CommandGroupKey只是邏輯隔離,那么ThreadPoolKey就是物理隔離,當沒有設置ThreadPoolKey的時候,線程池或者信號量的划分按照CommandGroupKey,當設置了ThreadPoolKey,那么線程池和信號量的划分就按照ThreadPoolKey來處理,相同ThreadPoolKey采用同一個線程池或者信號量。
Coding
原生模式
可以通過HystrixCommand.Setter來自定義配置
HystrixCommandGroupKey.Factory.asKey(""))
HystrixCommandKey.Factory.asKey("")
HystrixThreadPoolKey.Factory.asKey("")
注解模式
可以直接在方法名上添加
@HystrixCommand(groupKey = "", commandKey = "", threadPoolKey = "")
關於請求緩存
工作流程圖

優勢
·復用性
這里的復用性指的是代碼復用性
·一致性
也就是常說的冪等性,不管請求幾次,得到的結果應該都是一樣的
·減少重復工作
由於請求緩存是在HystrixCommand的construct()或run()運行之前運行,所有可以有效減少線程的使用
適用場景
請求緩存的優勢顯而易見,但是也不是銀彈。
在讀少寫多的場景就顯得不太合適,對於讀的請求,需要add緩存。對於增刪改的請求,需要把緩存remove。在增加系統資源開銷的同時,又很雞肋。
所以一般適合讀多寫少的場景。似乎所有緩存機制都有這個局限性吧
Coding
原生模式
繼承HystrixCommand后,重寫getCacheKey()方法,該方法默認返回的是null,也就是不使用請求緩存功能。相同key的請求會使用相同的緩存。
注解模式
在方法名上增加,並添加與cacheKeyMethod字符串相同的方法。兩者共用入參。
@CacheResult(cacheKeyMethod = "getCacheKey") public String post2AnotherService(String seed){ } public String getCacheKey(String seed){ return seed; }
初始化HystrixRequestContext
還有關鍵的一步,在調用HystrixCommand之前初始化HystrixRequestContext,其實就是創建一個ThreadLocal的副本,共享請求緩存就是通過ThreadLocal來實現的。
HystrixRequestContext context=HystrixRequestContext.initializeContext();
操作完成后context.shutdown();
一般情況可以在過濾器中控制是初始化和關閉整個生命周期
//啟動HystrixRequestContext HystrixRequestContext context = HystrixRequestContext.initializeContext(); try { chain.doFilter(req, res); } finally { //關閉HystrixRequestContext context.shutdown(); }
關於請求合並(Requst Collapsing)
工作流程圖

上半部分是模擬請求,下半部分是該請求的依賴設置,時間窗口默認是10ms,在這個時間窗口內,所有對於該接口的請求都會被加入隊列,然后進行批處理。這樣的好處在於,如果短時間內對於某個接口有大量請求,那么可以只處理一次就完成所有響應。
優勢
全局線程合並
在tomcat容器中,所有請求共用一個進程,也就是一個JVM容器,在並發場景下會派生出許多線程,collapsing可以合並整個JVM中的請求線程,這樣可以解決不同使用者同時請求的大量並發問題。
局部線程合並
可以合並單個tomcat請求線程,比如在10ms內有10個請求被同一線程處理(這不是像往常一樣請求->處理,而是請求->加入請求隊列,所有可以快速收集請求),那這些請求可以被合並。
對象建模和代碼復雜度
在實際場景下,調用接口取數據的復雜度往往高於數據的復雜度,通俗來說就是取數據可以千變萬化的取,而數據就那么幾個接口。
collapsing可以幫助你更好的實現你的業務,比如多次請求合並結果后再廣播出去。
適用場景
·並發量大接口
當並發量小,一個時間窗口內只有幾個或沒有請求,那么就白白浪費了請求合並的資源。
·請求耗時接口
時間窗口是固定的,假如一個請求實際耗時10ms,加上固定的時間窗口,最大延遲達到20ms,延遲被提高了100%。若一個請求實際耗時有1s,那么時間窗口的延遲就可以被忽略不計。
Coding
原生模式
/** * 批量返回值類型 * 返回值類型 * 請求參數類型 */ public class CommandCollapserGetValueForKey extends HystrixCollapser<List<String>, String, Integer> { private static Logger logger = LoggerFactory.getLogger(CommandCollapserGetValueForKey.class); private final Integer key; public CommandCollapserGetValueForKey(Integer key) { this.key = key; } /** *獲取請求參數 */ public Integer getRequestArgument() { return key; } /** *合並請求產生批量命令的具體實現 */ protected HystrixCommand<List<String>> createCommand(final Collection<CollapsedRequest<String, Integer>> requests) { return new BatchCommand(requests); } /** *批量命令結果返回后的處理,需要實現將批量結果拆分並傳遞給合並前的各原子請求命令的邏輯中 */ protected void mapResponseToRequests(List<String> batchResponse, Collection<CollapsedRequest<String, Integer>> requests) { int count = 0; //請求響應一一對應 for (CollapsedRequest<String, Integer> request : requests) { request.setResponse(batchResponse.get(count++)); } } private static final class BatchCommand extends HystrixCommand<List<String>> { private static Logger logger = LoggerFactory.getLogger(CommandCollapserGetValueForKey.BatchCommand.class); private final Collection<CollapsedRequest<String, Integer>> requests; private BatchCommand(Collection<CollapsedRequest<String, Integer>> requests) { super(Setter.withGroupKey(HystrixCommandGroupKey.Factory.asKey("ExampleGroup")) .andCommandKey(HystrixCommandKey.Factory.asKey("GetValueForKey"))); this.requests = requests; } @Override protected List<String> run() { ArrayList<String> response = new ArrayList<String>(); // 處理每個請求,返回結果 for (CollapsedRequest<String, Integer> request : requests) { logger.info("request.getArgument()={}",request.getArgument()); // artificial response for each argument received in the batch response.add("ValueForKey: " + request.getArgument()); } return response; } } }
調用的時候只需要new CommandCollapserGetValueForKey(1).queue()
在同一個時間窗口內,批處理的函數調用順序為
getRequestArgument()->createCommand()->mapResponseToRequests()
關於配置
所有的配置在HystrixCommandProperties類中
每種配置都有4種優先級,以下為優先級從低到高的解釋
1.基於代碼的全局缺省值
2.基於properties配置表的全局配置
3.基於代碼對配置更改
4.基於代碼對配置動態更改
注:同一個配置,采用不同方法更改,那么配置的key會有不同
| 默認配置key |
代碼更改key |
默認值 |
解釋 |
| hystrix.command.default.execution.isolation.strategy |
hystrix.command.HystrixCommandKey.execution.isolation.strategy |
THREAD |
可選擇的參數有THREAD, SEMAPHORE,表示隔離類型 |
| hystrix.command.default.execution.isolation.thread.timeoutInMilliseconds |
hystrix.command.HystrixCommandKey.execution.isolation.thread.timeoutInMilliseconds |
1000 |
降級的超時時間,單位ms |
| hystrix.command.default.execution.timeout.enabled |
hystrix.command.HystrixCommandKey.execution.timeout.enabled |
true |
針對HystrixCommand.run() 是否使用超時降級策略 |
| hystrix.command.default.execution.isolation.thread.interruptOnTimeout |
hystrix.command.HystrixCommandKey.execution.isolation.thread.interruptOnTimeout |
true |
HystrixCommand.run() 超時后是否應該中斷 |
| hystrix.command.default.execution.isolation.thread.interruptOnCancel |
hystrix.command.HystrixCommandKey.execution.isolation.thread.interruptOnCancel |
false |
HystrixCommand.run() 當發生cancel事件后是否應該取中斷 |
| hystrix.command.default.execution.isolation.semaphore.maxConcurrentRequests |
hystrix.command.HystrixCommandKey.execution.isolation.semaphore.maxConcurrentRequests |
10 |
當使用信號量隔離的時候,此配置有效。 官方給出5000請求只需要2個 |
| hystrix.command.default.fallback.isolation.semaphore.maxConcurrentRequests |
hystrix.command.HystrixCommandKey.fallback.isolation.semaphore.maxConcurrentRequests |
10 |
HystrixCommand.getFallback() 最大並發數,超過此並發則拒絕請求 |
| hystrix.command.default.fallback.enabled |
hystrix.command.HystrixCommandKey.fallback.enabled |
true |
是否打開快速失敗 |
| hystrix.command.default.circuitBreaker.enabled |
hystrix.command.HystrixCommandKey.circuitBreaker.enabled |
true |
是否打開熔斷器 |
| hystrix.command.default.circuitBreaker.requestVolumeThreshold |
hystrix.command.HystrixCommandKey.circuitBreaker.requestVolumeThreshold |
20 |
時間窗口內最小請求數,當小於這個請求數,即使全部失敗也不會熔斷 |
| hystrix.command.default.circuitBreaker.sleepWindowInMilliseconds |
hystrix.command.HystrixCommandKey.circuitBreaker.sleepWindowInMilliseconds |
5000 |
熔斷后,請求retry的時間間隔 |
| hystrix.command.default.circuitBreaker.errorThresholdPercentage |
hystrix.command.HystrixCommandKey.circuitBreaker.errorThresholdPercentage |
50 |
失敗率閾值,超過這個失敗率就會熔斷 |
| hystrix.command.default.circuitBreaker.forceOpen |
hystrix.command.HystrixCommandKey.circuitBreaker.forceOpen |
false |
是否強制開啟熔斷,這樣會導致拒絕所有請求 |
| hystrix.command.default.circuitBreaker.forceClosed |
hystrix.command.HystrixCommandKey.circuitBreaker.forceClosed |
false |
是否強制關閉熔斷,這樣任何原因都無法觸發熔斷 注:優先級小於強制開啟 |
| hystrix.command.default.requestCache.enabled |
hystrix.command.HystrixCommandKey.requestCache.enabled |
true |
是否打開請求緩存功能 |
|
|
|
true |
|
| hystrix.collapser.default.maxRequestsInBatch |
hystrix.collapser.HystrixCollapserKey.maxRequestsInBatch |
Integer.MAX_VALUE |
|
| hystrix.collapser.default.timerDelayInMilliseconds |
hystrix.collapser.HystrixCollapserKey.timerDelayInMilliseconds |
10 |
|
| hystrix.collapser.default.requestCache.enabled |
hystrix.collapser.HystrixCollapserKey.requestCache.enabled |
true |
|
| hystrix.threadpool.default.coreSize |
hystrix.threadpool.HystrixThreadPoolKey.coreSize |
10 |
requests per second at peak when healthy × 99th percentile latency in seconds + some breathing room 例如: 30rps * 0.2s + breathing room = 10 |
| hystrix.threadpool.default.maximumSize |
hystrix.threadpool.HystrixThreadPoolKey.maximumSize |
10 |
|
| hystrix.threadpool.default.maxQueueSize |
hystrix.threadpool.HystrixThreadPoolKey.maxQueueSize |
-1 |
|
| hystrix.threadpool.default.queueSizeRejectionThreshold |
hystrix.threadpool.HystrixThreadPoolKey.queueSizeRejectionThreshold |
5 |
|
| hystrix.threadpool.default.keepAliveTimeMinutes |
hystrix.threadpool.HystrixThreadPoolKey.keepAliveTimeMinutes |
1 |
|
| hystrix.threadpool.default.allowMaximumSizeToDivergeFromCoreSize |
hystrix.threadpool.HystrixThreadPoolKey.allowMaximumSizeToDivergeFromCoreSize |
false |
|
| hystrix.threadpool.default.metrics.rollingStats.timeInMilliseconds |
hystrix.threadpool.HystrixThreadPoolKey.metrics.rollingStats.timeInMilliseconds |
10000 |
|
| hystrix.threadpool.default.metrics.rollingStats.numBuckets |
hystrix.threadpool.HystrixThreadPoolProperties.metrics.rollingStats.numBuckets |
10 |
metrics.rollingStats.timeInMilliseconds % metrics.rollingStats.numBuckets == 0 |
//官方配置文檔
https://github.com/Netflix/Hystrix/wiki/Configuration#circuitBreaker.sleepWindowInMilliseconds
