一、hystrix基本介紹
Hystrix(https://github.com/Netflix/Hystrix)是Netflix(https://www.netflix.com/global)的一個開源項目,主要作用是通過控制那些訪問遠程系統、服務和第三方庫的節點,從而對延遲和故障提供更強大的容錯能力。 其可以看做是Netflix團隊對分布式系統運維的各種理念和實踐的總結。
二、基本用法
①pom.xml加上以下依賴
<dependency> <groupId>com.netflix.hystrix</groupId> <artifactId>hystrix-core</artifactId> <version>1.5.8</version> </dependency>
②基本使用
public class HelloWorldHystrixCommand extends HystrixCommand<String>{ private final String name; public HelloWorldHystrixCommand(String name) { super(HystrixCommandGroupKey.Factory.asKey("ExampleGroup")); this.name = name; } @Override protected String run() throws Exception { //Thread.sleep(100); return "hello"+name; } }
public static void main(String[] args){ String result = new HelloWorldHystrixCommand("test").execute(); System.out.println(result); }
或者
public class HelloWorldHystrixObservableCommand extends HystrixObservableCommand<String>{ private final String name; protected HelloWorldHystrixObservableCommand(String name) { super(HystrixCommandGroupKey.Factory.asKey("ExampleGroup")); this.name = name; } @Override protected Observable<String> construct() { System.out.println("in construct! thread:" + Thread.currentThread().getName()); return (Observable<String>) Observable.create(new Observable.OnSubscribe<String>() { // @Override public void call(Subscriber<? super String> observer) { try { System.out.println("in call of construct! thread:" + Thread.currentThread().getName()); if (!observer.isUnsubscribed()) { // observer.onError(getExecutionException()); // 直接拋異常退出,不會往下執行 observer.onNext("Hello1" + " thread:" + Thread.currentThread().getName()); observer.onNext("Hello2" + " thread:" + Thread.currentThread().getName()); observer.onNext(name + " thread:" + Thread.currentThread().getName()); System.out.println("complete before------" + " thread:" + Thread.currentThread().getName()); observer.onCompleted(); // 不會往下執行observer的任何方法 System.out.println("complete after------" + " thread:" + Thread.currentThread().getName()); observer.onCompleted(); // 不會執行到 observer.onNext("abc"); // 不會執行到 } } catch (Exception e) { observer.onError(e); } } }); } }
public static void main(String[] args) { Observable<String> observable = new HelloWorldHystrixObservableCommand("test").observe(); observable.subscribe(new Subscriber<String>() { public void onCompleted() { System.out.println("completed"); } public void onError(Throwable throwable) { System.out.println("error-----------"+throwable); } public void onNext(String v) { System.out.println("next------------" + v); } }); }

③HystrixCommand 與 HystrixObservableCommand對比
要想使用hystrix,只需要繼承HystrixCommand
或HystrixObservableCommand
,簡單用法見上面例子。
兩者主要區別是:
(1)前者的命令邏輯寫在run()
;后者的命令邏輯寫在construct()
(2)前者的run()
是由新創建的線程執行;后者的construct()
是由調用程序線程執行
(3)前者一個實例只能向調用程序發送(emit)單條數據,比如上面例子中run()
只能返回一個String結果;后者一個實例可以順序發送多條數據,比如demo中順序調用多個onNext()
,便實現了向調用程序發送多條數據;
execute()
、
queue()
、
observe()
、
toObservable()
這4個方法用來觸發執行
run()/construct()
,一個實例只能執行一次這4個方法,特別說明的是
HystrixObservableCommand
沒有
execute()
和
queue()
。
(1)execute()
:以同步堵塞方式執行run()
。調用execute()
后,hystrix先創建一個新線程運行run()
,接着調用程序要在execute()
調用處一直堵塞着,直到run()
運行完成(2)queue()
:以異步非堵塞方式執行run()
。一調用queue()
就直接返回一個Future對象,同時hystrix創建一個新線程運行run()
,調用程序通過Future.get()
拿到run()
的返回結果,而Future.get()
是堵塞執行的。
測試代碼如下:
@Test public void testQueue() throws Exception { // queue()是異步非堵塞性執行:直接返回,同時創建一個線程運行HelloWorldHystrixCommand.run() // 一個對象只能queue()一次 // queue()事實上是toObservable().toBlocking().toFuture() Future<String> future = new HelloWorldHystrixCommand("Hlx").queue(); // 使用future時會堵塞,必須等待HelloWorldHystrixCommand.run()執行完返回 String queueResult = future.get(10000, TimeUnit.MILLISECONDS); // String queueResult = future.get(); System.out.println("queue異步結果:" + queueResult); assertEquals("hello", queueResult.substring(0, 5)); }
(3)observe()
:事件注冊前執行run()/construct()
。第一步是事件注冊前,先調用observe()
自動觸發執行run()/construct()
(如果繼承的是HystrixCommand
,hystrix將創建新線程非堵塞執行run()
;如果繼承的是HystrixObservableCommand
,將以調用程序線程堵塞執行construct()
),第二步是從observe()
返回后調用程序調用subscribe()
完成事件注冊,如果run()/construct()
執行成功則觸發onNext()
和onCompleted()
,如果執行異常則觸發onError()
測試代碼如下:
@Test public void testObservable() throws Exception { // observe()是異步非堵塞性執行,同queue Observable<String> hotObservable = new HelloWorldHystrixCommand("Hlx").observe(); // single()是堵塞的 //System.out.println("hotObservable single結果:" + hotObservable.toBlocking().single()); //System.out.println("------------------single"); // 注冊觀察者事件 // subscribe()是非堵塞的 hotObservable.subscribe(new Observer<String>() { // 先執行onNext再執行onCompleted // @Override public void onCompleted() { System.out.println("hotObservable completed"); } // @Override public void onError(Throwable e) { e.printStackTrace(); } // @Override public void onNext(String v) { System.out.println("hotObservable onNext: " + v); } }); // 非堵塞 // - also verbose anonymous inner-class // - ignore errors and onCompleted signal hotObservable.subscribe(new Action1<String>() { // 相當於上面的onNext() // @Override public void call(String v) { System.out.println("hotObservable call: " + v); } }); // 主線程不直接退出,在此一直等待其他線程執行 System.in.read(); }
(4)toObservable()
:事件注冊后執行run()/construct()
。第一步是事件注冊前,一調用toObservable()
就直接返回一個Observable<String>
對象,第二步調用subscribe()
完成事件注冊后自動觸發執行run()/construct()
(如果繼承的是HystrixCommand
,hystrix將創建新線程非堵塞執行run()
,調用程序不必等待run()
;如果繼承的是HystrixObservableCommand
,將以調用程序線程堵塞執行construct()
,調用程序等待construct()
執行完才能繼續往下走),如果run()/construct()
執行成功則觸發onNext()
和onCompleted()
,如果執行異常則觸發onError()
測試代碼如下:
@Test public void testToObservable() throws Exception { // toObservable()是異步非堵塞性執行,同queue Observable<String> coldObservable = new HelloWorldHystrixCommand("Hlx").toObservable(); // single()是堵塞的 //System.out.println("coldObservable single結果:" + coldObservable.toBlocking().single()); // 注冊觀察者事件 // subscribe()是非堵塞的 // - this is a verbose anonymous inner-class approach and doesn't do assertions coldObservable.subscribe(new Observer<String>() { // 先執行onNext再執行onCompleted // @Override public void onCompleted() { System.out.println("coldObservable completed"); } // @Override public void onError(Throwable e) { System.out.println("coldObservable error"); e.printStackTrace(); } // @Override public void onNext(String v) { System.out.println("coldObservable onNext: " + v); } }); // 非堵塞 // - also verbose anonymous inner-class // - ignore errors and onCompleted signal /*coldObservable.subscribe(new Action1<String>() { public void call(String v) { // 相當於上面的onNext() // @Override System.out.println("coldObservable call: " + v); } });*/ // 主線程不直接退出,在此一直等待其他線程執行 System.in.read(); }
參考文獻:
http://www.jianshu.com/p/b9af028efebb