Hystrix最初是由Netflix的API team研發的,用於提高API的彈性和性能,2012年在公司內部廣受好評。
如果你的應用是一個單獨的應用,那幾乎不用在意斷路的問題。
但在分布式環境中,各個應用錯綜復雜的依賴關系,一個不穩定的服務會拖累依賴它的服務。
簡單來說,就是將服務之間的訪問隔離開來,在錯誤(包括超時)被傳播之前攔截下來,並提供相應的處理邏輯,讓這個分布式應用更有彈性。
Hystrix就是用來解決這一問題的lib,幫助開發者更方便的控制服務之間的通信。
在分布式系統中,你使用的第三方RPC API可能會提供服務通信攔截的功能,但通常不會涉及方方面面,更不能單獨拿出來給其他API使用。
而Hystrix會提供這些:
- 為服務通信提供保護、容錯
- 在復雜的依賴關系鏈中阻止錯誤傳播
- 快速失敗
- 根據具體事件進行毀掉
- 支持降級
- 近實時監控
假設你的分布式系統中存在幾十甚至上百個服務,即使每個服務都能保證99.99的可用性,但是在依賴關系錯綜復雜,單個服務依賴數量過多,隨着請求數量的上升,帶來的損失也是慘重的。
如果我為這個服務本身設置了超時時間,該服務對於不同的依賴方的權重不盡相同。
假設服務A和B都依賴服務C。對於A,它可能依賴很多服務,但C無法在1秒內響應時就放棄。而對於B,C是至關重要的服務,除非是業務數據異常,否則絕對不能中途停止。
如果C設置的超時時間為30s,那么A和B則同樣需要等待30s,這顯然是不合理的。而等待中的這些請求會耗費什么資源就看具體情況了,最壞的情況是拖垮了整個應用。
因此,延遲(lagency)和失敗(failure)都需要被隔離。
Hystrix如何做到這點?
- 通過HystrixCommand在獨立的線程調用服務。
- 超時時間由調用方掌握。
- 為每個依賴維護一個小線程池,線程池滿時可以拒絕請求,而不是將請求放入隊列。
- 區分事件,比如successe、failure, timeout、rejection,針對不同事件進行相應的回調。
- 當失敗占比超過指定閾值時啟動斷路器(circuit-breaker),一段時間內阻止對特定依賴訪問。
下面用幾個簡單的例子進行說明。
Getting Started
通過幾個簡單的例子,對Hystrix有個粗淺的認識。
首先,添加以下依賴
compile group: 'com.netflix.hystrix', name: 'hystrix-core', version: '1.5.10'
參考如下main
package com.kavlez.lab.hystrix;
import com.netflix.hystrix.HystrixCommand;
import com.netflix.hystrix.HystrixCommandGroupKey;
/**
* @author Kavlez
*/
public class Hello {
public static void main(String[] args) {
HystrixCommandGroupKey groupKey = HystrixCommandGroupKey.Factory.asKey("ExampleGroup");
HystrixCommand<String> hystrixCommand = new HystrixCommand<String>(groupKey) {
@Override
protected String run() throws Exception {
return "hi";
}
};
System.out.printf("exec command ... result = %s", hystrixCommand.execute());
}
}
被覆寫的run()為HystrixCommand中定義的抽象方法,調用依賴服務時也是在run中調用。
說明下上面的例子中出現的兩個類。
- HystrixCommand: 用command的包含任何潛在風險(延時、失敗)的代碼,進而對其進行處理,比如容錯、統計、斷路...
-
HystrixCommandGroupKey: 所謂command的group,用於對一系列command統一進行一些操作。
A group name for a {@link HystrixCommand}. This is used for grouping together commands such as for reporting, alerting, dashboards or team/library ownership.
和group一樣,command也是有名稱的。默認為類名
getClass().getSimpleName();
但並沒有提供相應的setter,只是提供了一個構造方法
protected HystrixCommand(Setter setter)
因此,如需指定command名稱,參考如下
final HystrixCommand.Setter setter =
HystrixCommand.Setter.withGroupKey(HystrixCommandGroupKey.Factory.asKey("ExampleGroup"))
.andCommandKey(HystrixCommandKey.Factory.asKey("ExampleCommand"));
HystrixCommand<String> hystrixCommand = new HystrixCommand<String>(setter) {
//...
};
異步執行
如上面的例子中,我們可以通過execute()執行command,這是一種同步執行方式。
如果需要異步執行,只需要用queue()替代execute()即可。
Future<String> future = hystrixCommand.queue();
try {
System.out.printf("exec command ... result = %s\n", future.get());
} catch (ExecutionException e) {
e.printStackTrace();
}
事實上,execute()不過是queue().get()而已。
Observe
嘗試執行gradle dependencies
,打印如下
compile - Dependencies for source set 'main'.
+--- org.slf4j:slf4j-api:1.7.21
\--- com.netflix.hystrix:hystrix-core:1.5.10
+--- org.slf4j:slf4j-api:1.7.0 -> 1.7.21
+--- com.netflix.archaius:archaius-core:0.4.1
| +--- commons-configuration:commons-configuration:1.8
| | +--- commons-lang:commons-lang:2.6
| | \--- commons-logging:commons-logging:1.1.1
| \--- org.slf4j:slf4j-api:1.6.4 -> 1.7.21
+--- io.reactivex:rxjava:1.2.0
\--- org.hdrhistogram:HdrHistogram:2.1.9
我想說的是hystrix依賴RxJava。
其中observe是比較典型的用法,HystrixCommand提供了兩種方法observe和toObservable,官方對兩者描述如下。
- observe() — returns a “hot” Observable that executes the command immediately, though because the Observable is filtered through a ReplaySubject you are not in danger of losing any items that it emits before you have a chance to subscribe
- toObservable() — returns a “cold” Observable that won’t execute the command and begin emitting its results until you subscribe to the Observable
顯然,如果是通過toObservable,同一個command實例是無法被subscribe多次的。
盡管兩者都返回Observable對象,但行為上稍有區別。
但本質上observe()幾乎等同於toObservable().subscribe(subject)
下面是一段例子,由於是通過observe(),命令可以有多個subscriber:
package com.kavlez.lab.hystrix;
import com.netflix.hystrix.HystrixCommand;
import com.netflix.hystrix.HystrixCommandGroupKey;
import rx.Observable;
import rx.Observer;
/**
* @author Kavlez
*/
public class HelloObservable {
public static void main(String[] args) {
HystrixCommandGroupKey groupKey = HystrixCommandGroupKey.Factory.asKey("ExampleGroup");
HystrixCommand<String> hystrixCommand = new HystrixCommand<String>(groupKey) {
@Override
protected String run() throws Exception {
return "hi";
}
};
Observable<String> observe = hystrixCommand.observe();
observe.subscribe(s -> {
System.out.printf("from action1...%s\n", s);
});
observe.subscribe(new Observer<String>() {
@Override
public void onCompleted() {
System.out.println("completed...");
}
@Override
public void onError(Throwable e) {
System.out.printf("error...%s\n", e.getMessage());
}
@Override
public void onNext(String s) {
System.out.printf("from next...%s\n", s);
}
});
}
}
Fallback
試試在run中寫一段Thread.sleep(1000),或者加個斷點讓程序暫停一段時間。
出現j.u.c.TimeoutException和HystrixRuntimeException,且后者提示
timed-out and no fallback available.
這是因為HystrixCommand的getFallback()默認為
protected R getFallback() { throw new UnsupportedOperationException("No fallback available."); }
既然如此,我們只需要覆寫該方法就可以實現降級(degradation)。
比如,把之前的例子改為:
HystrixCommand<String> hystrixCommand = new HystrixCommand<String>(groupKey) {
@Override
protected String run() throws Exception {
Thread.sleep(1000);
return "hi";
}
@Override
protected String getFallback() {
return "hi, sorry i am late...";
}
};
getFallback並沒有提供參數,這意味着fallback不止發生在timeout一種情況,failure、timeout、thread pool rejection都可以觸發fallback。
Circuit Breaker
接下來說說Command是如何和斷路器(circuit breaker)交互的。
HystrixCommand屬性及默認值可以參考抽象類HystrixCommandProperties,其中以circuitBreaker開頭為斷路器相關屬性。
這里先列出3個關於斷路器的屬性,分別為:
- HystrixCommandProperties.circuitBreakerRequestVolumeThreshold() : 請求容量閾值
- HystrixCommandProperties.circuitBreakerErrorThresholdPercentage(): 錯誤占比閾值
- HystrixCommandProperties.circuitBreakerSleepWindowInMilliseconds() : 狀態時長
工作流程大致如下:
- 假設達到了request volume threshold,也就是metrics.healthCounts.totalCount大於該項
- 並且失敗次數的占比也達到了error percentage,默認為50%
- 此時,斷路器的狀態從CLOSED變為OPEN
- 斷路器狀態變為OPEN后,接收到的請求將全部斷路
- 過了恢復時間后,也就是sleep window in milliseconds(默認為5s),斷路器從OPEN變為HALF-OPEN狀態。
- 如果變更為HALF-OPEN后的下一次請求失敗,則變回OPEN狀態,反之為CLOSED。
Request Cache
之前並沒有注意Hystrix也提供了這樣一個特性,command中可以通過覆寫getCacheKey
對請求進行緩存。
該方法默認返回null,也就是不緩存。
如果n個命令都在同一個request scope,則只有一個命令會被執行,其余n-1都是緩存。
代碼參考如下
package com.kavlez.lab.hystrix;
import com.netflix.hystrix.HystrixCommand;
import com.netflix.hystrix.HystrixCommandGroupKey;
import com.netflix.hystrix.HystrixCommandKey;
import com.netflix.hystrix.strategy.concurrency.HystrixRequestContext;
/**
* @author Kavlez
*/
public class ReqCache {
static class HelloCommand extends HystrixCommand<String> {
private static final HystrixCommand.Setter setter =
HystrixCommand.Setter.withGroupKey(HystrixCommandGroupKey.Factory.asKey("ExampleGroup"))
.andCommandKey(HystrixCommandKey.Factory.asKey("ExampleCommand"));
private String requestKey;
protected HelloCommand(String requestKey) {
super(setter);
this.requestKey = requestKey;
}
@Override
protected String run() throws Exception {
return null;
}
@Override
protected String getCacheKey() {
return this.requestKey;
}
}
public static void main(String[] args) {
HystrixRequestContext.initializeContext();
HelloCommand hello1 = new HelloCommand("Billy");
hello1.execute();
System.out.println(hello1.isResponseFromCache());
hello1 = new HelloCommand("Billy");
hello1.execute();
System.out.println(hello1.isResponseFromCache());
hello1 = new HelloCommand("Van");
hello1.execute();
System.out.println(hello1.isResponseFromCache());
}
}
注意這一行
HystrixRequestContext.initializeContext();
缺少HystrixRequestContext會提示illegal state。
Request Collapsing
Hystrix提供了一個叫collapse的特性,將多個請求進行合並,方便將多個請求限制在同一個time windows。
官方給出的例子是獲取收藏夾中的300部電影,類似的場景確實常見。
或者再復雜一點,比如我要獲取300部電影的工作人員信息,幾部不同電影很可能存在相同的工作人員。
也許我可以...首先獲取300部電影的列表,對其進行循環並get工作人員列表,然后再對其進行循環,依次請求工作人員的REST API...無論列表中是否有多個相同的電影和工作人員。
或者我可以僅僅為了這樣的應用場景而專門設計一套API,專門用於獲取電影列表中每一部電影的工作人員的信息。
但這顯然是個笨方法,默許這樣的方法會導致莫名其妙的API越來越多。
因此,為了應付這樣的場景而抽象出一層collapsing layer是值得的。
這樣一來,REST API和實體類可以依然保持單純,而開發者只需要使用HystrixCollapser即可。
假設多個命令同時進行相同的請求,collapser可以將請求進行合並,批量請求,並將結果分發給各個命令。
參考如下例子
package com.kavlez.lab.hystrix;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.netflix.hystrix.HystrixCollapser;
import com.netflix.hystrix.HystrixCollapserKey;
import com.netflix.hystrix.HystrixCommand;
import com.netflix.hystrix.HystrixCommandGroupKey;
import com.netflix.hystrix.strategy.concurrency.HystrixRequestContext;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.stream.Collectors;
/**
* @author Kavlez
* @since 5/12/17.
*/
public class CollapserExample {
private static final String[] names = {"Protos", "Terran", "Hulk", "Anderson", "Uriah", "Gegard", "Velasquez", "Mcgregor", "Jose"};
static class User {
private int id;
private int code;
private String name;
public User(int id, int code, String name) {
this.id = id;
this.code = code;
this.name = name;
}
public int getId() {
return id;
}
public void setId(int id) {
this.id = id;
}
public int getCode() {
return code;
}
public void setCode(int code) {
this.code = code;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
}
static class UserCollapser extends HystrixCollapser<Map<Integer, User>, User, Integer> {
private int userId;
private static final Setter setter = Setter.withCollapserKey(HystrixCollapserKey.Factory.asKey("UserCollapser"));
public UserCollapser(int userId) {
super(setter);
this.userId = userId;
}
@Override
public Integer getRequestArgument() {
return this.userId;
}
@Override
protected HystrixCommand<Map<Integer, User>> createCommand(
Collection<CollapsedRequest<User, Integer>> collapsedRequests) {
return new UserBatchCommand(collapsedRequests.stream().map(request -> {
System.out.println("arg mapped...");
return request.getArgument();
}).collect(Collectors.toList()));
}
@Override
protected void mapResponseToRequests(
Map<Integer, User> batchResponse, Collection<CollapsedRequest<User, Integer>> collapsedRequests) {
for (CollapsedRequest<User, Integer> request : collapsedRequests) {
Integer userId = request.getArgument();
request.setResponse(batchResponse.get(userId));
}
}
}
static class UserBatchCommand extends HystrixCommand<Map<Integer, User>> {
private List<Integer> ids;
private final static Setter setter = Setter.withGroupKey(HystrixCommandGroupKey.Factory.asKey("UserBatchGroup"));
public UserBatchCommand(List<Integer> ids) {
super(setter);
this.ids = ids;
}
@Override
protected Map<Integer, User> run() throws Exception {
return this.getUsers();
}
Map<Integer, User> getUsers() {
Map<Integer, User> users = Maps.newHashMap();
for (Integer id : ids) {
int randomCode = (int) (Math.random() * 100);
users.put(id, new User(id, randomCode, names[randomCode % names.length]));
}
return users;
}
}
public static void main(String[] args) throws ExecutionException, InterruptedException {
HystrixRequestContext.initializeContext();
List<Future<User>> futures = Lists.newArrayList(1, 1, 1, 1, 2, 2, 2, 3, 3, 4).stream()
.map(userId -> new UserCollapser(userId).queue()).collect(Collectors.toList());
for (Future<User> future : futures) {
future.get();
}
}
}
覆寫HystrixCollapser時指定的3個泛型類型,依次為
- batch command返回類型
- response類型
- request參數類型
繼承HystrixCollapser需要覆寫3個方法,分別為
-
protected abstract HystrixCommand createCommand(Collection<CollapsedRequest<ResponseType, RequestArgumentType>> requests); 工廠方法,用於創建HystrixCommand對象,或者說是一個專門用於處理批量請求的command。
多數情況下,該方法創建的命令執行一次后就沒什么用了,所以通常返回一個新的實例。
由於是用於處理批量請求,所以通常會把CollapsedRequest集合整個傳給command。 -
public abstract RequestArgumentType getRequestArgument(); 通過該方法來提供傳遞給HystrixCommand的參數,如果你需要傳遞多個參數,則封裝到一個對象即可。
-
protected abstract void mapResponseToRequests(BatchReturnType batchResponse, Collection<CollapsedRequest<ResponseType, RequestArgumentType>> requests); createCommand創建了對應的command,該command結束后會調用mapResponseToRequests,該方法將BatchReturnType映射為Collection<CollapsedRequest<ResponseType, RequestArgumentType>> requests。
Request Context Setup
上面提到的內容都涉及到了request context。
事實上,Hystrix的一些功能都需要request context,也就是request-scoped features。
比如,上面的例子中的main方法中都有這么一行
HystrixRequestContext context = HystrixRequestContext.initializeContext();
這就需要開發者按需管理HystrixRequestContext的生命周期。
而request多是在web應用中比較常見,比如實現一個servlet filter,在doFilter方法中進行管理。
public class HystrixRequestContextServletFilter implements Filter {
public void doFilter(ServletRequest request, ServletResponse response, FilterChain chain)
throws IOException, ServletException {
HystrixRequestContext context = HystrixRequestContext.initializeContext();
try {
chain.doFilter(request, response);
} finally {
context.shutdown();
}
}
}