hystrix總結之限流


  hystrix使用艙壁隔離模式來隔離和限制各個請求,設計了兩種隔離方式:信號量和線程池。線程池隔離:對每個command創建一個自己的線程池,執行調用。通過線程池隔離來保證不同調用不會相互干擾和每一個調用的並發限制。信號量隔熱:對每個command創建一個自己的計數器,當並發量超過計數器指定值時,直接拒絕。使用信號量和線程池的一個區別是,信號量沒有timeout機制。

  線程池隔離的本質是,如果在線程池執行模式下,調用響應的線程池,如果執行數量超過指定限制,線程池就會拋出異常。

if (properties.executionIsolationStrategy().get() == ExecutionIsolationStrategy.THREAD) {
            return Observable.defer(new Func0<Observable<R>>() {
                @Override
                public Observable<R> call() {
                        ...
                        try {
                          ...
                            executionHook.onExecutionStart(_cmd);
                            return getUserExecutionObservable(_cmd);
                        } catch (Throwable ex) {
                            return Observable.error(ex);
                        }
                   ...
                }
            }).subscribeOn(threadPool.getScheduler(new Func0<Boolean>() {
                @Override
                public Boolean call() {
                    return properties.executionIsolationThreadInterruptOnTimeout().get() && _cmd.isCommandTimedOut.get() == TimedOutStatus.TIMED_OUT;
                }
            }));
        }

  信號量隔離的本質是,針對每一個command使用一個原子變量,定義當前其執行並發量,如果在SEMAPHORE執行時,會嘗試獲取這個原子變量,如果超過了限制執行fallback降級流程。

private Observable<R> applyHystrixSemantics(final AbstractCommand<R> _cmd) {
       ...
    final TryableSemaphore executionSemaphore = getExecutionSemaphore();
if (executionSemaphore.tryAcquire()) { try { ... return executeCommandAndObserve(_cmd) .doOnError(markExceptionThrown) .doOnTerminate(singleSemaphoreRelease) .doOnUnsubscribe(singleSemaphoreRelease); } catch (RuntimeException e) { return Observable.error(e); } } else { return handleSemaphoreRejectionViaFallback(); } ... }
protected TryableSemaphore getExecutionSemaphore() {
        if (properties.executionIsolationStrategy().get() == ExecutionIsolationStrategy.SEMAPHORE) {
            if (executionSemaphoreOverride == null) {
                TryableSemaphore _s = executionSemaphorePerCircuit.get(commandKey.name());
                if (_s == null) {
                    // we didn't find one cache so setup
                    executionSemaphorePerCircuit.putIfAbsent(commandKey.name(), new TryableSemaphoreActual(properties.executionIsolationSemaphoreMaxConcurrentRequests()));
                    // assign whatever got set (this or another thread)
                    return executionSemaphorePerCircuit.get(commandKey.name());
                } else {
                    return _s;
                }
            } else {
                return executionSemaphoreOverride;
            }
        } else {
            return TryableSemaphoreNoOp.DEFAULT;
        }
    }

  TryableSemaphoreActual封裝了一個原子

static class TryableSemaphoreActual implements TryableSemaphore {
        protected final HystrixProperty<Integer> numberOfPermits;
        private final AtomicInteger count = new AtomicInteger(0);

        public TryableSemaphoreActual(HystrixProperty<Integer> numberOfPermits) {
            this.numberOfPermits = numberOfPermits;
        }

        @Override
        public boolean tryAcquire() {
            int currentCount = count.incrementAndGet();
            if (currentCount > numberOfPermits.get()) {
                count.decrementAndGet();
                return false;
            } else {
                return true;
            }
        }

        @Override
        public void release() {
            count.decrementAndGet();
        }

        @Override
        public int getNumberOfPermitsUsed() {
            return count.get();
        }

    }

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM