Hystrix源碼分析(一)
@HystrixCommand入口源碼
在方法上加上@HystrixCommand
就能讓Hystrix
起作用,我的想法就是應該是用了aop的技術去監聽@HystrixCommand的注解吧。經過一番尋找aop的代碼在HystrixCommandAspect
這里找到了實現的,代碼如下:
@Aspect
public class HystrixCommandAspect {
@Pointcut("@annotation(com.netflix.hystrix.contrib.javanica.annotation.HystrixCommand)")
public void hystrixCommandAnnotationPointcut() {
}
@Pointcut("@annotation(com.netflix.hystrix.contrib.javanica.annotation.HystrixCollapser)")
public void hystrixCollapserAnnotationPointcut() {
}
//aop監控的方法
@Around("hystrixCommandAnnotationPointcut() || hystrixCollapserAnnotationPointcut()")
public Object methodsAnnotatedWithHystrixCommand(ProceedingJoinPoint joinPoint) throws Throwable {
Method method = AopUtils.getMethodFromTarget(joinPoint);
Validate.notNull(method, "failed to get method from joinPoint: %s", new Object[]{joinPoint});
if (method.isAnnotationPresent(HystrixCommand.class) && method.isAnnotationPresent(HystrixCollapser.class)) {
throw new IllegalStateException("method cannot be annotated with HystrixCommand and HystrixCollapser annotations at the same time");
} else {
HystrixCommandAspect.MetaHolderFactory metaHolderFactory = (HystrixCommandAspect.MetaHolderFactory)META_HOLDER_FACTORY_MAP
.get(HystrixCommandAspect.HystrixPointcutType.of(method));
MetaHolder metaHolder = metaHolderFactory.create(joinPoint);
//構建hystrixCommand的實現類
HystrixInvokable invokable = HystrixCommandFactory.getInstance().create(metaHolder);
ExecutionType executionType = metaHolder.isCollapserAnnotationPresent()
?metaHolder.getCollapserExecutionType()
: metaHolder.getExecutionType();
try {
Object result;
if (!metaHolder.isObservable()) {
result = CommandExecutor.execute(invokable, executionType, metaHolder);
} else {
result = this.executeObservable(invokable, executionType, metaHolder);
}
return result;
} catch (...) {
...
}
}
}
}
HystrixCommandAspect監聽了@HystrixCommand
和@HystrixCollapser
的注解,查看methodsAnnotatedWithHystrixCommand 主要的代碼其實就是HystrixInvokable invokable = HystrixCommandFactory.getInstance().create(metaHolder)
這句話,這個代碼主要目的是構造一個HystrixCommand,首先這里看一下HystrixInvokable和HystrixCommand的關系圖:
可以發現HystrixCommand
就是HystrixInvokable
的子類,根據我之前寫的那個Hystrix入門demo中有一種方式就是通過實現HystrixCommand
去進行的。所以這里應該是通過你@HystrixCommand
設置的參數給你構造出一個HystrixCommand來,然后執行
CommandExecutor.execute(invokable, executionType, metaHolder)
Hystrix入門demo的地址:https://www.cnblogs.com/dabenxiang/p/13676116.html
構建GenericCommand的過程
這里我們要查看一下構建一個HystrixCommand
的時候主要做了什么事情:
展示 HystrixCommandFactory.getInstance().create(metaHolder)中的Create代碼:
public class HystrixCommandFactory {
private static final HystrixCommandFactory INSTANCE = new HystrixCommandFactory();
private HystrixCommandFactory() {
}
public static HystrixCommandFactory getInstance() {
return INSTANCE;
}
public HystrixInvokable create(MetaHolder metaHolder) {
Object executable;
//判斷是不是HystrixCollapser注解
if (metaHolder.isCollapserAnnotationPresent()) {
executable = new CommandCollapser(metaHolder);
} else if (metaHolder.isObservable()) {
executable = new GenericObservableCommand(HystrixCommandBuilderFactory.getInstance().create(metaHolder));
} else {
//會執行這個。
executable = new GenericCommand(HystrixCommandBuilderFactory.getInstance().create(metaHolder));
}
return (HystrixInvokable)executable;
}
}
因為我們的注解不是HystrixCollapser且我們的方法不是isObservable,所以我們會構建一個GenericCommand,GenericCommand這個類其實是HystrixCommand的一個子類,這個類的關系圖在上面也是有的,所以構造過程是 GenericCommand -> AbstractHystrixCommand -> HystrixCommand -> AbstractCommand, 構建GenericCommand的過程,我們主要還是看AbstractCommand
的構造方法把,代碼如下:
abstract class AbstractCommand<R> implements HystrixInvokableInfo<R>, HystrixObservable<R> {
//構造方法
protected AbstractCommand(HystrixCommandGroupKey group, HystrixCommandKey key,
HystrixThreadPoolKey threadPoolKey, HystrixCircuitBreaker circuitBreaker,
HystrixThreadPool threadPool,
HystrixCommandProperties.Setter commandPropertiesDefaults,
HystrixThreadPoolProperties.Setter threadPoolPropertiesDefaults,
HystrixCommandMetrics metrics,
TryableSemaphore fallbackSemaphore,
TryableSemaphore executionSemaphore,
HystrixPropertiesStrategy propertiesStrategy,
HystrixCommandExecutionHook executionHook) {
this.commandGroup = initGroupKey(group);
this.commandKey = initCommandKey(key, getClass());
this.properties = initCommandProperties(this.commandKey, propertiesStrategy, commandPropertiesDefaults);
//初始化線程池
this.threadPoolKey = initThreadPoolKey(threadPoolKey,
this.commandGroup,
this.properties.executionIsolationThreadPoolKeyOverride().get());
this.metrics = initMetrics(metrics, this.commandGroup, this.threadPoolKey, this.commandKey, this.properties);
//初始化熔斷器
this.circuitBreaker = initCircuitBreaker(this.properties.circuitBreakerEnabled().get(),
circuitBreaker, this.commandGroup,
this.commandKey, this.properties, this.metrics);
this.threadPool = initThreadPool(threadPool, this.threadPoolKey, threadPoolPropertiesDefaults);
//Strategies from plugins
this.eventNotifier = HystrixPlugins.getInstance().getEventNotifier();
this.concurrencyStrategy = HystrixPlugins.getInstance().getConcurrencyStrategy();
HystrixMetricsPublisherFactory.createOrRetrievePublisherForCommand(this.commandKey, this.commandGroup,
this.metrics, this.circuitBreaker,
this.properties);
this.executionHook = initExecutionHook(executionHook);
this.requestCache = HystrixRequestCache.getInstance(this.commandKey, this.concurrencyStrategy);
this.currentRequestLog = initRequestLog(this.properties.requestLogEnabled().get(), this.concurrencyStrategy);
/* fallback semaphore override if applicable */
this.fallbackSemaphoreOverride = fallbackSemaphore;
/* execution semaphore override if applicable */
this.executionSemaphoreOverride = executionSemaphore;
}
}
這里關注的點是: initThreadPool()是初始化線程池和initCircuitBreaker () 初始化熔斷器的配置。這里插個眼,先分析線程池初始化,再分析熔斷器的初始化。
initThreadPool線程池的初始化
initThreadPool的相關代碼如下:
private static HystrixThreadPoolKey initThreadPoolKey(HystrixThreadPoolKey threadPoolKey,
HystrixCommandGroupKey groupKey, String
threadPoolKeyOverride) {
if (threadPoolKeyOverride == null) {
// we don't have a property overriding the value so use either HystrixThreadPoolKey or HystrixCommandGroup
//判斷存不存在threadPoolKey不存在則使用groupKey.name()
if (threadPoolKey == null) {
/* use HystrixCommandGroup if HystrixThreadPoolKey is null */
return HystrixThreadPoolKey.Factory.asKey(groupKey.name());
} else {
return threadPoolKey;
}
} else {
// we have a property defining the thread-pool so use it instead
return HystrixThreadPoolKey.Factory.asKey(threadPoolKeyOverride);
}
}
private static HystrixThreadPool initThreadPool(HystrixThreadPool fromConstructor,
HystrixThreadPoolKey threadPoolKey,
HystrixThreadPoolProperties.Setter threadPoolPropertiesDefaults) {
if (fromConstructor == null) {
// get the default implementation of HystrixThreadPool
return HystrixThreadPool.Factory.getInstance(threadPoolKey, threadPoolPropertiesDefaults);
} else {
return fromConstructor;
}
}
/* package */static class Factory {
//線城池是利用ConcurrentHashMap來做保存的
final static ConcurrentHashMap<String, HystrixThreadPool> threadPools = new ConcurrentHashMap<String, HystrixThreadPool>();
//通過threadPoolKey獲取HystrixThreadPool,如果工廠有就直接返回,沒有就創建
static HystrixThreadPool getInstance(HystrixThreadPoolKey threadPoolKey, HystrixThreadPoolProperties.Setter propertiesBuilder) {
String key = threadPoolKey.name();
HystrixThreadPool previouslyCached = threadPools.get(key);
if (previouslyCached != null) {
return previouslyCached;
}
synchronized (HystrixThreadPool.class) {
if (!threadPools.containsKey(key)) {
threadPools.put(key, new HystrixThreadPoolDefault(threadPoolKey, propertiesBuilder));
}
}
return threadPools.get(key);
}
}
上面的代碼的功能,有這么三個作用
- initThreadPoolKey() 方法就是構建一個
HystrixThreadPoolKey
,代碼意思: 當threadPoolKey存在時使用threadPoolKey,當threadPoolKey不存在時使用groupKey來構建。把構建好的threadPoolKey入參到initThreadPool中。 - initThreadPool就是:HystrixThreadPool.Factory 通過threadPoolKey 來得到HystrixThreadPool
HystrixThreadPool.Factory.getInstance
作用:通過threadPoolKey獲取HystrixThreadPool,如果工廠有就直接返回,沒有就創建HystrixThreadPoolDefault,這里的 threadPoolKey.name() 等於 threadPoolKey == null ? groupKey : threadPoolKey
構建HystrixThreadPoolDefault過程如下:
public HystrixThreadPoolDefault(HystrixThreadPoolKey threadPoolKey, HystrixThreadPoolProperties.Setter propertiesDefaults) {
this.properties = HystrixPropertiesFactory.getThreadPoolProperties(threadPoolKey, propertiesDefaults);
HystrixConcurrencyStrategy concurrencyStrategy = HystrixPlugins.getInstance().getConcurrencyStrategy();
this.queueSize = properties.maxQueueSize().get();
this.metrics = HystrixThreadPoolMetrics.getInstance(threadPoolKey,
concurrencyStrategy.getThreadPool(threadPoolKey, properties),
properties);
this.threadPool = this.metrics.getThreadPool();
this.queue = this.threadPool.getQueue();
/* strategy: HystrixMetricsPublisherThreadPool */
HystrixMetricsPublisherFactory.createOrRetrievePublisherForThreadPool(threadPoolKey, this.metrics, this.properties);
}
public ThreadPoolExecutor getThreadPool(final HystrixThreadPoolKey threadPoolKey, HystrixThreadPoolProperties threadPoolProperties) {
final ThreadFactory threadFactory = getThreadFactory(threadPoolKey);
final boolean allowMaximumSizeToDivergeFromCoreSize = threadPoolProperties.getAllowMaximumSizeToDivergeFromCoreSize().get();
final int dynamicCoreSize = threadPoolProperties.coreSize().get();
final int keepAliveTime = threadPoolProperties.keepAliveTimeMinutes().get();
final int maxQueueSize = threadPoolProperties.maxQueueSize().get();
final BlockingQueue<Runnable> workQueue = getBlockingQueue(maxQueueSize);
//判斷允不允許設置最大的線程數
if (allowMaximumSizeToDivergeFromCoreSize) {
final int dynamicMaximumSize = threadPoolProperties.maximumSize().get();
if (dynamicCoreSize > dynamicMaximumSize) {
return new ThreadPoolExecutor(dynamicCoreSize, dynamicCoreSize, keepAliveTime, TimeUnit.MINUTES, workQueue, threadFactory);
} else {
return new ThreadPoolExecutor(dynamicCoreSize, dynamicMaximumSize, keepAliveTime, TimeUnit.MINUTES, workQueue, threadFactory);
}
} else {
//最大線程數就等於核心數
return new ThreadPoolExecutor(dynamicCoreSize, dynamicCoreSize, keepAliveTime, TimeUnit.MINUTES, workQueue, threadFactory);
}
}
可以看到getThreadPool()這里其實就可以看到 new ThreadPoolExecutor 構建了一個線程池
總結一下:,通過threadPoolKey和groupKey的邏輯作為key去工廠中取相應的線程池,沒有則創建,所以就說如果兩個HystrixCommand的threadPoolKey相同時會用同一個線程池,如果不存在threadPoolKey情況下,如果groupKey是相同的話也會用同一個線程池。
initCircuitBreaker 初始化熔斷器:
private static HystrixCircuitBreaker initCircuitBreaker(boolean enabled, HystrixCircuitBreaker fromConstructor,
HystrixCommandGroupKey groupKey, HystrixCommandKey commandKey,
HystrixCommandProperties properties, HystrixCommandMetrics metrics) {
if (enabled) {
if (fromConstructor == null) {
// get the default implementation of HystrixCircuitBreaker
return HystrixCircuitBreaker.Factory.getInstance(commandKey, groupKey, properties, metrics);
} else {
return fromConstructor;
}
} else {
return new NoOpCircuitBreaker();
}
}
public static class Factory {
private static ConcurrentHashMap<String, HystrixCircuitBreaker> circuitBreakersByCommand =
new ConcurrentHashMap<String, HystrixCircuitBreaker>();
public static HystrixCircuitBreaker getInstance(HystrixCommandKey key,
HystrixCommandGroupKey group, HystrixCommandProperties
properties,
HystrixCommandMetrics metrics) {
HystrixCircuitBreaker previouslyCached = circuitBreakersByCommand.get(key.name());
if (previouslyCached != null) {
return previouslyCached;
}
HystrixCircuitBreaker cbForCommand = circuitBreakersByCommand.putIfAbsent(key.name(),
new HystrixCircuitBreakerImpl(key, group, properties, metrics));
if (cbForCommand == null) {
// this means the putIfAbsent step just created a new one so let's retrieve and return it
return circuitBreakersByCommand.get(key.name());
} else {
return cbForCommand;
}
}
}
/* package */static class HystrixCircuitBreakerImpl implements HystrixCircuitBreaker {
private final HystrixCommandProperties properties;
private final HystrixCommandMetrics metrics;
/* track whether this circuit is open/closed at any given point in time (default to false==closed) */
private AtomicBoolean circuitOpen = new AtomicBoolean(false);
/* when the circuit was marked open or was last allowed to try a 'singleTest' */
private AtomicLong circuitOpenedOrLastTestedTime = new AtomicLong();
protected HystrixCircuitBreakerImpl(HystrixCommandKey key, HystrixCommandGroupKey commandGroup,
HystrixCommandProperties properties,
HystrixCommandMetrics metrics) {
this.properties = properties;
this.metrics = metrics;
}
}
熔斷器初始化代碼上整體結構和初始化線程池的過程差不多,都是通過工廠類里面的ConcurrentHashMap來管理熔斷器,且key也是根據HystrixCommandKey來做判斷,這里具體初始化細節把握的不太好先過了。因為這個this.metrics = initMetrics(metrics, this.commandGroup, this.threadPoolKey, this.commandKey, this.properties); 里面的rxjava搞得不太懂,失敗率,請求個數的統計的初始化在這個上面,先過了。到時候回來看把
結尾
這篇博文講了的事情就是: aop監聽@HystrixCommand,然后更具@Hystrix的配置構建了一個GenericCommand這么的一個過程,下一篇博文就是講述CommandExecutor.execute這個方法里面經歷了什么東西了。
我自己畫的整個Hystrix的流程圖:
高清流程圖:
https://gitee.com/gzgyc/blogimage/raw/master/hstrix執行流程圖.jpg