Hystrix的工作原理


一、簡介

  hystrix經常被我們用於服務的熔斷,降級等領域,基於RxJava(一種基於觀察者模式的響應式編程框架)實現,具備服務降級、服務熔斷、線程與信號隔離、請求緩存、請求合並以及服務監控等強大功能。

二、基本原理

  當我們需要調用某個方法時(一般是遠程調用),通過 Hystrix 將方法調用包裹起來,交由 Hystrix 來完成,從而享受 Hystrix 帶來保護。

Hystrix 提供了兩個請求命令:HystrixCommand、HystrixObservableCommand,可以使用這兩個對象來包裹待執行的任務。

HystrixCommand用在依賴服務返回單個操作結果的時候:

  execute():同步執行,從依賴的服務返回一個單一的結果對象,或是在發生錯誤的時候拋出異常。

  queue():異步執行,直接返回一個Future對象,其中包含了服務執行結束時要返回的單一結果對象。

HystrixObservableCommand用在依賴服務返回多個操作結果的時候:

  observe():返回Obervable對象,他代表了操作的多個結果,它是一個Hot Observable。

  toObservable():同樣返回Observable對象,也代表了操作多個結果,但它返回的是一個Cold Observable。

三、基本用法

  hystrix可以使用手動自定義Command、注解、結合feign的方式來實現,手動創建和結合feign的這里就不介紹了,主要看一下注解的實現方式。

1、開啟hystrix,啟動類上加上@EnableHystrix注解

2、在需要降級的方法上使用@HystrixCommand

public class UserServiceImpl{
   @Autowired    private UserDao userDao;
@HystrixCommand(fallbackMethod
= "getUserNameFallBack") @Override public String getUserName(String userId){ int i = 1/0 return userDao.getNameById(userId); } public String getUserNameFallBack(String userId){ return "服務暫時不可用,請稍后再試"; } }

四、初始化

從@EnableHystrix注解看起 

//開啟EnableCircuitBreaker
@EnableCircuitBreaker
public @interface EnableHystrix {
}
//導入EnableCircuitBreakerImportSelector
@Import(EnableCircuitBreakerImportSelector.class)
public @interface EnableCircuitBreaker {
}

在注解上又開啟了一個注解@EnableCircuitBreaker,並導入了一個Selector

@Order(Ordered.LOWEST_PRECEDENCE - 100)
public class EnableCircuitBreakerImportSelector
        extends SpringFactoryImportSelector<EnableCircuitBreaker> {

    @Override
    protected boolean isEnabled() {
        return getEnvironment().getProperty("spring.cloud.circuit.breaker.enabled",
                Boolean.class, Boolean.TRUE);
    }

}

這是hystrix生效的一個關鍵點,繼承了SpringFactoryImportSelector,此類在初始化后,會執行selectImports(AnnotationMetadata metadata)的方法。此方法會根據注解啟動的注解(這里指@EnableCircuitBreaker)從spring.factories文件中獲取其配置需要初始化@Configuration類,看下關鍵代碼

        List<String> factories = new ArrayList<>(new LinkedHashSet<>(SpringFactoriesLoader
                .loadFactoryNames(this.annotationClass, this.beanClassLoader)));

看一下spring.factories文件

org.springframework.boot.autoconfigure.EnableAutoConfiguration=\
org.springframework.cloud.netflix.hystrix.HystrixAutoConfiguration,\
org.springframework.cloud.netflix.hystrix.HystrixCircuitBreakerAutoConfiguration,\
org.springframework.cloud.netflix.hystrix.ReactiveHystrixCircuitBreakerAutoConfiguration,\
org.springframework.cloud.netflix.hystrix.security.HystrixSecurityAutoConfiguration

org.springframework.cloud.client.circuitbreaker.EnableCircuitBreaker=\
org.springframework.cloud.netflix.hystrix.HystrixCircuitBreakerConfiguration

配置類HystrixCircuitBreakerConfiguration

@Configuration(proxyBeanMethods = false)
public class HystrixCircuitBreakerConfiguration {

    @Bean
    public HystrixCommandAspect hystrixCommandAspect() {
        return new HystrixCommandAspect();
    }

}

向spring注入了HystrixCommandAspect

@Aspect
public class HystrixCommandAspect {

    private static final Map<HystrixPointcutType, MetaHolderFactory> META_HOLDER_FACTORY_MAP;

    static {
        META_HOLDER_FACTORY_MAP = ImmutableMap.<HystrixPointcutType, MetaHolderFactory>builder()
                .put(HystrixPointcutType.COMMAND, new CommandMetaHolderFactory())
                .put(HystrixPointcutType.COLLAPSER, new CollapserMetaHolderFactory())
                .build();
    }
    
    //切點是所有使用了HystrixCommand注解的地方
    @Pointcut("@annotation(com.netflix.hystrix.contrib.javanica.annotation.HystrixCommand)")
    public void hystrixCommandAnnotationPointcut() {
    }
    //切點是所有使用了HystrixCollapser注解的地方
    @Pointcut("@annotation(com.netflix.hystrix.contrib.javanica.annotation.HystrixCollapser)")
    public void hystrixCollapserAnnotationPointcut() {
    }
    
    //環繞通知
    @Around("hystrixCommandAnnotationPointcut() || hystrixCollapserAnnotationPointcut()")
    public Object methodsAnnotatedWithHystrixCommand(final ProceedingJoinPoint joinPoint) throws Throwable {
        Method method = getMethodFromTarget(joinPoint);
        Validate.notNull(method, "failed to get method from joinPoint: %s", joinPoint);
        //兩個注解不能同時作用
        if (method.isAnnotationPresent(HystrixCommand.class) && method.isAnnotationPresent(HystrixCollapser.class)) {
            throw new IllegalStateException("method cannot be annotated with HystrixCommand and HystrixCollapser " +
                    "annotations at the same time");
        }
        //META_HOLDER_FACTORY_MAP預先初始化了兩個工廠類
        //@HystrixCommand:CommandMetaHolderFactory
        //@HystrixCollapser:CollapserMetaHolderFactory
        MetaHolderFactory metaHolderFactory = META_HOLDER_FACTORY_MAP.get(HystrixPointcutType.of(method));
        //創建,把切點封裝進了MetaHolder
        MetaHolder metaHolder = metaHolderFactory.create(joinPoint);
        // 創建HystrixInvokable,只是一個空接口,沒有任何方法,只是用來標記具備可執行的能力
        // 具體的執行由實現類來做
        HystrixInvokable invokable = HystrixCommandFactory.getInstance().create(metaHolder);
        //執行類型
        ExecutionType executionType = metaHolder.isCollapserAnnotationPresent() ?
                metaHolder.getCollapserExecutionType() : metaHolder.getExecutionType();

        Object result;
        try {
            if (!metaHolder.isObservable()) {
                // 利用工具CommandExecutor來執行
                result = CommandExecutor.execute(invokable, executionType, metaHolder);
            } else {
                result = executeObservable(invokable, executionType, metaHolder);
            }
        } catch (HystrixBadRequestException e) {
            throw e.getCause();
        } catch (HystrixRuntimeException e) {
            throw hystrixRuntimeExceptionToThrowable(metaHolder, e);
        }
        return result;
    }

   ···

}

通過AOP編程,創建方法的代理,決定執行何種邏輯

創建參數包裝類

public MetaHolder create(final ProceedingJoinPoint joinPoint) {
            //獲取方法對象
            Method method = getMethodFromTarget(joinPoint);
            //目標對象
            Object obj = joinPoint.getTarget();
            //方法的參數列表
            Object[] args = joinPoint.getArgs();
            //代理對象
            Object proxy = joinPoint.getThis();
            //調用子類create方法
            return create(proxy, method, obj, args, joinPoint);
        }
        
        public MetaHolder create(Object proxy, Method method, Object obj, Object[] args, final ProceedingJoinPoint joinPoint) {
            //獲取HystrixCommand注解的信息
            HystrixCommand hystrixCommand = method.getAnnotation(HystrixCommand.class);
            //判斷方法的返回值類型Future:異步,Observable:rxjava中的被觀察者,其他:同步
            ExecutionType executionType = ExecutionType.getExecutionType(method.getReturnType());
            //獲取MetaHolder的builder對象
            MetaHolder.Builder builder = metaHolderBuilder(proxy, method, obj, args, joinPoint);
            if (isCompileWeaving()) {
                builder.ajcMethod(getAjcMethodFromTarget(joinPoint));
            }
            //建造者模式,創建MetaHolder包裝類
            return builder.defaultCommandKey(method.getName())
                            .hystrixCommand(hystrixCommand)
                            .observableExecutionMode(hystrixCommand.observableExecutionMode())
                            .executionType(executionType)
                            .observable(ExecutionType.OBSERVABLE == executionType)
                            .build();
        }

