上一節講到了hystrix提供的五個功能,這一節我們首先來講hystrix中提供實時執行metrics信息的實現。為什么先講metrics,因為很多功能都是基於metrics的數據來實現的,它是很多功能實現的基礎。
首先來看一下通過hystrix調用服務的過程中會產生那些類型的metrics信息:
1.某一事件的持續指標。
2.某一事件窗口時間內持續指標。
3.某一事件窗口時間內最大指標。
4.某一事件窗口時間內指標分布。
在來看一下這些數據在hystrix中是如何產生、計算和流轉的。

hystrix在執行服務調用的過程中會產生各類事件,執行模塊首先將這些事件發送的metrics接受流中,而metrics統計流會監聽metrics接受流,計算出各類統計數據。
metrics接收流
hystrix有以下接收流和對應接收的消息
| 接收流 | 接收消息 | 說明 |
| HystrixCommandStartStream | HystrixCommandExecutionStarted | 命令開始執行消息流 |
| HystrixCommandCompletionStream | HystrixCommandCompletion | 命令完成執行消息流 |
| HystrixThreadPoolStartStream | HystrixCommandExecutionStarted | 線程池開始執行消息流 |
| HystrixThreadPoolCompletionStream | HystrixCommandCompletion | 線程池執行完成消息流 |
| HystrixCollapserEventStream | HystrixCollapserEvent | 合並命令執行消息流 |
metrics接收流使用單例模式,HystrixCommandKey,HystrixThreadPoolKey,HystrixCollapserKey分別對應同一個(HystrixCommandStartStream、HystrixCommandCompletionStream),(HystrixThreadPoolStartStream,HystrixThreadPoolCompletionStream),(HystrixCollapserEventStream)。
內部使用rxjava來實現消息機制
HystrixCommandStartStream(final HystrixCommandKey commandKey) {
this.commandKey = commandKey;
this.writeOnlySubject = new SerializedSubject<HystrixCommandExecutionStarted, HystrixCommandExecutionStarted>(PublishSubject.<HystrixCommandExecutionStarted>create());
this.readOnlyStream = writeOnlySubject.share();
}
此外還提供了HystrixThreadEventStream統一執行接收消息然后發送到各個消息接收流類。
metrics接受流消息體
上面講了hystrix metrics的接收流,接下來我們看看接收流具體接收的內容。
| 消息體 | 內容 |
| HystrixCommandExecutionStarted | 內部包括了該命令的執行策略和並發數。 |
| HystrixCommandCompletion | 內部包含執行結果對象ExecutionResult和請求上下文對象HystrixRequestContext |
| ExecutionResult | private final EventCounts eventCounts;//事件數量
private final Exception failedExecutionException;//失敗異常
private final Exception executionException; //執行異常
private final long startTimestamp;//命令開始執行時間
private final int executionLatency; //執行run的時間
private final int userThreadLatency; //請求提交到執行結束的時間
private final boolean executionOccurred;//ture 執行過命令 false 未執行過命令
private final boolean isExecutedInThread;//ture 使用線程池執行 false 不是使用線程池執行
private final HystrixCollapserKey collapserKey; |
| EventCounts | private final BitSet events;事件類型 private final int numEmissions//emission次數 private final int numFallbackEmissions;//fallback次數 private final int numCollapsed;//合並格式 |
| HystrixCollapserEvent | private final HystrixCollapserKey collapserKey;//合並命令key private final HystrixEventType.Collapser eventType;事件類型 |
事件類型:
HystrixCommand只返回一個數據,當返回值時發生SUCCESS事件,執行失敗時,發生FAILURE事件,HystrixObservableCommand可以返回多個值,當返回值時發生EMIT事件,當命令完成時,發生SUCCESS事件,執行失敗時,發生FAILURE事件。
| 名稱 | 描述 | 是否fallback |
| EMIT | value返回,只在HystrixObservableCommand | NO |
| SUCCESS | 執行成功 | NO |
| FAILURE | 執行拋出異常 | YES |
| TIMEOUT | 超時 | YES |
| BAD_REQUEST | 拋出HystrixBadRequestException | NO |
| SHORT_CIRCUITED | 熔斷 | YES |
| THREAD_POOL_REJECTED | 線程池拒絕 | YES |
| SEMAPHORE_REJECTED | 信號量拒絕 | YES |
Fallback事件類型
| 名稱 | 描述 | 是否拋出異常 |
| FALLBACK_EMIT | fallback 返回值,只在HystrixObservableCommand | NO |
| FALLBACK_SUCCESS | fallback 執行完成 | NO |
| FALLBACK_FAILURE | fallback執行失敗 | YES |
| FALLBACK_REJECTION | fallback拒絕執行 | YES |
| FALLBACK_MISSING | 沒有fallback實現 | YES |
其他命令類型
| 名稱 | 描述 |
| EXCEPTION_THROWN | 執行命令值拋出異常 |
| RESPONSE_FROM_CACHE | 從緩存中獲取值 |
| CALLAPSED | 命令聚合執行 |
線程池類型
| 名稱 | 描述 |
| EXECUTED | 線程池執行一個命令 |
| REJECTED | 線程池拒絕執行命令 |
聚合事件類型
| 名稱 | 描述 |
| BATCH_EXECUTED | 執行一個batch批量執行 |
| ADDED_TO_BATCH | 參數添加到batch中 |
| RESPONSE_FROM_CACHE | 從緩存中獲取值 |
metrics統計流
hystrix有以下統計流
| 類別 | 統計流 | 監聽接收流 | 說明 |
| 窗口時間內持續統計 | RollingCommandEventCounterStream | HystrixCommandCompletionStream | 統計各種消息類型窗口期內次數 |
| RollingCollapserEventCounterStream | HystrixCollapserEventStream | 統計各種消息類型窗口期內次數 | |
| RollingThreadPoolEventCounterStream | HystrixThreadPoolCompletionStream | 統計各種消息類型窗口期內次數 | |
| HealthCountsStream | HystrixThreadPoolCompletionStream | 統計總調用次數,失敗次數,失敗率 | |
| 持續統計流 | CumulativeCommandEventCounterStream | HystrixCommandCompletionStream | 持久統計各種消息類型次數 |
| CumulativeCollapserEventCounterStream | HystrixCollapserEventStream | 持久統計各種消息類型次數 | |
| CumulativeThreadPoolEventCounterStream | HystrixThreadPoolCompletionStream | 持久統計各種消息類型次數 | |
| 窗口時間內分布統計 | RollingCommandLatencyDistributionStream | HystrixCommandCompletionStream消息流的executelatency事件 | 通過Histogram計算窗口期內的分布 |
| RollingCommandUserLatencyDistributionStream | HystrixCommandCompletionStream消息流的totalLatency事件 | 通過Histogram計算窗口期內的分布 | |
| RollingCollapserBatchSizeDistributionStream | HystrixCollapserEventStream消息流的ADDED_TO_BATCH消息 | 通過Histogram計算窗口期內的分布 | |
| 窗口時間內最大值統計流 | RollingCommandMaxConcurrencyStream | HystrixCommandStartStream | 窗口期內的執行並發量取最大值 |
| RollingThreadPoolMaxConcurrencyStream | HystrixThreadPoolStartStream | 窗口期內的執行並發量取最大值 |
窗口時間內持續統計流首先監聽一個消息接受流,統計一段時間內各個類型消息的累計數據(時間為:metrics.rollingStats.timeInMilliseconds/metrics.rollingStats.numBuckets)。然后再對累計的數據進行累加(個數為:metrics.rollingStats.numBuckets),即為最終累計數據。
持續統計流首先監聽一個消息流(開始消息流或者完成消息流),統計一段時間內各個類型消息的累計數據(時間為:metrics.rollingStats.timeInMilliseconds/metrics.rollingStats.numBuckets)。然后不斷的累加累計數據。
窗口時間內分布統計流首先監聽一個消息流,統計一段時間內各個類型消息存放在Histogram對象中(時間為:metrics.rollingStats.timeInMilliseconds/metrics.rollingStats.numBuckets),然后對(個數為:metrics.rollingStats.numBuckets)內的Histogram對象進行運算操作,即為窗口期內某一時間的分布。
RollingConcurrencyStream監聽一個消息流,例如HystrixCommandStartStream,然后通過RX java對一段時間內的執行並發量取最大值,重新發射,對窗口期內的執行並發量取最大值,重新發射。
metrics統計流使用單例模式,每個統計流分別對應一個HystrixCommandKey,HystrixThreadPoolKey,HystrixCollapserKey。
metrics模塊
hystrix中可以通過metrics模塊來獲取執行過程中的數據,主要有三部分數據:命令執行metrics,線程池metrics,合並命令執行metrics,每個HystrixCommandKey、HystrixThreadPoolKey、HystrixCollapserKey對應一個相應的metrics(HystrixCommandMetrics,HystrixThreadPoolMetrics,HystrixCollapserMetrics)。metrics模塊內部是通過監聽消息流來獲取各個指標的統計數據。
命令執行metrics
HystrixCommandMetrics為命令執行模塊的metrics,在其初始化時會創建6個統計數據流:HealthCountsStream、RollingCommandEventCounterStream、CumulativeCommandEventCounterStream、RollingCommandLatencyDistributionStream、RollingCommandUserLatencyDistributionStream、RollingCommandMaxConcurrencyStream,通過這些統計數據流來獲取相應metrics信息。
private HealthCountsStream healthCountsStream; private final RollingCommandEventCounterStream rollingCommandEventCounterStream; private final CumulativeCommandEventCounterStream cumulativeCommandEventCounterStream; private final RollingCommandLatencyDistributionStream rollingCommandLatencyDistributionStream; private final RollingCommandUserLatencyDistributionStream rollingCommandUserLatencyDistributionStream; private final RollingCommandMaxConcurrencyStream rollingCommandMaxConcurrencyStream; /* package */HystrixCommandMetrics(final HystrixCommandKey key, HystrixCommandGroupKey commandGroup, HystrixThreadPoolKey threadPoolKey, HystrixCommandProperties properties, HystrixEventNotifier eventNotifier) { super(null); healthCountsStream = HealthCountsStream.getInstance(key, properties); rollingCommandEventCounterStream = RollingCommandEventCounterStream.getInstance(key, properties); cumulativeCommandEventCounterStream = CumulativeCommandEventCounterStream.getInstance(key, properties); rollingCommandLatencyDistributionStream = RollingCommandLatencyDistributionStream.getInstance(key, properties); rollingCommandUserLatencyDistributionStream = RollingCommandUserLatencyDistributionStream.getInstance(key, properties); rollingCommandMaxConcurrencyStream = RollingCommandMaxConcurrencyStream.getInstance(key, properties); }
//獲取指定事件窗口期內數據指標 public long getRollingCount(HystrixEventType eventType) { return rollingCommandEventCounterStream.getLatest(eventType); }
//獲取指定事件持續的數據指標 public long getCumulativeCount(HystrixEventType eventType) { return cumulativeCommandEventCounterStream.getLatest(eventType); }//獲取某一百分比的執行時間public int getExecutionTimePercentile(double percentile) { return rollingCommandLatencyDistributionStream.getLatestPercentile(percentile); }//獲取平均的執行時間 public int getExecutionTimeMean() { return rollingCommandLatencyDistributionStream.getLatestMean(); } //獲取某一百分比的總時間 public int getTotalTimePercentile(double percentile) { return rollingCommandUserLatencyDistributionStream.getLatestPercentile(percentile); }//獲取平均的總時間 public int getTotalTimeMean() { return rollingCommandUserLatencyDistributionStream.getLatestMean(); } //獲取窗口期內最大並發量 public long getRollingMaxConcurrentExecutions() { return rollingCommandMaxConcurrencyStream.getLatestRollingMax(); }//獲取當前並發量 public int getCurrentConcurrentExecutionCount() { return concurrentExecutionCount.get(); } //獲取命令執行健康情況 public HealthCounts getHealthCounts() { return healthCountsStream.getLatest(); }
線程池metrics
HystrixThreadPoolMetrics為線程池執行模塊的metrics,在其初始化時會獲取3個數據流:RollingThreadPoolEventCounterStream、CumulativeThreadPoolEventCounterStream、RollingThreadPoolMaxConcurrencyStream通過這些統計流獲得相應的統計數據。
private final RollingThreadPoolEventCounterStream rollingCounterStream;
private final CumulativeThreadPoolEventCounterStream cumulativeCounterStream;
private final RollingThreadPoolMaxConcurrencyStream rollingThreadPoolMaxConcurrencyStream;
private HystrixThreadPoolMetrics(HystrixThreadPoolKey threadPoolKey, ThreadPoolExecutor threadPool, HystrixThreadPoolProperties properties) {
super(null);
this.threadPoolKey = threadPoolKey;
this.threadPool = threadPool;
this.properties = properties;
rollingCounterStream = RollingThreadPoolEventCounterStream.getInstance(threadPoolKey, properties);
cumulativeCounterStream = CumulativeThreadPoolEventCounterStream.getInstance(threadPoolKey, properties);
rollingThreadPoolMaxConcurrencyStream = RollingThreadPoolMaxConcurrencyStream.getInstance(threadPoolKey, properties);
}
/**
獲取窗口期內線程池執行的個數*/
public long getRollingCountThreadsExecuted() {
return rollingCounterStream.getLatestCount(HystrixEventType.ThreadPool.EXECUTED);
}
/**
獲取持續的線程池執行個數*/
public long getCumulativeCountThreadsExecuted() {
return cumulativeCounterStream.getLatestCount(HystrixEventType.ThreadPool.EXECUTED);
}
/**
獲取窗口期內線程池拒絕的個數*/
public long getRollingCountThreadsRejected() {
return rollingCounterStream.getLatestCount(HystrixEventType.ThreadPool.REJECTED);
}
/**
獲取持續內線程池拒絕的個數*/
public long getCumulativeCountThreadsRejected() {
return cumulativeCounterStream.getLatestCount(HystrixEventType.ThreadPool.REJECTED);
}
//獲取指定事件窗口期內數據指標
public long getRollingCount(HystrixEventType.ThreadPool event) {
return rollingCounterStream.getLatestCount(event);
}
//獲取指定事件持續的數據指標
public long getCumulativeCount(HystrixEventType.ThreadPool event) {
return cumulativeCounterStream.getLatestCount(event);
}/**
獲取窗口期內最大並發量*/
public long getRollingMaxActiveThreads() {
return rollingThreadPoolMaxConcurrencyStream.getLatestRollingMax();
}
還有一些根據線程池獲取線程池當前指標
public Number getCurrentActiveCount() {
return threadPool.getActiveCount();
}
public Number getCurrentCompletedTaskCount() {
return threadPool.getCompletedTaskCount();
}
public Number getCurrentCorePoolSize() {
return threadPool.getCorePoolSize();
}
public Number getCurrentLargestPoolSize() {
return threadPool.getLargestPoolSize();
}
public Number getCurrentMaximumPoolSize() {
return threadPool.getMaximumPoolSize();
}
public Number getCurrentPoolSize() {
return threadPool.getPoolSize();
}
public Number getCurrentTaskCount() {
return threadPool.getTaskCount();
}
public Number getCurrentQueueSize() {
return threadPool.getQueue().size();
}
合並命令執行metrics
HystrixCollapserMetrics為合並命令執行模塊的metrics,在其初始化時會創建3個數據流:RollingCollapserEventCounterStream、CumulativeCollapserEventCounterStream、RollingCollapserBatchSizeDistributionStream,通過這些統計流獲得相應的統計數據。
private final RollingCollapserEventCounterStream rollingCollapserEventCounterStream;
private final CumulativeCollapserEventCounterStream cumulativeCollapserEventCounterStream;
private final RollingCollapserBatchSizeDistributionStream rollingCollapserBatchSizeDistributionStream;
/* package */HystrixCollapserMetrics(HystrixCollapserKey key, HystrixCollapserProperties properties) {
super(null);
rollingCollapserEventCounterStream = RollingCollapserEventCounterStream.getInstance(key, properties);
cumulativeCollapserEventCounterStream = CumulativeCollapserEventCounterStream.getInstance(key, properties);
rollingCollapserBatchSizeDistributionStream = RollingCollapserBatchSizeDistributionStream.getInstance(key, properties);
}
//獲取指定事件窗口期內數據指標
public long getRollingCount(HystrixEventType.Collapser collapserEventType) {
return rollingCollapserEventCounterStream.getLatest(collapserEventType);
}
//獲取指定事件持續的數據指標
public long getCumulativeCount(HystrixEventType.Collapser collapserEventType) {
return cumulativeCollapserEventCounterStream.getLatest(collapserEventType);
}
//獲取指定百分比的batchsize
public int getBatchSizePercentile(double percentile) {
return rollingCollapserBatchSizeDistributionStream.getLatestPercentile(percentile);
}
//獲取平均的batchsize
public int getBatchSizeMean() {
return rollingCollapserBatchSizeDistributionStream.getLatestMean();
}
其他流
HystrixConfigurationStream
該數據流定時將hystrix的最新properties配置,發送到該消息流中。com.netflix.hystrix.contrib.sample.stream.HystrixConfigSseServlet就是用該流來獲取配置信息。
public HystrixConfigurationStream(final int intervalInMilliseconds) {
this.intervalInMilliseconds = intervalInMilliseconds; this.allConfigurationStream = Observable.interval(intervalInMilliseconds, TimeUnit.MILLISECONDS) .map(getAllConfig) .doOnSubscribe(new Action0() { @Override public void call() { isSourceCurrentlySubscribed.set(true); } }) .doOnUnsubscribe(new Action0() { @Override public void call() { isSourceCurrentlySubscribed.set(false); } }) .share() .onBackpressureDrop(); } private static final Func1<Long, HystrixConfiguration> getAllConfig = new Func1<Long, HystrixConfiguration>() { @Override public HystrixConfiguration call(Long timestamp) { return HystrixConfiguration.from( getAllCommandConfig.call(timestamp), getAllThreadPoolConfig.call(timestamp), getAllCollapserConfig.call(timestamp) ); } };
private static final Func1<Long, Map<HystrixCommandKey, HystrixCommandConfiguration>> getAllCommandConfig =
new Func1<Long, Map<HystrixCommandKey, HystrixCommandConfiguration>>() {
@Override
public Map<HystrixCommandKey, HystrixCommandConfiguration> call(Long timestamp) { Map<HystrixCommandKey, HystrixCommandConfiguration> commandConfigPerKey = new HashMap<HystrixCommandKey, HystrixCommandConfiguration>(); for (HystrixCommandMetrics commandMetrics: HystrixCommandMetrics.getInstances()) { HystrixCommandKey commandKey = commandMetrics.getCommandKey(); HystrixThreadPoolKey threadPoolKey = commandMetrics.getThreadPoolKey(); HystrixCommandGroupKey groupKey = commandMetrics.getCommandGroup(); commandConfigPerKey.put(commandKey, sampleCommandConfiguration(commandKey, threadPoolKey, groupKey, commandMetrics.getProperties())); } return commandConfigPerKey; } }; private static final Func1<Long, Map<HystrixThreadPoolKey, HystrixThreadPoolConfiguration>> getAllThreadPoolConfig = new Func1<Long, Map<HystrixThreadPoolKey, HystrixThreadPoolConfiguration>>() { @Override public Map<HystrixThreadPoolKey, HystrixThreadPoolConfiguration> call(Long timestamp) { Map<HystrixThreadPoolKey, HystrixThreadPoolConfiguration> threadPoolConfigPerKey = new HashMap<HystrixThreadPoolKey, HystrixThreadPoolConfiguration>(); for (HystrixThreadPoolMetrics threadPoolMetrics: HystrixThreadPoolMetrics.getInstances()) { HystrixThreadPoolKey threadPoolKey = threadPoolMetrics.getThreadPoolKey(); threadPoolConfigPerKey.put(threadPoolKey, sampleThreadPoolConfiguration(threadPoolKey, threadPoolMetrics.getProperties())); } return threadPoolConfigPerKey; } }; private static final Func1<Long, Map<HystrixCollapserKey, HystrixCollapserConfiguration>> getAllCollapserConfig = new Func1<Long, Map<HystrixCollapserKey, HystrixCollapserConfiguration>>() { @Override public Map<HystrixCollapserKey, HystrixCollapserConfiguration> call(Long timestamp) { Map<HystrixCollapserKey, HystrixCollapserConfiguration> collapserConfigPerKey = new HashMap<HystrixCollapserKey, HystrixCollapserConfiguration>(); for (HystrixCollapserMetrics collapserMetrics: HystrixCollapserMetrics.getInstances()) { HystrixCollapserKey collapserKey = collapserMetrics.getCollapserKey(); collapserConfigPerKey.put(collapserKey, sampleCollapserConfiguration(collapserKey, collapserMetrics.getProperties())); } return collapserConfigPerKey; } };
metrics發布
有時,我們需要發布Hystrix中的metrics到其他地方,Hystrix提供了相應的接口(HystrixMetricsPublisherCollapser,HystrixMetricsPublisherCommand,HystrixMetricsPublisherThreadPool),實現這些接口,並在initial方法中實現發送hystrix的metrics的邏輯。實現HystrixMetricsPublisher,來創建這些實現類。其實hystrix對metrics的發布只是定義了接口和initial方法。Hystrix運行時,HystrixMetricsPublisherFactory通過HystrixPlugins獲取HystrixMetricsPublisher的實現類。並且通過該實現類來創建(HystrixMetricsPublisherCollapser,HystrixMetricsPublisherCommand,HystrixMetricsPublisherThreadPool)的實現類,並在初次創建時調用其initial方法。
例如:
通過coda hale 實現了將hystrix 的metrics信息輸出到指定metrics監控系統中。
引入jar包:
<dependency>
<groupId>com.netflix.hystrix</groupId>
<artifactId>hystrix-codahale-metrics-publisher</artifactId>
<version>1.5.9</version>
</dependency>
創建HystrixMetricsPublisher對象並注冊到HystrixPlugins:
@Bean
HystrixMetricsPublisher hystrixMetricsPublisher() {
HystrixCodaHaleMetricsPublisher publisher = new HystrixCodaHaleMetricsPublisher(metricRegistry);
HystrixPlugins.getInstance().registerMetricsPublisher(publisher);
return publisher;
}
coda hale實現源碼如下:
public class HystrixCodaHaleMetricsPublisher extends HystrixMetricsPublisher { private final String metricsRootNode; private final MetricRegistry metricRegistry; public HystrixCodaHaleMetricsPublisher(MetricRegistry metricRegistry) { this(null, metricRegistry); } public HystrixCodaHaleMetricsPublisher(String metricsRootNode, MetricRegistry metricRegistry) { this.metricsRootNode = metricsRootNode; this.metricRegistry = metricRegistry; } @Override public HystrixMetricsPublisherCommand getMetricsPublisherForCommand(HystrixCommandKey commandKey, HystrixCommandGroupKey commandGroupKey, HystrixCommandMetrics metrics, HystrixCircuitBreaker circuitBreaker, HystrixCommandProperties properties) { return new HystrixCodaHaleMetricsPublisherCommand(metricsRootNode, commandKey, commandGroupKey, metrics, circuitBreaker, properties, metricRegistry); } @Override public HystrixMetricsPublisherThreadPool getMetricsPublisherForThreadPool(HystrixThreadPoolKey threadPoolKey, HystrixThreadPoolMetrics metrics, HystrixThreadPoolProperties properties) { return new HystrixCodaHaleMetricsPublisherThreadPool(metricsRootNode, threadPoolKey, metrics, properties, metricRegistry); } @Override public HystrixMetricsPublisherCollapser getMetricsPublisherForCollapser(HystrixCollapserKey collapserKey, HystrixCollapserMetrics metrics, HystrixCollapserProperties properties) { return new HystrixCodaHaleMetricsPublisherCollapser(collapserKey, metrics, properties, metricRegistry); } }
HystrixCodaHaleMetricsPublisher負責創建HystrixCodaHaleMetricsPublisherCommand,HystrixCodaHaleMetricsPublisherThreadPool,HystrixCodaHaleMetricsPublisherCollapser。這三個對象實現基本邏輯是在initialize方法中向metricRegistry中設置相應信息。
public void initialize() { metricRegistry.register(createMetricName("isCircuitBreakerOpen"), new Gauge<Boolean>() { @Override public Boolean getValue() { return circuitBreaker.isOpen(); } ..... }
