轉 Hystrix超時實現機制


HystrixCommand在執行的過程中如何探測超時,本篇主要對此進行介紹說明。

1.主入口:executeCommandAndObserve

#com.netflix.hystrix.AbstractCommand#executeCommandAndObserve private Observable<R> executeCommandAndObserve(final AbstractCommand<R> _cmd) { ···省略部分代碼··· Observable<R> execution; //判斷是否開啟超時監測 if (properties.executionTimeoutEnabled().get()) { execution = executeCommandWithSpecifiedIsolation(_cmd) .lift(new HystrixObservableTimeoutOperator<R>(_cmd)); } else { execution = executeCommandWithSpecifiedIsolation(_cmd); } return execution.doOnNext(markEmits) .doOnCompleted(markOnCompleted) .onErrorResumeNext(handleFallback) .doOnEach(setRequestContext); } 

executeCommandWithSpecifiedIsolation(_cmd) .lift(new HystrixObservableTimeoutOperator<R>(_cmd));

可以簡單的認為lift 里面的對前面的Observable包含,類似裝飾者,后面的parent就是指上層的Observable。其中 HystrixObservableTimeoutOperator 就是關鍵的部分。

2.關鍵點: HystrixObservableTimeoutOperator

先看下HystrixObservableTimeoutOperator.call(),TimerListener的實現