創建命令執行器

    public HystrixInvokable create(MetaHolder metaHolder) {
        HystrixInvokable executable;
        //@HystrixCollapser
        if (metaHolder.isCollapserAnnotationPresent()) {
            executable = new CommandCollapser(metaHolder);
        //@HystrixCommand 並且 返回值類型是Observable
        } else if (metaHolder.isObservable()) {
            executable = new GenericObservableCommand(HystrixCommandBuilderFactory.getInstance().create(metaHolder));
        } else {
            //其他情況 把HystrixCommandBuilder封裝進GenericCommand
            executable = new GenericCommand(HystrixCommandBuilderFactory.getInstance().create(metaHolder));
        }
        return executable;
    }
    public GenericCommand(HystrixCommandBuilder builder) {
        super(builder);
    }
    
    protected AbstractHystrixCommand(HystrixCommandBuilder builder) {
        super(builder.getSetterBuilder().build());
        //命令形式 包含需要執行的方法 fallback方法
        this.commandActions = builder.getCommandActions();
        this.collapsedRequests = builder.getCollapsedRequests();
        this.cacheResultInvocationContext = builder.getCacheResultInvocationContext();
        this.cacheRemoveInvocationContext = builder.getCacheRemoveInvocationContext();
        this.ignoreExceptions = builder.getIgnoreExceptions();
        //執行類型 ASYNCHRONOUS SYNCHRONOUS OBSERVABLE
        this.executionType = builder.getExecutionType();
    }
    
    public HystrixCommand.Setter build() throws HystrixPropertyException {
        //分組key:類名,命令key:方法名,線程池key
        HystrixCommand.Setter setter = HystrixCommand.Setter
                .withGroupKey(HystrixCommandGroupKey.Factory.asKey(groupKey))
                .andCommandKey(HystrixCommandKey.Factory.asKey(commandKey));
        if (StringUtils.isNotBlank(threadPoolKey)) {
            setter.andThreadPoolKey(HystrixThreadPoolKey.Factory.asKey(threadPoolKey));
        }
        try {
            //初始化線程池的配置
            setter.andThreadPoolPropertiesDefaults(HystrixPropertiesManager.initializeThreadPoolProperties(threadPoolProperties));
        } catch (IllegalArgumentException e) {
            throw new HystrixPropertyException("Failed to set Thread Pool properties. " + getInfo(), e);
        }
        try {
            //初始化命令執行配置
            setter.andCommandPropertiesDefaults(HystrixPropertiesManager.initializeCommandProperties(commandProperties));
        } catch (IllegalArgumentException e) {
            throw new HystrixPropertyException("Failed to set Command properties. " + getInfo(), e);
        }
        return setter;
    }

執行命令

    public static Object execute(HystrixInvokable invokable, ExecutionType executionType, MetaHolder metaHolder)
                                           throws RuntimeException { Validate.notNull(invokable); Validate.notNull(metaHolder); switch (executionType) { case SYNCHRONOUS: { //轉換成子接口HystrixExecutable return castToExecutable(invokable, executionType).execute(); } case ASYNCHRONOUS: { HystrixExecutable executable = castToExecutable(invokable, executionType); if (metaHolder.hasFallbackMethodCommand() && ExecutionType.ASYNCHRONOUS == metaHolder.getFallbackExecutionType()) { return new FutureDecorator(executable.queue()); } return executable.queue(); } case OBSERVABLE: { HystrixObservable observable = castToObservable(invokable); return ObservableExecutionMode.EAGER == metaHolder.getObservableExecutionMode()
                                ? observable.observe() : observable.toObservable(); } default: throw new RuntimeException("unsupported execution type: " + executionType); } } public R execute() { try { return queue().get(); } catch (Exception e) { throw Exceptions.sneakyThrow(decomposeException(e)); } }

HystrixCommand的四種執行方式

 

測試類

public class HystrixCommandTest {

    private <T> com.netflix.hystrix.HystrixCommand<T> createCommand(T message) {
        com.netflix.hystrix.HystrixCommand.Setter setter = com.netflix.hystrix.HystrixCommand.Setter
                .withGroupKey(HystrixCommandGroupKey.Factory.asKey("serviceA"))
                .andCommandKey(HystrixCommandKey.Factory.asKey("methodA"));
        return new com.netflix.hystrix.HystrixCommand<T>(setter) {
            @Override
            protected T run() throws Exception {
                System.out.println("HystrixCommand執行了!!!" + System.currentTimeMillis());
                return message;
            }
        };
    }

    @Test
    public void test01() {
        com.netflix.hystrix.HystrixCommand<String> command = createCommand("this is test01");
        System.out.println(command.execute());
    }


    @Test
    public void test02() throws ExecutionException, InterruptedException {
        com.netflix.hystrix.HystrixCommand<String> command = createCommand("this is test02");
        Future<String> f = command.queue();
        System.out.println("queue之后,command執行:" + System.currentTimeMillis());
        Thread.sleep(1000);
        System.out.println(f.get());
    }

