0.簡介
Ambari作為一款針對大數據平台的運維管理工具,提供了集群的創建,管理,監控,升級等多項功能,目前在業界已經得到廣泛使用。
Ambari指標系統( Ambari Metrics System,以下簡稱AMS)主要負責監控平台各類服務及主機的運行情況,提供各類服務及主機的相關指標,從而達到判斷集群健康情況的目的,其重要性不言而喻。
本文是在深入閱讀AMS源代碼的基礎之上,力求能夠從監控指標的采集、存儲、聚合及指標獲取4個層面詳細闡述AMS的整個工作機制。

圖 1 AMS架構圖
1.AMS指標采集
對於 AMS 本身來說,涉及的主要模塊有 Metrics Monitor、Hadoop Sinks(此處統稱,其中還包含kafka,storm,flume等服務的sinks,嚴格地來說應叫service sinks) 以及 Metrics Collector。
AMS 也是一個 Master-Slave 結構的框架。Master 模塊便是 Metrics Collector,Slave 則是 Metrics Monitor 和 Hadoop Sinks。Slave 模塊負責收集信息,並發送給 Collector。
當然 Metrics Monitor 和 Hadoop Sinks 也有不同的職責,前者主要負責收集機器本身相關的指標,例如 CPU、Mem、Disk 相關信息;后者則負責收集 Hadoop 相關 Service 模塊的性能數據,例如該模塊Namenode占用了多少 Mem,以及該模塊的 CPU 占用率等。
1.1 指標收集
關於指標的采集,此處以Flume服務為例,剖析AMS是如何采集Flume運行的相應指標的。Ambari內置了FlumeTimelineMetricsSink這樣的jar包,通過Ambari啟動flume服務,ambari會在flume的啟動腳本參數中加入以下兩項:
-Dflume.monitoring.type=org.apache.hadoop.metrics2.sink.flume.FlumeTimelineMetricsSink -Dflume.monitoring.node=<AMS_HOST>:6188
其中即為AMS collector的節點名字,而6188則是collector中的Timeline Server對外提供的默認端口,以此來向Timeline Server推送數據。
接下來再看一下FlumeTimelineMetricsSink jar包的結構,其中就包含一個FlumeTimelineMetricsSink類,繼承自AbstractTimelineMetricsSink抽象類並實現MetricsSink接口,如上所示的所有服務的sink包基本都采用這樣的結構。
FlumeTimelineMetricsSink類中內置一個TimelineMetricsCollector線程,在flume啟動FlumeTimelineMetricsSink jar包時,其就通過其start方法中的線程調度器來輪詢調度TimelineMetricsCollector線程,而在此線程中主段代碼如下所示。
@Override public void start() { LOG.info("Starting Flume Metrics Sink"); TimelineMetricsCollector timelineMetricsCollector = new TimelineMetricsCollector(); if (scheduledExecutorService == null || scheduledExecutorService.isShutdown() || scheduledExecutorService.isTerminated()) { scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(); } scheduledExecutorService.scheduleWithFixedDelay(timelineMetricsCollector, 0, pollFrequency, TimeUnit.MILLISECONDS); } }
從上面可看出Start方法中采取線程池的方法,以pollFrequency(可配置)的周期間隔,調度TimelineMetricsCollector線程,再細看一下TimelineMetricsCollector線程的詳細說明,其主要代碼如下所示。
class TimelineMetricsCollector implements Runnable { @Override public void run() { try { Map<String, Map<String, String>> metricsMap = JMXPollUtil.getAllMBeans(); long currentTimeMillis = System.currentTimeMillis(); for (String component : metricsMap.keySet()) { Map<String, String> attributeMap = metricsMap.get(component); LOG.debug("Attributes for component " + component); processComponentAttributes(currentTimeMillis, component, attributeMap); } }
TimelineMetricsCollector線程輪循從服務的JMX端口中獲取指標數據,形成Map對象,並通過processComponentAttributes方法進行邏輯轉換后再發送。
1.2 指標推送
由上面源碼可以看出,本質上,AMS的監控數據還是從各服務JMX端口中取得的,再通過processComponentAttributes方法邏輯上轉換成AMS的內部的TimelineMetrics,通過emitMetrics方法post到Timeline Server(emitMetrics方法正是從AbstractTimelineMetricsSink類繼承而來),其接口為:
http://:6188/ws/v1/timeline/metrics
如下是emitMetrics方法的部分片段,從中可以看出,emit方法最終還是將指標數據轉化成json格式的數據,通過接口推送至TimelineServer。
protected void emitMetrics(TimelineMetrics metrics) throws IOException { String connectUrl = getCollectorUri(); try { String jsonData = mapper.writeValueAsString(metrics); StringRequestEntity requestEntity = new StringRequestEntity(jsonData, "application/json", "UTF-8"); PostMethod postMethod = new PostMethod(connectUrl); postMethod.setRequestEntity(requestEntity); int statusCode = httpClient.executeMethod(postMethod);
若是轉換成curl命令的形式,則通過以下這樣一條命令進行推送數據:
curl -i -X POST -H "Content-Type: application/json" -d "${json}" ${url}
其中json為轉化成json的metrics數據,url為上面接口。
emitMetrics方法或curl命令發送的url最終會被Timeline server所截獲,再通過TimelineMetricStore類以phonenix接口方式存儲到hbase數據庫中,如下文TimelineWebServices類代碼所示。
@Path("/metrics") @POST @Consumes({ MediaType.APPLICATION_JSON /* , MediaType.APPLICATION_XML */}) public TimelinePutResponse postMetrics( @Context HttpServletRequest req, @Context HttpServletResponse res, TimelineMetrics metrics) { init(res); if (metrics == null) { return new TimelinePutResponse(); } try { // TODO: Check ACLs for MetricEntity using the TimelineACLManager. // TODO: Save owner of the MetricEntity. return timelineMetricStore.putMetrics(metrics); }
上文描述了Flume服務指標推送的大概過程,服務運行時主動推送指標, AMS接收推送指標。其它各類服務如hadoop,kafka,storm均以此種方式進行指標的推送,在此不再作詳細討論。
2.AMS指標存儲
AMS采集到的服務指標通過http/https的方式推送到timeline server,timeline server內部嵌入了一個hbase,通過phoenix(nosql之上的sql查詢)將指標數據存入到hbase中。
Hbase庫中總共有7張表,其相應表名如下表所示。
表2-1 Metrics指標存儲表

雖然庫中一共有7張表,但是實際存儲指標數據的只有METRIC_RECORD表,其它各表是在其基礎之上做相應的統計及聚合而成的,下表是METRIC_RECORD表詳細說明。
表2-2 METRIC_RECORD表字段說明

該表是所有表中唯一存儲實際metrics數據的表,其它表都是在此表的基礎之上進行時間段的相應統計。
(1)針對采集的hosts指標,即由monitor發送的指標值
采集的metric記錄,由{時間戳1:值,時間戳2:值,….}這樣的記錄組成,其中數目表現在Metric_count上,對於monitor發送的metric。為12條,每條間隔5秒種,然后一分鍾向timelineServer發送一次,存入表中。
(2)針對采集的hadoop sink指標
采集的metric記錄,由{時間戳1:值,時間戳2:值,….}這樣的記錄組成,每條間隔10秒鍾,每隔70秒發送一次,采集7條,所以metric_count為7,一分鍾向timelineServer發送一次,存入表中。
表METRIC_RECORD_MINUTE是按分鍾進行統計的,默認一次統計時間是5min(可配置),該表實則是以METRIC_RECORD表的數據作為統計的基准。下表對METRIC_RECORD_MINUTE做了詳細說明。
表2-3 METRIC_RECORD_MINUTE字段說明

假設5分鍾統計一次,以mem_free為例,則本次統計是以主機為單位,假設在metric_record表中,某主機每隔一分鍾發送一條mem_free的Record,一條record中有12條metric values,則本次統計共有5條Record,metric_count則為60條。同樣的,這五分鍾內的最大,最小和總和,只需要比對提取Metric_record中這60條的Record的最大,最小,以及5條總和即能統計出這5分鍾內相應的屬性。
類似於這樣幾條語句得以統計:
(1)select hostname,max(metric_max) from metric_record where metric_name='mem_free' and server_time>=1471503845324 and server_time<1471504146520 group by hostname;------統計5分鍾內,每台主機上該metric的最大值。 (2)select hostname,min(metric_min) from metric_record where metric_name='mem_free' and server_time>=1471503845324 and server_time<1471504146520 group by hostname; ------統計5分鍾內,每台主機上該metric的最小值。 (3)select hostname,sum(metric_sum) from metric_record where metric_name='mem_free' and server_time>=1471503845324 and server_time<1471504146520 group by hostname; ------統計5分鍾內,每台主機上該metric值總和。 (4)select hostname,sum(metric_count) from metric_record where metric_name='mem_free' and server_time>=1471503845324 and server_time<1471504146520 group by hostname; ------統計5分鍾內,每台主機上統計的該metric數量和。
至於METRIC_RECORD_HOURLY以及METRIC_RECORD_DAILY表其原理均是參照MINUTE表的原理,只是時間區間擴大了,已經參照的數據表變更了,METRIC_RECORD_HOURLY以METRIC_RECORD_MINUTE的數據為基准,而METRIC_RECORD_DAILY則以METRIC_RECORD_HOURLY的數據為基准進行統計,在此就不再描述了。
3.AMS指標聚集
上文中所統計的7張表,除了以METRIC_RECORD前綴的表之外,還有METRIC_AGGREGATE作為前綴的表,這就是集群的指標聚集表,在聚集表中不區分host,只是以service(APP_ID)
進行分組統計,其數據來源也是從METRIC_RECORD表中進行查詢后然后再進行聚集的,下表是表字段的詳細說明。
表3-1 METRIC_AGGREGATE表字段詳細說明

以Metric_Name為主要指標,由於是集群級別的統計,所以不再有HOSTNAME相關字段的說明,在此表中增加了HOSTS_COUNT的字段,即聚集的Metric來自主機的數量。
實際中不存在表METRIC_AGGREGATE_MINUTE,但是在圖3-1中可以看到一個單獨的TimelineMetricClusterAggregatorMinute聚集線程類,每2分鍾聚集一次,其聚集結果采取了分片的方式,30秒一個片區記錄,即0-30秒,30-60秒,60-90秒,90-120秒分成4個片區,每個時間段采集的記錄分別存到對應的片區記錄中,而各個片區記錄直接存入到METRIC_AGGREGATE表中,該表間隔記錄為30秒一條,是在表METRIC_AGGREGATE_MINUTE的基礎之上進行分片的,之所以這么設定,也是為了防止頻繁的聚集對AMS造成過大負載。
這個分片時間是由參數timeline.metrics.cluster.aggregator.minute.timeslice.interval進行設定的,所以METRIC_AGGREGATE表其實是METRIC_AGGREGATE_MINUTE表聚合統計的結果之上,再對其結果按時間進行分片而形成的。
指標聚集由TimelineMetricAggregator起始,其作為一個接口,也是一個線程,其實現子類如下圖所示。

圖3-1 TimelineMetricAggregator層次圖
在AbstractTimelineAggregator類中實現了run方法,其中通過輪循調度doWork方法來實現聚集,調度時間可配置,如下是doWork程序代碼。
@Override public boolean doWork(long startTime, long endTime) { boolean success = true; Condition condition = prepareMetricQueryCondition(startTime, endTime); Connection conn = null; PreparedStatement stmt = null; ResultSet rs = null; try { conn = hBaseAccessor.getConnection(); stmt = PhoenixTransactSQL.prepareGetMetricsSqlStmt(conn, condition); if (condition.doUpdate()) { int rows = stmt.executeUpdate(); conn.commit(); } else { rs = stmt.executeQuery(); } aggregate(rs, startTime, endTime); }
通過以上代碼可以看到,在doWork方法中,首先還是通過phoenix接口查詢到數據集,再對數據集進行聚集(aggregate方法中),針對不同的AbstractTimelineAggregator的子類,具有不同的aggregate方法,最終存入到METRIC_AGGREGATE表中,如此實現整個AMS的指標聚集功能。
4.AMS指標獲取
AMS提供了2種獲取指標的接口,分別是Collector提供的API以及Ambari Server的API接口。其中前一種方式更接近原生的指標數據,而后一種方式更為常用,應該說整個Ambari上層獲取指標的方式都是采取后者,而后者在底層本質上還是調用的第一種方式,拿到庫中的原生數據,再進行加工及邏輯處理,最后返回到WEB端。
4.1 Collector API
http://<AMS_HOST>:6188/ws/v1/timeline/metrics?metricNames=<>&hostname=<>&appId=<>&startTime=<>&endTime=<>&precision=<>
如上是AMS Collector的總體API,其參數說明如下表所示。
表4-1 Collector API參數說明

當以此接口獲取指標數據時,首先此URL會被TimelineWebServices捕獲到,其類中相應代碼如下所示。
@GET @Path("/metrics") @Produces({ MediaType.APPLICATION_JSON }) public TimelineMetrics getTimelineMetrics( @Context HttpServletRequest req, @Context HttpServletResponse res, @QueryParam("metricNames") String metricNames, @QueryParam("appId") String appId, @QueryParam("instanceId") String instanceId, @QueryParam("hostname") String hostname, @QueryParam("startTime") String startTime, @QueryParam("endTime") String endTime, @QueryParam("precision") String precision, @QueryParam("limit") String limit, @QueryParam("grouped") String grouped, @QueryParam("topN") String topN, @QueryParam("topNFunction") String topNFunction, @QueryParam("isBottomN") String isBottomN, @QueryParam("seriesAggregateFunction") String seriesAggregateFunction ) { init(res); try { return timelineMetricStore.getTimelineMetrics( parseListStr(metricNames, ","), parseListStr(hostname, ","), appId, instanceId, parseLongStr(startTime), parseLongStr(endTime), Precision.getPrecision(precision), parseIntStr(limit), parseBoolean(grouped), parseTopNConfig(topN, topNFunction, isBottomN), seriesAggregateFunction); }
如上代碼所示,在Timeline server捕獲到請求后,會調用TimelineMetricStore.getTimelineMetrics方法,並傳入相應的請求參數,獲取指標數據。再深入到TimelineMetricStore中可以看到此類為一個接口,唯一的實現子類為HBaseTimelineMetricStore,通過其getTimelineMetrics方法取得指標數據,其主要代碼如下所示。
@Override public TimelineMetrics getTimelineMetrics(List<String> metricNames, List<String> hostnames, String applicationId, String instanceId, Long startTime, Long endTime, Precision precision, Integer limit, boolean groupedByHosts, TopNConfig topNConfig, String seriesAggregateFunction) throws SQLException, IOException { TimelineMetricsSeriesAggregateFunction seriesAggrFunctionInstance = null; ………………………………………. Multimap<String, List<Function>> metricFunctions = parseMetricNamesToAggregationFunctions(metricNames); ConditionBuilder conditionBuilder = new ConditionBuilder(new ArrayList<String>(metricFunctions.keySet())) .hostnames(hostnames) .appId(applicationId) .instanceId(instanceId) .startTime(startTime) .endTime(endTime) .precision(precision) .limit(limit) .grouped(groupedByHosts); …………………………………………. ……………………………………… Condition condition = conditionBuilder.build(); TimelineMetrics metrics; if (hostnames == null || hostnames.isEmpty()) { metrics = hBaseAccessor.getAggregateMetricRecords(condition, metricFunctions); } else { metrics = hBaseAccessor.getMetricRecords(condition, metricFunctions); } metrics = postProcessMetrics(metrics); ……………………………….. return seriesAggregateMetrics(seriesAggrFunctionInstance, metrics); }
從上文代碼中也可以看出,HBaseTimelineMetricStore類在其內部通過注解映射查詢的條件構建用於查詢的Condition對象,其次向HBaseAccessor傳入此條件用於查詢,根據查詢參數中是否有無hostname從而決定是查詢聚集表還是主機表,最終取得相應的查詢結果,其中代碼在此不再詳述,感興趣可以自行閱讀,至此,通過Collector API取得指標數據的流程就打通了。
4.2 Ambari Server API
Ambari Server API總體上分為3個層次,具有3種類型的API,無論是哪種類型的API,在其底層取數據時,最終都是利用Collector的API,如下是整體的架構詳細圖。

圖4-1 AMS工作機制圖
4.2.1 主機類型指標API
http://<ambari-server>:8080/api/v1/clusters/<cluster-name>/hosts/<host-name>?fields=metrics/cpu/cpu_user[1430844610,1430848210,15]
此API是主機類型的API,在其中具有/hosts/的前綴和參數的補充,fileds后面跟的是指標名稱,[]內描述的是時間起始。上文中說過,Server的API在其底層還是調用Collector的API,那么此API在其對應的底層便是Collector的API加上hostname及appId可選參數,其中appId設為HOST即可,其后的時間戳便是在startTime和endTime中描述。
4.2.2 組件類型指標API
http://<ambari-server>:8080/api/v1/clusters/<cluster-name>/services/HDFS/components/DATANODE?fields=metrics/dfs/datanode/DfsUsed[1430844610,1430848210,15]
組件類型的API取消了hostname的參數,其主要針對服務整體的聚集查詢,其所查詢的表也是METRIC_AGGREGATE類型的表,對應於Collector的API則是在其中取消了hostname的參數字段,則API默認去聚集表(METRIC_AGGREGATE)中查詢。
4.2.3 主機組件類型指標API
http://<ambari-server>:8080/api/v1/clusters/<cluster-name>/hosts/<host-name>/host_components/NAMENODE?fields=metrics/jvm/memHeapCommittedM[1430847303,1430850903,15]
- 1
主機組件類型的API實則和主機類型的API類似,只是主機類型直接是針對主機相關指標如cpu,mem類型的指標的獲取,而主機組件則是真多host之上的部署的服務的指標的獲取,所以在其API中新增了host_components參數。對應於底層Collector的API,只需要將主機類型對應的Collector的API中的appId由HOST替換成相應的服務名稱即可。
以上3種類型的API,無論哪一種,在其底層最終都是調用Collector API取得metric數據的,主機類型指標則是在Collector API中補充上hostname屬性,組件指標則是在Collector API去掉hostname屬性,讓其做聚集查詢(去METRIC_AGGREGATE查詢數據),而主機組件指標類型則是同時補充上appId與hostname兩個參數進行查詢,下圖描述了通過Server API獲取metric數據時主要實現類及其繼承結構。

圖 4-2 AMSPropertyProvider類層次圖
當向Server發送Metric的請求URL時,最終都會在Server端通過AMSPropertyProvider轉化成一個MetricRequest,MetricRequest是AMSPropertyProvider的內部類,再由MetricRequest.populateResources方法進行請求的處理,下文是其主要核心代碼。
public Collection<Resource> populateResources() throws SystemException {
if (!hostComponentHostMetrics.isEmpty()) {
String hostComponentHostMetricParams = getSetString(processRegexps(hostComponentHostMetrics), -1);
setQueryParams(hostComponentHostMetricParams, hostnames, true, componentName);
TimelineMetrics metricsResponse = null;
try {
metricsResponse = getTimelineMetricsFromCache(
getTimelineAppMetricCacheKey(hostComponentHostMetrics,
componentName, uriBuilder.toString(),hostnames), componentName);
}
此方法首先針對metric的請求類型做了判斷,針對不同的指標分別進行不同的處理。此處便是hostComponentHostMetrics散列表不為空的情況下(即此為主機指標的請求),針對此請求,首先利用setQueryParams查詢參數設置,其次執行getTimelineMetricsFromCache取得返回的metrics,在其中傳入了uriBuilder,此對象即為Collector的API,構建完成之后即通過此URL取得相應的metrics數據。
接下來直接進入getTimelineMetricsFromCache方法,其代碼如下所示。
private TimelineMetrics getTimelineMetricsFromCache(TimelineAppMetricCacheKey metricCacheKey,String componentName) throws IOException { // Cache only the component level metrics // No point in time metrics are cached if (metricCache != null && !StringUtils.isEmpty(componentName) && !componentName.equalsIgnoreCase("HOST") && metricCacheKey.getTemporalInfo() != null) { return metricCache.getAppTimelineMetricsFromCache(metricCacheKey); } return requestHelper.fetchTimelineMetrics(metricCacheKey.getSpec()); }
從代碼中看到,其針對有無AMS緩存的情況分別進行了處理,在原始無緩存的情況下,則是通過requestHelper.fetchTimelineMetrics的方法取得相應的metric數據,而傳入的參數實則就是上文的URL。
緊接着進入fetchTimelineMetrics方法,從下文代碼可以看到,最終是通過streamProvider.readFrom(spec)此方法取得metrics數據的,而streamProvider則是一個封裝好的URL讀取數據對象。
public TimelineMetrics fetchTimelineMetrics(String spec) throws IOException { LOG.debug("Metrics request url = " + spec); BufferedReader reader = null; TimelineMetrics timelineMetrics = null; try { reader = new BufferedReader(new InputStreamReader(streamProvider.readFrom(spec))); timelineMetrics = timelineObjectReader.readValue(reader); }
5.總結
本文主要是從AMS指標的采集、存儲、聚集、獲取四個層面,詳細描述了AMS整個內在工作機制,並詳細整理了AMS對外各API不同點及其來龍去脈。由於源碼的復雜性,本文只羅列相關的主要功能代碼及其相應的流程,並未細化到最底層的實現,不足之處,敬請見諒。同時,也期待您的指導與幫助。
參考資料:
1.http://www.ibm.com/developerworks/cn/opensource/os-cn-ambari-metrics/index.html
2.http://blog.csdn.net/bluishglc/article/details/48155265