說明
原創不易,如若轉載 請標明來源!
歡迎關注本人微信公眾號:壹枝花算不算浪漫
更多內容也可查看本人博客:一枝花算不算浪漫
前言
前情回顧
上一講我們講了配置了feign.hystrix.enabled=true之后,默認的Targeter就會構建成HystrixTargter
, 然后通過對應的HystrixInvocationHandler
生成對應的動態代理。
本講目錄
這一講開始講解Hystrix相關代碼,當然還是基於上一個組件Feign的基礎上開始講解的,這里默認你已經對Feign有過大致了解。
目錄如下:
- 線程池初始化過程
- HystrixCommand通過線程池執行原理
由於這里面代碼比較多,所以我都是將一些主要核心代碼發出來,這里后面會匯總一個流程圖,可以參考流程圖 自己一點點調試。
這里建議在回調的地方都加上斷點,而且修改feign和hystrix超時時間,瀏覽器發送請求后,一步步debug代碼。
源碼分析
線程池初始化過程
上一講已經講過激活Hystrix后,構造的InvocationHandler為HystrixInvocationHandler
,所以當調用FeignClient服務實例的時候,會先執行HystrixInvocationHandler.invoke()
方法,這里我們先跟進這個方法:
final class HystrixInvocationHandler implements InvocationHandler {
@Override
public Object invoke(final Object proxy, final Method method, final Object[] args)
throws Throwable {
// 構建一個HystrixCommand
// HystrixCommand構造參數需要Setter對象
HystrixCommand<Object> hystrixCommand = new HystrixCommand<Object>(setterMethodMap.get(method)) {
@Override
protected Object run() throws Exception {
try {
// 執行SynchronousMethodHandler.invoke方法
return HystrixInvocationHandler.this.dispatch.get(method).invoke(args);
} catch (Exception e) {
throw e;
} catch (Throwable t) {
throw (Error) t;
}
}
}
// 省略部分代碼...
return hystrixCommand.execute();
}
}
這里主要是構造HystrixCommand
,我們先看看它的構造函數以及線程池池初始化的代碼:
public abstract class HystrixCommand<R> extends AbstractCommand<R> implements HystrixExecutable<R>, HystrixInvokableInfo<R>, HystrixObservable<R> {
protected HystrixCommand(HystrixCommandGroupKey group) {
super(group, null, null, null, null, null, null, null, null, null, null, null);
}
}
abstract class AbstractCommand<R> implements HystrixInvokableInfo<R>, HystrixObservable<R> {
protected AbstractCommand(HystrixCommandGroupKey group, HystrixCommandKey key, HystrixThreadPoolKey threadPoolKey, HystrixCircuitBreaker circuitBreaker, HystrixThreadPool threadPool,
HystrixCommandProperties.Setter commandPropertiesDefaults, HystrixThreadPoolProperties.Setter threadPoolPropertiesDefaults,
HystrixCommandMetrics metrics, TryableSemaphore fallbackSemaphore, TryableSemaphore executionSemaphore,
HystrixPropertiesStrategy propertiesStrategy, HystrixCommandExecutionHook executionHook) {
this.commandGroup = initGroupKey(group);
this.commandKey = initCommandKey(key, getClass());
this.properties = initCommandProperties(this.commandKey, propertiesStrategy, commandPropertiesDefaults);
this.threadPoolKey = initThreadPoolKey(threadPoolKey, this.commandGroup, this.properties.executionIsolationThreadPoolKeyOverride().get());
this.metrics = initMetrics(metrics, this.commandGroup, this.threadPoolKey, this.commandKey, this.properties);
this.circuitBreaker = initCircuitBreaker(this.properties.circuitBreakerEnabled().get(), circuitBreaker, this.commandGroup, this.commandKey, this.properties, this.metrics);
// 初始化線程池
this.threadPool = initThreadPool(threadPool, this.threadPoolKey, threadPoolPropertiesDefaults);
// 省略部分代碼...
}
private static HystrixThreadPool initThreadPool(HystrixThreadPool fromConstructor, HystrixThreadPoolKey threadPoolKey, HystrixThreadPoolProperties.Setter threadPoolPropertiesDefaults) {
if (fromConstructor == null) {
// get the default implementation of HystrixThreadPool
return HystrixThreadPool.Factory.getInstance(threadPoolKey, threadPoolPropertiesDefaults);
} else {
return fromConstructor;
}
}
}
public interface HystrixThreadPool {
final static ConcurrentHashMap<String, HystrixThreadPool> threadPools = new ConcurrentHashMap<String, HystrixThreadPool>();
static HystrixThreadPool getInstance(HystrixThreadPoolKey threadPoolKey, HystrixThreadPoolProperties.Setter propertiesBuilder) {
// 這個線程池的key就是我們feignClient定義的value名稱,其他服務的projectName
// 在我們的demo中:key = serviceA
String key = threadPoolKey.name();
// threadPools是一個map,key就是serviceA
HystrixThreadPool previouslyCached = threadPools.get(key);
if (previouslyCached != null) {
return previouslyCached;
}
// 初始化線程池
synchronized (HystrixThreadPool.class) {
if (!threadPools.containsKey(key)) {
threadPools.put(key, new HystrixThreadPoolDefault(threadPoolKey, propertiesBuilder));
}
}
return threadPools.get(key);
}
}
public abstract class HystrixThreadPoolProperties {
/* defaults */
static int default_coreSize = 10;
static int default_maximumSize = 10;
static int default_keepAliveTimeMinutes = 1;
static int default_maxQueueSize = -1;
static boolean default_allow_maximum_size_to_diverge_from_core_size = false;
static int default_queueSizeRejectionThreshold = 5;
static int default_threadPoolRollingNumberStatisticalWindow = 10000;
static int default_threadPoolRollingNumberStatisticalWindowBuckets = 10;
// 省略部分代碼...
}
這里主要是初始化線程池的邏輯,從HystrixCommand
一直到HystrixThreadPoolProperties
。這里的threadPools
是一個Map,一個serviceName會對應一個線程池。
線程池的默認配置都在HystrixThreadPoolProperties
中。線程池的核心線程和最大線程數都是10,隊列的大小為-1,這里意思是不使用隊列。
HystrixCommand
構造函數需要接收一個Setter
對象,Setter中包含兩個很重要的屬性,groupKey
和commandKey
, 這里看下Setter是如何構造的:
final class HystrixInvocationHandler implements InvocationHandler {
HystrixInvocationHandler(Target<?> target, Map<Method, MethodHandler> dispatch,
SetterFactory setterFactory, FallbackFactory<?> fallbackFactory) {
this.target = checkNotNull(target, "target");
this.dispatch = checkNotNull(dispatch, "dispatch");
this.fallbackFactory = fallbackFactory;
this.fallbackMethodMap = toFallbackMethod(dispatch);
this.setterMethodMap = toSetters(setterFactory, target, dispatch.keySet());
}
static Map<Method, Setter> toSetters(SetterFactory setterFactory, Target<?> target,
Set<Method> methods) {
Map<Method, Setter> result = new LinkedHashMap<Method, Setter>();
for (Method method : methods) {
method.setAccessible(true);
result.put(method, setterFactory.create(target, method));
}
return result;
}
}
public interface SetterFactory {
HystrixCommand.Setter create(Target<?> target, Method method);
final class Default implements SetterFactory {
@Override
public HystrixCommand.Setter create(Target<?> target, Method method) {
// groupKey既是調用的服務服務名稱:serviceA
String groupKey = target.name();
// commandKey即是方法的名稱+入參定義等,一個commandKey能夠確定這個類中唯一的一個方法
String commandKey = Feign.configKey(target.type(), method);
return HystrixCommand.Setter
.withGroupKey(HystrixCommandGroupKey.Factory.asKey(groupKey))
.andCommandKey(HystrixCommandKey.Factory.asKey(commandKey));
}
}
}
}
構建一個HystrixCommand時必須要傳入這兩個參數。
groupKey
: 就是調用的服務名稱,例如我們demo中的ServiceA,groupKey
對應着一個線程池。commandKey
: 一個FeignClient接口中的一個方法就是一個commandKey
, 其組成為方法名和入參等信息。
groupkey
和commandKey
是一對多的關系,例如ServiceA中的2個方法,那么groupKey就對應着這個ServiceA中的2個commandKey。
groupKey -> target.name() -> ServiceA -> @FeignClient注解里設置的服務名稱
commanKey -> ServiceAFeignClient#sayHello(String)
這里回調函數執行HystrixInvocationHandler.this.dispatch.get(method).invoke(args)
其實就是執行SynchronousMethodHandler.invoke()
方法了。但是什么時候才會回調回來呢?后面接着看吧。
HystrixCommand通過線程池執行原理
上面已經看了線程池的初始化過程,當一個服務第一次被調用的時候,會判斷threadPools
(數據結構為ConcurrentHashMap) 中是否存在這個serviceName對應的線程池,如果沒有的話則會初始化一個對應的線程池。線程池默認配置屬性在HystrixThreadPoolProperties
中可以看到。
Hystrix線程池默認是不使用隊列進行線程排隊的,核心線程數為10。接下來我們看看創建HystrixCommand
后,線程池是如何將HystrixCommand
命令提交的:
public abstract class HystrixCommand<R> extends AbstractCommand<R> implements HystrixExecutable<R>, HystrixInvokableInfo<R>, HystrixObservable<R> {
public R execute() {
try {
return queue().get();
} catch (Exception e) {
throw Exceptions.sneakyThrow(decomposeException(e));
}
}
public Future<R> queue() {
final Future<R> delegate = toObservable().toBlocking().toFuture();
final Future<R> f = new Future<R>() {
@Override
public boolean cancel(boolean mayInterruptIfRunning) {
if (delegate.isCancelled()) {
return false;
}
if (HystrixCommand.this.getProperties().executionIsolationThreadInterruptOnFutureCancel().get()) {
interruptOnFutureCancel.compareAndSet(false, mayInterruptIfRunning);
}
final boolean res = delegate.cancel(interruptOnFutureCancel.get());
if (!isExecutionComplete() && interruptOnFutureCancel.get()) {
final Thread t = executionThread.get();
if (t != null && !t.equals(Thread.currentThread())) {
t.interrupt();
}
}
return res;
}
@Override
public boolean isCancelled() {
return delegate.isCancelled();
}
@Override
public boolean isDone() {
return delegate.isDone();
}
@Override
public R get() throws InterruptedException, ExecutionException {
return delegate.get();
}
@Override
public R get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
return delegate.get(timeout, unit);
}
};
if (f.isDone()) {
try {
f.get();
return f;
} catch (Exception e) {
Throwable t = decomposeException(e);
if (t instanceof HystrixBadRequestException) {
return f;
} else if (t instanceof HystrixRuntimeException) {
HystrixRuntimeException hre = (HystrixRuntimeException) t;
switch (hre.getFailureType()) {
case COMMAND_EXCEPTION:
case TIMEOUT:
// we don't throw these types from queue() only from queue().get() as they are execution errors
return f;
default:
// these are errors we throw from queue() as they as rejection type errors
throw hre;
}
} else {
throw Exceptions.sneakyThrow(t);
}
}
}
return f;
}
}
這里又是一堆的回調函數,我們可以在每個回調函數中打上斷點,然后一點點調試。
這里主要是通過toObservable()
方法構造了一個Future<R>
, 然后包裝此Future,添加了中斷等邏輯,后面使用f.get()
阻塞獲取線程執行結果,最后返回Future對象。
這里我們的重點在於尋找哪里將HystrixCommand丟入線程池,然后返回一個Future的。
接着往后跟進代碼:
abstract class AbstractCommand<R> implements HystrixInvokableInfo<R>, HystrixObservable<R> {
public Observable<R> toObservable() {
// _cmd就是HystrixInvocationHandler對象
// 里面包含要請求的method信息,threadPool信息,groupKey,commandKey等信息
final AbstractCommand<R> _cmd = this;
final Func0<Observable<R>> applyHystrixSemantics = new Func0<Observable<R>>() {
@Override
public Observable<R> call() {
if (commandState.get().equals(CommandState.UNSUBSCRIBED)) {
return Observable.never();
}
return applyHystrixSemantics(_cmd);
}
};
// 省略部分回調函數代碼...
return Observable.defer(new Func0<Observable<R>>() {
@Override
public Observable<R> call() {
// 是否使用請求緩存,默認為false
final boolean requestCacheEnabled = isRequestCachingEnabled();
// 請求緩存相關
final String cacheKey = getCacheKey();
// 省略部分代碼...
Observable<R> hystrixObservable =
Observable.defer(applyHystrixSemantics)
.map(wrapWithAllOnNextHooks);
Observable<R> afterCache;
// put in cache
if (requestCacheEnabled && cacheKey != null) {
// 省略部分代碼...
} else {
afterCache = hystrixObservable;
}
return afterCache
.doOnTerminate(terminateCommandCleanup)
.doOnUnsubscribe(unsubscribeCommandCleanup)
.doOnCompleted(fireOnCompletedHook);
}
});
}
}
toObservable()
是比較核心的代碼,這里也是定義了很多回調函數,上面代碼做了精簡,留下一些核心邏輯,在defer()
中構造返回了一個Observable
對象,這個Observable
是包含上面的一些回調函數的。
通過debug代碼,這里會直接執行到applyHystrixSemantics
這個構造函數Func0中的call()
方法中,通過語意 我們可以大致猜到這個函數的意思:應用Hystrix語義
接着往下跟進代碼:
abstract class AbstractCommand<R> implements HystrixInvokableInfo<R>, HystrixObservable<R> {
private Observable<R> applyHystrixSemantics(final AbstractCommand<R> _cmd) {
executionHook.onStart(_cmd);
// 判斷是否短路
if (circuitBreaker.attemptExecution()) {
final TryableSemaphore executionSemaphore = getExecutionSemaphore();
final AtomicBoolean semaphoreHasBeenReleased = new AtomicBoolean(false);
// 如果不使用Semaphore配置,那么tryAcquire使用的是TryableSemaphoreNoOp中的方法,返回true
if (executionSemaphore.tryAcquire()) {
try {
/* used to track userThreadExecutionTime */
executionResult = executionResult.setInvocationStartTime(System.currentTimeMillis());
return executeCommandAndObserve(_cmd)
.doOnError(markExceptionThrown)
.doOnTerminate(singleSemaphoreRelease)
.doOnUnsubscribe(singleSemaphoreRelease);
} catch (RuntimeException e) {
return Observable.error(e);
}
} else {
return handleSemaphoreRejectionViaFallback();
}
} else {
return handleShortCircuitViaFallback();
}
}
}
這里面我們默認使用的線程池的隔離配置,所以executionSemaphore.tryAcquire()
都會返回true,這里有個重要的方法:executeCommandAndObserve(_cmd)
, 我們繼續往后跟進這個方法:
abstract class AbstractCommand<R> implements HystrixInvokableInfo<R>, HystrixObservable<R> {
private Observable<R> executeCommandAndObserve(final AbstractCommand<R> _cmd) {
final HystrixRequestContext currentRequestContext = HystrixRequestContext.getContextForCurrentThread();
// 省略部分回調函數...
Observable<R> execution;
// 默認配置timeOutEnabled為true
if (properties.executionTimeoutEnabled().get()) {
// 執行指定的隔離執行命令
execution = executeCommandWithSpecifiedIsolation(_cmd)
.lift(new HystrixObservableTimeoutOperator<R>(_cmd));
} else {
execution = executeCommandWithSpecifiedIsolation(_cmd);
}
return execution.doOnNext(markEmits)
.doOnCompleted(markOnCompleted)
.onErrorResumeNext(handleFallback)
.doOnEach(setRequestContext);
}
}
對於Hystrix來說,默認是開啟超時機制的,這里會執行executeCommandWithSpecifiedIsolation()
, 返回一個執行的Observable
.還是通過方法名我們可以猜測這個方法是:使用指定的隔離執行命令
繼續往里面跟進:
abstract class AbstractCommand<R> implements HystrixInvokableInfo<R>, HystrixObservable<R> {
private Observable<R> executeCommandWithSpecifiedIsolation(final AbstractCommand<R> _cmd) {
if (properties.executionIsolationStrategy().get() == ExecutionIsolationStrategy.THREAD) {
// mark that we are executing in a thread (even if we end up being rejected we still were a THREAD execution and not SEMAPHORE)
return Observable.defer(new Func0<Observable<R>>() {
@Override
public Observable<R> call() {
executionResult = executionResult.setExecutionOccurred();
if (!commandState.compareAndSet(CommandState.OBSERVABLE_CHAIN_CREATED, CommandState.USER_CODE_EXECUTED)) {
return Observable.error(new IllegalStateException("execution attempted while in state : " + commandState.get().name()));
}
metrics.markCommandStart(commandKey, threadPoolKey, ExecutionIsolationStrategy.THREAD);
if (isCommandTimedOut.get() == TimedOutStatus.TIMED_OUT) {
return Observable.error(new RuntimeException("timed out before executing run()"));
}
if (threadState.compareAndSet(ThreadState.NOT_USING_THREAD, ThreadState.STARTED)) {
//we have not been unsubscribed, so should proceed
HystrixCounters.incrementGlobalConcurrentThreads();
threadPool.markThreadExecution();
// store the command that is being run
endCurrentThreadExecutingCommand = Hystrix.startCurrentThreadExecutingCommand(getCommandKey());
executionResult = executionResult.setExecutedInThread();
try {
executionHook.onThreadStart(_cmd);
executionHook.onRunStart(_cmd);
executionHook.onExecutionStart(_cmd);
return getUserExecutionObservable(_cmd);
} catch (Throwable ex) {
return Observable.error(ex);
}
} else {
//command has already been unsubscribed, so return immediately
return Observable.error(new RuntimeException("unsubscribed before executing run()"));
}
}
}).subscribeOn(threadPool.getScheduler(new Func0<Boolean>() {
@Override
public Boolean call() {
return properties.executionIsolationThreadInterruptOnTimeout().get() && _cmd.isCommandTimedOut.get() == TimedOutStatus.TIMED_OUT;
}
}));
}
}
}
這里就是我們千辛萬苦需要找的核心方法了,里面仍然是一個回調函數,通過斷點調試,這里會先執行:subscribeOn
回調函數,執行threadPool.getScheduler
方法,我們進一步往后跟進:
public interface HystrixThreadPool {
@Override
public Scheduler getScheduler(Func0<Boolean> shouldInterruptThread) {
touchConfig();
return new HystrixContextScheduler(HystrixPlugins.getInstance().getConcurrencyStrategy(), this, shouldInterruptThread);
}
private void touchConfig() {
final int dynamicCoreSize = properties.coreSize().get();
final int configuredMaximumSize = properties.maximumSize().get();
int dynamicMaximumSize = properties.actualMaximumSize();
final boolean allowSizesToDiverge = properties.getAllowMaximumSizeToDivergeFromCoreSize().get();
boolean maxTooLow = false;
// 動態調整最大線程池的數量
if (allowSizesToDiverge && configuredMaximumSize < dynamicCoreSize) {
//if user sets maximum < core (or defaults get us there), we need to maintain invariant of core <= maximum
dynamicMaximumSize = dynamicCoreSize;
maxTooLow = true;
}
// In JDK 6, setCorePoolSize and setMaximumPoolSize will execute a lock operation. Avoid them if the pool size is not changed.
if (threadPool.getCorePoolSize() != dynamicCoreSize || (allowSizesToDiverge && threadPool.getMaximumPoolSize() != dynamicMaximumSize)) {
if (maxTooLow) {
logger.error("Hystrix ThreadPool configuration for : " + metrics.getThreadPoolKey().name() + " is trying to set coreSize = " +
dynamicCoreSize + " and maximumSize = " + configuredMaximumSize + ". Maximum size will be set to " +
dynamicMaximumSize + ", the coreSize value, since it must be equal to or greater than the coreSize value");
}
threadPool.setCorePoolSize(dynamicCoreSize);
threadPool.setMaximumPoolSize(dynamicMaximumSize);
}
threadPool.setKeepAliveTime(properties.keepAliveTimeMinutes().get(), TimeUnit.MINUTES);
}
}
public class HystrixContextScheduler extends Scheduler {
public HystrixContextScheduler(HystrixConcurrencyStrategy concurrencyStrategy, HystrixThreadPool threadPool, Func0<Boolean> shouldInterruptThread) {
this.concurrencyStrategy = concurrencyStrategy;
this.threadPool = threadPool;
this.actualScheduler = new ThreadPoolScheduler(threadPool, shouldInterruptThread);
}
@Override
public Worker createWorker() {
// 構建一個默認的Worker
return new HystrixContextSchedulerWorker(actualScheduler.createWorker());
}
private static class ThreadPoolScheduler extends Scheduler {
private final HystrixThreadPool threadPool;
private final Func0<Boolean> shouldInterruptThread;
public ThreadPoolScheduler(HystrixThreadPool threadPool, Func0<Boolean> shouldInterruptThread) {
this.threadPool = threadPool;
this.shouldInterruptThread = shouldInterruptThread;
}
@Override
public Worker createWorker() {
// 默認的worker為:ThreadPoolWorker
return new ThreadPoolWorker(threadPool, shouldInterruptThread);
}
}
private class HystrixContextSchedulerWorker extends Worker {
// 執行schedule方法
@Override
public Subscription schedule(Action0 action) {
if (threadPool != null) {
if (!threadPool.isQueueSpaceAvailable()) {
throw new RejectedExecutionException("Rejected command because thread-pool queueSize is at rejection threshold.");
}
}
// 默認的worker為:ThreadPoolWorker
return worker.schedule(new HystrixContexSchedulerAction(concurrencyStrategy, action));
}
}
// 執行command的核心類
private static class ThreadPoolWorker extends Worker {
private final HystrixThreadPool threadPool;
private final CompositeSubscription subscription = new CompositeSubscription();
private final Func0<Boolean> shouldInterruptThread;
public ThreadPoolWorker(HystrixThreadPool threadPool, Func0<Boolean> shouldInterruptThread) {
this.threadPool = threadPool;
this.shouldInterruptThread = shouldInterruptThread;
}
@Override
public void unsubscribe() {
subscription.unsubscribe();
}
@Override
public boolean isUnsubscribed() {
return subscription.isUnsubscribed();
}
@Override
public Subscription schedule(final Action0 action) {
if (subscription.isUnsubscribed()) {
// don't schedule, we are unsubscribed
return Subscriptions.unsubscribed();
}
// This is internal RxJava API but it is too useful.
ScheduledAction sa = new ScheduledAction(action);
subscription.add(sa);
sa.addParent(subscription);
ThreadPoolExecutor executor = (ThreadPoolExecutor) threadPool.getExecutor();
FutureTask<?> f = (FutureTask<?>) executor.submit(sa);
sa.add(new FutureCompleterWithConfigurableInterrupt(f, shouldInterruptThread, executor));
return sa;
}
@Override
public Subscription schedule(Action0 action, long delayTime, TimeUnit unit) {
throw new IllegalStateException("Hystrix does not support delayed scheduling");
}
}
}
touchConfig()
方法主要是重新設置最大線程池actualMaximumSize的,這里默認的allowMaximumSizeToDivergeFromCoreSize是false。
在HystrixContextScheduler
類中有HystrixContextSchedulerWorker
、ThreadPoolScheduler
、ThreadPoolWorker
這幾個內部類。看看它們的作用:
-
HystrixContextSchedulerWorker
: 對外提供schedule()
方法,這里會判斷線程池隊列是否已經滿,如果滿了這會拋出異常:Rejected command because thread-pool queueSize is at rejection threshold。 如果配置的隊列大小為-1 則默認返回true。 -
ThreadPoolScheduler
:執行createWorker()
方法,默認使用ThreadPoolWorker()
類 -
ThreadPoolWorker
:執行command的核心邏輯
private static class ThreadPoolWorker extends Worker {
private final HystrixThreadPool threadPool;
private final CompositeSubscription subscription = new CompositeSubscription();
private final Func0<Boolean> shouldInterruptThread;
@Override
public Subscription schedule(final Action0 action) {
if (subscription.isUnsubscribed()) {
return Subscriptions.unsubscribed();
}
ScheduledAction sa = new ScheduledAction(action);
subscription.add(sa);
sa.addParent(subscription);
// 獲取線程池
ThreadPoolExecutor executor = (ThreadPoolExecutor) threadPool.getExecutor();
// 將包裝后的HystrixCommand submit到線程池,然后返回FutureTask
FutureTask<?> f = (FutureTask<?>) executor.submit(sa);
sa.add(new FutureCompleterWithConfigurableInterrupt(f, shouldInterruptThread, executor));
return sa;
}
}
原來一個command就是在這里被提交到線程池的,再次回到AbstractCommand.executeCommandWithSpecifiedIsolation()
方法中,這里會回調到這個回調函數的call()
方法中,這里一路執行邏輯如下:
getUserExecutionObservable(_cmd)
>getExecutionObservable()
>hystrixCommand.run()
==>SynchronousMethodHandler.invoke()
這里最后執行到HystrixInvocationHandler
中的invoke()
方法中的回調函數run()
中,最后執行SynchronousMethodHandler.invoke()
方法。
一個正常的feign請求,經過hystrix走一遍也就返回對應的response。
總結
上面一頓分析,不知道大家有沒有對hystrix 線程池及command執行是否有些理解了?
這個是一個正向流程,沒有涉及超時、熔斷、降級等代碼。關於這些異常降級的源碼會在后面一篇文章涉及。
還是之前的建議,大家可以在每個相關的回調函數打上斷點,然后一點點調試。
最后再總結一下簡單的流程:
- 瀏覽器發送請求,執行HystrixTargter
- 創建HystrixCommand,根據serviceName構造線程池
- AbstractCommand中一堆回調函數,最后將command交由線程池submit處理
畫一張流程圖加深理解:
高清大圖:https://www.processon.com/view/link/5e1c128ce4b0169fb51ce77e
申明
本文章首發自本人博客:https://www.cnblogs.com/wang-meng 和公眾號:壹枝花算不算浪漫,如若轉載請標明來源!
感興趣的小伙伴可關注個人公眾號:壹枝花算不算浪漫