hystrix的源碼分析(二)


hystrix的源碼分析(二)

​ 上文回顧: 上文我們通過HystrixCommandAspect監聽@HystrixCommand,然后通過@HystrixCommand的配置構建了一個GenericCommand這么的一個過程。

先看一下簡潔版的HystrixCommandAspect:

@Aspect
public class HystrixCommandAspect {
	...
	
    @Around("hystrixCommandAnnotationPointcut() || hystrixCollapserAnnotationPointcut()")
    public Object methodsAnnotatedWithHystrixCommand(ProceedingJoinPoint joinPoint) throws Throwable {
		...
        HystrixInvokable invokable = HystrixCommandFactory.getInstance().create(metaHolder);
        ...
        result = CommandExecutor.execute(invokable, executionType, metaHolder);
        ...
    }
}

現在我們構建好了一個HystrixInvokable了。這篇博客主要講的就是CommandExecutor.execute這個方法的執行過程

CommandExecutor.execute代碼分析

CommandExecutor.execute執行如下:

public class CommandExecutor {
    public CommandExecutor() {
    }

    public static Object execute(HystrixInvokable invokable, ExecutionType executionType, MetaHolder metaHolder) throws RuntimeException {
        switch(executionType) {
        case SYNCHRONOUS:
            return castToExecutable(invokable, executionType).execute();
        case ASYNCHRONOUS:
          	...
        case OBSERVABLE:
			...
        default:
            throw new RuntimeException("unsupported execution type: " + executionType);
        }
    }

    private static HystrixExecutable castToExecutable(HystrixInvokable invokable, ExecutionType executionType) {
        if (invokable instanceof HystrixExecutable) {
            return (HystrixExecutable)invokable;
        } else {
            throw new RuntimeException("Command should implement " + HystrixExecutable.class.getCanonicalName() + " interface to execute in: " + executionType + " mode");
        }
    }
}
  public abstract class HystrixCommand<R> extends AbstractCommand<R> implements HystrixExecutable<R>, HystrixInvokableInfo<R>, HystrixObservable<R> {
      ...
  public R execute() {
        try {
            return queue().get();
        } catch (Exception e) {
            throw Exceptions.sneakyThrow(decomposeException(e));
        }
    }
      
   public Future<R> queue() {
        final Future<R> delegate = toObservable().toBlocking().toFuture();
   		...
   }
      ...
}

​ 首先CommandExecutor.execute 方法里要判斷是需要同步,異步,觀察這個三個模式下的哪一種,我們這里走的是同步。所以代碼就會走HystrixCommand.execute() -> queue() -> toObservable()

toObservable代碼分析

下面先看一下toObservable的代碼:

  public Observable<R> toObservable() {
    .... 一些action的定義 ....
 final Func0<Observable<R>> applyHystrixSemantics = new Func0<Observable<R>>() {
        public Observable<R> call() {
                if(this.commandState.get()).equals(AbstractCommand.CommandState.UNSUBSCRIBED)){
                    return Observable.never() 
                }else{
                    applyHystrixSemantics(AbstractCommand.this);
                }
            }
        };
        
        ...
        return Observable.defer(new Func0<Observable<R>>() {
            public Observable<R> call() {
                ...判斷是否開啟緩存...
                boolean requestCacheEnabled = AbstractCommand.this.isRequestCachingEnabled();
                String cacheKey = AbstractCommand.this.getCacheKey();
                if (requestCacheEnabled) {
                    	//拿去緩存,如果存在緩存的話,直接返回
                         HystrixCommandResponseFromCache<R> fromCache = (HystrixCommandResponseFromCache<R>) requestCache.get(cacheKey);
                    if (fromCache != null) {
                        isResponseFromCache = true;
                        return handleRequestCacheHitAndEmitValues(fromCache, _cmd);
                    }
                }

                Observable<R> hystrixObservable = Observable.defer(applyHystrixSemantics).map(wrapWithAllOnNextHooks);
                Observable afterCache;
                if (requestCacheEnabled && cacheKey != null) {
                    ... 緩存后續的一些判斷.....
                } else {
                    afterCache = hystrixObservable;
                }

                return 	afterCache.doOnTerminate(terminateCommandCleanup)
                    .doOnUnsubscribe(unsubscribeCommandCleanup)
                    .doOnCompleted(fireOnCompletedHook);

            }
        });
}

​ 首先toObservable()這個方法的返回值是Observable ,這個是rxjava的一個觀察者,如果沒看過rxjava的小伙伴建議去看一下先,不然hystrix后面代碼會很難看懂,他是一層層的返回Observable。 我們這里直接查看返回值就行了,根據rxjava里Observable.defer(Func0<Observable >) 特性,是當Observable綁定了觀察者的時候就會觸發Func0里的call方法。這里我們先看看call里面的方法把。call里面的方法主要用途:

  • 判斷一下是否開啟了緩存,如果開啟了就直接返回
  • 沒有開啟或者還沒有緩存的時候就執行Observable.defer(applyHystrixSemantics),執行后返回。

​ 我們看到Observable.defer(applyHystrixSemantics), 也是Observable.defer這個方式,所以直接看call方法,代碼接着會執行

applyHystrixSemantics(AbstractCommand.this);

代碼如下:

    private Observable<R> applyHystrixSemantics(AbstractCommand<R> _cmd) {
        this.executionHook.onStart(_cmd);
        //判讀是不是熔斷了。
        if (this.circuitBreaker.allowRequest()) {
           final TryableSemaphore executionSemaphore = getExecutionSemaphore();

			。。。
            //信號量的控制
            if (executionSemaphore.tryAccaquire()) {
                try {
                    this.executionResult = this.executionResult.setInvocationStartTime(System.currentTimeMillis());
                   	//如果都成功的話會執行executeCommandAndObserve
                    return this.executeCommandAndObserve(_cmd)
                        .doOnError(markExceptionThrown)
                        .doOnTerminate(singleSemaphoreRelease)
                        .doOnUnsubscribe(singleSemaphoreRelease);
                } catch (RuntimeException var7) {
                    return Observable.error(var7);
                }
            } else {
                return this.handleSemaphoreRejectionViaFallback();
            }
        } else {
            return this.handleShortCircuitViaFallback();
        }
    }

​ 這里首先先判斷this.circuitBreaker.allowRequest()是否熔斷了,熔斷了就執行this.handleSemaphoreRejectionViaFallback()方法直接返回,否則就繼續執行下去。然后會獲取TryableSemaphore,如果我們開啟的時候信號量隔離的話這里就返回TryableSemaphore,否則就返回TryableSemaphoreNoOp。再去tryAccaquire嘗試獲取信號量,如果成功了最后執行this.executeCommandAndObserve(_cmd)方法。

熔斷器降級分析

static class HystrixCircuitBreakerImpl implements HystrixCircuitBreaker {
        private final HystrixCommandProperties properties;
        private final HystrixCommandMetrics metrics;

    	//熔斷器是否開啟
        /* track whether this circuit is open/closed at any given point in time (default to false==closed) */
        private AtomicBoolean circuitOpen = new AtomicBoolean(false);

        /* when the circuit was marked open or was last allowed to try a 'singleTest' */
        private AtomicLong circuitOpenedOrLastTestedTime = new AtomicLong();

        protected HystrixCircuitBreakerImpl(HystrixCommandKey key, HystrixCommandGroupKey commandGroup, HystrixCommandProperties properties, HystrixCommandMetrics metrics) {
            this.properties = properties;
            this.metrics = metrics;
        }

    
    //當半開半閉狀態下,如果這次請求成功而了,則把熔斷器設為false,且讓統計指標reset
        public void markSuccess() {
            if (circuitOpen.get()) {
                if (circuitOpen.compareAndSet(true, false)) {
                    //win the thread race to reset metrics
                    //Unsubscribe from the current stream to reset the health counts stream.  This only affects the health counts view,
                    //and all other metric consumers are unaffected by the reset
                    metrics.resetStream();
                }
            }
        }

