Metric概述
HystrixCommands和HystrixObservableCommands執行過程中,會產生執行的數據,這些數據對於觀察調用的性能表現非常有用。
命令產生數據后,Metrics會根據不同緯度進行統計,主要有一下三個緯度:一段時間內(窗口期)的累計統計數據、持續的累計統計數據、一段時間內(窗口期)的數據分布。
Metric實現
Metrics實現主要的流程如下:
1.命令在開始執行前會向開始消息流(HystrixCommandStartStream)發送開始消息(HystrixCommandExecutionStarted)。
2.如果是線程池執行,執行前會向線程池開始消息流(HystrixThreadPoolStartStream)發送開始消息(HystrixCommandExecutionStarted)。
3.如果是線程池執行,執行后會向線程池結束消息流(HystrixThreadPoolCompletionStream)發送完成消息(HystrixCommandCompletion)。
4.命令在結束執行前會向完成消息流(HystrixCommandCompletionStream)發送完成消息(HystrixCommandCompletion)。
5.不同類型的統計流,會監聽開始消息流或完成消息流,根據接受到的消息內容,進行統計。
Hystrix消息類型
HystrixCommandCompletion有一下消息類型
一段時間內統計
統計流首先監聽一個消息流(開始消息流或者完成消息流),統計一段時間內各個類型消息的累計數據(時間為:metrics.rollingStats.timeInMilliseconds/metrics.rollingStats.numBuckets)。然后再對累計的數據進行累加(個數為:metrics.rollingStats.numBuckets),即為最終累計數據。
RollingCommandEventCounterStream消息流監聽了HystrixCommandCompletionStream消息流,並統計各種消息類型次數。
RollingCollapserEventCounterStream消息流監聽了HystrixCollapserEventStream消息流,並統計各種消息類型次數。
RollingThreadPoolEventCounterStream消息流監聽了HystrixThreadPoolCompletionStream消息流,並統計各種消息類型次數。
HealthCountsStream消息流監聽了HystrixThreadPoolCompletionStream消息流,並統計成功次數,失敗次數,失敗率。
持續統計
統計流首先監聽一個消息流(開始消息流或者完成消息流),統計一段時間內各個類型消息的累計數據(時間為:metrics.rollingStats.timeInMilliseconds/metrics.rollingStats.numBuckets)。然后不斷的累加累計數據。
CumulativeCommandEventCounterStream監聽了HystrixCommandCompletionStream消息流,並統計各種消息類型次數。
CumulativeCollapserEventCounterStream監聽了HystrixCollapserEventStream消息流,並統計各種消息類型次數。
CumulativeThreadPoolEventCounterStream監聽了HystrixThreadPoolCompletionStream消息流,並統計各種消息類型次數。
一段時間內分布統計
RollingDistributionStream監聽一個消息流,例如HystrixCommandStartStream,然后通過RX java對一段時間內的數值進行運算操作,生成統計值放在Histogram對象中,然后重新發射,對窗口期內的Histogram對象進行運算操作,並生成統計值重新發射。
子類RollingCommandLatencyDistributionStream監聽了HystrixCommandCompletionStream消息流,並且通過RX java監聽窗口期內的executelatency,通過Histogram計算窗口期內延時的分布。
子類RollingCommandUserLatencyDistributionStream監聽了HystrixCommandCompletionStream消息流,並且通過RX java監聽窗口期內的totalLatency,通過Histogram計算窗口期內延時的分布。
子類RollingCollapserBatchSizeDistributionStream監聽了HystrixCollapserEventStream消息流,並且通過RX java監聽窗口期內的ADDED_TO_BATCH消息類型次數,通過Histogram計算窗口期內延時的分布。
RollingConcurrencyStream監聽一個消息流,例如HystrixCommandStartStream,然后通過RX java對一段時間內的執行並發量取最大值,重新發射,對窗口期內的執行並發量取最大值,重新發射。
子類RollingCommandMaxConcurrencyStream監聽了HystrixCommandStartStream,然后通過RX java對窗口期內的執行並發量取最大值。
子類RollingThreadPoolMaxConcurrencyStream監聽了HystrixThreadPoolStartStream,然后通過RX java對窗口期內的執行並發量取最大值。
其他數據流
還有一些獨立於消息流的數據流,對於理解系統信息也非常有幫助。
配置流HystrixConfigurationStream,通過該數據流可以定時獲取hystrix最新的properties配置信息,com.netflix.hystrix.contrib.sample.stream.HystrixConfigSseServlet就是用該流來獲取配置信息。
數據格式:
data: {"type":"HystrixConfig","commands":{"CreditCardCommand":{"threadPoolKey":"CreditCard","groupKey":"CreditCard","execution":{"isolationStrategy":"THREAD","threadPoolKeyOverride":null,"requestCacheEnabled":true,"requestLogEnabled":true,"timeoutEnabled":true,"fallbackEnabled":true,"timeoutInMilliseconds":3000,"semaphoreSize":10,"fallbackSemaphoreSize":10,"threadInterruptOnTimeout":true},"metrics":{"healthBucketSizeInMs":500,"percentileBucketSizeInMilliseconds":60000,"percentileBucketCount":10,"percentileEnabled":true,"counterBucketSizeInMilliseconds":10000,"counterBucketCount":10},"circuitBreaker":{"enabled":true,"isForcedOpen":false,"isForcedClosed":false,"requestVolumeThreshold":20,"errorPercentageThreshold":50,"sleepInMilliseconds":5000}},"GetUserAccountCommand":{"threadPoolKey":"User","groupKey":"User","execution":{"isolationStrategy":"THREAD","threadPoolKeyOverride":null,"requestCacheEnabled":true,"requestLogEnabled":true,"timeoutEnabled":true,"fallbackEnabled":true,"timeoutInMilliseconds":50,"semaphoreSize":10,"fallbackSemaphoreSize":10,"threadInterruptOnTimeout":true},"metrics":{"healthBucketSizeInMs":500,"percentileBucketSizeInMilliseconds":60000,"percentileBucketCount":10,"percentileEnabled":true,"counterBucketSizeInMilliseconds":10000,"counterBucketCount":10},"circuitBreaker":{"enabled":true,"isForcedOpen":false,"isForcedClosed":false,"requestVolumeThreshold":20,"errorPercentageThreshold":50,"sleepInMilliseconds":5000}},"GetOrderCommand":{"threadPoolKey":"Order","groupKey":"Order","execution":{"isolationStrategy":"THREAD","threadPoolKeyOverride":null,"requestCacheEnabled":true,"requestLogEnabled":true,"timeoutEnabled":true,"fallbackEnabled":true,"timeoutInMilliseconds":1000,"semaphoreSize":10,"fallbackSemaphoreSize":10,"threadInterruptOnTimeout":true},"metrics":{"healthBucketSizeInMs":500,"percentileBucketSizeInMilliseconds":60000,"percentileBucketCount":10,"percentileEnabled":true,"counterBucketSizeInMilliseconds":10000,"counterBucketCount":10},"circuitBreaker":{"enabled":true,"isForcedOpen":false,"isForcedClosed":false,"requestVolumeThreshold":20,"errorPercentageThreshold":50,"sleepInMilliseconds":5000}},"GetPaymentInformationCommand":{"threadPoolKey":"PaymentInformation","groupKey":"PaymentInformation","execution":{"isolationStrategy":"THREAD","threadPoolKeyOverride":null,"requestCacheEnabled":true,"requestLogEnabled":true,"timeoutEnabled":true,"fallbackEnabled":true,"timeoutInMilliseconds":1000,"semaphoreSize":10,"fallbackSemaphoreSize":10,"threadInterruptOnTimeout":true},"metrics":{"healthBucketSizeInMs":500,"percentileBucketSizeInMilliseconds":60000,"percentileBucketCount":10,"percentileEnabled":true,"counterBucketSizeInMilliseconds":10000,"counterBucketCount":10},"circuitBreaker":{"enabled":true,"isForcedOpen":false,"isForcedClosed":false,"requestVolumeThreshold":20,"errorPercentageThreshold":50,"sleepInMilliseconds":5000}}},"threadpools":{"User":{"coreSize":8,"maxQueueSize":-1,"queueRejectionThreshold":5,"keepAliveTimeInMinutes":1,"counterBucketSizeInMilliseconds":10000,"counterBucketCount":10},"CreditCard":{"coreSize":8,"maxQueueSize":-1,"queueRejectionThreshold":5,"keepAliveTimeInMinutes":1,"counterBucketSizeInMilliseconds":10000,"counterBucketCount":10},"Order":{"coreSize":8,"maxQueueSize":-1,"queueRejectionThreshold":5,"keepAliveTimeInMinutes":1,"counterBucketSizeInMilliseconds":10000,"counterBucketCount":10},"PaymentInformation":{"coreSize":8,"maxQueueSize":-1,"queueRejectionThreshold":5,"keepAliveTimeInMinutes":1,"counterBucketSizeInMilliseconds":10000,"counterBucketCount":10}},"collapsers":{}}
功能流HystrixUtilizationStream,通過該數據流可以獲得並發量,線程池狀況等信息。com.netflix.hystrix.contrib.sample.stream.HystrixUtilizationSseServlet就是用該流來獲取配置信息。
數據格式:
data: {"type":"HystrixUtilization","commands":{"CreditCardCommand":{"activeCount":0},"GetUserAccountCommand":{"activeCount":0},"GetOrderCommand":{"activeCount":1},"GetPaymentInformationCommand":{"activeCount":0}},"threadpools":{"User":{"activeCount":0,"queueSize":0,"corePoolSize":8,"poolSize":2},"CreditCard":{"activeCount":0,"queueSize":0,"corePoolSize":8,"poolSize":1},"Order":{"activeCount":1,"queueSize":0,"corePoolSize":8,"poolSize":2},"PaymentInformation":{"activeCount":0,"queueSize":0,"corePoolSize":8,"poolSize":2}}}
請求數據流HystrixRequestEventsStream,通過該數據流可以獲得http請求相關的信息,com.netflix.hystrix.contrib.requests.stream.HystrixRequestEventsSseServlet就是用該流來獲取配置信息。
數據格式:
data: {"name":"GetOrderCommand","events":["SUCCESS"],"latencies":[111]},{"name":"GetPaymentInformationCommand","events":["SUCCESS"],"latencies":[15]},{"name":"GetUserAccountCommand","events":["TIMEOUT","FALLBACK_SUCCESS"],"latencies":[59],"cached":2},{"name":"CreditCardCommand","events":["SUCCESS"],"latencies":[957]}],[{"name":"GetUserAccountCommand","events":["SUCCESS"],"latencies":[3],"cached":2},{"name":"GetOrderCommand","events":["SUCCESS"],"latencies":[77]},{"name":"GetPaymentInformationCommand","events":["SUCCESS"],"latencies":[21]},{"name":"CreditCardCommand","events":["SUCCESS"],"latencies":[1199]}
MetricPublisher
有時,我們需要發布Hystrix中的metrics到其他地方,Hystrix提供了相應的接口(HystrixMetricsPublisherCollapser,HystrixMetricsPublisherCommand,HystrixMetricsPublisherThreadPool),實現這些接口,並在initial方法中實現發送hystrix的metrics。並且實現HystrixMetricsPublisher,來創建這些實現類。
Hystrix原理:
Hystrix運行時,HystrixMetricsPublisherFactory通過HystrixPlugins獲取HystrixMetricsPublisher的實現類。並且通過該實現類來創建(HystrixMetricsPublisherCollapser,HystrixMetricsPublisherCommand,HystrixMetricsPublisherThreadPool)的實現類,並在初次創建時調用其initial方法。
coda hale 實現了將hystrix 的metrics信息輸出到指定metrics監控系統中。
引入jar包:
<dependency>
<groupId>com.netflix.hystrix</groupId>
<artifactId>hystrix-codahale-metrics-publisher</artifactId>
<version>1.5.9</version>
</dependency>
創建HystrixMetricsPublisher對象並注冊到HystrixPlugins:
@Bean
HystrixMetricsPublisher hystrixMetricsPublisher() {
HystrixCodaHaleMetricsPublisher publisher = new HystrixCodaHaleMetricsPublisher(metricRegistry);
HystrixPlugins.getInstance().registerMetricsPublisher(publisher);
return publisher;
}
coda hale實現源碼如下:
public class HystrixCodaHaleMetricsPublisher extends HystrixMetricsPublisher {
private final String metricsRootNode;
private final MetricRegistry metricRegistry;
public HystrixCodaHaleMetricsPublisher(MetricRegistry metricRegistry) {
this(null, metricRegistry);
}
public HystrixCodaHaleMetricsPublisher(String metricsRootNode, MetricRegistry metricRegistry) {
this.metricsRootNode = metricsRootNode;
this.metricRegistry = metricRegistry;
}
@Override
public HystrixMetricsPublisherCommand getMetricsPublisherForCommand(HystrixCommandKey commandKey, HystrixCommandGroupKey commandGroupKey, HystrixCommandMetrics metrics, HystrixCircuitBreaker circuitBreaker, HystrixCommandProperties properties) {
return new HystrixCodaHaleMetricsPublisherCommand(metricsRootNode, commandKey, commandGroupKey, metrics, circuitBreaker, properties, metricRegistry);
}
@Override
public HystrixMetricsPublisherThreadPool getMetricsPublisherForThreadPool(HystrixThreadPoolKey threadPoolKey, HystrixThreadPoolMetrics metrics, HystrixThreadPoolProperties properties) {
return new HystrixCodaHaleMetricsPublisherThreadPool(metricsRootNode, threadPoolKey, metrics, properties, metricRegistry);
}
@Override
public HystrixMetricsPublisherCollapser getMetricsPublisherForCollapser(HystrixCollapserKey collapserKey, HystrixCollapserMetrics metrics, HystrixCollapserProperties properties) {
return new HystrixCodaHaleMetricsPublisherCollapser(collapserKey, metrics, properties, metricRegistry);
}
}
HystrixCodaHaleMetricsPublisher負責創建HystrixCodaHaleMetricsPublisherCommand,HystrixCodaHaleMetricsPublisherThreadPool,HystrixCodaHaleMetricsPublisherCollapser。這三個對象實現基本邏輯是在initialize方法中向metricRegistry中設置相應信息。
public void initialize() {
metricRegistry.register(createMetricName("isCircuitBreakerOpen"), new Gauge<Boolean>() {
@Override
public Boolean getValue() {
return circuitBreaker.isOpen();
}
.....
}
HystrixCodaHaleMetricsPublisherCommand向metricRegistry設置了一下metrics信息:
isCircuitBreakerOpen 是否熔斷,通過熔斷器獲得。
currentTime 當前系統時間
countBadRequests 通過HystrixCommandMetrics獲得。
統計流類實現類
Hystrix的Metrics功能模塊中存儲了與Hystrix運行相關的度量信息,主要有三類類型:
1)HystrixCommandMetrics:
保存hystrix命令執行的度量信息。
markCommandStart 當命令開始執行,調用該方法。
markCommandDone 命令執行完成,調用該方法。
getRollingCount 獲取某一事件類型窗口期內的統計數值
getCumulativeCount 獲取某一事件類型持續的統計數值
getExecutionTimePercentile 獲取某一百分比的請求執行時間
getExecutionTimeMean 獲取平均請求執行時間
getTotalTimePercentile,獲取某一百分比的請求執行總時間
getTotalTimeMean,獲取平均請求執行總時間
getRollingMaxConcurrentExecutions 獲取上一個窗口期內最大的並發數
getHealthCountsStream 獲取窗口期內的失敗次數,總次數,失敗比率
記錄以下事件類型:
HystrixRollingNumberEvent.SUCCESS 命令執行成功的
FAILURE
TIMEOUT(1), SHORT_CIRCUITED(1), THREAD_POOL_REJECTED(1), SEMAPHORE_REJECTED(1), BAD_REQUEST(1), FALLBACK_SUCCESS(1), FALLBACK_FAILURE(1), FALLBACK_REJECT
2)HystrixThreadPoolMetrics 保存hystrix線程池執行的度量信息。
markThreadCompletion 當線程吃執行一個任務時調用。
markThreadExecution 當線程池完成一個任務時調用。
getRollingCount 獲取某一事件類型窗口期內的統計數值。
getCumulativeCount 獲取某一事件類型持續的統計數值。
HystrixCommandMetrics實現:
當調用markCommandStart方法時,實際向消息流對象HystrixCommandStartStream 寫入HystrixCommandExecutionStarted消息。消息流是消息傳輸的中間件,其內部是一個RX java Subject。消息監聽者通過訂閱這些消息流來監聽這些消息。如果是線程模式執行,還需要向消息流對象HystrixThreadPoolStartStream 寫入HystrixCommandExecutionStarted消息。
當調用markCommandDone方法時, 實際向消息流對象HystrixCommandCompletionStream 寫入HystrixCommandCompletion消息。如果是線程模式執行,還需要向消息流對象HystrixThreadPoolCompletionStream 寫入HystrixCommandCompletion消息。
當調用collapserResponseFromCache方法時,實際向消息流對象HystrixCollapserEventStream寫入HystrixCollapserEvent消息。消息流是消息傳輸的中間件,其內部是一個RX java Subject。消息監聽者通過訂閱這些消息流來監聽這些消息。
當調用collapserBatchExecuted方法時,實際向消息流對象HystrixCollapserEventStream寫入HystrixCollapserEvent消息。消息流是消息傳輸的中間件,其內部是一個RX java Subject。消息監聽者通過訂閱這些消息流來監聽這些消息。
當調用getRollingCount方法時,實際從消息流對象RollingCommandEventCounterStream獲取相應的信息。RollingCommandEventCounterStream消息流監聽了HystrixCommandCompletionStream消息流,並且通過RX java 對各個消息類型進行一段時間內數據的統計。
當調用getCumulativeCount方法時,實際從消息流對象CumulativeCommandEventCounterStream獲取相應的信息。CumulativeCommandEventCounterStream消息流監聽了HystrixCommandCompletionStream消息流,並且通過RX java 對各個消息類型進行持續的數據的統計。
當調用getExecutionTimePercentile,getExecutionTimeMean方法時,實際從消息流對象RollingCommandLatencyDistributionStream獲取相應的信息。RollingCommandLatencyDistributionStream消息流監聽了HystrixCommandCompletionStream消息流。並且通過RX java對窗口期內的請求的executionLatency的分布進行計算。
當調用getTotalTimePercentile,getTotalTimeMean方法時,實際從消息流對象RollingCommandUserLatencyDistributionStream獲取相應的信息。RollingCommandUserLatencyDistributionStream消息流監聽了HystrixCommandCompletionStream消息流。並且通過RX java對窗口期內的請求的totalLatency的分布進行計算。
當調用getHealthCountsStream,實際從消息流對象HealthCountsStream獲取想要信息,HealthCountsStream消息流監聽了HystrixCommandCompletionStream消息流,並且通過RX java對窗口期內的請求成功,失敗,超時,進行統計得出失敗次數,總次數,失敗比率。
當調用getRollingMaxConcurrentExecutions,實際從消息流對象RollingCommandMaxConcurrencyStream獲取相應的信息。RollingCommandMaxConcurrencyStream消息流監聽了HystrixCommandStartStream消息流,並且通過RX java 獲取窗口期內最大的並發數。
HystrixThreadPoolMetrics實現:
當調用getRollingCount方法時,實際從消息流對象RollingThreadPoolEventCounterStream獲取相應的信息。RollingThreadPoolEventCounterStream消息流監聽了HystrixCommandExecutionStarted消息流,並且進行一段時間內數據的統計。
當前服務的健康狀況, 包括服務調用總次數和服務調用失敗次數等. 根據Metrics的計數, 熔斷器從而能計算出當前服務的調用失敗率, 用來和設定的閾值比較從而決定熔斷器的狀態切換邏輯. 因此Metrics的實現非常重要。
HystrixRollingNumber統計一定時間內的統計數值,基本思想就是分段統計,比如說要統計qps,即1秒內的請求總數。如下圖所示,我們可以將1s的時間分成10段,每段100ms。在第一個100ms內,寫入第一個段中進行計數,在第二個100ms內,寫入第二個段中進行計數,這樣如果要統計當前時間的qps,我們總是可以通過統計當前時間前1s(共10段)的計數總和值。