hystrix使用艙壁隔離模式來隔離和限制各個請求,設計了兩種隔離方式:信號量和線程池。線程池隔離:對每個command創建一個自己的線程池,執行調用。通過線程池隔離來保證不同調用不會相互干擾和每一個調用的並發限制。信號量隔熱:對每個command創建一個自己的計數器,當並發量超過計數器指定值時,直接拒絕。使用信號量和線程池的一個區別是,信號量沒有timeout機制。
線程池隔離的本質是,如果在線程池執行模式下,調用響應的線程池,如果執行數量超過指定限制,線程池就會拋出異常。
if (properties.executionIsolationStrategy().get() == ExecutionIsolationStrategy.THREAD) { return Observable.defer(new Func0<Observable<R>>() { @Override public Observable<R> call() { ... try { ... executionHook.onExecutionStart(_cmd); return getUserExecutionObservable(_cmd); } catch (Throwable ex) { return Observable.error(ex); } ... } }).subscribeOn(threadPool.getScheduler(new Func0<Boolean>() { @Override public Boolean call() { return properties.executionIsolationThreadInterruptOnTimeout().get() && _cmd.isCommandTimedOut.get() == TimedOutStatus.TIMED_OUT; } })); }
信號量隔離的本質是,針對每一個command使用一個原子變量,定義當前其執行並發量,如果在SEMAPHORE執行時,會嘗試獲取這個原子變量,如果超過了限制執行fallback降級流程。
private Observable<R> applyHystrixSemantics(final AbstractCommand<R> _cmd) { ...
final TryableSemaphore executionSemaphore = getExecutionSemaphore(); if (executionSemaphore.tryAcquire()) { try { ... return executeCommandAndObserve(_cmd) .doOnError(markExceptionThrown) .doOnTerminate(singleSemaphoreRelease) .doOnUnsubscribe(singleSemaphoreRelease); } catch (RuntimeException e) { return Observable.error(e); } } else { return handleSemaphoreRejectionViaFallback(); } ... }
protected TryableSemaphore getExecutionSemaphore() { if (properties.executionIsolationStrategy().get() == ExecutionIsolationStrategy.SEMAPHORE) { if (executionSemaphoreOverride == null) { TryableSemaphore _s = executionSemaphorePerCircuit.get(commandKey.name()); if (_s == null) { // we didn't find one cache so setup executionSemaphorePerCircuit.putIfAbsent(commandKey.name(), new TryableSemaphoreActual(properties.executionIsolationSemaphoreMaxConcurrentRequests())); // assign whatever got set (this or another thread) return executionSemaphorePerCircuit.get(commandKey.name()); } else { return _s; } } else { return executionSemaphoreOverride; } } else { return TryableSemaphoreNoOp.DEFAULT; } }
TryableSemaphoreActual封裝了一個原子
static class TryableSemaphoreActual implements TryableSemaphore { protected final HystrixProperty<Integer> numberOfPermits; private final AtomicInteger count = new AtomicInteger(0); public TryableSemaphoreActual(HystrixProperty<Integer> numberOfPermits) { this.numberOfPermits = numberOfPermits; } @Override public boolean tryAcquire() { int currentCount = count.incrementAndGet(); if (currentCount > numberOfPermits.get()) { count.decrementAndGet(); return false; } else { return true; } } @Override public void release() { count.decrementAndGet(); } @Override public int getNumberOfPermitsUsed() { return count.get(); } }