TimerListener listener = new TimerListener() { @Override public void tick() { if (originalCommand.isCommandTimedOut.compareAndSet(TimedOutStatus.NOT_EXECUTED, TimedOutStatus.TIMED_OUT)) { // 標記事件,可以認為是開的hook,這里暫忽略 originalCommand.eventNotifier.markEvent(HystrixEventType.TIMEOUT, originalCommand.commandKey); //取消原Obserable的訂閱 s.unsubscribe(); final HystrixContextRunnable timeoutRunnable = new HystrixContextRunnable(originalCommand.concurrencyStrategy, hystrixRequestContext, new Runnable() { @Override public void run() { child.onError(new HystrixTimeoutException()); } }); timeoutRunnable.run(); } } //獲取配置的超時時間配置 @Override public int getIntervalTimeInMilliseconds() { return originalCommand.properties.executionTimeoutInMilliseconds().get(); } }; 

這段代碼的意思就是,給當前command的超時狀態置為超時,如果設置成功就拋出HystrixTimeoutException異常,緊接着被command的 doOnErron接收走 fallback邏輯

fallback private Observable<R> executeCommandAndObserve(final AbstractCommand<R> _cmd) { final HystrixRequestContext currentRequestContext = HystrixRequestContext.getContextForCurrentThread(); ................................. final Func1<Throwable, Observable<R>> handleFallback = new Func1<Throwable, Observable<R>>() { @Override public Observable<R> call(Throwable t) { circuitBreaker.markNonSuccess(); Exception e = getExceptionFromThrowable(t); executionResult = executionResult.setExecutionException(e); if (e instanceof RejectedExecutionException) { return handleThreadPoolRejectionViaFallback(e); } else if (t instanceof HystrixTimeoutException) { //此處catch到超時異常 return handleTimeoutViaFallback(); } else if (t instanceof HystrixBadRequestException) { return handleBadRequestByEmittingError(e); } else { /* * Treat HystrixBadRequestException from ExecutionHook like a plain HystrixBadRequestException. */ if (e instanceof HystrixBadRequestException) { eventNotifier.markEvent(HystrixEventType.BAD_REQUEST, commandKey); return Observable.error(e); } return handleFailureViaFallback(e); } } }; ................................. return execution.doOnNext(markEmits) .doOnCompleted(markOnCompleted) .onErrorResumeNext(handleFallback) .doOnEach(setRequestContext); } 

同時s.unsubscribe()通知正在執行的線程,終止任務。如何終止呢?

executeCommandWithSpecifiedIsolation.subscribeOn()

subscribeOne的參數就是HystrixContextScheduler, Rxjava里 scheduler具體干活的是 worker,我們先看下Hystrix自定義scheduler的結構示意圖

 


那么我們直奔主題,直接看 ThreadPoolWorker
//ThreadPoolWorker.schedule @Override public Subscription schedule(final Action0 action) { if (subscription.isUnsubscribed()) { return Subscriptions.unsubscribed(); } ScheduledAction sa = new ScheduledAction(action); subscription.add(sa); sa.addParent(subscription); ThreadPoolExecutor executor = (ThreadPoolExecutor) threadPool.getExecutor(); FutureTask<?> f = (FutureTask<?>) executor.submit(sa); sa.add(new FutureCompleterWithConfigurableInterrupt(f, shouldInterruptThread, executor)); return sa; } 

1.開始的時候判斷observable是否被訂閱
2.被訂閱后,將任務 submit到線程池
3.FutureCompleterWithConfigurableInterrupt scheduler在執行的時候,增加了observable的中斷探測

private static class FutureCompleterWithConfigurableInterrupt implements Subscription { private final FutureTask<?> f; private final Func0<Boolean> shouldInterruptThread; private final ThreadPoolExecutor executor; private FutureCompleterWithConfigurableInterrupt(FutureTask<?> f, Func0<Boolean> shouldInterruptThread, ThreadPoolExecutor executor) { this.f = f; this.shouldInterruptThread = shouldInterruptThread; this.executor = executor; } @Override public void unsubscribe() { executor.remove(f); if (shouldInterruptThread.call()) { f.cancel(true); } else { f.cancel(false); } } .....省略代碼....... } 

當observable 取消訂閱時,就會把當前任務移除,並中斷任務

到這里只是講說了超時后的處理,如何認定執行超時呢?

3.匠心之巧

這里有個很巧妙的設計,再探HystrixObservableTimeoutOperator

final Reference<TimerListener> tl = HystrixTimer.getInstance().addTimerListener(listener); #com.netflix.hystrix.util.HystrixTimer#addTimerListener public Reference<TimerListener> addTimerListener(final TimerListener listener) { startThreadIfNeeded(); // add the listener Runnable r = new Runnable() { @Override public void run() { try { listener.tick(); } catch (Exception e) { logger.error("Failed while ticking TimerListener", e); } } }; ScheduledFuture<?> f = executor.get().getThreadPool().scheduleAtFixedRate(r, listener.getIntervalTimeInMilliseconds(), listener.getIntervalTimeInMilliseconds(), TimeUnit.MILLISECONDS); return new TimerReference(listener, f); } 

利用了ScheduledThreadPoolExecutor,延遲執行,延遲時間就是我們設定的超時時間,我們再看下

#HystrixObservableTimeoutOperator Subscriber<R> parent = new Subscriber<R>() { @Override public void onCompleted() { if (isNotTimedOut()) { // stop timer and pass notification through tl.clear(); child.onCompleted(); } } @Override public void onError(Throwable e) { if (isNotTimedOut()) { // stop timer and pass notification through tl.clear(); child.onError(e); } } ..... ..... ..... ..... ..... ..... ..... ..... ..... private boolean isNotTimedOut() { // if already marked COMPLETED (by onNext) or succeeds in setting to COMPLETED return originalCommand.isCommandTimedOut.get() == TimedOutStatus.COMPLETED || originalCommand.isCommandTimedOut.compareAndSet(TimedOutStatus.NOT_EXECUTED, TimedOutStatus.COMPLETED); } }; 

這里parent就是指上層的obserable,這里可以抽象的認為是我們的HystrixCommand執行線程, 當command執行線程執行完成的時候或異常的時候,會執行 tl.clear(), 也就是Future.cancel()會中斷 TimerListener 的ScheduledFuture 線程,迫使超時機制失效。

// tl.clear() private static class TimerReference extends SoftReference<TimerListener> { private final ScheduledFuture<?> f; .... .... .... .... .... @Override public void clear() { super.clear(); // stop this ScheduledFuture from any further executions f.cancel(false); } } 

4.回歸文字

HystrixCommand里有個 TimedOutStatus 超時狀態

 


現在可以認為有兩個線程,一個是hystrixCommand任務執行線程,一個是等着給hystrixCommand判定超時的線程,現在兩個線程看誰能先把hystrixCommand的狀態置換,只要任何一個線程對hystrixCommand打上標就意味着超時判定結束。

 

系列文章推薦



作者:青芒v5
鏈接:https://www.jianshu.com/p/60074fe1bd86
來源:簡書
簡書著作權歸作者所有,任何形式的轉載都請聯系作者獲得授權並注明出處。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM