上一篇介紹了Hystrix基本功能和單獨使用的方式,今天繼續學習如何將Hystrix融入SpringCloud組件中去。
在Ribbon上使用熔斷器
在 pom.xml
文件中引入 hystrix
的 依賴spring-cloud-starter-hystrix
:
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-hystrix</artifactId>
</dependency>
在應用的啟動類上使用 @EnableHystrix
開啟 hystrix
的功能。
package com.rickiyang.learn;
import com.netflix.loadbalancer.IRule;
import com.netflix.loadbalancer.RandomRule;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.client.circuitbreaker.EnableCircuitBreaker;
import org.springframework.cloud.client.discovery.EnableDiscoveryClient;
import org.springframework.cloud.client.loadbalancer.LoadBalanced;
import org.springframework.cloud.netflix.hystrix.EnableHystrix;
import org.springframework.context.annotation.Bean;
import org.springframework.web.client.RestTemplate;
@EnableHystrix
@EnableDiscoveryClient
@SpringBootApplication
public class RibbonDemoApplication {
public static void main(String[] args) {
SpringApplication.run(RibbonDemoApplication.class, args);
}
@Bean
@LoadBalanced
RestTemplate restTemplate() {
return new RestTemplate();
}
@Bean
public IRule ribbonRule() {
return new RandomRule();//這里配置策略,和配置文件對應
}
}
使用注解 @HystrixCommand
標記調用失敗時需要熔斷的方法,fallbackMethod
屬性指定 降級方法 的 方法名 為 fallback
。
package com.rickiyang.learn.service;
import com.netflix.hystrix.contrib.javanica.annotation.HystrixCommand;
import com.rickiyang.learn.entity.Person;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.web.client.RestTemplate;
/**
* @author rickiyang
* @date 2019-10-08
* @Desc TODO
*/
@Service
public class DemoService {
@Autowired
RestTemplate restTemplate;
/**
* 支持服務降級
* 自定義降級處理的超時時間,並發數
* 自定義執行該任務的線程池參數
* 自定義執行熔斷邏輯的異常
* @param name
* @return
*/
@HystrixCommand(
fallbackMethod = "hiError",
commandProperties={
// 降級處理超時時間設置
@HystrixProperty(name = "execution.isolation.thread.timeoutInMilliseconds", value = "3000"),
// 任意時間點允許的最高並發數。超過該設置值后,拒絕執行請求。
@HystrixProperty(name = "fallback.isolation.semaphore.maxConcurrentRequests", value = "1000"),
},
// 配置執行的線程池
threadPoolProperties = {
@HystrixProperty(name = "coreSize", value = "20"),
@HystrixProperty(name = "maxQueueSize", value = "-1"),
},
// 該異常不執行熔斷,去執行該異常拋出的自己邏輯
ignoreExceptions = {NoSuchMethodException.class}
)
public String hello(String name) {
return restTemplate.getForEntity("http://eureka-client/hello/" + name, String.class).getBody();
}
public String fail(String name) {
return "accu"+name+",error!";
}
}
在Feign上使用熔斷器
Feign
是自帶 斷路器 的,不過需要在 配置文件 中開啟 hystrix
的配置。
feign:
hystrix:
enabled: true
Hystrix
支持 降級回退 操作,當 發生熔斷 或 出現錯誤 時,調用會執行 默認代碼路徑。
package com.rickiyang.learn.service;
import com.rickiyang.learn.entity.Person;
import org.springframework.cloud.openfeign.FeignClient;
import org.springframework.web.bind.annotation.*;
/**
* @author: rickiyang
* @date: 2019/10/5
* @description:
*/
@FeignClient(name= "eureka-client",fallback = HelloFailBackService.class)
public interface HelloRemote {
@RequestMapping(value = "/hello/{name}")
String hello(@PathVariable(value = "name") String name);
@PostMapping(value ="/add",produces = "application/json; charset=UTF-8")
String addPerson(@RequestBody Person person);
@GetMapping("/getPerson/{id}")
String getPerson(@PathVariable("id") Integer id);
}
通過設置 fallback
屬性為實現 降級回退 的 類,來啟用 @FeignClient
的 失敗降級。
package com.rickiyang.learn.service;
import com.rickiyang.learn.entity.Person;
public class HelloFailBackService implements HelloRemote {
@Override
public String hello(String name) {
return "";
}
@Override
public String addPerson(Person person) {
return null;
}
@Override
public String getPerson(Integer id) {
return null;
}
}
如果需要獲取導致 回退觸發 的原因,可以指定 @FeignClient
注解內部的 fallbackFactory
屬性,fallbackFactory
屬性和 fallback
屬性不能一起使用。
package com.rickiyang.learn.service;
import com.rickiyang.learn.entity.Person;
import org.springframework.cloud.openfeign.FeignClient;
import org.springframework.web.bind.annotation.*;
/**
* @author: rickiyang
* @date: 2019/10/5
* @description:
*/
@FeignClient(name= "eureka-client",fallback = HelloFailBackFacgtory.class)
public inte rface HelloRemote {
@RequestMapping(value = "/hello/{name}")
String hello(@PathVariable(value = "name") String name);
@PostMapping(value ="/add",produces = "application/json; charset=UTF-8")
String addPerson(@RequestBody Person person);
@GetMapping("/getPerson/{id}")
String getPerson(@PathVariable("id") Integer id);
}
然后提供一個 FallbackFactory
的 實現類,實現類指定 泛型參數 為 HelloService
。
package com.rickiyang.learn.service;
import com.rickiyang.learn.entity.Person;
import feign.hystrix.FallbackFactory;
public class HelloFailBackFacgtory implements FallbackFactory<HelloRemote> {
@Override
public HelloRemote create(Throwable throwable) {
return new HelloRemote() {
@Override
public String hello(String name) {
return "fail reason is : " + throwable.getMessage();
}
@Override
public String addPerson(Person person) {
return "fail reason is : " + throwable.getMessage();
}
@Override
public String getPerson(Integer id) {
return "fail reason is : " + throwable.getMessage();
}
};
}
}
Hystrix Dashboard監控熔斷器的狀態
Hystrix Dashboard
是一個 監控熔斷器 狀況的組件,提供了 數據監控 和 圖形界面。
在Ribbon中使用Hystrix Dashboard
在加入 spring-cloud-starter-hystrix
依賴的基礎上,加入下面的依賴:
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-actuator</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-hystrix-dashboard</artifactId>
</dependency>
在應用程序 啟動類 已經加上 @EnableHystrix
的基礎上,加入 @EnableHystrixDashboard
注解,代碼如下:
package com.rickiyang.learn;
import com.netflix.loadbalancer.IRule;
import com.netflix.loadbalancer.RandomRule;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.client.circuitbreaker.EnableCircuitBreaker;
import org.springframework.cloud.client.discovery.EnableDiscoveryClient;
import org.springframework.cloud.client.loadbalancer.LoadBalanced;
import org.springframework.cloud.netflix.hystrix.EnableHystrix;
import org.springframework.context.annotation.Bean;
import org.springframework.web.client.RestTemplate;
@EnableHystrix
@EnableHystrixDashboard
@EnableDiscoveryClient
@SpringBootApplication
public class RibbonDemoApplication {
public static void main(String[] args) {
SpringApplication.run(RibbonDemoApplication.class, args);
}
@Bean
@LoadBalanced
RestTemplate restTemplate() {
return new RestTemplate();
}
@Bean
public IRule ribbonRule() {
return new RandomRule();//這里配置策略,和配置文件對應
}
}
Hystrix源碼分析
前文提到了Hystrix核心功能包括:隔離機制,熔斷機制,降級機制。圍繞着這3點我們一起看一下它們分別是如何實現的。
首先從 @EnableHystrix
入手,看一下開啟Hystrix功能的時候都做了什么。
package org.springframework.cloud.netflix.hystrix;
import java.lang.annotation.Documented;
import java.lang.annotation.ElementType;
import java.lang.annotation.Inherited;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;
import org.springframework.cloud.client.circuitbreaker.EnableCircuitBreaker;
@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Inherited
@EnableCircuitBreaker
public @interface EnableHystrix {
}
該注解的功能是為了引用@EnableCircuitBreaker
注解。
package org.springframework.cloud.client.circuitbreaker;
import java.lang.annotation.Documented;
import java.lang.annotation.ElementType;
import java.lang.annotation.Inherited;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;
import org.springframework.context.annotation.Import;
/**
* Annotation to enable a CircuitBreaker implementation.
* http://martinfowler.com/bliki/CircuitBreaker.html
* @author Spencer Gibb
*/
@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Inherited
@Import(EnableCircuitBreakerImportSelector.class)
public @interface EnableCircuitBreaker {
}
這里使用了import注解引入EnableCircuitBreakerImportSelector:
@Order(Ordered.LOWEST_PRECEDENCE - 100)
public class EnableCircuitBreakerImportSelector extends
SpringFactoryImportSelector<EnableCircuitBreaker> {
@Override
protected boolean isEnabled() {
return getEnvironment().getProperty(
"spring.cloud.circuit.breaker.enabled", Boolean.class, Boolean.TRUE);
}
}
作用是將spring.cloud.circuit.breaker.enabled
設置為啟用。那么在進行配置掃描的時候就會按照啟用的配置去進行相應的操作。
再回到EnableCircuitBreaker
,查看被引用的位置,能看到在spring-cloud-netflix-core
包的配置文件中有一個引用:
org.springframework.cloud.client.circuitbreaker.EnableCircuitBreaker=\
org.springframework.cloud.netflix.hystrix.HystrixCircuitBreakerConfiguration
將 EnableCircuitBreaker 指向了 HystrixCircuitBreakerConfiguration 類:
@Configuration
public class HystrixCircuitBreakerConfiguration {
@Bean
public HystrixCommandAspect hystrixCommandAspect() {
return new HystrixCommandAspect();
}
@Bean
public HystrixShutdownHook hystrixShutdownHook() {
return new HystrixShutdownHook();
}
@Bean
public HasFeatures hystrixFeature() {
return HasFeatures.namedFeatures(new NamedFeature("Hystrix", HystrixCommandAspect.class));
}
/**
* {@link DisposableBean} that makes sure that Hystrix internal state is cleared when
* {@link ApplicationContext} shuts down.
*/
private class HystrixShutdownHook implements DisposableBean {
@Override
public void destroy() throws Exception {
// Just call Hystrix to reset thread pool etc.
Hystrix.reset();
}
}
}
這里有一個關鍵的實例: HystrixCommandAspect ,從名稱就能看出是掃描 HystrixCommand 注解的切面。
在 HystrixCommandAspect 定義了兩個切面:
public class HystrixCommandAspect {
private static final Map<HystrixPointcutType, MetaHolderFactory> META_HOLDER_FACTORY_MAP;
static {
META_HOLDER_FACTORY_MAP = ImmutableMap.<HystrixPointcutType, MetaHolderFactory>builder()
.put(HystrixPointcutType.COMMAND, new CommandMetaHolderFactory())
.put(HystrixPointcutType.COLLAPSER, new CollapserMetaHolderFactory())
.build();
}
@Pointcut("@annotation(com.netflix.hystrix.contrib.javanica.annotation.HystrixCommand)")
public void hystrixCommandAnnotationPointcut() {
}
@Pointcut("@annotation(com.netflix.hystrix.contrib.javanica.annotation.HystrixCollapser)")
public void hystrixCollapserAnnotationPointcut() {
}
@Around("hystrixCommandAnnotationPointcut() || hystrixCollapserAnnotationPointcut()")
public Object methodsAnnotatedWithHystrixCommand(final ProceedingJoinPoint joinPoint) throws Throwable {
//拿到切面聲明的方法
Method method = getMethodFromTarget(joinPoint);
Validate.notNull(method, "failed to get method from joinPoint: %s", joinPoint);
//如果方法的注解不是這兩個不進入
if (method.isAnnotationPresent(HystrixCommand.class) && method.isAnnotationPresent(HystrixCollapser.class)) {
throw new IllegalStateException("method cannot be annotated with HystrixCommand and HystrixCollapser " +
"annotations at the same time");
}
//從工廠對象中取出當前注解所對應的執行邏輯工廠類
MetaHolderFactory metaHolderFactory = META_HOLDER_FACTORY_MAP.get(HystrixPointcutType.of(method));
MetaHolder metaHolder = metaHolderFactory.create(joinPoint);
//執行工廠類邏輯
HystrixInvokable invokable = HystrixCommandFactory.getInstance().create(metaHolder);
ExecutionType executionType = metaHolder.isCollapserAnnotationPresent() ?
metaHolder.getCollapserExecutionType() : metaHolder.getExecutionType();
Object result;
try {
//這里區分是要同步執行還是異步執行
if (!metaHolder.isObservable()) {
result = CommandExecutor.execute(invokable, executionType, metaHolder);
} else {
result = executeObservable(invokable, executionType, metaHolder);
}
} catch (HystrixBadRequestException e) {
throw e.getCause() != null ? e.getCause() : e;
} catch (HystrixRuntimeException e) {
throw hystrixRuntimeExceptionToThrowable(metaHolder, e);
}
return result;
}
......
......
......
}
這里會掃描兩個注解:
- HystrixCommand
- HystrixCollapser
methodsAnnotatedWithHystrixCommand()
方法使用環繞通知攔截所有@HystrixCommand 和 @HystrixCollapser注解的方法。
這里用到的設計模式還是挺多:
- 首先構造了一個分別執行掃描到上面兩個注解的方法工廠:
CommandMetaHolderFactory
,通過工廠類來決定執行哪個注解對應的邏輯,然后把對應的邏輯封裝成MetaHolder
; - 接着會使用
HystrixCommandFactory
工廠將每一個被掃描的方法統一封裝成HystrixInvokable
對象; - 然后去執行對每一個被攔截的方法生成相應攔截配置的邏輯。
關於hystrix初始化的過程我們暫時就說這么多,了解到在哪里執行了注解掃描去做了初始化,下面我們分別詳述各個功能在Hystrix中是如何實現。
隔離機制
Hystrix可以指定為每一個請求創建獨立的線程池來執行,首先看一下@HystrixCommand
的參數說明:
public @interface HystrixCommand {
// HystrixCommand 命令所屬的組的名稱:默認注解方法類的名稱
String groupKey() default "";
// HystrixCommand 命令的key值,默認值為注解方法的名稱
String commandKey() default "";
// 線程池名稱,默認定義為groupKey
String threadPoolKey() default "";
// 定義回退方法的名稱, 此方法必須和hystrix的執行方法在相同類中
String fallbackMethod() default "";
// 配置hystrix命令的參數
HystrixProperty[] commandProperties() default {};
// 配置hystrix依賴的線程池的參數
HystrixProperty[] threadPoolProperties() default {};
// 如果hystrix方法拋出的異常包括RUNTIME_EXCEPTION,則會被封裝HystrixRuntimeException異常。我們也可以通過此方法定義哪些需要忽略的異常
Class<? extends Throwable>[] ignoreExceptions() default {};
// 定義執行hystrix observable的命令的模式,類型詳細見ObservableExecutionMode
ObservableExecutionMode observableExecutionMode() default ObservableExecutionMode.EAGER;
// 如果hystrix方法拋出的異常包括RUNTIME_EXCEPTION,則會被封裝HystrixRuntimeException異常。此方法定義需要拋出的異常
HystrixException[] raiseHystrixExceptions() default {};
// 定義回調方法:但是defaultFallback不能傳入參數,返回參數和hystrix的命令兼容
String defaultFallback() default "";
}
threadPoolKey()
可以指定線程池名稱。
還記得上一篇中我們講到 HystrixCommand / HystrixObservableCommand類,被以來的服務想要被Hystrix封裝,只用繼承這兩個類中的一個即可。只是現在使用了 @HystrixCommand注解將這一部分邏輯封裝了你無法看到。
首先我們從HystrixCommand類入手,可以看到它繼承了 AbstractCommand,一般抽象類都會先默默地做一些事情為子類分擔憂愁。我們看一下 AbstractCommand 中的邏輯:
protected AbstractCommand(HystrixCommandGroupKey group, HystrixCommandKey key, HystrixThreadPoolKey threadPoolKey, HystrixCircuitBreaker circuitBreaker, HystrixThreadPool threadPool,
HystrixCommandProperties.Setter commandPropertiesDefaults, HystrixThreadPoolProperties.Setter threadPoolPropertiesDefaults,
HystrixCommandMetrics metrics, TryableSemaphore fallbackSemaphore, TryableSemaphore executionSemaphore,
HystrixPropertiesStrategy propertiesStrategy, HystrixCommandExecutionHook executionHook) {
this.commandGroup = initGroupKey(group);
this.commandKey = initCommandKey(key, getClass());
this.properties = initCommandProperties(this.commandKey, propertiesStrategy, commandPropertiesDefaults);
//線程池的key
this.threadPoolKey = initThreadPoolKey(threadPoolKey, this.commandGroup, this.properties.executionIsolationThreadPoolKeyOverride().get());
this.metrics = initMetrics(metrics, this.commandGroup, this.threadPoolKey, this.commandKey, this.properties);
this.circuitBreaker = initCircuitBreaker(this.properties.circuitBreakerEnabled().get(), circuitBreaker, this.commandGroup, this.commandKey, this.properties, this.metrics);
//初始化線程池
this.threadPool = initThreadPool(threadPool, this.threadPoolKey, threadPoolPropertiesDefaults);
//Strategies from plugins
this.eventNotifier = HystrixPlugins.getInstance().getEventNotifier();
this.concurrencyStrategy = HystrixPlugins.getInstance().getConcurrencyStrategy();
HystrixMetricsPublisherFactory.createOrRetrievePublisherForCommand(this.commandKey, this.commandGroup, this.metrics, this.circuitBreaker, this.properties);
this.executionHook = initExecutionHook(executionHook);
this.requestCache = HystrixRequestCache.getInstance(this.commandKey, this.concurrencyStrategy);
this.currentRequestLog = initRequestLog(this.properties.requestLogEnabled().get(), this.concurrencyStrategy);
/* fallback semaphore override if applicable */
this.fallbackSemaphoreOverride = fallbackSemaphore;
/* execution semaphore override if applicable */
this.executionSemaphoreOverride = executionSemaphore;
}
直接進入到線程池初始化的代碼:
/*
* 初始化線程池的key
* 如果key為空,默認使用HystrixCommandGroup的名稱作為key
*
*/
private static HystrixThreadPoolKey initThreadPoolKey(HystrixThreadPoolKey threadPoolKey, HystrixCommandGroupKey groupKey, String threadPoolKeyOverride) {
if (threadPoolKeyOverride == null) {
// we don't have a property overriding the value so use either HystrixThreadPoolKey or HystrixCommandGroup
if (threadPoolKey == null) {
/* use HystrixCommandGroup if HystrixThreadPoolKey is null */
return HystrixThreadPoolKey.Factory.asKey(groupKey.name());
} else {
return threadPoolKey;
}
} else {
// we have a property defining the thread-pool so use it instead
return HystrixThreadPoolKey.Factory.asKey(threadPoolKeyOverride);
}
}
/*
* 初始化線程池
* HystrixThreadPool 中構造了一個ConcurrentHashMap來保存所有的線程池
*
*/
private static HystrixThreadPool initThreadPool(HystrixThreadPool fromConstructor, HystrixThreadPoolKey threadPoolKey, HystrixThreadPoolProperties.Setter threadPoolPropertiesDefaults) {
if (fromConstructor == null) {
// get the default implementation of HystrixThreadPool
return HystrixThreadPool.Factory.getInstance(threadPoolKey, threadPoolPropertiesDefaults);
} else {
return fromConstructor;
}
}
/**
*
*從map中獲取線程池,如果不存在則構造一個線程池對象存入
*/
static HystrixThreadPool getInstance(HystrixThreadPoolKey threadPoolKey, HystrixThreadPoolProperties.Setter propertiesBuilder) {
// get the key to use instead of using the object itself so that if people forget to implement equals/hashcode things will still work
String key = threadPoolKey.name();
// this should find it for all but the first time
HystrixThreadPool previouslyCached = threadPools.get(key);
if (previouslyCached != null) {
return previouslyCached;
}
// 加鎖 保證單機並發的安全性
synchronized (HystrixThreadPool.class) {
if (!threadPools.containsKey(key)) {
//通過HystrixThreadPoolDefault類來構造線程池
threadPools.put(key, new HystrixThreadPoolDefault(threadPoolKey, propertiesBuilder));
}
}
return threadPools.get(key);
}
構造線程池的代碼主要在 HystrixThreadPoolDefault 類中:
static class HystrixThreadPoolDefault implements HystrixThreadPool {
public HystrixThreadPoolDefault(HystrixThreadPoolKey threadPoolKey, HystrixThreadPoolProperties.Setter propertiesDefaults) {
this.properties = HystrixPropertiesFactory.getThreadPoolProperties(threadPoolKey, propertiesDefaults);
HystrixConcurrencyStrategy concurrencyStrategy = HystrixPlugins.getInstance().getConcurrencyStrategy();
this.queueSize = properties.maxQueueSize().get();
this.metrics = HystrixThreadPoolMetrics.getInstance(threadPoolKey,
concurrencyStrategy.getThreadPool(threadPoolKey, properties),
properties);
this.threadPool = this.metrics.getThreadPool();
this.queue = this.threadPool.getQueue();
/* strategy: HystrixMetricsPublisherThreadPool */
HystrixMetricsPublisherFactory.createOrRetrievePublisherForThreadPool(threadPoolKey, this.metrics, this.properties);
}
......
......
......
}
注意這里有一個策略模式:HystrixConcurrencyStrategy,聲明了不同的加載線程池的策略。
具體加載線程池是在:concurrencyStrategy.getThreadPool(threadPoolKey, properties)
方法中:
public ThreadPoolExecutor getThreadPool(final HystrixThreadPoolKey threadPoolKey, HystrixThreadPoolProperties threadPoolProperties) {
final ThreadFactory threadFactory = getThreadFactory(threadPoolKey);
final boolean allowMaximumSizeToDivergeFromCoreSize = threadPoolProperties.getAllowMaximumSizeToDivergeFromCoreSize().get();
final int dynamicCoreSize = threadPoolProperties.coreSize().get();
final int keepAliveTime = threadPoolProperties.keepAliveTimeMinutes().get();
final int maxQueueSize = threadPoolProperties.maxQueueSize().get();
final BlockingQueue<Runnable> workQueue = getBlockingQueue(maxQueueSize);
if (allowMaximumSizeToDivergeFromCoreSize) {
final int dynamicMaximumSize = threadPoolProperties.maximumSize().get();
if (dynamicCoreSize > dynamicMaximumSize) {
logger.error("Hystrix ThreadPool configuration at startup for : " + threadPoolKey.name() + " is trying to set coreSize = " +
dynamicCoreSize + " and maximumSize = " + dynamicMaximumSize + ". Maximum size will be set to " +
dynamicCoreSize + ", the coreSize value, since it must be equal to or greater than the coreSize value");
return new ThreadPoolExecutor(dynamicCoreSize, dynamicCoreSize, keepAliveTime, TimeUnit.MINUTES, workQueue, threadFactory);
} else {
return new ThreadPoolExecutor(dynamicCoreSize, dynamicMaximumSize, keepAliveTime, TimeUnit.MINUTES, workQueue, threadFactory);
}
} else {
return new ThreadPoolExecutor(dynamicCoreSize, dynamicCoreSize, keepAliveTime, TimeUnit.MINUTES, workQueue, threadFactory);
}
}
這里構造線程池的方式就是我們熟悉的:new ThreadPoolExecutor()
。
至此隔離機制中的線程池隔離我們就弄清楚了,線程池是以HystrixCommandGroupKey進行划分的,不同的CommandGroup有不同的線程池來處理。
熔斷機制
上一節我們寫過開啟熔斷器的代碼:
/**
* 一段簡單的使用HystrixCommand封裝服務隔離調用的實例
*/
public class QueryOrderIdCommand extends HystrixCommand<Integer> {
private final static Logger logger = LoggerFactory.getLogger(QueryOrderIdCommand.class);
private String orderId = "";
/**
* 構造函數中封裝了一些參數設置
* @param orderId
*/
public QueryOrderIdCommand(String orderId) {
super(HystrixCommand.Setter.withGroupKey(HystrixCommandGroupKey.Factory.asKey("orderService"))
.andCommandKey(HystrixCommandKey.Factory.asKey("queryByOrderId"))
.andCommandPropertiesDefaults(HystrixCommandProperties.Setter()
.withCircuitBreakerRequestVolumeThreshold(10)//至少有10個請求,熔斷器才進行錯誤率的計算
.withCircuitBreakerSleepWindowInMilliseconds(5000)//熔斷器中斷請求5秒后會進入半打開狀態,放部分流量過去重試
.withCircuitBreakerErrorThresholdPercentage(50)//錯誤率達到50開啟熔斷保護
)
.andThreadPoolKey(HystrixThreadPoolKey.Factory.asKey("orderServicePool"))
.andThreadPoolPropertiesDefaults(HystrixThreadPoolProperties
.Setter().withCoreSize(10)));
this.orderId = orderId;
}
......
......
......
}
上面使用 @HystrixCommand 注解進行熔斷的用法:
@HystrixCommand(
fallbackMethod = "hiError",
commandProperties={
// 降級處理超時時間設置
@HystrixProperty(name = "execution.isolation.thread.timeoutInMilliseconds", value = "3000"),
// 任意時間點允許的最高並發數。超過該設置值后,拒絕執行請求。
@HystrixProperty(name = "fallback.isolation.semaphore.maxConcurrentRequests", value = "1000"),
},
// 配置執行的線程池
threadPoolProperties = {
@HystrixProperty(name = "coreSize", value = "20"),
@HystrixProperty(name = "maxQueueSize", value = "-1"),
},
// 該異常不執行熔斷,去執行該異常拋出的自己邏輯
ignoreExceptions = {NoSuchMethodException.class}
)
public String hello(String name) {
return restTemplate.getForEntity("http://eureka-client/hello/" + name, String.class).getBody();
}
仍舊是在AbstractCommand的構造函數中,有熔斷器初始化的邏輯:
this.circuitBreaker = initCircuitBreaker(this.properties.circuitBreakerEnabled().get(), circuitBreaker, this.commandGroup, this.commandKey, this.properties, this.metrics);
/*
*調用 HystrixCircuitBreaker 工廠類方法執行初始化
*
*/
private static HystrixCircuitBreaker initCircuitBreaker(boolean enabled, HystrixCircuitBreaker fromConstructor,
HystrixCommandGroupKey groupKey, HystrixCommandKey commandKey,
HystrixCommandProperties properties, HystrixCommandMetrics metrics) {
if (enabled) {
if (fromConstructor == null) {
// get the default implementation of HystrixCircuitBreaker
return HystrixCircuitBreaker.Factory.getInstance(commandKey, groupKey, properties, metrics);
} else {
return fromConstructor;
}
} else {
return new NoOpCircuitBreaker();
}
}
同樣,在熔斷器的保存邏輯中,也是將所有的熔斷器存儲在本地Map:
public static HystrixCircuitBreaker getInstance(HystrixCommandKey key, HystrixCommandGroupKey group, HystrixCommandProperties properties, HystrixCommandMetrics metrics) {
// this should find it for all but the first time
HystrixCircuitBreaker previouslyCached = circuitBreakersByCommand.get(key.name());
if (previouslyCached != null) {
return previouslyCached;
}
HystrixCircuitBreaker cbForCommand = circuitBreakersByCommand.putIfAbsent(key.name(), new HystrixCircuitBreakerImpl(key, group, properties, metrics));
if (cbForCommand == null) {
return circuitBreakersByCommand.get(key.name());
return cbForCommand;
}
}
在putIfAbsent()
方法中,保存的是一個 HystrixCircuitBreakerImpl 對象,這里定義的是斷路器所有實現的邏輯狀態。
斷路器的狀態分為3種:
enum Status {
CLOSED, OPEN, HALF_OPEN;
}
- CLOSED關閉狀態:允許流量通過。
- OPEN打開狀態:不允許流量通過,即處於降級狀態,走降級邏輯。
- HALF_OPEN半開狀態:允許某些流量通過,並關注這些流量的結果,如果出現超時、異常等情況,將進入OPEN狀態,如果成功,那么將進入CLOSED狀態。
在構造函數初始化的時候做了監聽 metrics 的HealthCountsStream信息的異步操作:
protected HystrixCircuitBreakerImpl(HystrixCommandKey key, HystrixCommandGroupKey commandGroup, final HystrixCommandProperties properties, HystrixCommandMetrics metrics) {
this.properties = properties;
this.metrics = metrics;
//On a timer, this will set the circuit between OPEN/CLOSED as command executions occur
Subscription s = subscribeToStream();
activeSubscription.set(s);
}
private Subscription subscribeToStream() {
//這里會在每次執行onNext()事件的時候來評估是否需要打開或者關閉斷路器
return metrics.getHealthCountsStream()
.observe()
.subscribe(new Subscriber<HealthCounts>() {
@Override
public void onCompleted() {
}
@Override
public void onError(Throwable e) {
}
@Override
public void onNext(HealthCounts hc) {
//首先校驗的時在時間窗范圍內的請求次數,如果低於閾值(默認是20),不做處理,如果高於閾值,則去判斷接口請求的錯誤率
if (hc.getTotalRequests() < properties.circuitBreakerRequestVolumeThreshold().get()) { // 如果沒有超過統計閾值的最低窗口值,就沒有必要去改變斷路器的狀態
// 當前如果斷路器是關閉的,那么就保持關閉狀態無需更改;
// 如果斷路器狀態為半開狀態,需要等待直到有成功的命令執行;
// 如果斷路器是打開狀態,需要等待休眠窗口過期。
} else {
//判斷接口請求的錯誤率(閾值默認是50),如果高於這個值,則斷路器打開
if (hc.getErrorPercentage() < properties.circuitBreakerErrorThresholdPercentage().get()) {
// 如果當前請求的錯誤率小於斷路器設置的容錯率百分比,也不會攔截請求
} else {
// 如果當前錯誤率太高則打開斷路器
if (status.compareAndSet(Status.CLOSED, Status.OPEN)) {
circuitOpened.set(System.currentTimeMillis());
}
}
}
}
});
}
每當metric 收到HealthCounts信息時就會調用next方法判斷當前是否需要打開斷路器。
HystrixCommandMetrics 是斷路器最核心的東西,通過時間窗的形式,記錄一段時間范圍內(默認是10秒)的接口請求的健康狀況(Command的執行狀態,包括成功、失敗、超時、線程池拒絕等)並得到對應的度量指標。
HealthCounts中保存了當前時間窗口內接口的請求狀態包括請求總數、失敗數和失敗率,如果當前的請求總數和 失敗率都達到閾值,則對接口進行熔斷將斷路器狀態設置為打開狀態記錄當前打開斷路器的時間。
失敗回退
首先從HystrixCommand的queue()
方法作為入口:
public Future<R> queue() {
final Future<R> delegate = toObservable().toBlocking().toFuture();
......
......
......
}
queue()
異步非堵塞的,它調用了toObservable().toBlocking().toFuture()方法,queue()執行完后,會創建一個新線程運行run()。
繼續走到 toObservable()
方法:
public Observable<R> toObservable() {
......
......
......
final Func0<Observable<R>> applyHystrixSemantics = new Func0<Observable<R>>() {
@Override
public Observable<R> call() {
if (commandState.get().equals(CommandState.UNSUBSCRIBED)) {
return Observable.never();
}
return applyHystrixSemantics(_cmd);
}
};
......
......
......
}
applyHystrixSemantics()
封裝Hystrix的核心流程,根據 HystrixCircuitBreaker 提供熔斷器狀態,確定執行run() 還是getFallback():
private Observable<R> applyHystrixSemantics(final AbstractCommand<R> _cmd) {
//如果回調邏輯拋出異常,那么就不會執行回調邏輯而是執行快速失敗機制
executionHook.onStart(_cmd);
/* 斷每個Hystrix命令的請求都通過它是否被執行 */
if (circuitBreaker.attemptExecution()) {
final TryableSemaphore executionSemaphore = getExecutionSemaphore();
final AtomicBoolean semaphoreHasBeenReleased = new AtomicBoolean(false);
final Action0 singleSemaphoreRelease = new Action0() {
@Override
public void call() {
if (semaphoreHasBeenReleased.compareAndSet(false, true)) {
executionSemaphore.release();
}
}
};
final Action1<Throwable> markExceptionThrown = new Action1<Throwable>() {
@Override
public void call(Throwable t) {
eventNotifier.markEvent(HystrixEventType.EXCEPTION_THROWN, commandKey);
}
};
if (executionSemaphore.tryAcquire()) {
try {
/* 設置開始時間,用於監控執行超時*/
executionResult = executionResult.setInvocationStartTime(System.currentTimeMillis());
//包裝HystrixCommand Observable,注冊觀察者
return executeCommandAndObserve(_cmd)
.doOnError(markExceptionThrown)
.doOnTerminate(singleSemaphoreRelease)
.doOnUnsubscribe(singleSemaphoreRelease);
} catch (RuntimeException e) {
return Observable.error(e);
}
} else {
//信號量資源不足,拒絕執行,執行getFallBack()方法
return handleSemaphoreRejectionViaFallback();
}
} else {
//不可執行,快速失效,執行getFallBack()方法
return handleShortCircuitViaFallback();
}
}
在包裝executeCommandAndObserve的邏輯中有關於發生異常的時候對各種類型異常的判斷,然后在doOnError()
回調的時候對異常進行相應的處理。
private Observable<R> executeCommandAndObserve(final AbstractCommand<R> _cmd) {
final HystrixRequestContext currentRequestContext = HystrixRequestContext.getContextForCurrentThread();
......
final Func1<Throwable, Observable<R>> handleFallback = new Func1<Throwable, Observable<R>>() {
@Override
public Observable<R> call(Throwable t) {
circuitBreaker.markNonSuccess();
//如果是異常情況,判斷當前異常的類型
Exception e = getExceptionFromThrowable(t);
executionResult = executionResult.setExecutionException(e);
//線程提交拒絕異常
if (e instanceof RejectedExecutionException) {
return handleThreadPoolRejectionViaFallback(e);
//執行命令超時異常
} else if (t instanceof HystrixTimeoutException) {
return handleTimeoutViaFallback();
//請求異常
} else if (t instanceof HystrixBadRequestException) {
return handleBadRequestByEmittingError(e);
} else {
//hystrix自定義異常
if (e instanceof HystrixBadRequestException) {
eventNotifier.markEvent(HystrixEventType.BAD_REQUEST, commandKey);
return Observable.error(e);
}
return handleFailureViaFallback(e);
}
}
};
......
......
Observable<R> execution;
if (properties.executionTimeoutEnabled().get()) {
execution = executeCommandWithSpecifiedIsolation(_cmd)
.lift(new HystrixObservableTimeoutOperator<R>(_cmd));
} else {
execution = executeCommandWithSpecifiedIsolation(_cmd);
}
return execution.doOnNext(markEmits)
.doOnCompleted(markOnCompleted)
.onErrorResumeNext(handleFallback)
.doOnEach(setRequestContext);
}
注意到最后execution執行的是:executeCommandWithSpecifiedIsolation(_cmd)
,根據隔離級別選擇是“線程方式”隔離執行還是“信號量方式”隔離執行:
private Observable<R> executeCommandWithSpecifiedIsolation(final AbstractCommand<R> _cmd) {
//如果當前配置的是線程池隔離的策略
if (properties.executionIsolationStrategy().get() == ExecutionIsolationStrategy.THREAD) {
return Observable.defer(new Func0<Observable<R>>() {
@Override
public Observable<R> call() {
executionResult = executionResult.setExecutionOccurred();
if (!commandState.compareAndSet(CommandState.OBSERVABLE_CHAIN_CREATED, CommandState.USER_CODE_EXECUTED)) {
return Observable.error(new IllegalStateException("execution attempted while in state : " + commandState.get().name()));
}
metrics.markCommandStart(commandKey, threadPoolKey, ExecutionIsolationStrategy.THREAD);
if (isCommandTimedOut.get() == TimedOutStatus.TIMED_OUT) {
// the command timed out in the wrapping thread so we will return immediately
// and not increment any of the counters below or other such logic
return Observable.error(new RuntimeException("timed out before executing run()"));
}
if (threadState.compareAndSet(ThreadState.NOT_USING_THREAD, ThreadState.STARTED)) {
//we have not been unsubscribed, so should proceed
HystrixCounters.incrementGlobalConcurrentThreads();
threadPool.markThreadExecution();
// store the command that is being run
endCurrentThreadExecutingCommand = Hystrix.startCurrentThreadExecutingCommand(getCommandKey());
executionResult = executionResult.setExecutedInThread();
//這里注冊線程執行的hook,開啟異步執行
try {
executionHook.onThreadStart(_cmd);
executionHook.onRunStart(_cmd);
executionHook.onExecutionStart(_cmd);
return getUserExecutionObservable(_cmd);
} catch (Throwable ex) {
return Observable.error(ex);
}
} else {
//command has already been unsubscribed, so return immediately
return Observable.error(new RuntimeException("unsubscribed before executing run()"));
}
}
}).doOnTerminate(new Action0() {
@Override
public void call() {
if (threadState.compareAndSet(ThreadState.STARTED, ThreadState.TERMINAL)) {
handleThreadEnd(_cmd);
}
if (threadState.compareAndSet(ThreadState.NOT_USING_THREAD, ThreadState.TERMINAL)) {
//if it was never started and received terminal, then no need to clean up (I don't think this is possible)
}
//if it was unsubscribed, then other cleanup handled it
}
}).doOnUnsubscribe(new Action0() {
@Override
public void call() {
if (threadState.compareAndSet(ThreadState.STARTED, ThreadState.UNSUBSCRIBED)) {
handleThreadEnd(_cmd);
}
if (threadState.compareAndSet(ThreadState.NOT_USING_THREAD, ThreadState.UNSUBSCRIBED)) {
//if it was never started and was cancelled, then no need to clean up
}
//if it was terminal, then other cleanup handled it
}
}).subscribeOn(threadPool.getScheduler(new Func0<Boolean>() {
@Override
public Boolean call() {
return properties.executionIsolationThreadInterruptOnTimeout().get() && _cmd.isCommandTimedOut.get() == TimedOutStatus.TIMED_OUT;
}
}));
//這里使用信號量機制
} else {
return Observable.defer(new Func0<Observable<R>>() {
@Override
public Observable<R> call() {
executionResult = executionResult.setExecutionOccurred();
if (!commandState.compareAndSet(CommandState.OBSERVABLE_CHAIN_CREATED, CommandState.USER_CODE_EXECUTED)) {
return Observable.error(new IllegalStateException("execution attempted while in state : " + commandState.get().name()));
}
metrics.markCommandStart(commandKey, threadPoolKey, ExecutionIsolationStrategy.SEMAPHORE);
// semaphore isolated
// store the command that is being run
endCurrentThreadExecutingCommand = Hystrix.startCurrentThreadExecutingCommand(getCommandKey());
try {
executionHook.onRunStart(_cmd);
executionHook.onExecutionStart(_cmd);
return getUserExecutionObservable(_cmd); //the getUserExecutionObservable method already wraps sync exceptions, so this shouldn't throw
} catch (Throwable ex) {
//If the above hooks throw, then use that as the result of the run method
return Observable.error(ex);
}
}
});
}
}