        @Override
        public boolean allowRequest() {
            //判斷是否強制打開熔斷器
            if (properties.circuitBreakerForceOpen().get()) {
                return false;
            }
            //是否強制關閉熔斷器
            if (properties.circuitBreakerForceClosed().get()) {
                isOpen();
                return true;
            }
            return !isOpen() || allowSingleTest();
        }

    
        public boolean allowSingleTest() {
            long timeCircuitOpenedOrWasLastTested = circuitOpenedOrLastTestedTime.get();
            // 1) if the circuit is open
            // 2) and it's been longer than 'sleepWindow' since we opened the circuit
            //熔斷器是開啟的,且當前時間比開啟熔斷器的時間加上sleepWindow時間還要長
            if (circuitOpen.get() && System.currentTimeMillis() > timeCircuitOpenedOrWasLastTested + properties.circuitBreakerSleepWindowInMilliseconds().get()) {
                // We push the 'circuitOpenedTime' ahead by 'sleepWindow' since we have allowed one request to try.
                // If it succeeds the circuit will be closed, otherwise another singleTest will be allowed at the end of the 'sleepWindow'.
                //設置當前時間到timeCircuitOpenedOrWasLastTested,
                //如果半開半閉的狀態下,如果這次請求成功了則會調用markSuccess,讓熔斷器狀態設為false,
                //如果不成功,就不需要了。
                //案例:半開半合狀態下,熔斷開啟時間為00:00:00,sleepWindow為10s,如果00:00:15秒的時候調用,如果調用失敗,
                //在00:00:15至00:00:25秒這個區間都是熔斷的,
                if (circuitOpenedOrLastTestedTime.compareAndSet(timeCircuitOpenedOrWasLastTested, System.currentTimeMillis())) {
                    // if this returns true that means we set the time so we'll return true to allow the singleTest
                    // if it returned false it means another thread raced us and allowed the singleTest before we did
                    return true;
                }
            }
            return false;
        }

        @Override
        public boolean isOpen() {
            //判斷是否熔斷了,circuitOpen是熔斷的狀態 ,true為熔斷,false為不熔斷
            if (circuitOpen.get()) {
                return true;
            }

            //獲取統計到的指標信息
            HealthCounts health = metrics.getHealthCounts();
		 	// 一個時間窗口(默認10s鍾)總請求次數是否大於circuitBreakerRequestVolumeThreshold 默認為20s
            if (health.getTotalRequests() < properties.circuitBreakerRequestVolumeThreshold().get()) {
                return false;
            }
		    // 錯誤率(總錯誤次數/總請求次數)小於circuitBreakerErrorThresholdPercentage(默認50%)
            if (health.getErrorPercentage() < properties.circuitBreakerErrorThresholdPercentage().get()) {
                return false;
            } else {
                // 反之,熔斷狀態將從CLOSED變為OPEN,且circuitOpened==>當前時間戳
                if (circuitOpen.compareAndSet(false, true)) {
                    //並且把當前時間設置到circuitOpenedOrLastTestedTime,可待后面的時間的對比
                    circuitOpenedOrLastTestedTime.set(System.currentTimeMillis());
                    return true;
                } else {
                    return true;
                }
            }
        }

    }

​ HystrixCircuitBreakerImpl這個類就是在構建AbstractCommand的時候創建的。this.circuitBreaker.allowRequest() 這個方法做了以下幾件事:

  1. 判斷是否強制開啟熔斷器和強制關閉熔斷器,如果不是調用返回!isOpen() || allowSingleTest();

  2. isOpen 首先判斷熔斷是否開啟,然后判斷是否需要熔斷,熔斷的條件如下:

    • 時間窗口內(默認10s鍾)總請求次數大於20次
    • 時間窗口內(默認10s鍾)失敗率大於50%

    如果同時滿足這兩個條件則做以下操作:

    • 把熔斷狀態從false設為true
    • 把熔斷時間設置為當前時間
  3. 如果是熔斷的情況下就執行allowSingleTest,allowSingleTest的作用是:讓開啟熔斷的都能往下執行,滿足條件:

    • circuitOpen.get() 為true,確保是普通的熔斷,而不是強制熔斷
    • 當前時間比開啟熔斷器的時間加上sleepWindow時間還要長

    如果同時滿足這個條件則讓熔斷開始時間設置為當前時間,且返回true(讓程序執行走下去,而不是熔斷了)。這里有個點是需要知道的,舉個例子:熔斷開啟時間為00:00:00,sleepWindow為10s,如果00:00:15秒的時候調用,如果調用失敗,在00:00:15至00:00:25秒這個區間都是熔斷的。 半開半閉狀態下如果這次請求為false的話,下次不會被熔斷的時間可能就是這個時間加上睡眠時間了。

  4. 如果在半開半必的狀態下,這次請求成功了,他回去調用markSuccess()方法,這個方法主要功能:

    • 把熔斷器的狀態從開啟設為關閉
    • 讓metrics統計指標重新統計

Tips:allowSingleTest返回true的簡單的可以叫為半開半閉狀態。

信號量隔離的分析

  /* package */static class TryableSemaphoreActual implements TryableSemaphore {
        protected final HystrixProperty<Integer> numberOfPermits;
        private final AtomicInteger count = new AtomicInteger(0);

        public TryableSemaphoreActual(HystrixProperty<Integer> numberOfPermits) {
            this.numberOfPermits = numberOfPermits;
        }

        @Override
        public boolean tryAcquire() {
            int currentCount = count.incrementAndGet();
            if (currentCount > numberOfPermits.get()) {
                count.decrementAndGet();
                return false;
            } else {
                return true;
            }
        }
    }
        
        
    /* package */static class TryableSemaphoreNoOp implements TryableSemaphore {

        public static final TryableSemaphore DEFAULT = new TryableSemaphoreNoOp();

        @Override
        public boolean tryAcquire() {
            return true;
        }
    }

executionSemaphore.tryAccaquire()的執行,主要他有兩種情況

  • 開啟了信號量隔離,TryableSemaphoreActual會把信號量增加1,如果currentCount > numberOfPermits.get()的時候就返回false,信號量降級。
  • 沒有開啟信號量隔離,TryableSemaphoreNoOp.tryAcquire()永遠都是返回true。

executeCommandAndObserve方法解析

​ 如果沒有被熔斷隔離和信號量隔離的話,進入executeCommandAndObserve這個方法,代碼如下:

    private Observable<R> executeCommandAndObserve(final AbstractCommand<R> _cmd) {
        final HystrixRequestContext currentRequestContext = HystrixRequestContext.getContextForCurrentThread();
        ....
        Observable<R> execution;
        //判斷是否超時隔離
        if (properties.executionTimeoutEnabled().get()) {
            execution = executeCommandWithSpecifiedIsolation(_cmd)
                    .lift(new HystrixObservableTimeoutOperator<R>(_cmd));
        } else {
            execution = executeCommandWithSpecifiedIsolation(_cmd);
        }

        //markEmits,markOnCompleted,handleFallback,setRequestContext都是匿名內部類,都在這個方法里定義了,
        //這我覺得無關緊要就把他們復制進來。他們就是一些狀態的設置
        return execution.doOnNext(markEmits)
                .doOnCompleted(markOnCompleted)
                .onErrorResumeNext(handleFallback)
                .doOnEach(setRequestContext);
    }

判斷是否開啟超時隔離:

  • 超時隔離executeCommandWithSpecifiedIsolation(_cmd).lift(new HystrixObservableTimeoutOperator (_cmd));
  • 不是超時隔離executeCommandWithSpecifiedIsolation(_cmd)

​ 其實是不是超時隔離都會執行executeCommandWithSpecifiedIsolation(_cmd),超時隔離額外加了一個Obserable.lift(new HystrixObservableTimeoutOperator (_cmd));

超時隔離分析

​ Obserable.lift可以認為是給這個Obserable加了一個裝飾器,把傳進來的參數進行加工,然后再傳出到Obserable.onNext中,所以這里我們看HystrixObservableTimeoutOperator.call方法就行了。因為是call方法中進行加工的

​ HystrixObservableTimeoutOperator (_cmd)代碼如下:

  private static class HystrixObservableTimeoutOperator<R> implements Operator<R, R> {

        final AbstractCommand<R> originalCommand;

        public HystrixObservableTimeoutOperator(final AbstractCommand<R> originalCommand) {
            this.originalCommand = originalCommand;
        }

        @Override
        public Subscriber<? super R> call(final Subscriber<? super R> child) {
            final CompositeSubscription s = new CompositeSubscription();
            // if the child unsubscribes we unsubscribe our parent as well
            child.add(s);
            //超時的時候拋出new HystrixTimeoutException()
            final HystrixContextRunnable timeoutRunnable = new HystrixContextRunnable(originalCommand.concurrencyStrategy, new Runnable() {
                @Override
                public void run() {
                    child.onError(new HystrixTimeoutException());
                }
            });

            //設置定時調度
            TimerListener listener = new TimerListener() {

                //定時觸發的方法
                @Override
                public void tick() {
                    //把狀態從未執行設為timeout
                    if (originalCommand.isCommandTimedOut.compareAndSet(TimedOutStatus.NOT_EXECUTED, TimedOutStatus.TIMED_OUT)) {
                        // report timeout failure
                        originalCommand.eventNotifier.markEvent(HystrixEventType.TIMEOUT, originalCommand.commandKey);
                        // shut down the original request
                        s.unsubscribe();
                        timeoutRunnable.run();
                    }
                }
                //獲取定時的的時間
                @Override
                public int getIntervalTimeInMilliseconds() {
                    return originalCommand.properties.executionTimeoutInMilliseconds().get();
                }
            };

            final Reference<TimerListener> tl = HystrixTimer.getInstance().addTimerListener(listener);
            // set externally so execute/queue can see this
            originalCommand.timeoutTimer.set(tl);
            /**
             * If this subscriber receives values it means the parent succeeded/completed
             */
            Subscriber<R> parent = new Subscriber<R>() {
				...
            };

            // if s is unsubscribed we want to unsubscribe the parent
            s.add(parent);

            return parent;
        }

    }

HystrixTimer:

    public Reference<TimerListener> addTimerListener(final TimerListener listener) {
        startThreadIfNeeded();
        // add the listener

        Runnable r = new Runnable() {

            @Override
            public void run() {
                try {
                    listener.tick();
                } catch (Exception e) {
                    logger.error("Failed while ticking TimerListener", e);
                }
            }
        };
//getIntervalTimeInMilliseconds獲取定時時間
        ScheduledFuture<?> f = executor.get().getThreadPool().scheduleAtFixedRate(r, listener.getIntervalTimeInMilliseconds(), listener.getIntervalTimeInMilliseconds(), TimeUnit.MILLISECONDS);
        return new TimerReference(listener, f);
    }

​ ObservableTimeoutOperator.call主要做了:定義了一個定時器TimerListener,里面定時的時間就是我們設置的@HystrixCommand的超時的時間(體現的位置:originalCommand.properties.executionTimeoutInMilliseconds().get()),然后當超時了,會執行以下操作:

  • 把狀態從NOT_EXECUTED設置為TIMED_OUT
  • 發送TIMEOUT事件
  • s.unsubscribe()取消事件訂閱
  • timeoutRunnable.run();拋出timeoutRunnable異常

​ 簡單來說就是,設置了一個定時器,定時時間是我們設置的超時時間,如果定時時間到了,我們就改變相應的狀態,發送相應的內部事件,取消Obserable的訂閱,拋出異常,而做到一個超時的隔離。

executeCommandWithSpecifiedIsolation方法的執行

​ 代碼如下:

private Observable<R> executeCommandWithSpecifiedIsolation(final AbstractCommand<R> _cmd) {
        if (properties.executionIsolationStrategy().get() == ExecutionIsolationStrategy.THREAD) {
            // mark that we are executing in a thread (even if we end up being rejected we still were a THREAD execution and not SEMAPHORE)
            return Observable.defer(new Func0<Observable<R>>() {
                @Override
                public Observable<R> call() {
                  	...
                    metrics.markCommandStart(commandKey, threadPoolKey, ExecutionIsolationStrategy.THREAD);

                    if (isCommandTimedOut.get() == TimedOutStatus.TIMED_OUT) {
              			...
                        return Observable.error(new RuntimeException("timed out before executing run()"));
                    }
                    if (threadState.compareAndSet(ThreadState.NOT_USING_THREAD, ThreadState.STARTED)) {
               				....
                        try {
                            executionHook.onThreadStart(_cmd);
                            executionHook.onRunStart(_cmd);
                            executionHook.onExecutionStart(_cmd);
                            //最后執行這個
                            return getUserExecutionObservable(_cmd);
                        } catch (Throwable ex) {
                            return Observable.error(ex);
                        }
                    } else {
                        //command has already been unsubscribed, so return immediately
                        return Observable.error(new RuntimeException("unsubscribed before executing run()"));
                    }
                }
            }).doOnTerminate(...).doOnUnsubscribe(...)
              .subscribeOn(threadPool.getScheduler(new Func0<Boolean>() {
                @Override
                public Boolean call() {
                    return properties.executionIsolationThreadInterruptOnTimeout().get() && _cmd.isCommandTimedOut.get() == TimedOutStatus.TIMED_OUT;
                }
            }));
        } else {
          ...
        }
    }

​ 這里返回的Obserable是Observable.defer(...).subscribeOn(...) , Observable.defer之前說過了。而且call方法中也沒什么好分析的可以直接看到return getUserExecutionObservable(_cmd);這個方法了。

​ 而Observable.subscribeOn這個方法是用於指定一個線程池去執行我們被觀察者observable觸發時的方法,可以看到threadPool.getScheduler(...)。

指定線程池執行方法

​ 指定相應線程池的代碼如下:

    /* package */static class HystrixThreadPoolDefault implements HystrixThreadPool {
        private static final Logger logger = LoggerFactory.getLogger(HystrixThreadPoolDefault.class);

        private final HystrixThreadPoolProperties properties;
        private final BlockingQueue<Runnable> queue;
        private final ThreadPoolExecutor threadPool;
        private final HystrixThreadPoolMetrics metrics;
        private final int queueSize;

		...
 

        @Override
        public Scheduler getScheduler(Func0<Boolean> shouldInterruptThread) {
            touchConfig();
            return new HystrixContextScheduler(HystrixPlugins.getInstance().getConcurrencyStrategy(), this, shouldInterruptThread);
        }

        //動態調整線程池的大小
        // allow us to change things via fast-properties by setting it each time
        private void touchConfig() {
            final int dynamicCoreSize = properties.coreSize().get();
            final int configuredMaximumSize = properties.maximumSize().get();
            int dynamicMaximumSize = properties.actualMaximumSize();
            final boolean allowSizesToDiverge = properties.getAllowMaximumSizeToDivergeFromCoreSize().get();
            boolean maxTooLow = false;

            if (allowSizesToDiverge && configuredMaximumSize < dynamicCoreSize) {
                dynamicMaximumSize = dynamicCoreSize;
                maxTooLow = true;
            }

            // In JDK 6, setCorePoolSize and setMaximumPoolSize will execute a lock operation. Avoid them if the pool size is not changed.
            if (threadPool.getCorePoolSize() != dynamicCoreSize || (allowSizesToDiverge && threadPool.getMaximumPoolSize() != dynamicMaximumSize)) {
          		...
                threadPool.setCorePoolSize(dynamicCoreSize);
                threadPool.setMaximumPoolSize(dynamicMaximumSize);
            }

            threadPool.setKeepAliveTime(properties.keepAliveTimeMinutes().get(), TimeUnit.MINUTES);
        }
}
public class HystrixContextScheduler extends Scheduler {

    private final HystrixConcurrencyStrategy concurrencyStrategy;
    private final Scheduler actualScheduler;
    private final HystrixThreadPool threadPool;
    
	。。。
  
    public HystrixContextScheduler(HystrixConcurrencyStrategy concurrencyStrategy, HystrixThreadPool threadPool, Func0<Boolean> shouldInterruptThread) {
        this.concurrencyStrategy = concurrencyStrategy;
        this.threadPool = threadPool;
        this.actualScheduler = new ThreadPoolScheduler(threadPool, shouldInterruptThread);
    }

    @Override
    public Worker createWorker() {
         	// 構建一個默認的Worker,這里的actualScheduler就是ThreadPoolScheduler
        //actualScheduler.createWorker()就是ThreadPoolWorker
        return new HystrixContextSchedulerWorker(actualScheduler.createWorker());
    }

    
    //HystrixContextSchedulerWorker類
    private class HystrixContextSchedulerWorker extends Worker {

        private final Worker worker;

        private HystrixContextSchedulerWorker(Worker actualWorker) {
            this.worker = actualWorker;
        }

   		...

        @Override
        public Subscription schedule(Action0 action) {
            if (threadPool != null) {
                if (!threadPool.isQueueSpaceAvailable()) {
                    throw new RejectedExecutionException("Rejected command because thread-pool queueSize is at rejection threshold.");
                }
            }
            //這里的worker其實就是ThreadPoolWorker
            return worker.schedule(new HystrixContexSchedulerAction(concurrencyStrategy, action));
        }

    }

    //ThreadPoolScheduler類
    private static class ThreadPoolScheduler extends Scheduler {

        private final HystrixThreadPool threadPool;
        private final Func0<Boolean> shouldInterruptThread;

        public ThreadPoolScheduler(HystrixThreadPool threadPool, Func0<Boolean> shouldInterruptThread) {
            this.threadPool = threadPool;
            this.shouldInterruptThread = shouldInterruptThread;
        }

        @Override
        public Worker createWorker() {
            //默認的worker為:ThreadPoolWorker
            return new ThreadPoolWorker(threadPool, shouldInterruptThread);
        }

    }

    
//ThreadPoolWorker類
    private static class ThreadPoolWorker extends Worker {

        private final HystrixThreadPool threadPool;
        private final CompositeSubscription subscription = new CompositeSubscription();
        private final Func0<Boolean> shouldInterruptThread;

        public ThreadPoolWorker(HystrixThreadPool threadPool, Func0<Boolean> shouldInterruptThread) {
            this.threadPool = threadPool;
            this.shouldInterruptThread = shouldInterruptThread;
        }
		...
        @Override
        public Subscription schedule(final Action0 action) {
            if (subscription.isUnsubscribed()) {
                // don't schedule, we are unsubscribed
                return Subscriptions.unsubscribed();
            }

            // This is internal RxJava API but it is too useful.
            ScheduledAction sa = new ScheduledAction(action);

            subscription.add(sa);
            sa.addParent(subscription);

            ThreadPoolExecutor executor = (ThreadPoolExecutor) threadPool.getExecutor();
            FutureTask<?> f = (FutureTask<?>) executor.submit(sa);
            sa.add(new FutureCompleterWithConfigurableInterrupt(f, shouldInterruptThread, executor));

            return sa;
        }

       ...
    }


}

touchConfig() 方法主要是重新設置最大線程池actualMaximumSize的,這里默認的allowMaximumSizeToDivergeFromCoreSize是false。和動態調整線程池的核心數大小

HystrixContextScheduler類中有HystrixContextSchedulerWorkerThreadPoolSchedulerThreadPoolWorker 這幾個內部類。看看它們的作用:

  • HystrixContextSchedulerWorker: 對外提供schedule()方法,這里會判斷線程池隊列是否已經滿,如果滿了這會拋出異常:Rejected command because thread-pool queueSize is at rejection threshold。 如果配置的隊列大小為-1 則默認返回true。然后繼續調用actualScheduler.createWorker().schedule() , actualScheduler就是ThreadPoolScheduler。
  • ThreadPoolScheduler:執行createWorker()方法,默認使用ThreadPoolWorker()
  • ThreadPoolWorker: 執行command的核心邏輯
private static class ThreadPoolWorker extends Worker {

    private final HystrixThreadPool threadPool;
    private final CompositeSubscription subscription = new CompositeSubscription();
    private final Func0<Boolean> shouldInterruptThread;

    @Override
    public Subscription schedule(final Action0 action) {
        if (subscription.isUnsubscribed()) {
            return Subscriptions.unsubscribed();
        }

        ScheduledAction sa = new ScheduledAction(action);
        subscription.add(sa);
        sa.addParent(subscription);
        // 獲取線程池
        ThreadPoolExecutor executor = (ThreadPoolExecutor) threadPool.getExecutor();
        // 將包裝后的HystrixCommand submit到線程池,然后返回FutureTask
        FutureTask<?> f = (FutureTask<?>) executor.submit(sa);
        sa.add(new FutureCompleterWithConfigurableInterrupt(f, shouldInterruptThread, executor));

        return sa;
    }
}

​ 這里我們可以看到了,獲取線程池,並且將包裝后的HystrixCommand submit到線程池,然后返回FutureTask。

getUserExecutionObservable方法執行

    private Observable<R> getUserExecutionObservable(final AbstractCommand<R> _cmd) {
        Observable<R> userObservable;

        try {
            userObservable = getExecutionObservable();
        } catch (Throwable ex) {
            // the run() method is a user provided implementation so can throw instead of using Observable.onError
            // so we catch it here and turn it into Observable.error
            userObservable = Observable.error(ex);
        }

        return userObservable
                .lift(new ExecutionHookApplication(_cmd))
                .lift(new DeprecatedOnRunHookApplication(_cmd));
    }

HystrixCommand類中的

   @Override
    final protected Observable<R> getExecutionObservable() {
        return Observable.defer(new Func0<Observable<R>>() {
            @Override
            public Observable<R> call() {
                try {
                    //可以看到run()方法了。 HystrixCommand.run()其實就是我們自己寫的代碼里的方法
                    return Observable.just(run());
                } catch (Throwable ex) {
                    return Observable.error(ex);
                }
            }
        }).doOnSubscribe(new Action0() {
            @Override
            public void call() {
                // Save thread on which we get subscribed so that we can interrupt it later if needed
                executionThread.set(Thread.currentThread());
            }
        });
    }

最后可以看到會調用Observable.just(run()) ,這個就是我們我們自己寫的代碼里的方法,到這里就是我們整體的執行過程了。

額外補充

​ 為什么我們沒有看到Observable.subscribe去訂閱觀察者呢。其實在HystrixCommand.queue()的方法中有這么一個代碼:toObservable().toBlocking().toFuture()。跟蹤一下代碼:toObservable().toBlocking() -> BlockingObservable.from(this) -> new BlockingObservable(o) 得到的是BlockingObservable ,然后BlockingObservable.toFuture -> BlockingOperatorToFuture.toFuture(this.o) 看下 BlockingOperatorToFuture.toFuture代碼:

 public static <T> Future<T> toFuture(Observable<? extends T> that) {
        final CountDownLatch finished = new CountDownLatch(1);
        final AtomicReference<T> value = new AtomicReference();
        final AtomicReference<Throwable> error = new AtomicReference();
     	//observable.subscribe 訂閱的位置
        final Subscription s = that.single().subscribe(new Subscriber<T>() {
            public void onCompleted() {
                finished.countDown();
            }

            public void onError(Throwable e) {
                error.compareAndSet((Object)null, e);
                finished.countDown();
            }

            public void onNext(T v) {
                value.set(v);
            }
        });
        return new Future<T>() {
            ...
        };
    }

final Subscription s = that.single().subscribe(...) 這里就是訂閱的位置了。

結尾

​ 總結: 這篇博文主要是講了HystrixCommand.execute整個的執行的流程,里面已經涵蓋了熔斷,超時,信號量,線程的代碼了。最后附上一張我自己畫的一張流程圖,如果想自己走一遍流程的話可以看一下我這個流程圖:

hstrix執行流程圖

高清流程圖:

https://gitee.com/gzgyc/blogimage/raw/master/hstrix執行流程圖.jpg


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM