Hystrix請求熔斷與服務降級


Hystrix請求熔斷與服務降級

https://www.cnblogs.com/huangjuncong/p/9026949.html

SpringCloud實戰-Hystrix請求熔斷與服務降級

我們知道大量請求會阻塞在Tomcat服務器上,影響其它整個服務.在復雜的分布式架構的應用程序有很多的依賴,都會不可避免地在某些時候失敗.高並發的依賴失敗時如果沒有隔離措施,當前應用服務就有被拖垮的風險.
Spring Cloud Netflix Hystrix就是隔離措施的一種實現,可以設置在某種超時或者失敗情形下斷開依賴調用或者返回指定邏輯,從而提高分布式系統的穩定性.

生活中舉個例子,如電力過載保護器,當電流過大的的時候,出問題,過載器會自動斷開,從而保護電器不受燒壞。因此Hystrix請求熔斷的機制跟電力過載保護器的原理很類似。

比如:訂單系統請求庫存系統,結果一個請求過去,因為各種原因,網絡超時,在規定幾秒內沒反應,或者服務本身就掛了,這時候更多的請求來了,不斷的請求庫存服務,不斷的創建線程,因為沒有返回,也就資源沒有釋放,

這也導致了系統資源被耗盡,你的服務奔潰了,這訂單系統好好的,你訪問了一個可能有問題的庫存系統,結果導致你的訂單系統也奔潰了,你再繼續調用更多的依賴服務,可會會導致更多的系統奔潰,這時候Hystrix可以實現快速失敗,

如果它在一段時間內偵測到許多類似的錯誤,會強迫其以后的多個調用快速失敗,不再訪問遠程服務器,從而防止應用程序不斷地嘗試執行可能會失敗的操作進而導致資源耗盡。這時候Hystrix進行FallBack操作來服務降級,

Fallback相當於是降級操作. 對於查詢操作, 我們可以實現一個fallback方法, 當請求后端服務出現異常的時候, 可以使用fallback方法返回的值. fallback方法的返回值一般是設置的默認值或者來自緩存.通知后面的請求告知這服務暫時不可用了。

使得應用程序繼續執行而不用等待修正錯誤,或者浪費CPU時間去等到長時間的超時產生。Hystrix熔斷器也可以使應用程序能夠診斷錯誤是否已經修正,如果已經修正,應用程序會再次嘗試調用操作。

如下圖所示:

 

 

 

Hystrix設計原則

  1.防止單個服務的故障,耗盡整個系統服務的容器(比如tomcat)的線程資源,避免分布式環境里大量級聯失敗。通過第三方客戶端訪問(通常是通過網絡)依賴服務出現失敗、拒絕、超時或短路時執行回退邏輯

       2.用快速失敗代替排隊(每個依賴服務維護一個小的線程池或信號量,當線程池滿或信號量滿,會立即拒絕服務而不會排隊等待)和優雅的服務降級;當依賴服務失效后又恢復正常,快速恢復

       3.提供接近實時的監控和警報,從而能夠快速發現故障和修復。監控信息包括請求成功,失敗(客戶端拋出的異常),超時和線程拒絕。如果訪問依賴服務的錯誤百分比超過閾值,斷路器會跳閘,此時服務會在一段時間內停止對特定服務的所有請求

       4.將所有請求外部系統(或請求依賴服務)封裝到HystrixCommand或HystrixObservableCommand對象中,然后這些請求在一個獨立的線程中執行。使用隔離技術來限制任何一個依賴的失敗對系統的影響。每個依賴服務維護一個小的線程池(或信號量),當線程池滿或信號量滿,會立即拒絕服務而不會排隊等待

 

Hystrix特性

  1.請求熔斷: 當Hystrix Command請求后端服務失敗數量超過一定比例(默認50%), 斷路器會切換到開路狀態(Open). 這時所有請求會直接失敗而不會發送到后端服務. 斷路器保持在開路狀態一段時間后(默認5秒), 自動切換到半開路狀態(HALF-OPEN).

    這時會判斷下一次請求的返回情況, 如果請求成功, 斷路器切回閉路狀態(CLOSED), 否則重新切換到開路狀態(OPEN). Hystrix的斷路器就像我們家庭電路中的保險絲, 一旦后端服務不可用, 斷路器會直接切斷請求鏈, 避免發送大量無效請求影響系統吞吐量, 並且斷路器有自我檢測並恢復的能力.

  2.服務降級:Fallback相當於是降級操作. 對於查詢操作, 我們可以實現一個fallback方法, 當請求后端服務出現異常的時候, 可以使用fallback方法返回的值. fallback方法的返回值一般是設置的默認值或者來自緩存.告知后面的請求服務不可用了,不要再來了。

  3.依賴隔離(采用艙壁模式,Docker就是艙壁模式的一種):在Hystrix中, 主要通過線程池來實現資源隔離. 通常在使用的時候我們會根據調用的遠程服務划分出多個線程池.比如說,一個服務調用兩外兩個服務,你如果調用兩個服務都用一個線程池,那么如果一個服務卡在哪里,資源沒被釋放

   后面的請求又來了,導致后面的請求都卡在哪里等待,導致你依賴的A服務把你卡在哪里,耗盡了資源,也導致了你另外一個B服務也不可用了。這時如果依賴隔離,某一個服務調用A B兩個服務,如果這時我有100個線程可用,我給A服務分配50個,給B服務分配50個,這樣就算A服務掛了,

   我的B服務依然可以用。

  4.請求緩存:比如一個請求過來請求我userId=1的數據,你后面的請求也過來請求同樣的數據,這時我不會繼續走原來的那條請求鏈路了,而是把第一次請求緩存過了,把第一次的請求結果返回給后面的請求。

  5.請求合並:我依賴於某一個服務,我要調用N次,比如說查數據庫的時候,我發了N條請求發了N條SQL然后拿到一堆結果,這時候我們可以把多個請求合並成一個請求,發送一個查詢多條數據的SQL的請求,這樣我們只需查詢一次數據庫,提升了效率。

Hystrixl流程圖如下:

 

Hystrix流程說明:

 

    1:每次調用創建一個新的HystrixCommand,把依賴調用封裝在run()方法中.
  2:執行execute()/queue做同步或異步調用.
  3:判斷熔斷器(circuit-breaker)是否打開,如果打開跳到步驟8,進行降級策略,如果關閉進入步驟.
  4:判斷線程池/隊列/信號量是否跑滿,如果跑滿進入降級步驟8,否則繼續后續步驟.
  5:調用HystrixCommand的run方法.運行依賴邏輯
  5a:依賴邏輯調用超時,進入步驟8.
  6:判斷邏輯是否調用成功
  6a:返回成功調用結果
  6b:調用出錯,進入步驟8.
  7:計算熔斷器狀態,所有的運行狀態(成功, 失敗, 拒絕,超時)上報給熔斷器,用於統計從而判斷熔斷器狀態.
  8:getFallback()降級邏輯.以下四種情況將觸發getFallback調用:
    (1):run()方法拋出非HystrixBadRequestException異常。
    (2):run()方法調用超時
    (3):熔斷器開啟攔截調用
    (4):線程池/隊列/信號量是否跑滿
  8a:沒有實現getFallback的Command將直接拋出異常
  8b:fallback降級邏輯調用成功直接返回
  8c:降級邏輯調用失敗拋出異常
  9:返回執行成功結果
 
這里接着前面的Ribbon進行Hystrix集成。說白了你想對一個請求進行熔斷,必然不能讓客戶直接去調用那個請求,你必然要要對別人的請求進行包裝一層和攔截,才能做點手腳,比如進行熔斷,所以說要在Ribbon上動手腳。因為它是請求發起的地方。
我們剛開始請求一個服務,為了負載均衡進行了攔截一次,現在我們要進行熔斷,所以必須跟Ribbon集成一次,再進行請求攔截來熔斷。
 
下面開始進行實戰:
1.引入Hystrix相關的依賴如下依賴所示:
復制代碼
    <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-hystrix</artifactId>
            <version>1.4.0.RELEASE</version>
        </dependency>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-hystrix-dashboard</artifactId>
            <version>1.4.0.RELEASE</version>
        </dependency>
復制代碼

 

2.在啟動類中加入@EnableCircuitBreaker注解,表示允許斷路器。如下代碼所示:
復制代碼
package hjc;

import com.netflix.loadbalancer.IRule;
import com.netflix.loadbalancer.RandomRule;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.client.circuitbreaker.EnableCircuitBreaker;
import org.springframework.cloud.client.discovery.EnableDiscoveryClient;
import org.springframework.cloud.client.loadbalancer.LoadBalanced;
import org.springframework.context.annotation.Bean;
import org.springframework.web.client.RestTemplate;

@SpringBootApplication
@EnableDiscoveryClient
//允許斷路器
@EnableCircuitBreaker
public class RibbonApplication {

</span><span style="color: #0000ff;">public</span> <span style="color: #0000ff;">static</span> <span style="color: #0000ff;">void</span><span style="color: #000000;"> main(String[] args) {
    SpringApplication.run(RibbonApplication.</span><span style="color: #0000ff;">class</span><span style="color: #000000;">, args);
}

@Bean
</span><span style="color: #0000ff;">public</span><span style="color: #000000;"> IRule ribbonRule(){
    </span><span style="color: #0000ff;">return</span> <span style="color: #0000ff;">new</span><span style="color: #000000;"> RandomRule();
}

@Bean
@LoadBalanced
</span><span style="color: #0000ff;">public</span><span style="color: #000000;"> RestTemplate restTemplate(){
    </span><span style="color: #0000ff;">return</span> <span style="color: #0000ff;">new</span><span style="color: #000000;"> RestTemplate();
}

}

復制代碼

 

2.現在為了代碼比較清晰一點,我們需要在先前的Ribbon模塊進行新建一個service
 
復制代碼
/**
 * Created by cong on 2018/5/9.
 */
@Service
public class HelloService {
@Autowired
</span><span style="color: #0000ff;">private</span><span style="color: #000000;"> RestTemplate restTemplate;


  //請求熔斷注解,當服務出現問題時候會執行fallbackMetho屬性的名為helloFallBack的方法
@HystrixCommand(fallbackMethod = "helloFallBack")
public String helloService() throws ExecutionException, InterruptedException {
return restTemplate.getForEntity("http://HELLO-SERVICE/hello",String.class).getBody();
  }

  

  public String helloFallBack(){
  return "error";
  }

}
復制代碼

 

Controller端代碼修改為:

復制代碼
@RestController
public class ConsumerController {

  
  @Autowired
  private HelloService helloService;

  @RequestMapping("/consumer")
  public String helloConsumer() throws ExecutionException, InterruptedException {
    return helloService.helloService();
  }
}
復制代碼

 

先把前面的兩個Eureka注冊中心,和前面的provider1,和provider2模塊啟動起來。

接着再把Ribbon模塊啟動起來,在瀏覽器上輸入localhost:8082/consumer,運行結果如下:

 

 

 不管敲幾遍,還是出現hello1,hello2,因為有前面的輪詢算法。

現在如果我們突然將provider2模塊斷開,即停止下來,再來在瀏覽器上輸入localhost:8082/consumer,運行結果如下:

 再進行一次localhost:8082/consumer,運行結果,就變成如下:

我們看到了當輪詢到第二個服務提供者的時候,即provider2,由於provider2被我們停止了,導致服務不可訪問了,返回我們原先在代碼中定義的服務降級后的結果error回來,當后面還有請求再也不會輪詢到provider2了,

網頁上永遠出現hello1。

 

到這里簡單演示了用Hystrix的注解@HystrixCommand(fallbackMethod = "helloFallBack"),來實現熔斷和服務降級。這只是表面的東西而已,根本不清楚他背后的原理,

因此這里進入注解@HystrixCommand(fallbackMethod = "helloFallBack")的背后原理來實現熔斷和服務降級。用我們自己手寫的代碼去實現熔斷和服務降級。那么Hystrix給我們留下了什么樣的接口呢?可以讓我們自己手動更靈活的去實現熔斷和服務降級。

Hystrix給我們提供了HystrixCommand類,讓我們去繼承它,去實現靈活的熔斷和服務降級。

如下代碼:

復制代碼
public class HelloServiceCommand extends HystrixCommand<String> {
</span><span style="color: #0000ff;">private</span><span style="color: #000000;"> RestTemplate restTemplate;

</span><span style="color: #0000ff;">protected</span><span style="color: #000000;"> HelloServiceCommand(HystrixCommandGroupKey group) {
    <br>      super(group);
    </span><span style="color: #000000;">
}

  //服務調用
@Override
protected String run() throws Exception {
System.
out.println(Thread.currentThread().getName());
return restTemplate.getForEntity("http://HELLO-SERVICE/hello",String.class).getBody();
}
  //服務降級時所調用的Fallback()
@Override
protected String getFallback() {
return "error";
}
}

復制代碼

看到上面的代碼,問題又來了,我們知道我們繼承HystrixCommand類的HelloServiceCommand 是沒有交由Spring進行管理的,那么也就沒法進行RestTemplate注入了。

那么我們怎么做的呢?這時候讀者要轉過彎來了,我們為什么不通過Controller先注入,然后調用Service層的時候,通過HelloServiceCommand的構造方法注入呢?因此問題就迎刃而解了。

修改后的代碼如下:

復制代碼
package hjc.consumer;

import com.netflix.hystrix.HystrixCommand;
import com.netflix.hystrix.HystrixCommandGroupKey;
import org.springframework.web.client.RestTemplate;

/

  • Created by cong on 2018/5/9.
    */
    public class HelloServiceCommand extends HystrixCommand<String> {

    private RestTemplate restTemplate;

    protected HelloServiceCommand(String commandGroupKey,RestTemplate restTemplate) {
    super(HystrixCommandGroupKey.Factory.asKey(commandGroupKey));
    this.restTemplate = restTemplate;
    }

    @Override
    protected String run() throws Exception {
    System.
    out.println(Thread.currentThread().getName());
    return restTemplate.getForEntity("http://HELLO-SERVICE/hello",String.class).getBody();
    }

    @Override
    protected String getFallback() {
    return "error";
    }
    }

復制代碼

Controller層的代碼如下:

復制代碼
/**
 * Created by cong on 2018/5/8.
 */
@RestController
public class ConsumerController {
@Autowired
</span><span style="color: #0000ff;">private</span><span style="color: #000000;"> HelloService helloService;

@Autowired
</span><span style="color: #0000ff;">private</span><span style="color: #000000;">  RestTemplate restTemplate;

@RequestMapping(</span><span style="color: #800000;">"</span><span style="color: #800000;">/consumer</span><span style="color: #800000;">"</span><span style="color: #000000;">)
</span><span style="color: #0000ff;">public</span><span style="color: #000000;"> String helloConsumer() throws ExecutionException, InterruptedException {

    HelloServiceCommand command </span>= <span style="color: #0000ff;">new</span> HelloServiceCommand(<span style="color: #800000;">"</span><span style="color: #800000;">hello</span><span style="color: #800000;">"</span><span style="color: #000000;">,restTemplate);
    String result </span>=<span style="color: #000000;"> command.execute();
    </span><span style="color: #0000ff;">return</span> result;<br>  }<br>}</pre>
復制代碼

在這里我們要注意一下,雖然我們在這里new了個HelloServiceCommand,但是並沒有調用HelloServiceCommand的方法,而是用command.execute();方法來手工執行的。

接着再把Ribbon模塊啟動起來,在瀏覽器上輸入localhost:8082/consumer,運行結果如下:

 

 

 不管敲幾遍,還是出現hello1,hello2,因為有前面的輪詢算法。

現在如果我們突然將provider2模塊斷開,即停止下來,再來在瀏覽器上輸入localhost:8082/consumer,運行結果如下:

 再進行一次localhost:8082/consumer,運行結果,就變成如下:

我們看到了當輪詢到第二個服務提供者的時候,即provider2,由於provider2被我們停止了,導致服務不可訪問了,返回我們原先在代碼中定義的服務降級后的結果error回來,當后面還有請求再也不會輪詢到provider2了,

網頁上永遠出現hello1。

 

那么問題又來了,restTemplate.getForEntity("http://HELLO-SERVICE/hello",String.class).getBody();這是阻塞式的,因為這是阻塞式的,如果后面還有代碼,必須等到網絡請求restTemplate.getForEntity("http://HELLO-SERVICE/hello",String.class).getBody();返回結果后,你后面的代碼才會執行。

如果此刻,有一個請求過來,通過Ribbon客戶端進來了,Ribbon客戶端調用了三個服務,每一服務執行的時間都是2秒鍾,那么這三個服務都是用阻塞IO來執行的話,那么耗時是2+2+2=6,一共就花了6秒鍾。那么如果我們使用異步來執行的話,花費的時間就是這三個服務中

哪一個耗時長就是總耗時時間,比如,此時耗時最多的一個服務是3秒鍾,那么總共耗時就花了3秒鍾。那么異步IO是什么意思呢?就是請求發出去以后,主線程不會在原地等着,會繼續往下執行我的主線程,什么時候返回結果,我就什么時候過去取出來。等着三個服務執行完了我就一次性把結果取

出來。

非阻塞式IO有兩個分別是:Future將來式,Callable回調式

1.Future將來式:就是說你用Future將來式去請求一個網絡IO之類的任務,它會一多線程的形式去實現,主線程不必卡死在哪里等待,等什么時候需要結果就通過Future的get()方法去取,不用阻塞。

2.Callable回調式:預定義一個回調任務,Callable發出去的請求,主線程繼續往下執行,等你請求返回結果執行完了,會自動調用你哪個回調任務。

 

好了,那么代碼如何修改呢?其實HelloServiceCommand類幾面不用變,只需要改變一下在Controller層的command的調用方式即可,command的叫用方式如下:

Future<String> queue = command.queue();
return queue.get();

然后重啟Ribbon模塊,結果跟上面一樣。

 

那么Future的注解方式如何調用呢?代碼如下所示:

復制代碼
/**
 * Created by cong on 2018/5/9.
 */
@Service
public class HelloService {
@Autowired
</span><span style="color: #0000ff;">private</span><span style="color: #000000;"> RestTemplate restTemplate;

@HystrixCommand(fallbackMethod </span>= <span style="color: #800000;">"</span><span style="color: #800000;">helloFallBack</span><span style="color: #800000;">"</span><span style="color: #000000;">)
</span><span style="color: #0000ff;">public</span><span style="color: #000000;"> String helloService() throws ExecutionException, InterruptedException {

    Future</span>&lt;String&gt; future = <span style="color: #0000ff;">new</span> AsyncResult&lt;String&gt;<span style="color: #000000;">() {
        @Override
        </span><span style="color: #0000ff;">public</span><span style="color: #000000;"> String invoke() {
            </span><span style="color: #0000ff;">return</span> restTemplate.getForEntity(<span style="color: #800000;">"</span><span style="color: #800000;">http://HELLO-SERVICE/hello</span><span style="color: #800000;">"</span>,String.<span style="color: #0000ff;">class</span><span style="color: #000000;">).getBody();
        }
    };
    </span><span style="color: #0000ff;">return</span> future.<span style="color: #0000ff;">get</span><span style="color: #000000;">();
}<br><br></span></pre>
  public String helloFallBack(){
  return "error";
  }


}
復制代碼

 

運行結果跟上面的一樣。

 

那么接下來我們又有另外一個需求就是,我發多個請求出去請求多個服務,我需要把請求結果匯總起來,一起返回給我,上面的例子,什么同步異步都不太好辦。很麻煩,要寫N個Future。

 

這時候Hystrix又給我們提供了另外一種模式HystrixObservableCommand來讓我們繼承這個類,其實這種模式就運用了Java的RX編程中的觀察者模式,如下:

 

 接下來我們新建一個名為HelloServiceObserveCommand的類,來繼承Hystrix給我們提供的HystrixObservableCommand類,同樣HelloServiceObserveCommand類也不是交由Spring管理的,需要我們通過Controller層注入RestTemplate,放在構造方法來注入,代碼如下所示:

復制代碼
package hjc.consumer;

import com.netflix.hystrix.HystrixCommandGroupKey;
import com.netflix.hystrix.HystrixObservableCommand;
import org.springframework.web.client.RestClientException;
import org.springframework.web.client.RestTemplate;
import rx.Observable;
import rx.Subscriber;

/

  • Created by cong on 2018/5/10.
    */
    public class HelloServiceObserveCommand extends HystrixObservableCommand<String>{

    private RestTemplate restTemplate;

    protected HelloServiceObserveCommand(String commandGroupKey, RestTemplate restTemplate) {
    super(HystrixCommandGroupKey.Factory.asKey(commandGroupKey));
    this.restTemplate = restTemplate;
    }

    @Override
    protected Observable<String> construct() {
          //觀察者訂閱網絡請求事件
    return Observable.create(new Observable.OnSubscribe<String>() {
    @Override
    public void call(Subscriber<? super String> subscriber) {
    try {
    if (!subscriber.isUnsubscribed()){
    System.
    out.println("方法執行....");
    String result
    = restTemplate.getForEntity("http://HELLO-SERVICE/hello", String.class).getBody();
                  //這個方法監聽方法,是傳遞結果的,請求多次的結果通過它返回去匯總起來。
    subscriber.onNext(result);
    String result1
    = restTemplate.getForEntity("http://HELLO-SERVICE/hello", String.class).getBody();
                  //這個方法是監聽方法,傳遞結果的
    subscriber.onNext(result1);
    subscriber.onCompleted();
    }
    }
    catch (Exception e) {
    subscriber.onError(e);
    }
    }
    });
    }
      //服務降級Fallback
    @Override
    protected Observable<String> resumeWithFallback() {
    return Observable.create(new Observable.OnSubscribe<String>() {
    @Override
    public void call(Subscriber<? super String> subscriber) {
    try {
    if (!subscriber.isUnsubscribed()) {
    subscriber.onNext(
    "error");
    subscriber.onCompleted();
    }
    }
    catch (Exception e) {
    subscriber.onError(e);
    }
    }
    });
    }
    }

復制代碼

Controller層調用如下代碼所示:

復制代碼
/**
 * Created by cong on 2018/5/8.
 */
@RestController
public class ConsumerController {
@Autowired
</span><span style="color: #0000ff;">private</span><span style="color: #000000;">  RestTemplate restTemplate;

@RequestMapping(</span><span style="color: #800000;">"</span><span style="color: #800000;">/consumer</span><span style="color: #800000;">"</span><span style="color: #000000;">)
</span><span style="color: #0000ff;">public</span><span style="color: #000000;"> String helloConsumer() throws ExecutionException, InterruptedException {

    List</span>&lt;String&gt; list = <span style="color: #0000ff;">new</span> ArrayList&lt;&gt;<span style="color: #000000;">();
    HelloServiceObserveCommand command </span>= <span style="color: #0000ff;">new</span> HelloServiceObserveCommand(<span style="color: #800000;">"</span><span style="color: #800000;">hello</span><span style="color: #800000;">"</span><span style="color: #000000;">,restTemplate);
    </span><span style="color: #008000;">//</span><span style="color: #008000;">熱執行</span>
    Observable&lt;String&gt; observable =<span style="color: #000000;"> command.observe();
    </span><span style="color: #008000;">//</span><span style="color: #008000;">冷執行

// Observable<String> observable =command.toObservable();
    //訂閱
observable.subscribe(
new Observer<String>() {
        //請求完成的方法
@Override
public void onCompleted() {
System.
out.println("會聚完了所有查詢請求");
}
      
@Override
public void onError(Throwable throwable) {
throwable.printStackTrace();
}
        //訂閱調用事件,結果會聚的地方,用集合去裝返回的結果會聚起來。
@Override
public void onNext(String s) {
System.
out.println("結果來了.....");
list.add(s);
}
});

    </span><span style="color: #0000ff;">return</span><span style="color: #000000;"> list.toString();

}

}

復制代碼

運行結果如下:

 

前面的例子有異步和同步這兩種方式,這里HystrixObservableCommand也有兩個中執行方式,分別是,冷執行,和熱執行

剛剛HystrixObservableCommand中的command.observe()熱執行方式。

什么是熱執行方式呢?

  所謂的熱執行就是不管你事件有沒有注冊完(onCompleted(),onError,onNext這三個事件注冊),就去執行我的業務方法即(HystrixObservableCommand實現類中的construct()方法).我們可以在上面的代碼中sleep(10000)一下清楚看出熱執行,如下:

復制代碼
/**
 * Created by cong on 2018/5/8.
 */
@RestController
public class ConsumerController {
@Autowired
</span><span style="color: #0000ff;">private</span><span style="color: #000000;">  RestTemplate restTemplate;

@RequestMapping(</span><span style="color: #800000;">"</span><span style="color: #800000;">/consumer</span><span style="color: #800000;">"</span><span style="color: #000000;">)
</span><span style="color: #0000ff;">public</span><span style="color: #000000;"> String helloConsumer() throws ExecutionException, InterruptedException {

    List</span>&lt;String&gt; list = <span style="color: #0000ff;">new</span> ArrayList&lt;&gt;<span style="color: #000000;">();
    HelloServiceObserveCommand command </span>= <span style="color: #0000ff;">new</span> HelloServiceObserveCommand(<span style="color: #800000;">"</span><span style="color: #800000;">hello</span><span style="color: #800000;">"</span><span style="color: #000000;">,restTemplate);
    </span><span style="color: #008000;">//</span><span style="color: #008000;">熱執行</span>
    Observable&lt;String&gt; observable =<span style="color: #000000;"> command.observe();
    </span><span style="color: #008000;">//</span><span style="color: #008000;">冷執行

// Observable<String> observable =command.toObservable();
    Thread.sleep(10000);
    
//訂閱
observable.subscribe(new Observer<String>() {
        
//請求完成的方法
@Override
public void onCompleted() {
System.
out.println("會聚完了所有查詢請求");
}
      
@Override
public void onError(Throwable throwable) {
throwable.printStackTrace();
}
        
//訂閱調用事件,結果會聚的地方,用集合去裝返回的結果會聚起來。
@Override
public void onNext(String s) {
System.
out.println("結果來了.....");
list.add(s);
}
});

    </span><span style="color: #0000ff;">return</span><span style="color: #000000;"> list.toString();

}

}

復制代碼

運行結果可以看到,是先執行了業務方法,在卡頓了10秒后才時間監聽方法才執行,如下所示:

過10秒后事件監聽方法才執行,如下:

 

什么是冷執行呢?

  所謂的冷執行就是,先進行事件監聽方法注冊完成后,才執行業務方法

接下來我們把Controller中的Observable<String> observable = command.observe();改成冷執行Observable<String> observable =command.toObservable();

運行結果如下:

  先卡頓了10S后,才出現如下的結果:

 

好了,現在我們有回到注解的方式層面上去實現多請求,將結果會聚起來,代碼如下:

復制代碼
/**
* Created by cong on 2018/5/9.
*/
@Service
public class HelloService {

@Autowired
private RestTemplate restTemplate;


//多請求結果會聚的注解寫法,調用還是跟手寫會聚一樣調用
//ObservableExecutionMode.EAGER熱執行 ObservableExecutionMode.LAZY冷執行
  //還可以忽略某些異常避免出現服務降級,有時候某些異常出現,但是我們並不想服務降級,異常就異常吧。參數ignoreExceptions = XXX.class
  //groupKey ="" ,threadPoolKey = "",這是線程隔離,比如我需要根據groupKey划分,如果還要對groupKey內的任務進一步划分,就要threadPoolKey,比如對groupKey組內進行
  //讀取數據的時候,是從緩存讀,還是數據庫讀
  //@CacheKey,緩存的注解方式
    @HystrixCommand(fallbackMethod = "helloFallBack",observableExecutionMode = ObservableExecutionMode.LAZY)
public Observable<String> helloService() throws ExecutionException, InterruptedException {
return Observable.create(new Observable.OnSubscribe<String>() {
@Override
public void call(Subscriber<? super String> subscriber) {
try {
if (!subscriber.isUnsubscribed()){
String result = restTemplate.getForEntity("http://HELLO-SERVICE/hello", String.class).getBody();
subscriber.onNext(result);
String result1 = restTemplate.getForEntity("http://HELLO-SERVICE/hello", String.class).getBody();
subscriber.onNext(result1);
subscriber.onCompleted();
}
} catch (Exception e) {
subscriber.onError(e);
}
}
});
}

public String helloFallBack(){
return "error";
}

}
復制代碼

Controller層直接調用就行了,運行結果跟上面例子的結果都是一樣的,這里就不演示了。

 

對於服務降級里面還有網絡請求,請求又失敗可以再次降級,在一級降級方法上繼續打上 @HystrixCommand注解進行級聯,然后進行二次服務降級,一般不會這樣干,因為這樣下去沒完沒了。

如果想在服務降級拿到異常,給業務一些提示,那怎么辦呢?很簡單,你在方法里面加入Throwable即可,代碼如下:
復制代碼
  @HystrixCommand(fallbackMethod = "XX降級方法",observableExecutionMode = ObservableExecutionMode.LAZY)
    public String helloFallBack(Throwable throwable){
     //網絡請求
      ........
    
return "error"; }
復制代碼

 

標簽: SpringCloud
2
0
« 上一篇: SpringCloud實戰-Ribbon客戶端負載均衡
» 下一篇: SpringCloud實戰-Hystrix線程隔離&請求緩存&請求合並
	</div>
	<div class="postDesc">posted @ <span id="post-date">2018-05-12 15:46</span> <a href="http://www.cnblogs.com/huangjuncong/">蝸居在小黑屋操控世界</a> 閱讀(<span id="post_view_count">136</span>) 評論(<span id="post_comment_count">0</span>)  <a href="https://i.cnblogs.com/EditPosts.aspx?postid=9026949" rel="nofollow">編輯</a> <a href="#" onclick="AddToWz(9026949);return false;">收藏</a></div>
</div>
posted on 2018-05-18 10:23  HackerVirus  閱讀( 2032)  評論( 0編輯  收藏


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM