一、簡介
hystrix經常被我們用於服務的熔斷,降級等領域,基於RxJava(一種基於觀察者模式的響應式編程框架)實現,具備服務降級、服務熔斷、線程與信號隔離、請求緩存、請求合並以及服務監控等強大功能。
二、基本原理
當我們需要調用某個方法時(一般是遠程調用),通過 Hystrix 將方法調用包裹起來,交由 Hystrix 來完成,從而享受 Hystrix 帶來保護。
Hystrix 提供了兩個請求命令:HystrixCommand、HystrixObservableCommand,可以使用這兩個對象來包裹待執行的任務。
HystrixCommand用在依賴服務返回單個操作結果的時候:
execute():同步執行,從依賴的服務返回一個單一的結果對象,或是在發生錯誤的時候拋出異常。
queue():異步執行,直接返回一個Future對象,其中包含了服務執行結束時要返回的單一結果對象。
HystrixObservableCommand用在依賴服務返回多個操作結果的時候:
observe():返回Obervable對象,他代表了操作的多個結果,它是一個Hot Observable。
toObservable():同樣返回Observable對象,也代表了操作多個結果,但它返回的是一個Cold Observable。
三、基本用法
hystrix可以使用手動自定義Command、注解、結合feign的方式來實現,手動創建和結合feign的這里就不介紹了,主要看一下注解的實現方式。
1、開啟hystrix,啟動類上加上@EnableHystrix注解
2、在需要降級的方法上使用@HystrixCommand
public class UserServiceImpl{
@Autowired private UserDao userDao;
@HystrixCommand(fallbackMethod = "getUserNameFallBack") @Override public String getUserName(String userId){ int i = 1/0 return userDao.getNameById(userId); } public String getUserNameFallBack(String userId){ return "服務暫時不可用,請稍后再試"; } }
四、初始化
從@EnableHystrix注解看起
//開啟EnableCircuitBreaker @EnableCircuitBreaker public @interface EnableHystrix { } //導入EnableCircuitBreakerImportSelector @Import(EnableCircuitBreakerImportSelector.class) public @interface EnableCircuitBreaker { }
在注解上又開啟了一個注解@EnableCircuitBreaker,並導入了一個Selector
@Order(Ordered.LOWEST_PRECEDENCE - 100) public class EnableCircuitBreakerImportSelector extends SpringFactoryImportSelector<EnableCircuitBreaker> { @Override protected boolean isEnabled() { return getEnvironment().getProperty("spring.cloud.circuit.breaker.enabled", Boolean.class, Boolean.TRUE); } }
這是hystrix生效的一個關鍵點,繼承了SpringFactoryImportSelector,此類在初始化后,會執行selectImports(AnnotationMetadata metadata)的方法。此方法會根據注解啟動的注解(這里指@EnableCircuitBreaker)從spring.factories文件中獲取其配置需要初始化@Configuration類,看下關鍵代碼
List<String> factories = new ArrayList<>(new LinkedHashSet<>(SpringFactoriesLoader .loadFactoryNames(this.annotationClass, this.beanClassLoader)));
看一下spring.factories文件
org.springframework.boot.autoconfigure.EnableAutoConfiguration=\ org.springframework.cloud.netflix.hystrix.HystrixAutoConfiguration,\ org.springframework.cloud.netflix.hystrix.HystrixCircuitBreakerAutoConfiguration,\ org.springframework.cloud.netflix.hystrix.ReactiveHystrixCircuitBreakerAutoConfiguration,\ org.springframework.cloud.netflix.hystrix.security.HystrixSecurityAutoConfiguration org.springframework.cloud.client.circuitbreaker.EnableCircuitBreaker=\ org.springframework.cloud.netflix.hystrix.HystrixCircuitBreakerConfiguration
配置類HystrixCircuitBreakerConfiguration
@Configuration(proxyBeanMethods = false) public class HystrixCircuitBreakerConfiguration { @Bean public HystrixCommandAspect hystrixCommandAspect() { return new HystrixCommandAspect(); } }
向spring注入了HystrixCommandAspect
@Aspect public class HystrixCommandAspect { private static final Map<HystrixPointcutType, MetaHolderFactory> META_HOLDER_FACTORY_MAP; static { META_HOLDER_FACTORY_MAP = ImmutableMap.<HystrixPointcutType, MetaHolderFactory>builder() .put(HystrixPointcutType.COMMAND, new CommandMetaHolderFactory()) .put(HystrixPointcutType.COLLAPSER, new CollapserMetaHolderFactory()) .build(); } //切點是所有使用了HystrixCommand注解的地方 @Pointcut("@annotation(com.netflix.hystrix.contrib.javanica.annotation.HystrixCommand)") public void hystrixCommandAnnotationPointcut() { } //切點是所有使用了HystrixCollapser注解的地方 @Pointcut("@annotation(com.netflix.hystrix.contrib.javanica.annotation.HystrixCollapser)") public void hystrixCollapserAnnotationPointcut() { } //環繞通知 @Around("hystrixCommandAnnotationPointcut() || hystrixCollapserAnnotationPointcut()") public Object methodsAnnotatedWithHystrixCommand(final ProceedingJoinPoint joinPoint) throws Throwable { Method method = getMethodFromTarget(joinPoint); Validate.notNull(method, "failed to get method from joinPoint: %s", joinPoint); //兩個注解不能同時作用 if (method.isAnnotationPresent(HystrixCommand.class) && method.isAnnotationPresent(HystrixCollapser.class)) { throw new IllegalStateException("method cannot be annotated with HystrixCommand and HystrixCollapser " + "annotations at the same time"); } //META_HOLDER_FACTORY_MAP預先初始化了兩個工廠類 //@HystrixCommand:CommandMetaHolderFactory //@HystrixCollapser:CollapserMetaHolderFactory MetaHolderFactory metaHolderFactory = META_HOLDER_FACTORY_MAP.get(HystrixPointcutType.of(method)); //創建,把切點封裝進了MetaHolder MetaHolder metaHolder = metaHolderFactory.create(joinPoint); // 創建HystrixInvokable,只是一個空接口,沒有任何方法,只是用來標記具備可執行的能力 // 具體的執行由實現類來做 HystrixInvokable invokable = HystrixCommandFactory.getInstance().create(metaHolder); //執行類型 ExecutionType executionType = metaHolder.isCollapserAnnotationPresent() ? metaHolder.getCollapserExecutionType() : metaHolder.getExecutionType(); Object result; try { if (!metaHolder.isObservable()) { // 利用工具CommandExecutor來執行 result = CommandExecutor.execute(invokable, executionType, metaHolder); } else { result = executeObservable(invokable, executionType, metaHolder); } } catch (HystrixBadRequestException e) { throw e.getCause(); } catch (HystrixRuntimeException e) { throw hystrixRuntimeExceptionToThrowable(metaHolder, e); } return result; } ··· }
通過AOP編程,創建方法的代理,決定執行何種邏輯
創建參數包裝類
public MetaHolder create(final ProceedingJoinPoint joinPoint) { //獲取方法對象 Method method = getMethodFromTarget(joinPoint); //目標對象 Object obj = joinPoint.getTarget(); //方法的參數列表 Object[] args = joinPoint.getArgs(); //代理對象 Object proxy = joinPoint.getThis(); //調用子類create方法 return create(proxy, method, obj, args, joinPoint); } public MetaHolder create(Object proxy, Method method, Object obj, Object[] args, final ProceedingJoinPoint joinPoint) { //獲取HystrixCommand注解的信息 HystrixCommand hystrixCommand = method.getAnnotation(HystrixCommand.class); //判斷方法的返回值類型Future:異步,Observable:rxjava中的被觀察者,其他:同步 ExecutionType executionType = ExecutionType.getExecutionType(method.getReturnType()); //獲取MetaHolder的builder對象 MetaHolder.Builder builder = metaHolderBuilder(proxy, method, obj, args, joinPoint); if (isCompileWeaving()) { builder.ajcMethod(getAjcMethodFromTarget(joinPoint)); } //建造者模式,創建MetaHolder包裝類 return builder.defaultCommandKey(method.getName()) .hystrixCommand(hystrixCommand) .observableExecutionMode(hystrixCommand.observableExecutionMode()) .executionType(executionType) .observable(ExecutionType.OBSERVABLE == executionType) .build(); }
創建命令執行器
public HystrixInvokable create(MetaHolder metaHolder) { HystrixInvokable executable; //@HystrixCollapser if (metaHolder.isCollapserAnnotationPresent()) { executable = new CommandCollapser(metaHolder); //@HystrixCommand 並且 返回值類型是Observable } else if (metaHolder.isObservable()) { executable = new GenericObservableCommand(HystrixCommandBuilderFactory.getInstance().create(metaHolder)); } else { //其他情況 把HystrixCommandBuilder封裝進GenericCommand executable = new GenericCommand(HystrixCommandBuilderFactory.getInstance().create(metaHolder)); } return executable; } public GenericCommand(HystrixCommandBuilder builder) { super(builder); } protected AbstractHystrixCommand(HystrixCommandBuilder builder) { super(builder.getSetterBuilder().build()); //命令形式 包含需要執行的方法 fallback方法 this.commandActions = builder.getCommandActions(); this.collapsedRequests = builder.getCollapsedRequests(); this.cacheResultInvocationContext = builder.getCacheResultInvocationContext(); this.cacheRemoveInvocationContext = builder.getCacheRemoveInvocationContext(); this.ignoreExceptions = builder.getIgnoreExceptions(); //執行類型 ASYNCHRONOUS SYNCHRONOUS OBSERVABLE this.executionType = builder.getExecutionType(); } public HystrixCommand.Setter build() throws HystrixPropertyException { //分組key:類名,命令key:方法名,線程池key HystrixCommand.Setter setter = HystrixCommand.Setter .withGroupKey(HystrixCommandGroupKey.Factory.asKey(groupKey)) .andCommandKey(HystrixCommandKey.Factory.asKey(commandKey)); if (StringUtils.isNotBlank(threadPoolKey)) { setter.andThreadPoolKey(HystrixThreadPoolKey.Factory.asKey(threadPoolKey)); } try { //初始化線程池的配置 setter.andThreadPoolPropertiesDefaults(HystrixPropertiesManager.initializeThreadPoolProperties(threadPoolProperties)); } catch (IllegalArgumentException e) { throw new HystrixPropertyException("Failed to set Thread Pool properties. " + getInfo(), e); } try { //初始化命令執行配置 setter.andCommandPropertiesDefaults(HystrixPropertiesManager.initializeCommandProperties(commandProperties)); } catch (IllegalArgumentException e) { throw new HystrixPropertyException("Failed to set Command properties. " + getInfo(), e); } return setter; }
執行命令
public static Object execute(HystrixInvokable invokable, ExecutionType executionType, MetaHolder metaHolder)
throws RuntimeException { Validate.notNull(invokable); Validate.notNull(metaHolder); switch (executionType) { case SYNCHRONOUS: { //轉換成子接口HystrixExecutable return castToExecutable(invokable, executionType).execute(); } case ASYNCHRONOUS: { HystrixExecutable executable = castToExecutable(invokable, executionType); if (metaHolder.hasFallbackMethodCommand() && ExecutionType.ASYNCHRONOUS == metaHolder.getFallbackExecutionType()) { return new FutureDecorator(executable.queue()); } return executable.queue(); } case OBSERVABLE: { HystrixObservable observable = castToObservable(invokable); return ObservableExecutionMode.EAGER == metaHolder.getObservableExecutionMode()
? observable.observe() : observable.toObservable(); } default: throw new RuntimeException("unsupported execution type: " + executionType); } } public R execute() { try { return queue().get(); } catch (Exception e) { throw Exceptions.sneakyThrow(decomposeException(e)); } }
HystrixCommand的四種執行方式
測試類
public class HystrixCommandTest { private <T> com.netflix.hystrix.HystrixCommand<T> createCommand(T message) { com.netflix.hystrix.HystrixCommand.Setter setter = com.netflix.hystrix.HystrixCommand.Setter .withGroupKey(HystrixCommandGroupKey.Factory.asKey("serviceA")) .andCommandKey(HystrixCommandKey.Factory.asKey("methodA")); return new com.netflix.hystrix.HystrixCommand<T>(setter) { @Override protected T run() throws Exception { System.out.println("HystrixCommand執行了!!!" + System.currentTimeMillis()); return message; } }; } @Test public void test01() { com.netflix.hystrix.HystrixCommand<String> command = createCommand("this is test01"); System.out.println(command.execute()); } @Test public void test02() throws ExecutionException, InterruptedException { com.netflix.hystrix.HystrixCommand<String> command = createCommand("this is test02"); Future<String> f = command.queue(); System.out.println("queue之后,command執行:" + System.currentTimeMillis()); Thread.sleep(1000); System.out.println(f.get()); } @Test public void test03() throws InterruptedException { HystrixCommand<String> command = createCommand("this is test03"); // observe直接執行run方法,稱為Hot Observable Observable<String> observe = command.observe(); System.out.println("observe之后,command執行:" + System.currentTimeMillis()); Thread.sleep(1000); observe.subscribe(new Action1<String>() { @Override public void call(String s) { System.out.println("通過訂閱,獲取執行結果:" + s); } }); } @Test public void test04() throws InterruptedException { HystrixCommand<String> command = createCommand("this is test04"); // toObservable不直接執行run方法 Observable<String> observe = command.toObservable(); System.out.println("未訂閱,command不執行:" + System.currentTimeMillis()); Thread.sleep(1000); observe.subscribe(); System.out.println("訂閱后,command執行了" + System.currentTimeMillis()); Thread.sleep(1000); } @Test public void test05() throws InterruptedException, ExecutionException { HystrixCommand<String> command = createCommand("this is test05"); // toObservable不直接執行run方法 Future<String> f = command.toObservable().toBlocking().toFuture(); System.out.println("轉成future執行:" + System.currentTimeMillis()); Thread.sleep(1000); System.out.println(f.get()); } }
回到hystrix源碼中queue()方法
public Future<R> queue() { // toObservable轉換為Observable // toBlocking轉換為BlockingObservable // toFuture轉換為Future // 完成了Observable的創建和訂閱 final Future<R> delegate = toObservable().toBlocking().toFuture(); // 代理future,對於cancel操作做特殊處理 // 因為toObservable().toBlocking().toFuture()返回的future無法通過cancel方法中斷執行線程。 final Future<R> f = new Future<R>() { @Override public boolean cancel(boolean mayInterruptIfRunning) { if (delegate.isCancelled()) { return false; } // 如果 execution.isolation.thread.interruptOnFutureCancel = true(默認false) if (HystrixCommand.this.getProperties().executionIsolationThreadInterruptOnFutureCancel().get()) { // 設置標志位 interruptOnFutureCancel.compareAndSet(false, mayInterruptIfRunning); } // 執行目標future的cancel final boolean res = delegate.cancel(interruptOnFutureCancel.get()); // 如果command還沒執行完成 且 需要中斷執行的線程 if (!isExecutionComplete() && interruptOnFutureCancel.get()) { // 獲取執行線程 final Thread t = executionThread.get(); // 執行線程非當前線程則中斷線程 if (t != null && !t.equals(Thread.currentThread())) { t.interrupt(); } } return res; } ``` }; //判斷是否執行完成 if (f.isDone()) { try { //獲取結果 f.get(); return f; } catch (Exception e) { ··· } } return f; }
這里用到了RxJava框架的響應式編程,會執行到具體Command(之前封裝的GenericCommand)的run方法
public class GenericCommand extends AbstractHystrixCommand<Object> { private static final Logger LOGGER = LoggerFactory.getLogger(GenericCommand.class); public GenericCommand(HystrixCommandBuilder builder) { super(builder); } //執行目標的方法 @Override protected Object run() throws Exception { LOGGER.debug("execute command: {}", getCommandKey().name()); return process(new Action() { @Override Object execute() { return getCommandAction().execute(getExecutionType()); } }); } //執行fallback方法 @Override protected Object getFallback() { final CommandAction commandAction = getFallbackAction(); if (commandAction != null) { try { return process(new Action() { @Override Object execute() { MetaHolder metaHolder = commandAction.getMetaHolder(); Object[] args = createArgsForFallback(metaHolder, getExecutionException()); return commandAction.executeWithArgs(metaHolder.getFallbackExecutionType(), args); } }); } catch (Throwable e) { LOGGER.error(FallbackErrorMessageBuilder.create() .append(commandAction, e).build()); throw new FallbackInvocationException(unwrapCause(e)); } } else { return super.getFallback(); } } }
五、Hystrix上下文
當feign結合Hystrix使用線程隔離時,如果我們想要使用ThreadLocal傳遞參數是不行的,存在跨線程傳遞的問題,Hystrix提供了一個上下文類HystrixRequestContext,以傳遞traceId為例
參考鏈接:https://blog.csdn.net/alex_xfboy/article/details/89844066
參考鏈接:https://juejin.cn/column/6960847703521624094