HystrixPlugins
獲取並發相關類(HystrixConcurrencyStrategy)、事件通知類(HystrixEventNotifier)、度量信息類(HystrixMetricsPublisher)、Properties配置類(HystrixPropertiesStrategy)、HystrixCommand回調函數類(HystrixCommandExecutionHook)、HystrixDynamicProperties,6類插件。
插件獲取:
HystrixPlugins 首先會去properties(HystrixDynamicProperties)配置下需找相應類型的實現類。
private static <T> T getPluginImplementationViaProperties(Class<T> pluginClass, HystrixDynamicProperties dynamicProperties) { String classSimpleName = pluginClass.getSimpleName(); // Check Archaius for plugin class. String propertyName = "hystrix.plugin." + classSimpleName + ".implementation"; String implementingClass = dynamicProperties.getString(propertyName, null).get();
....
如果返回null,則通過ServiceLoader獲取相應類型的實現類。
private <T> T getPluginImplementation(Class<T> pluginClass) { T p = getPluginImplementationViaProperties(pluginClass, dynamicProperties); if (p != null) return p; return findService(pluginClass, classLoader); } private static <T> T findService( Class<T> spi, ClassLoader classLoader) throws ServiceConfigurationError { ServiceLoader<T> sl = ServiceLoader.load(spi, classLoader); for (T s : sl) { if (s != null) return s; } return null; }
如果獲取成功,存儲變量中。
final AtomicReference<HystrixEventNotifier> notifier = new AtomicReference<HystrixEventNotifier>();
public HystrixEventNotifier getEventNotifier() { if (notifier.get() == null) { // check for an implementation from Archaius first Object impl = getPluginImplementation(HystrixEventNotifier.class); if (impl == null) { // nothing set via Archaius so initialize with default notifier.compareAndSet(null, HystrixEventNotifierDefault.getInstance()); // we don't return from here but call get() again in case of thread-race so the winner will always get returned } else { // we received an implementation from Archaius so use it notifier.compareAndSet(null, (HystrixEventNotifier) impl); } } return notifier.get(); }
獲取HystrixDynamicProperties對象,有點不同,首先使用HystrixDynamicPropertiesSystemProperties配置來需找相應類型的實現類。第二如果查詢不同會默認使用Archaius,如果依然沒有Archaius支持,會使用HystrixDynamicPropertiesSystemProperties。
private static HystrixDynamicProperties resolveDynamicProperties(ClassLoader classLoader, LoggerSupplier logSupplier) { HystrixDynamicProperties hp = getPluginImplementationViaProperties(HystrixDynamicProperties.class, HystrixDynamicPropertiesSystemProperties.getInstance()); if (hp != null) { logSupplier.getLogger().debug( "Created HystrixDynamicProperties instance from System property named " + "\"hystrix.plugin.HystrixDynamicProperties.implementation\". Using class: {}", hp.getClass().getCanonicalName()); return hp; } hp = findService(HystrixDynamicProperties.class, classLoader); if (hp != null) { logSupplier.getLogger() .debug("Created HystrixDynamicProperties instance by loading from ServiceLoader. Using class: {}", hp.getClass().getCanonicalName()); return hp; } hp = HystrixArchaiusHelper.createArchaiusDynamicProperties(); if (hp != null) { logSupplier.getLogger().debug("Created HystrixDynamicProperties. Using class : {}", hp.getClass().getCanonicalName()); return hp; } hp = HystrixDynamicPropertiesSystemProperties.getInstance(); logSupplier.getLogger().info("Using System Properties for HystrixDynamicProperties! Using class: {}", hp.getClass().getCanonicalName()); return hp; }
HystrixConcurrencyStrategy
並發相關的策略類。
獲取線程池,實際根據配置創建ThreadPoolExecutor。
/** 獲取線程池*/ public ThreadPoolExecutor getThreadPool(final HystrixThreadPoolKey threadPoolKey, HystrixProperty<Integer> corePoolSize, HystrixProperty<Integer> maximumPoolSize, HystrixProperty<Integer> keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) { final ThreadFactory threadFactory = getThreadFactory(threadPoolKey); final int dynamicCoreSize = corePoolSize.get(); final int dynamicMaximumSize = maximumPoolSize.get(); if (dynamicCoreSize > dynamicMaximumSize) { logger.error("Hystrix ThreadPool configuration at startup for : " + threadPoolKey.name() + " is trying to set coreSize = " + dynamicCoreSize + " and maximumSize = " + dynamicMaximumSize + ". Maximum size will be set to " + dynamicCoreSize + ", the coreSize value, since it must be equal to or greater than the coreSize value"); return new ThreadPoolExecutor(dynamicCoreSize, dynamicCoreSize, keepAliveTime.get(), unit, workQueue, threadFactory); } else { return new ThreadPoolExecutor(dynamicCoreSize, dynamicMaximumSize, keepAliveTime.get(), unit, workQueue, threadFactory); } } public ThreadPoolExecutor getThreadPool(final HystrixThreadPoolKey threadPoolKey, HystrixThreadPoolProperties threadPoolProperties) { final ThreadFactory threadFactory = getThreadFactory(threadPoolKey); final boolean allowMaximumSizeToDivergeFromCoreSize = threadPoolProperties.getAllowMaximumSizeToDivergeFromCoreSize().get(); final int dynamicCoreSize = threadPoolProperties.coreSize().get(); final int keepAliveTime = threadPoolProperties.keepAliveTimeMinutes().get(); final int maxQueueSize = threadPoolProperties.maxQueueSize().get(); final BlockingQueue<Runnable> workQueue = getBlockingQueue(maxQueueSize); if (allowMaximumSizeToDivergeFromCoreSize) { final int dynamicMaximumSize = threadPoolProperties.maximumSize().get(); if (dynamicCoreSize > dynamicMaximumSize) { logger.error("Hystrix ThreadPool configuration at startup for : " + threadPoolKey.name() + " is trying to set coreSize = " + dynamicCoreSize + " and maximumSize = " + dynamicMaximumSize + ". Maximum size will be set to " + dynamicCoreSize + ", the coreSize value, since it must be equal to or greater than the coreSize value"); return new ThreadPoolExecutor(dynamicCoreSize, dynamicCoreSize, keepAliveTime, TimeUnit.MINUTES, workQueue, threadFactory); } else { return new ThreadPoolExecutor(dynamicCoreSize, dynamicMaximumSize, keepAliveTime, TimeUnit.MINUTES, workQueue, threadFactory); } } else { return new ThreadPoolExecutor(dynamicCoreSize, dynamicCoreSize, keepAliveTime, TimeUnit.MINUTES, workQueue, threadFactory); } } private static ThreadFactory getThreadFactory(final HystrixThreadPoolKey threadPoolKey) { if (!PlatformSpecific.isAppEngineStandardEnvironment()) { return new ThreadFactory() { private final AtomicInteger threadNumber = new AtomicInteger(0); @Override public Thread newThread(Runnable r) { Thread thread = new Thread(r, "hystrix-" + threadPoolKey.name() + "-" + threadNumber.incrementAndGet()); thread.setDaemon(true); return thread; } }; } else { return PlatformSpecific.getAppEngineThreadFactory(); } } public BlockingQueue<Runnable> getBlockingQueue(int maxQueueSize) { if (maxQueueSize <= 0) { return new SynchronousQueue<Runnable>(); } else { return new LinkedBlockingQueue<Runnable>(maxQueueSize); } }
在執行callable前提供用戶對callable進行分裝
public <T> Callable<T> wrapCallable(Callable<T> callable) { return callable; }
獲取HystrixRequestVariable
public <T> HystrixRequestVariable<T> getRequestVariable(final HystrixRequestVariableLifecycle<T> rv) { return new HystrixLifecycleForwardingRequestVariable<T>(rv); }
HystrixCommandExecutionHook
hystrix命令執行過程中的回調接口,提供了一下回調接口。
/** 在命令執行前被調用*/ public <T> void onStart(HystrixInvokable<T> commandInstance) { //do nothing by default } /** 當接收到數據時被調用*/ public <T> T onEmit(HystrixInvokable<T> commandInstance, T value) { return value; //by default, just pass through } /** *當命令異常時被調用 * @since 1.2 */ public <T> Exception onError(HystrixInvokable<T> commandInstance, FailureType failureType, Exception e) { return e; //by default, just pass through } /** *當執行成功后被調用 * @since 1.4 */ public <T> void onSuccess(HystrixInvokable<T> commandInstance) { //do nothing by default } /** 在線程池執行命令前執行*/ public <T> void onThreadStart(HystrixInvokable<T> commandInstance) { //do nothing by default } /** 在線程池執行完成后調用*/ public <T> void onThreadComplete(HystrixInvokable<T> commandInstance) { // do nothing by default } /** 當開始執行命令時調用*/ public <T> void onExecutionStart(HystrixInvokable<T> commandInstance) { //do nothing by default } /** 當執行命令emits a value時調用 * * @param commandInstance The executing HystrixInvokable instance. * @param value value emitted * * @since 1.4 */ public <T> T onExecutionEmit(HystrixInvokable<T> commandInstance, T value) { return value; //by default, just pass through } /** 執行拋出異常時調用*/ public <T> Exception onExecutionError(HystrixInvokable<T> commandInstance, Exception e) { return e; //by default, just pass through } /** 當調用命令成功執行完調用 * @since 1.4 */ public <T> void onExecutionSuccess(HystrixInvokable<T> commandInstance) { //do nothing by default } /** * Invoked when the fallback method in {@link HystrixInvokable} starts. * * @param commandInstance The executing HystrixInvokable instance. * * @since 1.2 */ public <T> void onFallbackStart(HystrixInvokable<T> commandInstance) { //do nothing by default } /** * Invoked when the fallback method in {@link HystrixInvokable} emits a value. * * @param commandInstance The executing HystrixInvokable instance. * @param value value emitted * * @since 1.4 */ public <T> T onFallbackEmit(HystrixInvokable<T> commandInstance, T value) { return value; //by default, just pass through } /** * Invoked when the fallback method in {@link HystrixInvokable} fails with an Exception. * * @param commandInstance The executing HystrixInvokable instance. * @param e exception object * * @since 1.2 */ public <T> Exception onFallbackError(HystrixInvokable<T> commandInstance, Exception e) { //by default, just pass through return e; } /** * Invoked when the user-defined execution method in {@link HystrixInvokable} completes successfully. * * @param commandInstance The executing HystrixInvokable instance. * * @since 1.4 */ public <T> void onFallbackSuccess(HystrixInvokable<T> commandInstance) { //do nothing by default } /** 如果從緩存中獲取數據時調用*/ public <T> void onCacheHit(HystrixInvokable<T> commandInstance) { //do nothing by default } /** 當取消掛載時被調用 * @since 1.5.9 */ public <T> void onUnsubscribe(HystrixInvokable<T> commandInstance) { //do nothing by default }
HystrixEventNotifier
hystrix命令執行過程中,接收相應的事件通知。
/** 接收到消息后執行
接受一下消息:
RESPONSE_FROM_CACHE如果該command從緩存中獲取數據
CANCELLED 如果command在完成前unsubscrible
EXCEPTION_THROWN 如果command出現異常
EMIT 如果command發送數據
THREAD_POOL_REJECTED 如果線程池拋出reject異常
FAILURE 執行失敗異常
FALLBACK_EMIT執行fallback返回數據
FALLBACK_SUCCESS執行fallback成功
FALLBACK_FAILURE執行fallback發生異常
FALLBACK_REJECTION超過信號量,fallback未被執行。
FALLBACK_MISSING
SEMAPHORE_REJECTED信號量模型執行被拒絕。
SHORT_CIRCUITED熔斷
*/ public void markEvent(HystrixEventType eventType, HystrixCommandKey key) { // do nothing } /** * Called after a command is executed using thread isolation. * Will not get called if a command is rejected, short-circuited etc.*/ public void markCommandExecution(HystrixCommandKey key, ExecutionIsolationStrategy isolationStrategy, int duration, List<HystrixEventType> eventsDuringExecution) { // do nothing }
HystrixMetricsPublisher
HystrixMetricsPublisherFactory使用該插件來創建HystrixMetricsPublisherCommand、HystrixMetricsPublisherThreadPool、HystrixMetricsPublisherCollapser,並調用相應的initialize方法,這些類用來向第三方publish hystrix的metrics信息。
private final ConcurrentHashMap<String, HystrixMetricsPublisherThreadPool> threadPoolPublishers = new ConcurrentHashMap<String, HystrixMetricsPublisherThreadPool>(); /* package */ HystrixMetricsPublisherThreadPool getPublisherForThreadPool(HystrixThreadPoolKey threadPoolKey, HystrixThreadPoolMetrics metrics, HystrixThreadPoolProperties properties) { // attempt to retrieve from cache first HystrixMetricsPublisherThreadPool publisher = threadPoolPublishers.get(threadPoolKey.name()); if (publisher != null) { return publisher; } // it doesn't exist so we need to create it publisher = HystrixPlugins.getInstance().getMetricsPublisher().getMetricsPublisherForThreadPool(threadPoolKey, metrics, properties); // attempt to store it (race other threads) HystrixMetricsPublisherThreadPool existing = threadPoolPublishers.putIfAbsent(threadPoolKey.name(), publisher); if (existing == null) { // we won the thread-race to store the instance we created so initialize it publisher.initialize(); // done registering, return instance that got cached return publisher; } else { // we lost so return 'existing' and let the one we created be garbage collected // without calling initialize() on it return existing; } }
HystrixPropertiesStrategy
創建HystrixCommandProperties、HystrixThreadPoolProperties、HystrixCollapserProperties、HystrixTimerThreadPoolProperties
/** 創建HystrixCommandProperties*/ public HystrixCommandProperties getCommandProperties(HystrixCommandKey commandKey, HystrixCommandProperties.Setter builder) { return new HystrixPropertiesCommandDefault(commandKey, builder); }/** 創建HystrixThreadPoolProperties*/ public HystrixThreadPoolProperties getThreadPoolProperties(HystrixThreadPoolKey threadPoolKey, HystrixThreadPoolProperties.Setter builder) { return new HystrixPropertiesThreadPoolDefault(threadPoolKey, builder); }/** 創建HystrixCollapserProperties*/ public HystrixCollapserProperties getCollapserProperties(HystrixCollapserKey collapserKey, HystrixCollapserProperties.Setter builder) { return new HystrixPropertiesCollapserDefault(collapserKey, builder); } /** 創建HystrixTimerThreadPoolProperties*/ public HystrixTimerThreadPoolProperties getTimerThreadPoolProperties() { return new HystrixPropertiesTimerThreadPoolDefault(); }