hystrix基本介紹和使用(1)


一、hystrix基本介紹

Hystrix(https://github.com/Netflix/Hystrix)是Netflix(https://www.netflix.com/global)的一個開源項目,主要作用是通過控制那些訪問遠程系統、服務和第三方庫的節點,從而對延遲和故障提供更強大的容錯能力。 其可以看做是Netflix團隊對分布式系統運維的各種理念和實踐的總結。

二、基本用法

①pom.xml加上以下依賴

<dependency>
  <groupId>com.netflix.hystrix</groupId>
  <artifactId>hystrix-core</artifactId>
  <version>1.5.8</version>
</dependency>

 ②基本使用

如果某程序或class要使用Hystrix,只需簡單繼承 HystrixCommand/HystrixObservableCommand並重寫run()/construct()
然后調用程序實例化此class並執行execute()/queue()/observe()/toObservable()
例如:
public class HelloWorldHystrixCommand extends HystrixCommand<String>{
	private final String name; 
	public HelloWorldHystrixCommand(String name) {
		super(HystrixCommandGroupKey.Factory.asKey("ExampleGroup"));
		this.name = name;
	}

	@Override
	protected String run() throws Exception {
		//Thread.sleep(100);
		return "hello"+name;
	}
}
public static void main(String[] args){	
    String result = new HelloWorldHystrixCommand("test").execute();
    System.out.println(result);
}

或者

public class HelloWorldHystrixObservableCommand extends HystrixObservableCommand<String>{

	private final String name;
	
	protected HelloWorldHystrixObservableCommand(String  name) {
		super(HystrixCommandGroupKey.Factory.asKey("ExampleGroup"));
		this.name = name;
	}
	
	@Override
	protected Observable<String> construct() {
		System.out.println("in construct! thread:" + Thread.currentThread().getName());
             return (Observable<String>) Observable.create(new Observable.OnSubscribe<String>() {
//            @Override
            public void call(Subscriber<? super String> observer) {
                try {
                	System.out.println("in call of construct! thread:" + Thread.currentThread().getName());
                    if (!observer.isUnsubscribed()) {
//                      observer.onError(getExecutionException());	// 直接拋異常退出,不會往下執行
                        observer.onNext("Hello1" + " thread:" + Thread.currentThread().getName());
                        observer.onNext("Hello2" + " thread:" + Thread.currentThread().getName());
                        observer.onNext(name + " thread:" + Thread.currentThread().getName());
                        System.out.println("complete before------" + " thread:" + Thread.currentThread().getName());
                        observer.onCompleted();	// 不會往下執行observer的任何方法
                        System.out.println("complete after------" + " thread:" + Thread.currentThread().getName());
                        observer.onCompleted();	// 不會執行到
                        observer.onNext("abc");	// 不會執行到
                    }
                } catch (Exception e) {
                    observer.onError(e);
                }
            }
         });
	}
}
public static void main(String[] args) {
    Observable<String> observable = new HelloWorldHystrixObservableCommand("test").observe();
    observable.subscribe(new Subscriber<String>() {
          public void onCompleted() {
	      System.out.println("completed");
	  }
	  public void onError(Throwable throwable) {
	        System.out.println("error-----------"+throwable);
	  }
	  public void onNext(String v) {
		System.out.println("next------------" + v);
       	  }
  });
}

③HystrixCommand 與 HystrixObservableCommand對比

要想使用hystrix,只需要繼承HystrixCommandHystrixObservableCommand,簡單用法見上面例子。

兩者主要區別是:

(1)前者的命令邏輯寫在run();后者的命令邏輯寫在construct()

(2)前者的run()是由新創建的線程執行;后者的construct()是由調用程序線程執行

(3)前者一個實例只能向調用程序發送(emit)單條數據,比如上面例子中run()只能返回一個String結果;后者一個實例可以順序發送多條數據,比如demo中順序調用多個onNext(),便實現了向調用程序發送多條數據;

④4個命令的執行方法對比
execute()queue()observe()toObservable()這4個方法用來觸發執行 run()/construct(),一個實例只能執行一次這4個方法,特別說明的是 HystrixObservableCommand沒有 execute()queue()
4個方法的主要區別是:

(1)execute():以同步堵塞方式執行run()。調用execute()后,hystrix先創建一個新線程運行run(),接着調用程序要在execute()調用處一直堵塞着,直到run()運行完成(2)queue():以異步非堵塞方式執行run()。一調用queue()就直接返回一個Future對象,同時hystrix創建一個新線程運行run(),調用程序通過Future.get()拿到run()的返回結果,而Future.get()是堵塞執行的。

測試代碼如下:

@Test
    public void testQueue() throws Exception {
        // queue()是異步非堵塞性執行:直接返回,同時創建一個線程運行HelloWorldHystrixCommand.run()
        // 一個對象只能queue()一次
        // queue()事實上是toObservable().toBlocking().toFuture()
        Future<String> future = new HelloWorldHystrixCommand("Hlx").queue();
        
        // 使用future時會堵塞,必須等待HelloWorldHystrixCommand.run()執行完返回
        String queueResult = future.get(10000, TimeUnit.MILLISECONDS);
        // String queueResult = future.get();
        System.out.println("queue異步結果:" + queueResult);
        assertEquals("hello", queueResult.substring(0, 5));
}

(3)observe():事件注冊前執行run()/construct()。第一步是事件注冊前,先調用observe()自動觸發執行run()/construct()(如果繼承的是HystrixCommand,hystrix將創建新線程非堵塞執行run();如果繼承的是HystrixObservableCommand,將以調用程序線程堵塞執行construct()),第二步是從observe()返回后調用程序調用subscribe()完成事件注冊,如果run()/construct()執行成功則觸發onNext()onCompleted(),如果執行異常則觸發onError()

測試代碼如下:

@Test
	public void testObservable() throws Exception {

		// observe()是異步非堵塞性執行,同queue
		Observable<String> hotObservable = new HelloWorldHystrixCommand("Hlx").observe();
		
		// single()是堵塞的
		//System.out.println("hotObservable single結果:" + hotObservable.toBlocking().single());
	    //System.out.println("------------------single");
		// 注冊觀察者事件
		// subscribe()是非堵塞的
		hotObservable.subscribe(new Observer<String>() {

			// 先執行onNext再執行onCompleted
			// @Override
			public void onCompleted() {
				System.out.println("hotObservable completed");
			}
			// @Override
			public void onError(Throwable e) {
				e.printStackTrace();
			}
			// @Override
			public void onNext(String v) {
				System.out.println("hotObservable onNext: " + v);
			}
		});
		// 非堵塞
		// - also verbose anonymous inner-class
		// - ignore errors and onCompleted signal
		hotObservable.subscribe(new Action1<String>() {

			// 相當於上面的onNext()
			// @Override
			public void call(String v) {
				System.out.println("hotObservable call: " + v);
			}
		});
		// 主線程不直接退出,在此一直等待其他線程執行
		System.in.read();
	}

(4)toObservable():事件注冊后執行run()/construct()。第一步是事件注冊前,一調用toObservable()就直接返回一個Observable<String>對象,第二步調用subscribe()完成事件注冊后自動觸發執行run()/construct()(如果繼承的是HystrixCommand,hystrix將創建新線程非堵塞執行run(),調用程序不必等待run();如果繼承的是HystrixObservableCommand,將以調用程序線程堵塞執行construct(),調用程序等待construct()執行完才能繼續往下走),如果run()/construct()執行成功則觸發onNext()onCompleted(),如果執行異常則觸發onError()

測試代碼如下:

@Test
	public void testToObservable() throws Exception {

		// toObservable()是異步非堵塞性執行,同queue
		Observable<String> coldObservable = new HelloWorldHystrixCommand("Hlx").toObservable();

		// single()是堵塞的
		//System.out.println("coldObservable single結果:" + coldObservable.toBlocking().single());
		
		// 注冊觀察者事件
		// subscribe()是非堵塞的
		// - this is a verbose anonymous inner-class approach and doesn't do assertions
		coldObservable.subscribe(new Observer<String>() {

			// 先執行onNext再執行onCompleted
			// @Override
			public void onCompleted() {
				System.out.println("coldObservable completed");
			}
			// @Override
			public void onError(Throwable e) {
				System.out.println("coldObservable error");
				e.printStackTrace();
			}
			// @Override
			public void onNext(String v) {
				System.out.println("coldObservable onNext: " + v);
			}
		});
		// 非堵塞
		// - also verbose anonymous inner-class
		// - ignore errors and onCompleted signal
		/*coldObservable.subscribe(new Action1<String>() {

			public void call(String v) {
				// 相當於上面的onNext()
				// @Override				
				System.out.println("coldObservable call: " + v);
				}
		});*/
		// 主線程不直接退出,在此一直等待其他線程執行
		System.in.read();

}

 參考文獻:

http://www.jianshu.com/p/b9af028efebb


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM