Spring Cloud Netflix Hystrix介紹和使用


前面我們搭建了具有服務降級功能的Hystrix客戶端,現在我們來詳細了解下Hystrix的一些功能。

Hystrix的意思是豪豬,大家都知道,就是長滿刺的豬。。。實際上,它表明了該框架的主要功能:自我保護功能。Hystrix具有服務降級,熔斷,線程池隔離,信號量隔離,緩存等功能,基本上能覆蓋到微服務中調用依賴服務會遇到的問題。下面我們介紹下,如何理解和使用這些功能。

1、最常用的的服務降級功能

  當執行調用服務方法時,若調用方法出現問題,如:請求超時,拋出異常,線程池拒絕,熔斷這些情況下,為該方法定義降級方法,以便在出現問題時執行,實現備用返回。之前我們已經實現了服務降級功能,主要就是通過@HystrixCommand(fallbackMethod = "defaultMethod")注釋到需要在出現問題時降級的方法。fallbackMethod指定降級后執行的方法。方法定義在該類中,public,private,protected都可以。在注釋的方法出問題后,如超時未返回(execution.isolation.thread.timeoutinMilliseconds來配置),就會執行備用方法,返回備用方法的返回值。當然,降級的方法也可以定義再下一級的降級方法,實現和上面一樣。

  上面說到方法拋出異常也會觸發服務降級,但是如果我們自定義了異常,並需要將異常拋出給上層做操作,不希望Hystrix捕捉到自定義異常執行服務降級時,可以使用@HystrixCommand(ignoreExceptions = {MyException.class})來定義忽略捕捉的異常。多個異常用逗號隔開。也可以將拋出的異常通過入參傳到降級的方法,來實現不同類型異常的不同處理,需要將降級方法定義如下。

@HystrixCommand(fallbackMethod = "back")
    public String getHello(String id)
    {
        return template.getForObject("http://helloclient/hello", String.class);
    }

    public String back(String id , Throwable e)
    {
        if (e instanceof NullPointerException)
        {
            return "client 2 has some error! NullPointerException";
        }
        else
        {
            return "client 2 has some error! Exception";
        }
    }

 2、熔斷器

  熔斷器,和物理概念中的斷路器類似,斷路器在高壓高溫的過載情況下,會自動斷開,實現對電路的保護。熔斷器也是一樣,下面我們看下主要的接口類:HystrixCircuitBreaker.java,它定義了以下幾個方法,並有兩個內部實現類HystrixCircuitBreakerImpl,NoOpCircuitBreaker,斷路器主要用到HystrixCircuitBreakerImpl。NoOpCircuitBreaker這個類表明不做任何操作,默認熔斷器不打開,表明不起用熔斷功能。以下的實現方法,都是指HystrixCircuitBreakerImpl的實現。熔斷器有三個狀態,OPEN,HALF_OPEN,CLOSED,如果要自定義參數配置,下面代碼注釋中可以找到。

/**
     * Every {@link HystrixCommand} requests asks this if it is allowed to proceed or not.  It is idempotent and does
     * not modify any internal state, and takes into account the half-open logic which allows some requests through
     * after the circuit has been opened
     * 
     * @return boolean whether a request should be permitted
     */
    boolean allowRequest();

    /**
     * Whether the circuit is currently open (tripped).
     * 
     * @return boolean state of circuit breaker
     */
    boolean isOpen();

    /**
     * Invoked on successful executions from {@link HystrixCommand} as part of feedback mechanism when in a half-open state.
     */
    void markSuccess();

    /**
     * Invoked on unsuccessful executions from {@link HystrixCommand} as part of feedback mechanism when in a half-open state.
     */
    void markNonSuccess();

    /**
     * Invoked at start of command execution to attempt an execution.  This is non-idempotent - it may modify internal
     * state.
     */
    boolean attemptExecution();

(1) isOpen()方法用於判斷熔斷器是否打開。實現方法如下:

 @Override
        public boolean isOpen() {
           //判斷熔斷器是否被強制打開,如果強制打開,返回true,表示熔斷器已打開。circuitBreaker.forceOpen這個配置指定
            if (properties.circuitBreakerForceOpen().get()) {
                return true;
            }
          //判斷熔斷器是否被強制關閉。circuitBreaker.forceClosed
            if (properties.circuitBreakerForceClosed().get()) {
                return false;
            }
           //判斷上一次斷路器打開的時間是否大於零,訪問成功,該值為-1,訪問失敗,該值為訪問失敗時的系統時間。根據是否大於零,判斷熔斷器是否打開。
            return circuitOpened.get() >= 0;
        }

 

 

(2) attemptExecution(),該方法會在熔斷器開啟的時候,有訪問時,熔斷器第一個執行的方法。如果返回false,則直接執行fallback降級方法。

@Override
        public boolean attemptExecution() {
           //判斷熔斷器是否被強制打開,如果強制打開,返回false后,直接執行fallback
            if (properties.circuitBreakerForceOpen().get()) {
                return false;
            }
          //判斷熔斷器是否被強制關閉
            if (properties.circuitBreakerForceClosed().get()) {
                return true;
            }
          //如果circuitOpened為-1,返回true,正常執行
            if (circuitOpened.get() == -1) {
                return true;
            } else {
              //如果circuitOpened不為-1,則表示斷路器打開了,此時,服務會從circuitOpened起,休眠5秒(circuitBreaker.sleepWindowInMilliseconds配置,
//默認5000),直接返回false,執行fallback。若休眠時間超過5秒,並且當前熔斷狀態為打開狀態,則會將熔斷狀態置為半開狀態。如它的注釋,只有第一個
//請求滿足第一次為打開,之后的請求都為半開狀態,返回false。
if (isAfterSleepWindow()) { if (status.compareAndSet(Status.OPEN, Status.HALF_OPEN)) { //only the first request after sleep window should execute return true; } else { return false; } } else { return false; } } }

 

(3)markSuccess(),在執行完attemptExecution()返回true正常執行成功后(未fallback),才會執行該方法,標注成功,若之前斷路器為關閉狀態,則不做處理,若為半開狀態,則重置熔斷器。

 @Override
        public void markSuccess() {
           //如果當前狀態為半開,則將state設置成closed,關閉熔斷器。如果之前由於斷路器打開時,之后的請求,Hystrix會放開一個請求去嘗試是否服務正常,並將斷路器置為半開,
//如果正常,則將斷路器關閉,並重置斷路器。
if (status.compareAndSet(Status.HALF_OPEN, Status.CLOSED)) { //This thread wins the race to close the circuit - it resets the stream to start it over from 0 metrics.resetStream(); Subscription previousSubscription = activeSubscription.get(); if (previousSubscription != null) { previousSubscription.unsubscribe(); } Subscription newSubscription = subscribeToStream(); activeSubscription.set(newSubscription); circuitOpened.set(-1L); } }

(4) markNonSuccess(),用來在正常請求下,請求失敗后調用。

 @Override
        public void markNonSuccess() {
           //如果當前為半開狀態,且請求失敗,則重新打開斷路器,將最近一次訪問失敗的時間置為當前時間。
            if (status.compareAndSet(Status.HALF_OPEN, Status.OPEN)) {
                //This thread wins the race to re-open the circuit - it resets the start time for the sleep window
                circuitOpened.set(System.currentTimeMillis());
            }
        }

(5) 熔斷器的打開。上面的方法都不會去打開熔斷器,熔斷器打開是由另一個方法去判斷的。這個觀察者的方法應該是周期執行的。

 private Subscription subscribeToStream() {
            /*
             * This stream will recalculate the OPEN/CLOSED status on every onNext from the health stream
             */
            return metrics.getHealthCountsStream()
                    .observe()
                    .subscribe(new Subscriber<HealthCounts>() {
                        @Override
                        public void onCompleted() {

                        }

                        @Override
                        public void onError(Throwable e) {

                        }

                        @Override
                        public void onNext(HealthCounts hc) {
                            // check if we are past the statisticalWindowVolumeThreshold
                           //檢查時間窗內的請求總數小於配置文件中的數量(采用的是buckets,感興趣的自己研究下)。默認時間窗為10S(metrics.rollingStats.timeInMilliseconds,metrics.rollingStats.numBuckets),默認請求總數為20(circuitBreaker.requestVolumeThreshold)。
                            if (hc.getTotalRequests() < properties.circuitBreakerRequestVolumeThreshold().get()) {
                                // we are not past the minimum volume threshold for the stat window,
                                // so no change to circuit status.
                                // if it was CLOSED, it stays CLOSED
                                // if it was half-open, we need to wait for a successful command execution
                                // if it was open, we need to wait for sleep window to elapse
                            } else {
                                //時間窗內,統計的錯誤(失敗)請求比例是否小於配置比例,默認配置是50%,通過circuitBreaker.errorThresholdPercentage=50指定。
                                if (hc.getErrorPercentage() < properties.circuitBreakerErrorThresholdPercentage().get()) {
                                    //we are not past the minimum error threshold for the stat window,
                                    // so no change to circuit status.
                                    // if it was CLOSED, it stays CLOSED
                                    // if it was half-open, we need to wait for a successful command execution
                                    // if it was open, we need to wait for sleep window to elapse
                                } else {
                                    // our failure rate is too high, we need to set the state to OPEN
                                   //如果時間窗內請求數大於定義數,且失敗比例大於定義比例,並且當前熔斷器關閉的情況下,將熔斷器置為打開,並將circuitOpened置為當前時間。
                                    if (status.compareAndSet(Status.CLOSED, Status.OPEN)) {
                                        circuitOpened.set(System.currentTimeMillis());
                                    }
                                }
                            }
                        }
                    });
        }

(6) 過程:先文字敲吧,沒畫圖工具。

  正常情況:請求——>subscribeToStream未打開熔斷器——>attemptExecution——>markSuccess

  異常情況:請求——>subscribeToStream打開熔斷器——>attemptExecution最后一個return返回false——>markNonSuccess,這個時候斷路器打開狀態,且在休眠時間窗內。

       請求——>subscribeToStream未處理——>attemptExecution在超過休眠時間窗后,放開一個請求,並把熔斷器設置成半開——>請求成功,執行markSuccess,將熔斷器從半開置為關閉,並重置熔斷器;請求失敗,則將半開狀態置為打開狀態,失敗時間起點重置成當前時間,再次循環。

3、緩存

  之前我以為是每次相同請求,會使用緩存直接返回。其實理解錯了,Hystrix的緩存是在當次請求的緩存,當次請求中,多次使用同一方法時,會使用緩存。其他請求不能用到。而且還需初始化HystrixRequestContext,不然直接使用會報錯,我們采用定義filter來初始化。不多說了,貼代碼大家看下,代碼中注釋很清楚,啟動注冊中心和服務實例后(環境搭建見之前章節),就可以測試。

(1)pom.xml,application.yml配置,大家參見之前的章節。

(2)啟動類,注意注解上@ServletComponentScan。

package com.example.demo;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.web.servlet.ServletComponentScan;
import org.springframework.cloud.client.circuitbreaker.EnableCircuitBreaker;
import org.springframework.cloud.client.loadbalancer.LoadBalanced;
import org.springframework.cloud.netflix.eureka.EnableEurekaClient;
import org.springframework.context.annotation.Bean;
import org.springframework.web.client.RestTemplate;

@EnableCircuitBreaker
@SpringBootApplication
@EnableEurekaClient
@ServletComponentScan
public class ConsumerApplication {

    @Bean
    @LoadBalanced
    RestTemplate template()
    {
        return new RestTemplate();
    }

    public static void main(String[] args) {
        SpringApplication.run(ConsumerApplication.class, args);
    }
}

(3)Filter類,用於初始化HystrixRequestContext。

package com.example.demo;

import com.netflix.hystrix.strategy.concurrency.HystrixRequestContext;

import javax.servlet.*;
import javax.servlet.annotation.WebFilter;
import java.io.IOException;

@WebFilter(filterName = "HystrixRequestContextServletFilter",urlPatterns = "/*",asyncSupported = true)
public class HystrixRequestContextServletFilter implements Filter {

    @Override
    public void init(FilterConfig filterConfig) throws ServletException {

    }

    @Override
    public void doFilter(ServletRequest request, ServletResponse response, FilterChain chain) throws IOException, ServletException {
        HystrixRequestContext context = HystrixRequestContext.initializeContext();

        try
        {
            chain.doFilter(request,response);
        }
        finally {
            context.shutdown();
        }
    }

    @Override
    public void destroy() {

    }
}

(4)controller類。

package com.example.demo;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

@RestController
public class ConsumerContorller {

    @Autowired
    HystrixServer server;

    //注意,在這個controller中調用具有緩存功能的方法才會具備緩存效果。
    @RequestMapping("/hello")
    public String sayHello()
    {
        System.out.println("請求了一次hello2");
        server.getHello2("1","ibethfy");
        System.out.println("請求了二次hello2,不會打印hello2 initinized");
        server.getHello2("1","ibethfy");
        System.out.println("請求了三次hello2,清空緩存,會打印hello2 initinized");
        server.updateHello2("1","ibethfy");
        server.getHello2("1","ibethfy");
        System.out.println("請求了四次hello2,入參不同,會打印hello2 initinized");
        server.getHello2("1","ibethfy1");
        return server.getHello2("1","ibethfy1");
    }
}

(5)server類。

package com.example.demo;

import com.netflix.hystrix.contrib.javanica.annotation.HystrixCommand;
import com.netflix.hystrix.contrib.javanica.annotation.HystrixProperty;
import com.netflix.hystrix.contrib.javanica.cache.annotation.CacheKey;
import com.netflix.hystrix.contrib.javanica.cache.annotation.CacheRemove;
import com.netflix.hystrix.contrib.javanica.cache.annotation.CacheResult;
import com.netflix.hystrix.strategy.concurrency.HystrixRequestContext;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.web.client.RestTemplate;

@Service
public class HystrixServer {

    @Autowired
    RestTemplate template;

    //通過指定生成緩存key的方法生成key,commandKey指定一個HystrixCommand的key,表示注解@HystrixCommand的方法的key。groupKey表示一個類型分組的key。threadPoolKey指定線程池的key。
    //fallbackMethod指定降級方法,commandProperties指定該HystrixCommand方法的參數,是個數組類型,里面的值是@HystrixProperty,多個用逗號隔開。
    @CacheResult(cacheKeyMethod = "generateCacheKey")
    @HystrixCommand(commandKey = "getHello1",groupKey = "getHello",threadPoolKey = "getHelloThreadPool",fallbackMethod = "back",commandProperties = {
            @HystrixProperty(name="execution.isolation.thread.timeoutinMilliseconds", value = "5000")
    })
    public String getHello1()
    {
        System.out.println("hello1 initinized");
        return template.getForObject("http://helloclient/hello", String.class);
    }

    private String generateCacheKey()
    {
        return "myHelloKey";
    }


    //若不指定cache的key,默認使用方法的所有參數作為key
    @CacheResult
    @HystrixCommand(commandKey = "getHello2",groupKey = "getHello",threadPoolKey = "getHelloThreadPool")
    public String getHello2(String id,String name)
    {
        System.out.println("hello2 initinized");
        return template.getForObject("http://helloclient/hello", String.class);
    }

    //使用@CacheRemove在數據更新時,移除對應key的緩存,需要指定commandKey,@HystrixCommand里面的參數可以指定亦可以不用
    @CacheRemove(commandKey = "getHello2")
    @HystrixCommand(commandKey = "getHello2",groupKey = "getHello",threadPoolKey = "getHelloThreadPool")
    public void updateHello2(String id,String name)
    {
        System.out.println("hello2 id = "+ id + ", name = "+ name + " removed");
    }

    //使用@CacheKey指定參數作為key
    @CacheResult
    @HystrixCommand(commandKey = "getHello3",groupKey = "getHello",threadPoolKey = "getHelloThreadPool")
    public String getHello3(@CacheKey("id") String id, String name)
    {
        System.out.println("請求了一次hello3");
        return "hello3 " + id + name;
    }

    public String back(Throwable e)
    {
        if (e instanceof NullPointerException)
        {
            return "client 2 has some error! NullPointerException";
        }
        else
        {
            return "client 2 has some error! Exception";
        }
    }

}


4、線程隔離和信號量隔離。

  Hystrix為了避免多個不同服務間的調用影響,使用了線程隔離模式,它為每個依賴服務單獨創建自己的線程池,就算某個服務延遲或問題阻塞,其余的服務也能正常執行。總之,使得我們的服務更加健壯。當然,創建這么多線程池,可能會對性能造成影響,但Hystrix測試后,獨立線程池帶來的好處,遠大於性能損耗的壞處。所以,大家可以放心使用。

  ExecutionIsolationStrategy枚舉中定義了兩個THREAD, SEMAPHORE,一個是線程池,一個是信號量,Hystix默認使用線程池。通過execution.isolation.strategy可以切換。

  Hystrix默認情況下,會讓配置了同組名的groupKey的command使用同一線程池,但也支持使用threadPoolKey配置線程池key。

  對於那些本來延遲就比較小的請求(例如訪問本地緩存成功率很高的請求)來說,線程池帶來的開銷是非常高的,這時,可以考慮采用非阻塞信號量(不支持超時),來實現依賴服務的隔離,使用信號量的開銷很小。但絕大多數情況下,Netflix 更偏向於使用線程池來隔離依賴服務,因為其帶來的額外開銷可以接受,並且能支持包括超時在內的所有功能。

 

好了,Hystrix的主要功能基本介紹完了,碼字不容易呀,,,,

  


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM