深入Ambari Metrics 機制分析


0.簡介

  Ambari作為一款針對大數據平台的運維管理工具,提供了集群的創建,管理,監控,升級等多項功能,目前在業界已經得到廣泛使用。

Ambari指標系統( Ambari Metrics System,以下簡稱AMS)主要負責監控平台各類服務及主機的運行情況,提供各類服務及主機的相關指標,從而達到判斷集群健康情況的目的,其重要性不言而喻。 

本文是在深入閱讀AMS源代碼的基礎之上,力求能夠從監控指標的采集、存儲、聚合及指標獲取4個層面詳細闡述AMS的整個工作機制。 


這里寫圖片描述 
圖 1 AMS架構圖 

 

1.AMS指標采集

  對於 AMS 本身來說,涉及的主要模塊有 Metrics Monitor、Hadoop Sinks(此處統稱,其中還包含kafka,storm,flume等服務的sinks,嚴格地來說應叫service sinks) 以及 Metrics Collector。

AMS 也是一個 Master-Slave 結構的框架。Master 模塊便是 Metrics Collector,Slave 則是 Metrics Monitor 和 Hadoop Sinks。Slave 模塊負責收集信息,並發送給 Collector。

當然 Metrics Monitor 和 Hadoop Sinks 也有不同的職責,前者主要負責收集機器本身相關的指標,例如 CPU、Mem、Disk 相關信息;后者則負責收集 Hadoop 相關 Service 模塊的性能數據,例如該模塊Namenode占用了多少 Mem,以及該模塊的 CPU 占用率等。

1.1 指標收集

  關於指標的采集,此處以Flume服務為例,剖析AMS是如何采集Flume運行的相應指標的。Ambari內置了FlumeTimelineMetricsSink這樣的jar包,通過Ambari啟動flume服務,ambari會在flume的啟動腳本參數中加入以下兩項:

-Dflume.monitoring.type=org.apache.hadoop.metrics2.sink.flume.FlumeTimelineMetricsSink 
-Dflume.monitoring.node=<AMS_HOST>:6188

   其中即為AMS collector的節點名字,而6188則是collector中的Timeline Server對外提供的默認端口,以此來向Timeline Server推送數據。 
   接下來再看一下FlumeTimelineMetricsSink jar包的結構,其中就包含一個FlumeTimelineMetricsSink類,繼承自AbstractTimelineMetricsSink抽象類並實現MetricsSink接口,如上所示的所有服務的sink包基本都采用這樣的結構。 
FlumeTimelineMetricsSink類中內置一個TimelineMetricsCollector線程,在flume啟動FlumeTimelineMetricsSink jar包時,其就通過其start方法中的線程調度器來輪詢調度TimelineMetricsCollector線程,而在此線程中主段代碼如下所示。

@Override
  public void start() {
    LOG.info("Starting Flume Metrics Sink");
    TimelineMetricsCollector timelineMetricsCollector = new TimelineMetricsCollector();
    if (scheduledExecutorService == null || scheduledExecutorService.isShutdown() || scheduledExecutorService.isTerminated()) {
      scheduledExecutorService = Executors.newSingleThreadScheduledExecutor();
    }
    scheduledExecutorService.scheduleWithFixedDelay(timelineMetricsCollector, 0,
        pollFrequency, TimeUnit.MILLISECONDS);
  }  }

  從上面可看出Start方法中采取線程池的方法,以pollFrequency(可配置)的周期間隔,調度TimelineMetricsCollector線程,再細看一下TimelineMetricsCollector線程的詳細說明,其主要代碼如下所示。

class TimelineMetricsCollector implements Runnable {
    @Override
    public void run() {
      try {
        Map<String, Map<String, String>> metricsMap = JMXPollUtil.getAllMBeans();
        long currentTimeMillis = System.currentTimeMillis();
        for (String component : metricsMap.keySet()) {
          Map<String, String> attributeMap = metricsMap.get(component);
          LOG.debug("Attributes for component " + component);
          processComponentAttributes(currentTimeMillis, component, attributeMap);
        }
      } 

   TimelineMetricsCollector線程輪循從服務的JMX端口中獲取指標數據,形成Map對象,並通過processComponentAttributes方法進行邏輯轉換后再發送。

1.2 指標推送

   由上面源碼可以看出,本質上,AMS的監控數據還是從各服務JMX端口中取得的,再通過processComponentAttributes方法邏輯上轉換成AMS的內部的TimelineMetrics,通過emitMetrics方法post到Timeline Server(emitMetrics方法正是從AbstractTimelineMetricsSink類繼承而來),其接口為: 
http://:6188/ws/v1/timeline/metrics 
如下是emitMetrics方法的部分片段,從中可以看出,emit方法最終還是將指標數據轉化成json格式的數據,通過接口推送至TimelineServer。

protected void emitMetrics(TimelineMetrics metrics) throws IOException {
    String connectUrl = getCollectorUri();
    try {
      String jsonData = mapper.writeValueAsString(metrics);
      StringRequestEntity requestEntity = new StringRequestEntity(jsonData, "application/json", "UTF-8");
      PostMethod postMethod = new PostMethod(connectUrl);
      postMethod.setRequestEntity(requestEntity);
      int statusCode = httpClient.executeMethod(postMethod);

  若是轉換成curl命令的形式,則通過以下這樣一條命令進行推送數據:

curl -i -X POST -H "Content-Type: application/json" -d "${json}" ${url}

   其中json為轉化成json的metrics數據,url為上面接口。 
emitMetrics方法或curl命令發送的url最終會被Timeline server所截獲,再通過TimelineMetricStore類以phonenix接口方式存儲到hbase數據庫中,如下文TimelineWebServices類代碼所示。

@Path("/metrics")
@POST
@Consumes({ MediaType.APPLICATION_JSON /* , MediaType.APPLICATION_XML */})
  public TimelinePutResponse postMetrics(
    @Context HttpServletRequest req,
    @Context HttpServletResponse res,
    TimelineMetrics metrics) {
    init(res);
    if (metrics == null) {
      return new TimelinePutResponse();
    }
    try {
      // TODO: Check ACLs for MetricEntity using the TimelineACLManager.
      // TODO: Save owner of the MetricEntity.
      return timelineMetricStore.putMetrics(metrics);
}

  上文描述了Flume服務指標推送的大概過程,服務運行時主動推送指標, AMS接收推送指標。其它各類服務如hadoop,kafka,storm均以此種方式進行指標的推送,在此不再作詳細討論。

2.AMS指標存儲

  AMS采集到的服務指標通過http/https的方式推送到timeline server,timeline server內部嵌入了一個hbase,通過phoenix(nosql之上的sql查詢)將指標數據存入到hbase中。 
   Hbase庫中總共有7張表,其相應表名如下表所示。 
  


表2-1 Metrics指標存儲表 
這里寫圖片描述 


  雖然庫中一共有7張表,但是實際存儲指標數據的只有METRIC_RECORD表,其它各表是在其基礎之上做相應的統計及聚合而成的,下表是METRIC_RECORD表詳細說明。 
  


表2-2 METRIC_RECORD表字段說明 
這里寫圖片描述 


  該表是所有表中唯一存儲實際metrics數據的表,其它表都是在此表的基礎之上進行時間段的相應統計。 
  (1)針對采集的hosts指標,即由monitor發送的指標值 
   采集的metric記錄,由{時間戳1:值,時間戳2:值,….}這樣的記錄組成,其中數目表現在Metric_count上,對於monitor發送的metric。為12條,每條間隔5秒種,然后一分鍾向timelineServer發送一次,存入表中。 
  (2)針對采集的hadoop sink指標 
   采集的metric記錄,由{時間戳1:值,時間戳2:值,….}這樣的記錄組成,每條間隔10秒鍾,每隔70秒發送一次,采集7條,所以metric_count為7,一分鍾向timelineServer發送一次,存入表中。 
  表METRIC_RECORD_MINUTE是按分鍾進行統計的,默認一次統計時間是5min(可配置),該表實則是以METRIC_RECORD表的數據作為統計的基准。下表對METRIC_RECORD_MINUTE做了詳細說明。 
  


表2-3 METRIC_RECORD_MINUTE字段說明 
這里寫圖片描述 


   假設5分鍾統計一次,以mem_free為例,則本次統計是以主機為單位,假設在metric_record表中,某主機每隔一分鍾發送一條mem_free的Record,一條record中有12條metric values,則本次統計共有5條Record,metric_count則為60條。同樣的,這五分鍾內的最大,最小和總和,只需要比對提取Metric_record中這60條的Record的最大,最小,以及5條總和即能統計出這5分鍾內相應的屬性。 
  類似於這樣幾條語句得以統計:

1select hostname,max(metric_max) from metric_record where metric_name='mem_free' and server_time>=1471503845324 and server_time<1471504146520 group by hostname;------統計5分鍾內,每台主機上該metric的最大值。
  (2select hostname,min(metric_min) from metric_record where metric_name='mem_free' and server_time>=1471503845324 and server_time<1471504146520 group by hostname; ------統計5分鍾內,每台主機上該metric的最小值。
  (3select hostname,sum(metric_sum) from metric_record where metric_name='mem_free' and server_time>=1471503845324 and server_time<1471504146520 group by hostname; ------統計5分鍾內,每台主機上該metric值總和。
  (4select hostname,sum(metric_count) from metric_record where metric_name='mem_free' and server_time>=1471503845324 and server_time<1471504146520 group by hostname; ------統計5分鍾內,每台主機上統計的該metric數量和。

  至於METRIC_RECORD_HOURLY以及METRIC_RECORD_DAILY表其原理均是參照MINUTE表的原理,只是時間區間擴大了,已經參照的數據表變更了,METRIC_RECORD_HOURLY以METRIC_RECORD_MINUTE的數據為基准,而METRIC_RECORD_DAILY則以METRIC_RECORD_HOURLY的數據為基准進行統計,在此就不再描述了。

3.AMS指標聚集

   上文中所統計的7張表,除了以METRIC_RECORD前綴的表之外,還有METRIC_AGGREGATE作為前綴的表,這就是集群的指標聚集表,在聚集表中不區分host,只是以service(APP_ID) 
進行分組統計,其數據來源也是從METRIC_RECORD表中進行查詢后然后再進行聚集的,下表是表字段的詳細說明。 


表3-1 METRIC_AGGREGATE表字段詳細說明 
這里寫圖片描述 


   以Metric_Name為主要指標,由於是集群級別的統計,所以不再有HOSTNAME相關字段的說明,在此表中增加了HOSTS_COUNT的字段,即聚集的Metric來自主機的數量。 
  實際中不存在表METRIC_AGGREGATE_MINUTE,但是在圖3-1中可以看到一個單獨的TimelineMetricClusterAggregatorMinute聚集線程類,每2分鍾聚集一次,其聚集結果采取了分片的方式,30秒一個片區記錄,即0-30秒,30-60秒,60-90秒,90-120秒分成4個片區,每個時間段采集的記錄分別存到對應的片區記錄中,而各個片區記錄直接存入到METRIC_AGGREGATE表中,該表間隔記錄為30秒一條,是在表METRIC_AGGREGATE_MINUTE的基礎之上進行分片的,之所以這么設定,也是為了防止頻繁的聚集對AMS造成過大負載。 
  這個分片時間是由參數timeline.metrics.cluster.aggregator.minute.timeslice.interval進行設定的,所以METRIC_AGGREGATE表其實是METRIC_AGGREGATE_MINUTE表聚合統計的結果之上,再對其結果按時間進行分片而形成的。 
  指標聚集由TimelineMetricAggregator起始,其作為一個接口,也是一個線程,其實現子類如下圖所示。 
  


   這里寫圖片描述 
圖3-1 TimelineMetricAggregator層次圖 


   在AbstractTimelineAggregator類中實現了run方法,其中通過輪循調度doWork方法來實現聚集,調度時間可配置,如下是doWork程序代碼。

@Override
  public boolean doWork(long startTime, long endTime) {
    boolean success = true;
    Condition condition = prepareMetricQueryCondition(startTime, endTime);
    Connection conn = null;
    PreparedStatement stmt = null;
    ResultSet rs = null;
    try {
      conn = hBaseAccessor.getConnection();
      stmt = PhoenixTransactSQL.prepareGetMetricsSqlStmt(conn, condition);
      if (condition.doUpdate()) {
        int rows = stmt.executeUpdate();
        conn.commit();
      } else {
        rs = stmt.executeQuery();
      }
      aggregate(rs, startTime, endTime);
      }

  通過以上代碼可以看到,在doWork方法中,首先還是通過phoenix接口查詢到數據集,再對數據集進行聚集(aggregate方法中),針對不同的AbstractTimelineAggregator的子類,具有不同的aggregate方法,最終存入到METRIC_AGGREGATE表中,如此實現整個AMS的指標聚集功能。

4.AMS指標獲取

  AMS提供了2種獲取指標的接口,分別是Collector提供的API以及Ambari Server的API接口。其中前一種方式更接近原生的指標數據,而后一種方式更為常用,應該說整個Ambari上層獲取指標的方式都是采取后者,而后者在底層本質上還是調用的第一種方式,拿到庫中的原生數據,再進行加工及邏輯處理,最后返回到WEB端。

4.1 Collector API

http://<AMS_HOST>:6188/ws/v1/timeline/metrics?metricNames=<>&hostname=<>&appId=<>&startTime=<>&endTime=<>&precision=<>

   如上是AMS Collector的總體API,其參數說明如下表所示。 
  


表4-1 Collector API參數說明 
這里寫圖片描述 


  當以此接口獲取指標數據時,首先此URL會被TimelineWebServices捕獲到,其類中相應代碼如下所示。

@GET
@Path("/metrics")
@Produces({ MediaType.APPLICATION_JSON })
  public TimelineMetrics getTimelineMetrics(
    @Context HttpServletRequest req,
    @Context HttpServletResponse res,
    @QueryParam("metricNames") String metricNames,
    @QueryParam("appId") String appId,
    @QueryParam("instanceId") String instanceId,
    @QueryParam("hostname") String hostname,
    @QueryParam("startTime") String startTime,
    @QueryParam("endTime") String endTime,
    @QueryParam("precision") String precision,
    @QueryParam("limit") String limit,
    @QueryParam("grouped") String grouped,
    @QueryParam("topN") String topN,
    @QueryParam("topNFunction") String topNFunction,
    @QueryParam("isBottomN") String isBottomN,
    @QueryParam("seriesAggregateFunction") String seriesAggregateFunction
 ) {
    init(res);
    try {
      return timelineMetricStore.getTimelineMetrics(
        parseListStr(metricNames, ","), parseListStr(hostname, ","), appId, instanceId,
        parseLongStr(startTime), parseLongStr(endTime),
        Precision.getPrecision(precision), parseIntStr(limit),
        parseBoolean(grouped), parseTopNConfig(topN, topNFunction, isBottomN),
        seriesAggregateFunction);
}

   如上代碼所示,在Timeline server捕獲到請求后,會調用TimelineMetricStore.getTimelineMetrics方法,並傳入相應的請求參數,獲取指標數據。再深入到TimelineMetricStore中可以看到此類為一個接口,唯一的實現子類為HBaseTimelineMetricStore,通過其getTimelineMetrics方法取得指標數據,其主要代碼如下所示。

@Override
  public TimelineMetrics getTimelineMetrics(List<String> metricNames,
      List<String> hostnames, String applicationId, String instanceId,
      Long startTime, Long endTime, Precision precision, Integer limit,
      boolean groupedByHosts, TopNConfig topNConfig, String seriesAggregateFunction) throws SQLException, IOException {
    TimelineMetricsSeriesAggregateFunction seriesAggrFunctionInstance = null;
    ……………………………………….
    Multimap<String, List<Function>> metricFunctions =
      parseMetricNamesToAggregationFunctions(metricNames);
    ConditionBuilder conditionBuilder = new ConditionBuilder(new ArrayList<String>(metricFunctions.keySet()))
      .hostnames(hostnames)
      .appId(applicationId)
      .instanceId(instanceId)
      .startTime(startTime)
      .endTime(endTime)
      .precision(precision)
      .limit(limit)
      .grouped(groupedByHosts);
………………………………………….
………………………………………
    Condition condition = conditionBuilder.build();
    TimelineMetrics metrics;
    if (hostnames == null || hostnames.isEmpty()) {
      metrics = hBaseAccessor.getAggregateMetricRecords(condition, metricFunctions);
    } else {
      metrics = hBaseAccessor.getMetricRecords(condition, metricFunctions);
    }
    metrics = postProcessMetrics(metrics);
    ………………………………..
    return seriesAggregateMetrics(seriesAggrFunctionInstance, metrics);
  }

 

 

   從上文代碼中也可以看出,HBaseTimelineMetricStore類在其內部通過注解映射查詢的條件構建用於查詢的Condition對象,其次向HBaseAccessor傳入此條件用於查詢,根據查詢參數中是否有無hostname從而決定是查詢聚集表還是主機表,最終取得相應的查詢結果,其中代碼在此不再詳述,感興趣可以自行閱讀,至此,通過Collector API取得指標數據的流程就打通了。

4.2 Ambari Server API

  Ambari Server API總體上分為3個層次,具有3種類型的API,無論是哪種類型的API,在其底層取數據時,最終都是利用Collector的API,如下是整體的架構詳細圖。 


   這里寫圖片描述 
圖4-1 AMS工作機制圖 

 

4.2.1 主機類型指標API

http://<ambari-server>:8080/api/v1/clusters/<cluster-name>/hosts/<host-name>?fields=metrics/cpu/cpu_user[1430844610,1430848210,15]

   此API是主機類型的API,在其中具有/hosts/的前綴和參數的補充,fileds后面跟的是指標名稱,[]內描述的是時間起始。上文中說過,Server的API在其底層還是調用Collector的API,那么此API在其對應的底層便是Collector的API加上hostname及appId可選參數,其中appId設為HOST即可,其后的時間戳便是在startTime和endTime中描述。

4.2.2 組件類型指標API

http://<ambari-server>:8080/api/v1/clusters/<cluster-name>/services/HDFS/components/DATANODE?fields=metrics/dfs/datanode/DfsUsed[1430844610,1430848210,15]

   組件類型的API取消了hostname的參數,其主要針對服務整體的聚集查詢,其所查詢的表也是METRIC_AGGREGATE類型的表,對應於Collector的API則是在其中取消了hostname的參數字段,則API默認去聚集表(METRIC_AGGREGATE)中查詢。

4.2.3 主機組件類型指標API

http://<ambari-server>:8080/api/v1/clusters/<cluster-name>/hosts/<host-name>/host_components/NAMENODE?fields=metrics/jvm/memHeapCommittedM[1430847303,1430850903,15]
  • 1

  主機組件類型的API實則和主機類型的API類似,只是主機類型直接是針對主機相關指標如cpu,mem類型的指標的獲取,而主機組件則是真多host之上的部署的服務的指標的獲取,所以在其API中新增了host_components參數。對應於底層Collector的API,只需要將主機類型對應的Collector的API中的appId由HOST替換成相應的服務名稱即可。 
以上3種類型的API,無論哪一種,在其底層最終都是調用Collector API取得metric數據的,主機類型指標則是在Collector API中補充上hostname屬性,組件指標則是在Collector API去掉hostname屬性,讓其做聚集查詢(去METRIC_AGGREGATE查詢數據),而主機組件指標類型則是同時補充上appId與hostname兩個參數進行查詢,下圖描述了通過Server API獲取metric數據時主要實現類及其繼承結構。 


這里寫圖片描述 
圖 4-2 AMSPropertyProvider類層次圖 


  當向Server發送Metric的請求URL時,最終都會在Server端通過AMSPropertyProvider轉化成一個MetricRequest,MetricRequest是AMSPropertyProvider的內部類,再由MetricRequest.populateResources方法進行請求的處理,下文是其主要核心代碼。

 

public Collection<Resource> populateResources() throws SystemException {
if (!hostComponentHostMetrics.isEmpty()) {
        String hostComponentHostMetricParams = getSetString(processRegexps(hostComponentHostMetrics), -1);
         setQueryParams(hostComponentHostMetricParams, hostnames, true, componentName);
          TimelineMetrics metricsResponse = null;
          try {
            metricsResponse = getTimelineMetricsFromCache(
              getTimelineAppMetricCacheKey(hostComponentHostMetrics,
                componentName, uriBuilder.toString(),hostnames), componentName);
          } 

   此方法首先針對metric的請求類型做了判斷,針對不同的指標分別進行不同的處理。此處便是hostComponentHostMetrics散列表不為空的情況下(即此為主機指標的請求),針對此請求,首先利用setQueryParams查詢參數設置,其次執行getTimelineMetricsFromCache取得返回的metrics,在其中傳入了uriBuilder,此對象即為Collector的API,構建完成之后即通過此URL取得相應的metrics數據。 
   接下來直接進入getTimelineMetricsFromCache方法,其代碼如下所示。

private TimelineMetrics getTimelineMetricsFromCache(TimelineAppMetricCacheKey metricCacheKey,String componentName) throws IOException {
      // Cache only the component level metrics
      // No point in time metrics are cached
      if (metricCache != null
          && !StringUtils.isEmpty(componentName)
          && !componentName.equalsIgnoreCase("HOST")
          && metricCacheKey.getTemporalInfo() != null) {
        return metricCache.getAppTimelineMetricsFromCache(metricCacheKey);
      }
      return requestHelper.fetchTimelineMetrics(metricCacheKey.getSpec());
    }

   從代碼中看到,其針對有無AMS緩存的情況分別進行了處理,在原始無緩存的情況下,則是通過requestHelper.fetchTimelineMetrics的方法取得相應的metric數據,而傳入的參數實則就是上文的URL。 
  緊接着進入fetchTimelineMetrics方法,從下文代碼可以看到,最終是通過streamProvider.readFrom(spec)此方法取得metrics數據的,而streamProvider則是一個封裝好的URL讀取數據對象。

public TimelineMetrics fetchTimelineMetrics(String spec) throws IOException {
    LOG.debug("Metrics request url = " + spec);
    BufferedReader reader = null;
    TimelineMetrics timelineMetrics = null;
    try {
      reader = new BufferedReader(new InputStreamReader(streamProvider.readFrom(spec)));
      timelineMetrics = timelineObjectReader.readValue(reader);
}

5.總結

  本文主要是從AMS指標的采集、存儲、聚集、獲取四個層面,詳細描述了AMS整個內在工作機制,並詳細整理了AMS對外各API不同點及其來龍去脈。由於源碼的復雜性,本文只羅列相關的主要功能代碼及其相應的流程,並未細化到最底層的實現,不足之處,敬請見諒。同時,也期待您的指導與幫助。

參考資料:

1.http://www.ibm.com/developerworks/cn/opensource/os-cn-ambari-metrics/index.html 
2.http://blog.csdn.net/bluishglc/article/details/48155265


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM