1. Ambari介紹
Apache Ambari是一種基於Web的工具,支持Apache Hadoop集群的供應、管理和監控。Ambari已支持大多數Hadoop組件,包括HDFS、MapReduce、Hive、Pig、 Hbase、Zookeeper、Sqoop和Hcatalog等。
1.1 基本概念
1. Resource:Ambari把可以被管理的資源的抽象為一個Resource實例,資源可以包括服務、組件、主機節點等,一個resource實例中包含了一系列該資源的屬性;
2. Property:服務組件的指標名稱;
3. ResourceProvider和PropertyProvider:分別對應Resource和Property的提供方,獲取指標需要先獲取Resource,然后獲取Property對應的metric;
4. Query:Query是Resource的內部對象,代表了對該資源的操作;
5. Request:一個Request代表了對Resource的操作請求,包含http信息及要操作的Resource的實例,Request按照http的請求方式分為四種:GET、PUT、DELETE、POST;
6. Predicate:一個Predicate代表了一系列表達式,如and、or等;
1.2 基本組件
Ambari 可以分為 5個大的組件,分別是是 Ambari-server 、 Ambari-web 、 Ambari-agent 、 Ambari-metrics-collector 和 Ambari-metrics-monitor 。
1. 在集群的每一台機器上都會部署 Ambari-agent 程序。 Agent 主要負責接收來着 Server 端的命令,這些命令可以是安裝、啟動、停止 Hadoop 集群上的某一服務。同時, agent 端需要向 Ambari-server 端上 報命令執行的結果,是執行成功還是失敗。
2. Ambari-Server 提供 REST 接口給Agent 和 Web 訪問,用戶甚至可以不用界面,而是通過 curl 命令來操控集群。
3. Ambari-metric-collector和 Ambari-metrics-monitor 是收集群中組件 metrics 的模塊。
1.3 相關技術
Ambari充分利用了一些已有的優秀開源軟件,巧妙地把它們結合起來,使其在分布式環境中做到了集群式服務管理能力、監控能力、展示能力,這些優秀的開源軟件有:
(1)agent端,采用了puppet管理節點。
(2)在web端,采用ember.js作為前端MVC框架和NodeJS相關工具,用handlebars.js作為頁面渲染引擎,在CSS/HTML方面還用了Bootstrap框架。
(3)在Server端,采用了Jetty、Spring、JAX-RS等。
(4)同時利用了Ganglia、Nagios的分布式監控能力。
Ambari架構采用的是Server/Client的模式,主要由兩部分組成:ambari-agent和ambari-server。ambari依賴其它已經成熟的工具,例如其ambari-server 就依賴python,而ambari-agent還同時依賴ruby, puppet,facter等工具,還有它也依賴一些監控工具nagios和ganglia用於監控集群狀況。
其中:
1. puppet是分布式集群配置管理工具,也是典型的Server/Client模式,能夠集中式管理分布式集群的安裝配置部署,主要語言是ruby。
2. facter是用python寫的一個節點資源采集庫,用於采集節點的系統信息,例如OS信息,主機信息等。由於ambari-agent主要是用python寫的,因此用facter可以很好地采集到節點信息。
2. Ambari項目目錄結構
2.1 總體目錄
ambari-server | Ambari的Server程序,主要管理部署在每個節點上的管理監控程序 |
ambari-agent | 部署在監控節點上運行的管理監控程序 |
ambari-web | Ambari頁面UI的代碼,作為用戶與Ambari server交互的。 |
ambari-views | 用於擴展Ambari Web UI中的框架 |
ambari-common | Ambari-server 和Ambari-agent 共用的代碼 |
ambari-metrics | 在Ambari所管理的集群中用來收集、聚合和服務Hadoop和系統計量 |
contrib | 自定義第三方庫 |
docs | 文檔 |
2.2 ambari-server 目錄
目錄 | 描述 |
---|---|
org.apache.ambari.server.api.services | 對web接口的入口方法,處理/api/v1/* 的請求 |
org.apache.ambari.server.controller | 對Ambari中cluster的管理處理,如新增host,更service、刪除component等 |
org.apache.ambari.server.controller.internal | 主要存放ResourceProvider和PropertyProvider; |
org.apache.ambari.service.orm.* | 對數據庫的操作 |
org.apache.ambari.server.agent.rest | 處理與Agent的接口的入口方法 |
org.apache.ambari.security | 使用Spring Security來做權限管理 |
每一種Resource都對應一個ResourceProvider,如下表所示:
Resource.Type | ResourceProvider |
---|---|
Workflow | WorkflowResourceProvider |
Job | JobResourceProvider |
TaskAttempt | TaskAttemptResourceProvider |
View | ViewResourceProvider |
ViewInstance | ViewInstanceResourceProvider |
Blueprint | BlueprintResourceProvider |
Cluster | ClusterResourceProvider |
Service | ServiceResourceProvider |
Component | ComponentResourceProvider |
Host | HostResourceProvider |
HostComponent | HostComponentResourceProvider |
Configuration | ConfigurationResourceProvider |
Action | ActionResourceProvider |
Request | RequestResourceProvider |
Task | TaskResourceProvider |
User | UserResourceProvider |
Stack | StackResourceProvider |
StackVersion | StackVersionResourceProvider |
StackService | StackServiceResourceProvider |
StackServiceComponent | StackServiceComponentResourceProvider |
StackConfiguration | StackConfigurationResourceProvider |
OperatingSystem | OperatingSystemResourceProvider |
Repository | RepositoryResourceProvider |
RootService | RootServiceResourceProvider |
RootServiceComponent | RootServiceComponentResourceProvider |
RootServiceHostComponent | RootServiceHostComponentResourceProvider |
ConfigGroup | ConfigGroupResourceProvider |
RequestSchedule | RequestScheduleResourceProvider |
2.3 Ambari-agent目錄
3. Ambari-server
3.1 ambari-server架構
ambari-server是一個有狀態的,它維護着自己的一個有限狀態機FSM,同時這些狀態機存儲在數據庫中,默認數據庫為postgressql數據庫。
1. Ambarii-Server提供ambari web,rest api,ambari shell三大方式操作機群;
2. ambari將集群的配置、各個服務的配置等信息存在ambari server端的DB中;
3. ambari server與ambari agent的交流走RPC,即agent向server報告心跳,server將command通過respons發回給agent,agent本地執行命令,比如:agent端執行相應的python腳本;
4. ambari有自己的一套監控、告警、鏡像服務,以可插拔的形式供上層服務調用;
Ambari-Server是一個WEB Server,提供統一的REST API接口,同時向web和agent開放了兩個不同的端口(默認前者是8080, 后者是8440或者8441)。它是由Jetty Server容器構建起來的,通過Spring Framework構建出來的WEB服務器,其中大量采用了google提供的Guice注解完成spring框架所需要的注入功能,REST服務由JAX-RS標准來實現。
如下圖所示,server端主要維護三類狀態:
1. Live Cluster State:集群現有狀態,各個節點匯報上來的狀態信息會更改該狀態;
2. Desired State:用戶希望該節點所處狀態,是用戶在頁面進行了一系列的操作,需要更改某些服務的狀態,這些狀態還沒有在節點上產生作用;
3. Action State:操作狀態,是狀態改變時的請求狀態,也可以看作是一種中間狀態,這種狀態可以輔助Live Cluster State向Desired State狀態轉變。
Ambari-server的Heartbeat Handler模塊用於接收各個agent的心跳請求(心跳請求里面主要包含兩類信息:節點狀態信息和返回的操作結果),把節點狀態信息傳遞給FSM狀態機去維護着該節點的狀態,並且把返回的操作結果信息返回給Action Manager去做進一步的處理。
Coordinator模塊又可以稱為API handler,主要在接收WEB端操作請求后,會檢查它是否符合要求,stage planner分解成一組操作,最后提供給Action Manager去完成執行操作。
因此,從上圖就可以看出,Ambari-Server的所有狀態信息的維護和變更都會記錄在數據庫中,用戶做一些更改服務的操作都會在數據庫上做一些相應的記錄,同時,agent通過心跳來獲得數據庫的變更歷史。
Ambari Server 會讀取 Stack 和 Service 的配置文件。當用 Ambari 創建集群的時候,Ambari Server 傳送 Stack 和 Service 的配置文件以及 Service 生命周期的控制腳本到 Ambari Agent。Agent 拿到配置文件后,會下載安裝公共源里軟件包(Redhat,就是使用 yum 服務)。安裝完成后,Ambari Server 會通知 Agent 去啟動 Service。之后 Ambari Server 會定期發送命令到 Agent 檢查 Service 的狀態,Agent 上報給 Server,並呈現在 Ambari 的 GUI 上。
Ambari Server 支持 Rest API,這樣可以很容易的擴展和定制化 Ambari。甚至於不用登陸 Ambari 的 GUI,只需要在命令行通過 curl 就可以控制 Ambari,以及控制 Hadoop 的 cluster。具體的 API 可以參見 Apache Ambari 的官方網頁 API reference。
4. Ambari-agent
4.1 ambari-agent架構
ambari-agent是無狀態的,其功能主要分兩部分:
- 采集所在節點的信息並且匯總發心跳匯報給ambari-server
- 處理ambari-server的執行請求
因此它有兩種隊列:
- 消息隊列MessageQueue,或為ResultQueue。包括節點狀態信息(包括注冊信息)和執行結果信息,並且匯總后通過心跳發送給ambari-server;
- 操作隊列ActionQueue。用於接收ambari-server返回過來的狀態操作,然后能過執行器按序調用puppet或python腳本等模塊完成任務。
4.2 Ambari-agent引導流程
分別是用SSH和人工手動的非SSH
步驟:
1. Ambari Server通過調用bootstrap.py來初始化整個bootstrap進程
2. Server端通過SSH Keys在Agent上配置Ambari Repo:通過scp 命令將Ambari Server上的ambari.repo文件拷貝到Agent Host上。
3. 復制Ambari Agent Setup script:利用scp命令將setupAgent.py腳本復制到Agent host上。
4. 在各個Agent上執行Ambari Agent Setup script:SSH到各個Agent Host上然后執行setupAgent.py。
5. 在Agent上安裝epel-release:用yum工具來安裝epel-release包
6. 在Agent上安裝Ambari-agent:用yum工具來安裝Ambari-Agent包
7. 配置Ambari-agent.ini:修改/etc/ambari-agent/conf/ambari-agent.ini,並設置agent host上的hostname
8. 啟動Ambari-agent:啟動Ambari-agent進程
9. 開始Ambari Agent注冊:agent開始registration進程
人工手動引導
具體步驟內容基本同上
4.3 Agent注冊流程
步驟
1. 連接握手端口8441:Ambari Agent連接到Ambari Server的握手端口8441。
2. 下載Server Certification:Ambari Agent下載Server Certification。
3. 請求簽署Agent Certification:Ambari Agent請求Ambari Server來簽署Agent證書。
4. 簽署Agent Cert:Ambari Server通過密碼簽署Agent證書。
5. 下載Agent Cert並斷掉連接:Ambari Agent下載Agent證書,然后斷掉之前的連接。
6. 連接注冊端口8440:Ambari Agent連接到Ambari Server的注冊端口8441
7. 用Agent Cert執行2WAY auth:在Agent和Server之間完成2WAY權限認證。
8. 獲取FQDN:Ambari Agent host獲取Fully Qualified Domain Name(FQDN)
9. 注冊Host:利用FQDN,host向Ambari Server提出注冊。
10. 完成Host注冊:Ambari Server完成host的注冊過程,把host加入到Ambari數據庫 。
11. Agent心跳程序啟動:Ambari Agent向Ambari Server開啟心跳程序,確認各種命令的執行 。
5. Ambari-web內部架構
Ambari-web使用了一個流行的前端Embar.js MVC框架實現,Embar.js是一個TodoMVC框架,它涵蓋了現今典型的單頁面應用(single page application)幾乎所有的行為。
使用了nodejs
使用brunch 作為項目的構建管理工具
Brunch ,是一個超快的HTML5構建工具。它有如下功能:
(1)編譯你的腳本、模板、樣式、鏈接它們。
(2)將腳本和模板封裝進common.js/AMD模塊里,鏈接腳本和樣式。
(3)為鏈接文件生成源地圖,復制資源和靜態文件。
(4)通過縮減代碼和優化圖片來收縮輸出,看管你的文件更改。
(5)並通過控制台和系統提示通知你錯誤。
Nodejs 是一個基於Chrome JavaScript運行時建立的一個平台,用來方便的搭建快速的易於擴展的網絡應用,NodeJS借助事件驅動,非阻塞I/O模型變得輕量和高效,非常適合運行在分布式設備的數據密集型的實時應用。
6. 源碼分析
6.1 ambari-server處理ambari-agent請求
Agent發送過來的心跳請求由org.apache.ambari.server.agent.HeartBeatHandler.handleHeartBeat(HeartBeat)來處理,執行完后,同時會返回org.apache.ambari.server.agent.HeartBeatResponse給agent。 org.apache.ambari.server.agent.HeartBeat里面主要含了兩類信息:節點的狀態信息nodeStatus和服務狀態信息componentStatus。
public class HeartBeatHandler { ... public HeartBeatResponse handleHeartBeat(HeartBeat heartbeat) throws AmbariException { long now = System.currentTimeMillis(); if (heartbeat.getAgentEnv() != null && heartbeat.getAgentEnv().getHostHealth() != null) { heartbeat.getAgentEnv().getHostHealth().setServerTimeStampAtReporting(now); } String hostname = heartbeat.getHostname(); Long currentResponseId = hostResponseIds.get(hostname); HeartBeatResponse response; if (currentResponseId == null) { //Server restarted, or unknown host. LOG.error("CurrentResponseId unknown for " + hostname + " - send register command"); // 無responseId, 新請求,就進行注冊, responseId =0 return createRegisterCommand(); } LOG.debug("Received heartbeat from host" + ", hostname=" + hostname + ", currentResponseId=" + currentResponseId + ", receivedResponseId=" + heartbeat.getResponseId()); if (heartbeat.getResponseId() == currentResponseId - 1) { LOG.warn("Old responseId received - response was lost - returning cached response"); return hostResponses.get(hostname); } else if (heartbeat.getResponseId() != currentResponseId) { LOG.error("Error in responseId sequence - sending agent restart command"); // 心跳是歷史記錄,那么就要求其重啟,重新注冊,responseId 不變 return createRestartCommand(currentResponseId); } response = new HeartBeatResponse(); //responseId 加 1 , 返回一個新的responseId,下次心跳又要把這個responseId帶回來。 response.setResponseId(++currentResponseId); Host hostObject; try { hostObject = clusterFsm.getHost(hostname); } catch (HostNotFoundException e) { LOG.error("Host: {} not found. Agent is still heartbeating.", hostname); if (LOG.isDebugEnabled()) { LOG.debug("Host associated with the agent heratbeat might have been " + "deleted", e); } // For now return empty response with only response id. return response; } //失去心跳,要求重新注冊, responseId=0 if (hostObject.getState().equals(HostState.HEARTBEAT_LOST)) { // After loosing heartbeat agent should reregister LOG.warn("Host is in HEARTBEAT_LOST state - sending register command"); return createRegisterCommand(); } hostResponseIds.put(hostname, currentResponseId); hostResponses.put(hostname, response); // If the host is waiting for component status updates, notify it //如果主機正在等待組件狀態更新,請通知它 //節點已經進行了注冊,但是該節點還沒有匯報相關狀態信息,等待服務狀態更新 if (heartbeat.componentStatus.size() > 0 && hostObject.getState().equals(HostState.WAITING_FOR_HOST_STATUS_UPDATES)) { try { LOG.debug("Got component status updates"); //更新服務狀態機 hostObject.handleEvent(new HostStatusUpdatesReceivedEvent(hostname, now)); } catch (InvalidStateTransitionException e) { LOG.warn("Failed to notify the host about component status updates", e); } } if (heartbeat.getRecoveryReport() != null) { RecoveryReport rr = heartbeat.getRecoveryReport(); processRecoveryReport(rr, hostname); } try { if (heartbeat.getNodeStatus().getStatus().equals(HostStatus.Status.HEALTHY)) { //向狀態機發送更新事件,更新節點至正常狀態 hostObject.handleEvent(new HostHealthyHeartbeatEvent(hostname, now, heartbeat.getAgentEnv(), heartbeat.getMounts())); } else { // 把節點列入不健康 hostObject.handleEvent(new HostUnhealthyHeartbeatEvent(hostname, now, null)); } } catch (InvalidStateTransitionException ex) { LOG.warn("Asking agent to re-register due to " + ex.getMessage(), ex); hostObject.setState(HostState.INIT); return createRegisterCommand(); } /** * A host can belong to only one cluster. Though getClustersForHost(hostname) * returns a set of clusters, it will have only one entry. *主機只能屬於一個集群。 通過getClustersForHost(hostname)返回一組集群,它只有一個條目。 * * TODO: Handle the case when a host is a part of multiple clusters. * 處理 主機是多個集群的一部分時的 情況。 */ Set<Cluster> clusters = clusterFsm.getClustersForHost(hostname); if (clusters.size() > 0) { String clusterName = clusters.iterator().next().getClusterName(); if (recoveryConfigHelper.isConfigStale(clusterName, hostname, heartbeat.getRecoveryTimestamp())) { RecoveryConfig rc = recoveryConfigHelper.getRecoveryConfig(clusterName, hostname); response.setRecoveryConfig(rc); if (response.getRecoveryConfig() != null) { LOG.info("Recovery configuration set to {}", response.getRecoveryConfig().toString()); } } } heartbeatProcessor.addHeartbeat(heartbeat); // Send commands if node is active if (hostObject.getState().equals(HostState.HEALTHY)) { sendCommands(hostname, response); annotateResponse(hostname, response); } return response; } ... }
6.2 Ambari-Agent執行流程
安裝ambari-agent 服務時會把相應在的python代碼置於python執行的環境上下文中,例如其入口代碼可能是/usr/lib/python2.6/site-packages/ambari_agent/main.py,並且進行相關初始化工作(例如驗證參數,與server建立連接,初始化安全驗證證書),最后會產生一個新的控制器Controller子線程來統一管理節點的狀態。Controller線程里面有一個動作隊列ActionQueue線程,並且開啟向Server注冊和發心跳服務。可以看出來,ambari-agent主要由兩個線程組成,Controller線程向Server發送注冊或心跳請求,請求到的Action數據放到ActionQueue線程里面,ActionQueue線程維護着兩個隊列:CommandQueue和ResultQueue。ActionQueue線程會監聽CommandQueue的狀況。
class Controller(threading.Thread): def __init__(self, config, range=30): // 在初始化Controller之前,ambari-agent就會在main.py里面進行判斷:ambari-server是否正常,正常才會初始化Controller // 省略初始化代碼 def run(self): try: // 初始化隊列線程 self.actionQueue = ActionQueue(self.config, controller=self) self.actionQueue.start() // 初始化注冊類 self.register = Register(self.config) // 初始化心跳類 self.heartbeat = Heartbeat(self.actionQueue, self.config, self.alert_scheduler_handler.collector()) opener = urllib2.build_opener() urllib2.install_opener(opener) while True: self.repeatRegistration = False //開始注冊 並且 定時發心跳 self.registerAndHeartbeat() if not self.repeatRegistration: logger.info("Finished heartbeating and registering cycle") break except: logger.exception("Controller thread failed with exception:") raise logger.info("Controller thread has successfully finished")
CommandQueue隊列主要有3類command:
1. REGISTER_COMMAND:該類命令主要通知agent重新向server發送注冊請求。
2. STATUS_COMMAND:該類命令主要告訴agent需要向server發送某組件的狀態信息。
3. EXECUTION_COMMAND:要求agent執行puppet或者軟件集升級任務
三、獲取指標流程:
- jersy接口接收到請求,創建一個ResourceInstance實例;
- 解析http請求構造一個Request對象,然后交給reques的process()方法來處理;
- reques解析url或http_body得到一個Predicate對象;
- 根據http類型獲取handler,GET請求對應ReadHandler;
- handler向Query對象中添加分頁、Render、Predicate等屬性后,然后讓query.execute();
- 根據Resource.Type獲得對應的ResourceProvider對象,調用其getResources方法得到Set;
- 調用對應的PropertyProvider填充Resource;
- 處理結果,放回json結果
Ambari-Server啟動
Ambari-Server接受來自兩處的REST請求,Agent過來的請求處理邏輯由包org.apache.ambari.server.agent處理, 而API所的處理邏輯來自org.apache.ambari.server.api。詳見如下代碼:
“`
“`
Ambari-Server有一個狀態機管理模塊,所有節點的狀態信息更改都最終提供給狀態機進行更改操作,因此狀態機是一個很忙的組件。在Ambari-Server里面,把每一次更改操作都把它當作是一類事件,采用事件驅動機制完成對應的任務。這種思想有點借鑒已經運用在hadoop 2.x YARN里面的事件驅動機制。事件驅動機制能夠一種高效的異步RPC請求方式,直接調用需要執行相應的代碼邏輯,而事件驅動只需要產生事件統一提交給事件處理器,因此事件驅動需要一個更復雜的有限狀態機結合起來一同使用。