    @Test
    public void test03() throws InterruptedException {
        HystrixCommand<String> command = createCommand("this is test03");
        // observe直接執行run方法,稱為Hot Observable
        Observable<String> observe = command.observe();
        System.out.println("observe之后,command執行:" + System.currentTimeMillis());
        Thread.sleep(1000);
        observe.subscribe(new Action1<String>() {
            @Override
            public void call(String s) {
                System.out.println("通過訂閱,獲取執行結果:" + s);
            }
        });
    }

    @Test
    public void test04() throws InterruptedException {
        HystrixCommand<String> command = createCommand("this is test04");
        // toObservable不直接執行run方法
        Observable<String> observe = command.toObservable();
        System.out.println("未訂閱,command不執行:" + System.currentTimeMillis());
        Thread.sleep(1000);
        observe.subscribe();
        System.out.println("訂閱后,command執行了" + System.currentTimeMillis());
        Thread.sleep(1000);
    }

    @Test
    public void test05() throws InterruptedException, ExecutionException {
        HystrixCommand<String> command = createCommand("this is test05");
        // toObservable不直接執行run方法
        Future<String> f = command.toObservable().toBlocking().toFuture();
        System.out.println("轉成future執行:" + System.currentTimeMillis());
        Thread.sleep(1000);
        System.out.println(f.get());
    }
}

回到hystrix源碼中queue()方法

    public Future<R> queue() {
        // toObservable轉換為Observable
        // toBlocking轉換為BlockingObservable
        // toFuture轉換為Future
        // 完成了Observable的創建和訂閱
        final Future<R> delegate = toObservable().toBlocking().toFuture();
        // 代理future,對於cancel操作做特殊處理
        // 因為toObservable().toBlocking().toFuture()返回的future無法通過cancel方法中斷執行線程。
        final Future<R> f = new Future<R>() {

            @Override
            public boolean cancel(boolean mayInterruptIfRunning) {
                if (delegate.isCancelled()) {
                    return false;
                }
                // 如果 execution.isolation.thread.interruptOnFutureCancel = true(默認false)
                if (HystrixCommand.this.getProperties().executionIsolationThreadInterruptOnFutureCancel().get()) {
                    // 設置標志位
                    interruptOnFutureCancel.compareAndSet(false, mayInterruptIfRunning);
                }
                // 執行目標future的cancel
                final boolean res = delegate.cancel(interruptOnFutureCancel.get());
                // 如果command還沒執行完成 且 需要中斷執行的線程
                if (!isExecutionComplete() && interruptOnFutureCancel.get()) {
                    // 獲取執行線程
                    final Thread t = executionThread.get();
                    // 執行線程非當前線程則中斷線程
                    if (t != null && !t.equals(Thread.currentThread())) {
                        t.interrupt();
                    }
                }

                return res;
            }

            ```
            
        };

        //判斷是否執行完成
        if (f.isDone()) {
            try {
                //獲取結果
                f.get();
                return f;
            } catch (Exception e) {
                ···
            }
        }

        return f;
    }

這里用到了RxJava框架的響應式編程,會執行到具體Command(之前封裝的GenericCommand)的run方法

public class GenericCommand extends AbstractHystrixCommand<Object> {

    private static final Logger LOGGER = LoggerFactory.getLogger(GenericCommand.class);

    public GenericCommand(HystrixCommandBuilder builder) {
        super(builder);
    }

    //執行目標的方法
    @Override
    protected Object run() throws Exception {
        LOGGER.debug("execute command: {}", getCommandKey().name());
        return process(new Action() {
            @Override
            Object execute() {
                return getCommandAction().execute(getExecutionType());
            }
        });
    }
    
    //執行fallback方法
    @Override
    protected Object getFallback() {
        final CommandAction commandAction = getFallbackAction();
        if (commandAction != null) {
            try {
                return process(new Action() {
                    @Override
                    Object execute() {
                        MetaHolder metaHolder = commandAction.getMetaHolder();
                        Object[] args = createArgsForFallback(metaHolder, getExecutionException());
                        return commandAction.executeWithArgs(metaHolder.getFallbackExecutionType(), args);
                    }
                });
            } catch (Throwable e) {
                LOGGER.error(FallbackErrorMessageBuilder.create()
                        .append(commandAction, e).build());
                throw new FallbackInvocationException(unwrapCause(e));
            }
        } else {
            return super.getFallback();
        }
    }

}

 五、Hystrix上下文

  當feign結合Hystrix使用線程隔離時,如果我們想要使用ThreadLocal傳遞參數是不行的,存在跨線程傳遞的問題,Hystrix提供了一個上下文類HystrixRequestContext,以傳遞traceId為例

 

 

參考鏈接:https://blog.csdn.net/alex_xfboy/article/details/89844066

參考鏈接:https://juejin.cn/column/6960847703521624094

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM