hystrix(2) metrics


  上一節講到了hystrix提供的五個功能,這一節我們首先來講hystrix中提供實時執行metrics信息的實現。為什么先講metrics,因為很多功能都是基於metrics的數據來實現的,它是很多功能實現的基礎。

  首先來看一下通過hystrix調用服務的過程中會產生那些類型的metrics信息:

  1.某一事件的持續指標。

  2.某一事件窗口時間內持續指標。

  3.某一事件窗口時間內最大指標。

  4.某一事件窗口時間內指標分布。

  在來看一下這些數據在hystrix中是如何產生、計算和流轉的。

  hystrix在執行服務調用的過程中會產生各類事件,執行模塊首先將這些事件發送的metrics接受流中,而metrics統計流會監聽metrics接受流,計算出各類統計數據。

metrics接收流

  hystrix有以下接收流和對應接收的消息

接收流 接收消息 說明
HystrixCommandStartStream HystrixCommandExecutionStarted 命令開始執行消息流
HystrixCommandCompletionStream HystrixCommandCompletion 命令完成執行消息流
HystrixThreadPoolStartStream HystrixCommandExecutionStarted 線程池開始執行消息流
HystrixThreadPoolCompletionStream HystrixCommandCompletion 線程池執行完成消息流
HystrixCollapserEventStream HystrixCollapserEvent 合並命令執行消息流

  metrics接收流使用單例模式,HystrixCommandKey,HystrixThreadPoolKey,HystrixCollapserKey分別對應同一個(HystrixCommandStartStream、HystrixCommandCompletionStream),(HystrixThreadPoolStartStream,HystrixThreadPoolCompletionStream),(HystrixCollapserEventStream)。

  內部使用rxjava來實現消息機制

    HystrixCommandStartStream(final HystrixCommandKey commandKey) {
        this.commandKey = commandKey;
        this.writeOnlySubject = new SerializedSubject<HystrixCommandExecutionStarted, HystrixCommandExecutionStarted>(PublishSubject.<HystrixCommandExecutionStarted>create());
        this.readOnlyStream = writeOnlySubject.share();
    }

  此外還提供了HystrixThreadEventStream統一執行接收消息然后發送到各個消息接收流類。

metrics接受流消息體

  上面講了hystrix metrics的接收流,接下來我們看看接收流具體接收的內容。

消息體 內容
HystrixCommandExecutionStarted 內部包括了該命令的執行策略和並發數。
HystrixCommandCompletion 內部包含執行結果對象ExecutionResult和請求上下文對象HystrixRequestContext
ExecutionResult
    private final EventCounts eventCounts;//事件數量
    private final Exception failedExecutionException;//失敗異常
    private final Exception executionException; //執行異常
    private final long startTimestamp;//命令開始執行時間
    private final int executionLatency; //執行run的時間
    private final int userThreadLatency; //請求提交到執行結束的時間
    private final boolean executionOccurred;//ture 執行過命令 false 未執行過命令
    private final boolean isExecutedInThread;//ture 使用線程池執行 false 不是使用線程池執行
    private final HystrixCollapserKey collapserKey;
EventCounts
   private final BitSet events;事件類型
   private final int numEmissions//emission次數
   private final int numFallbackEmissions;//fallback次數
   private final int numCollapsed;//合並格式
HystrixCollapserEvent
private final HystrixCollapserKey collapserKey;//合並命令key
private final HystrixEventType.Collapser eventType;事件類型
private final int count;次數

事件類型:

  HystrixCommand只返回一個數據,當返回值時發生SUCCESS事件,執行失敗時,發生FAILURE事件,HystrixObservableCommand可以返回多個值,當返回值時發生EMIT事件,當命令完成時,發生SUCCESS事件,執行失敗時,發生FAILURE事件。

名稱 描述 是否fallback
EMIT value返回,只在HystrixObservableCommand NO
SUCCESS 執行成功 NO
FAILURE 執行拋出異常 YES
TIMEOUT 超時 YES
BAD_REQUEST 拋出HystrixBadRequestException NO
SHORT_CIRCUITED 熔斷 YES
THREAD_POOL_REJECTED 線程池拒絕 YES
SEMAPHORE_REJECTED 信號量拒絕 YES

Fallback事件類型

名稱 描述 是否拋出異常
FALLBACK_EMIT fallback 返回值,只在HystrixObservableCommand NO
FALLBACK_SUCCESS fallback 執行完成 NO
FALLBACK_FAILURE fallback執行失敗 YES
FALLBACK_REJECTION fallback拒絕執行 YES
FALLBACK_MISSING 沒有fallback實現 YES

其他命令類型

名稱 描述
EXCEPTION_THROWN 執行命令值拋出異常
RESPONSE_FROM_CACHE 從緩存中獲取值
CALLAPSED 命令聚合執行

線程池類型

名稱 描述
EXECUTED 線程池執行一個命令
REJECTED 線程池拒絕執行命令

聚合事件類型

名稱 描述
BATCH_EXECUTED 執行一個batch批量執行
ADDED_TO_BATCH 參數添加到batch中
RESPONSE_FROM_CACHE 從緩存中獲取值

 

metrics統計流

  hystrix有以下統計流

類別 統計流 監聽接收流 說明
窗口時間內持續統計 RollingCommandEventCounterStream HystrixCommandCompletionStream 統計各種消息類型窗口期內次數
RollingCollapserEventCounterStream HystrixCollapserEventStream 統計各種消息類型窗口期內次數
RollingThreadPoolEventCounterStream HystrixThreadPoolCompletionStream 統計各種消息類型窗口期內次數
HealthCountsStream HystrixThreadPoolCompletionStream 統計總調用次數,失敗次數,失敗率
持續統計流 CumulativeCommandEventCounterStream HystrixCommandCompletionStream 持久統計各種消息類型次數
CumulativeCollapserEventCounterStream HystrixCollapserEventStream 持久統計各種消息類型次數
CumulativeThreadPoolEventCounterStream HystrixThreadPoolCompletionStream 持久統計各種消息類型次數
窗口時間內分布統計 RollingCommandLatencyDistributionStream HystrixCommandCompletionStream消息流的executelatency事件 通過Histogram計算窗口期內的分布
RollingCommandUserLatencyDistributionStream HystrixCommandCompletionStream消息流的totalLatency事件 通過Histogram計算窗口期內的分布
RollingCollapserBatchSizeDistributionStream HystrixCollapserEventStream消息流的ADDED_TO_BATCH消息 通過Histogram計算窗口期內的分布
窗口時間內最大值統計流 RollingCommandMaxConcurrencyStream HystrixCommandStartStream 窗口期內的執行並發量取最大值
RollingThreadPoolMaxConcurrencyStream HystrixThreadPoolStartStream 窗口期內的執行並發量取最大值

  窗口時間內持續統計流首先監聽一個消息接受流,統計一段時間內各個類型消息的累計數據(時間為:metrics.rollingStats.timeInMilliseconds/metrics.rollingStats.numBuckets)。然后再對累計的數據進行累加(個數為:metrics.rollingStats.numBuckets),即為最終累計數據。

  持續統計流首先監聽一個消息流(開始消息流或者完成消息流),統計一段時間內各個類型消息的累計數據(時間為:metrics.rollingStats.timeInMilliseconds/metrics.rollingStats.numBuckets)。然后不斷的累加累計數據。

  窗口時間內分布統計流首先監聽一個消息流,統計一段時間內各個類型消息存放在Histogram對象中(時間為:metrics.rollingStats.timeInMilliseconds/metrics.rollingStats.numBuckets),然后對(個數為:metrics.rollingStats.numBuckets)內的Histogram對象進行運算操作,即為窗口期內某一時間的分布。

  RollingConcurrencyStream監聽一個消息流,例如HystrixCommandStartStream,然后通過RX java對一段時間內的執行並發量取最大值,重新發射,對窗口期內的執行並發量取最大值,重新發射。

  metrics統計流使用單例模式,每個統計流分別對應一個HystrixCommandKey,HystrixThreadPoolKey,HystrixCollapserKey。

metrics模塊

  hystrix中可以通過metrics模塊來獲取執行過程中的數據,主要有三部分數據:命令執行metrics,線程池metrics,合並命令執行metrics,每個HystrixCommandKey、HystrixThreadPoolKey、HystrixCollapserKey對應一個相應的metrics(HystrixCommandMetrics,HystrixThreadPoolMetrics,HystrixCollapserMetrics)。metrics模塊內部是通過監聽消息流來獲取各個指標的統計數據。

命令執行metrics

  HystrixCommandMetrics為命令執行模塊的metrics,在其初始化時會創建6個統計數據流:HealthCountsStream、RollingCommandEventCounterStream、CumulativeCommandEventCounterStream、RollingCommandLatencyDistributionStream、RollingCommandUserLatencyDistributionStream、RollingCommandMaxConcurrencyStream,通過這些統計數據流來獲取相應metrics信息。


private HealthCountsStream healthCountsStream; private final RollingCommandEventCounterStream rollingCommandEventCounterStream; private final CumulativeCommandEventCounterStream cumulativeCommandEventCounterStream; private final RollingCommandLatencyDistributionStream rollingCommandLatencyDistributionStream; private final RollingCommandUserLatencyDistributionStream rollingCommandUserLatencyDistributionStream; private final RollingCommandMaxConcurrencyStream rollingCommandMaxConcurrencyStream; /* package */HystrixCommandMetrics(final HystrixCommandKey key, HystrixCommandGroupKey commandGroup, HystrixThreadPoolKey threadPoolKey, HystrixCommandProperties properties, HystrixEventNotifier eventNotifier) { super(null); healthCountsStream = HealthCountsStream.getInstance(key, properties); rollingCommandEventCounterStream = RollingCommandEventCounterStream.getInstance(key, properties); cumulativeCommandEventCounterStream = CumulativeCommandEventCounterStream.getInstance(key, properties); rollingCommandLatencyDistributionStream = RollingCommandLatencyDistributionStream.getInstance(key, properties); rollingCommandUserLatencyDistributionStream = RollingCommandUserLatencyDistributionStream.getInstance(key, properties); rollingCommandMaxConcurrencyStream = RollingCommandMaxConcurrencyStream.getInstance(key, properties); }
  //獲取指定事件窗口期內數據指標 public long getRollingCount(HystrixEventType eventType) { return rollingCommandEventCounterStream.getLatest(eventType); }
  //獲取指定事件持續的數據指標 public long getCumulativeCount(HystrixEventType eventType) { return cumulativeCommandEventCounterStream.getLatest(eventType); }//獲取某一百分比的執行時間public int getExecutionTimePercentile(double percentile) { return rollingCommandLatencyDistributionStream.getLatestPercentile(percentile); }//獲取平均的執行時間 public int getExecutionTimeMean() { return rollingCommandLatencyDistributionStream.getLatestMean(); } //獲取某一百分比的總時間 public int getTotalTimePercentile(double percentile) { return rollingCommandUserLatencyDistributionStream.getLatestPercentile(percentile); }//獲取平均的總時間 public int getTotalTimeMean() { return rollingCommandUserLatencyDistributionStream.getLatestMean(); } //獲取窗口期內最大並發量 public long getRollingMaxConcurrentExecutions() { return rollingCommandMaxConcurrencyStream.getLatestRollingMax(); }//獲取當前並發量 public int getCurrentConcurrentExecutionCount() { return concurrentExecutionCount.get(); } //獲取命令執行健康情況 public HealthCounts getHealthCounts() { return healthCountsStream.getLatest(); }

 

線程池metrics

  HystrixThreadPoolMetrics為線程池執行模塊的metrics,在其初始化時會獲取3個數據流:RollingThreadPoolEventCounterStream、CumulativeThreadPoolEventCounterStream、RollingThreadPoolMaxConcurrencyStream通過這些統計流獲得相應的統計數據。

    private final RollingThreadPoolEventCounterStream rollingCounterStream;
    private final CumulativeThreadPoolEventCounterStream cumulativeCounterStream;
    private final RollingThreadPoolMaxConcurrencyStream rollingThreadPoolMaxConcurrencyStream;
    private HystrixThreadPoolMetrics(HystrixThreadPoolKey threadPoolKey, ThreadPoolExecutor threadPool, HystrixThreadPoolProperties properties) {
        super(null);
        this.threadPoolKey = threadPoolKey;
        this.threadPool = threadPool;
        this.properties = properties;
        rollingCounterStream = RollingThreadPoolEventCounterStream.getInstance(threadPoolKey, properties);
        cumulativeCounterStream = CumulativeThreadPoolEventCounterStream.getInstance(threadPoolKey, properties);
        rollingThreadPoolMaxConcurrencyStream = RollingThreadPoolMaxConcurrencyStream.getInstance(threadPoolKey, properties);
    }
/**
     獲取窗口期內線程池執行的個數*/
    public long getRollingCountThreadsExecuted() {
        return rollingCounterStream.getLatestCount(HystrixEventType.ThreadPool.EXECUTED);
    }

    /**
     獲取持續的線程池執行個數*/
    public long getCumulativeCountThreadsExecuted() {
        return cumulativeCounterStream.getLatestCount(HystrixEventType.ThreadPool.EXECUTED);
    }

    /**
    獲取窗口期內線程池拒絕的個數*/
    public long getRollingCountThreadsRejected() {
        return rollingCounterStream.getLatestCount(HystrixEventType.ThreadPool.REJECTED);
    }

    /**
    獲取持續內線程池拒絕的個數*/
    public long getCumulativeCountThreadsRejected() {
        return cumulativeCounterStream.getLatestCount(HystrixEventType.ThreadPool.REJECTED);
    }
    //獲取指定事件窗口期內數據指標
    public long getRollingCount(HystrixEventType.ThreadPool event) {
        return rollingCounterStream.getLatestCount(event);
    }
   //獲取指定事件持續的數據指標
    public long getCumulativeCount(HystrixEventType.ThreadPool event) {
        return cumulativeCounterStream.getLatestCount(event);
    }/**
    獲取窗口期內最大並發量*/
    public long getRollingMaxActiveThreads() {
        return rollingThreadPoolMaxConcurrencyStream.getLatestRollingMax();
    }

 還有一些根據線程池獲取線程池當前指標

    public Number getCurrentActiveCount() {
        return threadPool.getActiveCount();
    }
    public Number getCurrentCompletedTaskCount() {
        return threadPool.getCompletedTaskCount();
    }
    public Number getCurrentCorePoolSize() {
        return threadPool.getCorePoolSize();
    }
    public Number getCurrentLargestPoolSize() {
        return threadPool.getLargestPoolSize();
    }
    public Number getCurrentMaximumPoolSize() {
        return threadPool.getMaximumPoolSize();
    }
    public Number getCurrentPoolSize() {
        return threadPool.getPoolSize();
    }
    public Number getCurrentTaskCount() {
        return threadPool.getTaskCount();
    }
    public Number getCurrentQueueSize() {
        return threadPool.getQueue().size();
    }

 合並命令執行metrics

  HystrixCollapserMetrics為合並命令執行模塊的metrics,在其初始化時會創建3個數據流:RollingCollapserEventCounterStream、CumulativeCollapserEventCounterStream、RollingCollapserBatchSizeDistributionStream,通過這些統計流獲得相應的統計數據。

    private final RollingCollapserEventCounterStream rollingCollapserEventCounterStream;
    private final CumulativeCollapserEventCounterStream cumulativeCollapserEventCounterStream;
    private final RollingCollapserBatchSizeDistributionStream rollingCollapserBatchSizeDistributionStream;

    /* package */HystrixCollapserMetrics(HystrixCollapserKey key, HystrixCollapserProperties properties) {
        super(null);
        rollingCollapserEventCounterStream = RollingCollapserEventCounterStream.getInstance(key, properties);
        cumulativeCollapserEventCounterStream = CumulativeCollapserEventCounterStream.getInstance(key, properties);
        rollingCollapserBatchSizeDistributionStream = RollingCollapserBatchSizeDistributionStream.getInstance(key, properties);
    }
    //獲取指定事件窗口期內數據指標
    public long getRollingCount(HystrixEventType.Collapser collapserEventType) {
        return rollingCollapserEventCounterStream.getLatest(collapserEventType);
    }
    //獲取指定事件持續的數據指標
    public long getCumulativeCount(HystrixEventType.Collapser collapserEventType) {
        return cumulativeCollapserEventCounterStream.getLatest(collapserEventType);
    }
    //獲取指定百分比的batchsize
    public int getBatchSizePercentile(double percentile) {
        return rollingCollapserBatchSizeDistributionStream.getLatestPercentile(percentile);
    }
    //獲取平均的batchsize
    public int getBatchSizeMean() {
        return rollingCollapserBatchSizeDistributionStream.getLatestMean();
    }

 

其他流

HystrixConfigurationStream

  該數據流定時將hystrix的最新properties配置,發送到該消息流中。com.netflix.hystrix.contrib.sample.stream.HystrixConfigSseServlet就是用該流來獲取配置信息。

public HystrixConfigurationStream(final int intervalInMilliseconds) {
        this.intervalInMilliseconds = intervalInMilliseconds; this.allConfigurationStream = Observable.interval(intervalInMilliseconds, TimeUnit.MILLISECONDS) .map(getAllConfig) .doOnSubscribe(new Action0() { @Override public void call() { isSourceCurrentlySubscribed.set(true); } }) .doOnUnsubscribe(new Action0() { @Override public void call() { isSourceCurrentlySubscribed.set(false); } }) .share() .onBackpressureDrop(); } private static final Func1<Long, HystrixConfiguration> getAllConfig = new Func1<Long, HystrixConfiguration>() { @Override public HystrixConfiguration call(Long timestamp) { return HystrixConfiguration.from( getAllCommandConfig.call(timestamp), getAllThreadPoolConfig.call(timestamp), getAllCollapserConfig.call(timestamp) ); } };
private static final Func1<Long, Map<HystrixCommandKey, HystrixCommandConfiguration>> getAllCommandConfig =
            new Func1<Long, Map<HystrixCommandKey, HystrixCommandConfiguration>>() {
                @Override
                public Map<HystrixCommandKey, HystrixCommandConfiguration> call(Long timestamp) { Map<HystrixCommandKey, HystrixCommandConfiguration> commandConfigPerKey = new HashMap<HystrixCommandKey, HystrixCommandConfiguration>(); for (HystrixCommandMetrics commandMetrics: HystrixCommandMetrics.getInstances()) { HystrixCommandKey commandKey = commandMetrics.getCommandKey(); HystrixThreadPoolKey threadPoolKey = commandMetrics.getThreadPoolKey(); HystrixCommandGroupKey groupKey = commandMetrics.getCommandGroup(); commandConfigPerKey.put(commandKey, sampleCommandConfiguration(commandKey, threadPoolKey, groupKey, commandMetrics.getProperties())); } return commandConfigPerKey; } }; private static final Func1<Long, Map<HystrixThreadPoolKey, HystrixThreadPoolConfiguration>> getAllThreadPoolConfig = new Func1<Long, Map<HystrixThreadPoolKey, HystrixThreadPoolConfiguration>>() { @Override public Map<HystrixThreadPoolKey, HystrixThreadPoolConfiguration> call(Long timestamp) { Map<HystrixThreadPoolKey, HystrixThreadPoolConfiguration> threadPoolConfigPerKey = new HashMap<HystrixThreadPoolKey, HystrixThreadPoolConfiguration>(); for (HystrixThreadPoolMetrics threadPoolMetrics: HystrixThreadPoolMetrics.getInstances()) { HystrixThreadPoolKey threadPoolKey = threadPoolMetrics.getThreadPoolKey(); threadPoolConfigPerKey.put(threadPoolKey, sampleThreadPoolConfiguration(threadPoolKey, threadPoolMetrics.getProperties())); } return threadPoolConfigPerKey; } }; private static final Func1<Long, Map<HystrixCollapserKey, HystrixCollapserConfiguration>> getAllCollapserConfig = new Func1<Long, Map<HystrixCollapserKey, HystrixCollapserConfiguration>>() { @Override public Map<HystrixCollapserKey, HystrixCollapserConfiguration> call(Long timestamp) { Map<HystrixCollapserKey, HystrixCollapserConfiguration> collapserConfigPerKey = new HashMap<HystrixCollapserKey, HystrixCollapserConfiguration>(); for (HystrixCollapserMetrics collapserMetrics: HystrixCollapserMetrics.getInstances()) { HystrixCollapserKey collapserKey = collapserMetrics.getCollapserKey(); collapserConfigPerKey.put(collapserKey, sampleCollapserConfiguration(collapserKey, collapserMetrics.getProperties())); } return collapserConfigPerKey; } };

metrics發布

  有時,我們需要發布Hystrix中的metrics到其他地方,Hystrix提供了相應的接口(HystrixMetricsPublisherCollapser,HystrixMetricsPublisherCommand,HystrixMetricsPublisherThreadPool),實現這些接口,並在initial方法中實現發送hystrix的metrics的邏輯。實現HystrixMetricsPublisher,來創建這些實現類。其實hystrix對metrics的發布只是定義了接口和initial方法。Hystrix運行時,HystrixMetricsPublisherFactory通過HystrixPlugins獲取HystrixMetricsPublisher的實現類。並且通過該實現類來創建(HystrixMetricsPublisherCollapser,HystrixMetricsPublisherCommand,HystrixMetricsPublisherThreadPool)的實現類,並在初次創建時調用其initial方法。

例如:

  通過coda hale 實現了將hystrix 的metrics信息輸出到指定metrics監控系統中。

  引入jar包:

     <dependency>
            <groupId>com.netflix.hystrix</groupId>
            <artifactId>hystrix-codahale-metrics-publisher</artifactId>
            <version>1.5.9</version>
        </dependency>

  創建HystrixMetricsPublisher對象並注冊到HystrixPlugins:

  @Bean

    HystrixMetricsPublisher hystrixMetricsPublisher() {

        HystrixCodaHaleMetricsPublisher publisher = new HystrixCodaHaleMetricsPublisher(metricRegistry);

        HystrixPlugins.getInstance().registerMetricsPublisher(publisher);

        return publisher;

    }

  coda hale實現源碼如下:

public class HystrixCodaHaleMetricsPublisher extends HystrixMetricsPublisher {

      private final String metricsRootNode;

      private final MetricRegistry metricRegistry;

      public HystrixCodaHaleMetricsPublisher(MetricRegistry metricRegistry) {

          this(null, metricRegistry);

      }

      public HystrixCodaHaleMetricsPublisher(String metricsRootNode, MetricRegistry metricRegistry) {

          this.metricsRootNode = metricsRootNode;

          this.metricRegistry = metricRegistry;

      }

      @Override

      public HystrixMetricsPublisherCommand getMetricsPublisherForCommand(HystrixCommandKey commandKey, HystrixCommandGroupKey commandGroupKey, HystrixCommandMetrics metrics, HystrixCircuitBreaker circuitBreaker, HystrixCommandProperties properties) {

          return new HystrixCodaHaleMetricsPublisherCommand(metricsRootNode, commandKey, commandGroupKey, metrics, circuitBreaker, properties, metricRegistry);

      }

      @Override

      public HystrixMetricsPublisherThreadPool getMetricsPublisherForThreadPool(HystrixThreadPoolKey threadPoolKey, HystrixThreadPoolMetrics metrics, HystrixThreadPoolProperties properties) {

          return new HystrixCodaHaleMetricsPublisherThreadPool(metricsRootNode, threadPoolKey, metrics, properties, metricRegistry);

      }

      @Override

      public HystrixMetricsPublisherCollapser getMetricsPublisherForCollapser(HystrixCollapserKey collapserKey, HystrixCollapserMetrics metrics, HystrixCollapserProperties properties) {

          return new HystrixCodaHaleMetricsPublisherCollapser(collapserKey, metrics, properties, metricRegistry);

      }

  }

  HystrixCodaHaleMetricsPublisher負責創建HystrixCodaHaleMetricsPublisherCommand,HystrixCodaHaleMetricsPublisherThreadPool,HystrixCodaHaleMetricsPublisherCollapser。這三個對象實現基本邏輯是在initialize方法中向metricRegistry中設置相應信息。

public void initialize() {      metricRegistry.register(createMetricName("isCircuitBreakerOpen"), new Gauge<Boolean>() {

      @Override

      public Boolean getValue() {

           return circuitBreaker.isOpen();

      }

    .....

 }

  

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM