Ambari架構源碼解析


1. Ambari介紹

  Apache Ambari是一種基於Web的工具,支持Apache Hadoop集群的供應、管理和監控。Ambari已支持大多數Hadoop組件,包括HDFS、MapReduce、Hive、Pig、 Hbase、Zookeeper、Sqoop和Hcatalog等。

1.1 基本概念

1. Resource:Ambari把可以被管理的資源的抽象為一個Resource實例,資源可以包括服務、組件、主機節點等,一個resource實例中包含了一系列該資源的屬性;
2. Property:服務組件的指標名稱;
3. ResourceProvider和PropertyProvider:分別對應Resource和Property的提供方,獲取指標需要先獲取Resource,然后獲取Property對應的metric;
4. Query:Query是Resource的內部對象,代表了對該資源的操作;
5. Request:一個Request代表了對Resource的操作請求,包含http信息及要操作的Resource的實例,Request按照http的請求方式分為四種:GET、PUT、DELETE、POST;
6. Predicate:一個Predicate代表了一系列表達式,如and、or等;

1.2 基本組件

Ambari 可以分為 5個大的組件,分別是是 Ambari-server 、 Ambari-web 、 Ambari-agent 、 Ambari-metrics-collectorAmbari-metrics-monitor 。

1. 在集群的每一台機器上都會部署 Ambari-agent 程序。 Agent 主要負責接收來着 Server 端的命令,這些命令可以是安裝、啟動、停止 Hadoop 集群上的某一服務。同時, agent 端需要向 Ambari-server 端上 報命令執行的結果,是執行成功還是失敗。 

2. Ambari-Server 提供 REST 接口給Agent 和 Web 訪問,用戶甚至可以不用界面,而是通過 curl 命令來操控集群。 

3. Ambari-metric-collector和 Ambari-metrics-monitor 是收集群中組件 metrics 的模塊。

1.3 相關技術

Ambari充分利用了一些已有的優秀開源軟件,巧妙地把它們結合起來,使其在分布式環境中做到了集群式服務管理能力、監控能力、展示能力,這些優秀的開源軟件有: 
(1)agent端,采用了puppet管理節點。 
(2)在web端,采用ember.js作為前端MVC框架和NodeJS相關工具,用handlebars.js作為頁面渲染引擎,在CSS/HTML方面還用了Bootstrap框架。 
(3)在Server端,采用了Jetty、Spring、JAX-RS等。 
(4)同時利用了Ganglia、Nagios的分布式監控能力。

Ambari架構采用的是Server/Client的模式,主要由兩部分組成:ambari-agent和ambari-server。ambari依賴其它已經成熟的工具,例如其ambari-server 就依賴python,而ambari-agent還同時依賴ruby, puppet,facter等工具,還有它也依賴一些監控工具nagios和ganglia用於監控集群狀況。 
其中: 
1. puppet是分布式集群配置管理工具,也是典型的Server/Client模式,能夠集中式管理分布式集群的安裝配置部署,主要語言是ruby。 
2. facter是用python寫的一個節點資源采集庫,用於采集節點的系統信息,例如OS信息,主機信息等。由於ambari-agent主要是用python寫的,因此用facter可以很好地采集到節點信息。

2. Ambari項目目錄結構

2.1 總體目錄

ambari-server Ambari的Server程序,主要管理部署在每個節點上的管理監控程序
ambari-agent 部署在監控節點上運行的管理監控程序
ambari-web Ambari頁面UI的代碼,作為用戶與Ambari server交互的。
ambari-views 用於擴展Ambari Web UI中的框架
ambari-common Ambari-server 和Ambari-agent 共用的代碼
ambari-metrics 在Ambari所管理的集群中用來收集、聚合和服務Hadoop和系統計量
contrib 自定義第三方庫
docs 文檔

   

 

 

 

 

 

 

2.2 ambari-server 目錄

目錄 描述
org.apache.ambari.server.api.services 對web接口的入口方法,處理/api/v1/* 的請求
org.apache.ambari.server.controller 對Ambari中cluster的管理處理,如新增host,更service、刪除component等
org.apache.ambari.server.controller.internal 主要存放ResourceProvider和PropertyProvider;
org.apache.ambari.service.orm.* 對數據庫的操作
org.apache.ambari.server.agent.rest 處理與Agent的接口的入口方法
org.apache.ambari.security 使用Spring Security來做權限管理

 每一種Resource都對應一個ResourceProvider,如下表所示:

 

Resource.Type ResourceProvider
Workflow WorkflowResourceProvider
Job JobResourceProvider
TaskAttempt TaskAttemptResourceProvider
View ViewResourceProvider
ViewInstance ViewInstanceResourceProvider
Blueprint BlueprintResourceProvider
Cluster ClusterResourceProvider
Service ServiceResourceProvider
Component ComponentResourceProvider
Host HostResourceProvider
HostComponent HostComponentResourceProvider
Configuration ConfigurationResourceProvider
Action ActionResourceProvider
Request RequestResourceProvider
Task TaskResourceProvider
User UserResourceProvider
Stack StackResourceProvider
StackVersion StackVersionResourceProvider
StackService StackServiceResourceProvider
StackServiceComponent StackServiceComponentResourceProvider
StackConfiguration StackConfigurationResourceProvider
OperatingSystem OperatingSystemResourceProvider
Repository RepositoryResourceProvider
RootService RootServiceResourceProvider
RootServiceComponent RootServiceComponentResourceProvider
RootServiceHostComponent RootServiceHostComponentResourceProvider
ConfigGroup ConfigGroupResourceProvider
RequestSchedule RequestScheduleResourceProvider  

 

2.3 Ambari-agent目錄

3. Ambari-server

3.1 ambari-server架構

  ambari-server是一個有狀態的,它維護着自己的一個有限狀態機FSM,同時這些狀態機存儲在數據庫中,默認數據庫為postgressql數據庫。 

1. Ambarii-Server提供ambari web,rest api,ambari shell三大方式操作機群; 
2. ambari將集群的配置、各個服務的配置等信息存在ambari server端的DB中; 
3. ambari server與ambari agent的交流走RPC,即agent向server報告心跳,server將command通過respons發回給agent,agent本地執行命令,比如:agent端執行相應的python腳本; 
4. ambari有自己的一套監控、告警、鏡像服務,以可插拔的形式供上層服務調用; 

 

 

  Ambari-Server是一個WEB Server,提供統一的REST API接口,同時向web和agent開放了兩個不同的端口(默認前者是8080, 后者是8440或者8441)。它是由Jetty Server容器構建起來的,通過Spring Framework構建出來的WEB服務器,其中大量采用了google提供的Guice注解完成spring框架所需要的注入功能,REST服務由JAX-RS標准來實現。


 

如下圖所示,server端主要維護三類狀態: 
1. Live Cluster State:集群現有狀態,各個節點匯報上來的狀態信息會更改該狀態; 
2. Desired State:用戶希望該節點所處狀態,是用戶在頁面進行了一系列的操作,需要更改某些服務的狀態,這些狀態還沒有在節點上產生作用; 
3. Action State:操作狀態,是狀態改變時的請求狀態,也可以看作是一種中間狀態,這種狀態可以輔助Live Cluster State向Desired State狀態轉變。

Ambari-server的Heartbeat Handler模塊用於接收各個agent的心跳請求(心跳請求里面主要包含兩類信息:節點狀態信息和返回的操作結果),把節點狀態信息傳遞給FSM狀態機去維護着該節點的狀態,並且把返回的操作結果信息返回給Action Manager去做進一步的處理。 
Coordinator模塊又可以稱為API handler,主要在接收WEB端操作請求后,會檢查它是否符合要求,stage planner分解成一組操作,最后提供給Action Manager去完成執行操作。

  因此,從上圖就可以看出,Ambari-Server的所有狀態信息的維護和變更都會記錄在數據庫中,用戶做一些更改服務的操作都會在數據庫上做一些相應的記錄,同時,agent通過心跳來獲得數據庫的變更歷史。

 


 

  Ambari Server 會讀取 Stack 和 Service 的配置文件。當用 Ambari 創建集群的時候,Ambari Server 傳送 Stack 和 Service 的配置文件以及 Service 生命周期的控制腳本到 Ambari Agent。Agent 拿到配置文件后,會下載安裝公共源里軟件包(Redhat,就是使用 yum 服務)。安裝完成后,Ambari Server 會通知 Agent 去啟動 Service。之后 Ambari Server 會定期發送命令到 Agent 檢查 Service 的狀態,Agent 上報給 Server,並呈現在 Ambari 的 GUI 上。 
Ambari Server 支持 Rest API,這樣可以很容易的擴展和定制化 Ambari。甚至於不用登陸 Ambari 的 GUI,只需要在命令行通過 curl 就可以控制 Ambari,以及控制 Hadoop 的 cluster。具體的 API 可以參見 Apache Ambari 的官方網頁 API reference。 

4. Ambari-agent

4.1 ambari-agent架構

ambari-agent是無狀態的,其功能主要分兩部分:

  • 采集所在節點的信息並且匯總發心跳匯報給ambari-server
  • 處理ambari-server的執行請求

因此它有兩種隊列:

  • 消息隊列MessageQueue,或為ResultQueue。包括節點狀態信息(包括注冊信息)和執行結果信息,並且匯總后通過心跳發送給ambari-server;
  • 操作隊列ActionQueue。用於接收ambari-server返回過來的狀態操作,然后能過執行器按序調用puppet或python腳本等模塊完成任務。 

 

4.2 Ambari-agent引導流程

分別是用SSH和人工手動的非SSH 
步驟: 
1. Ambari Server通過調用bootstrap.py來初始化整個bootstrap進程 
2. Server端通過SSH Keys在Agent上配置Ambari Repo:通過scp 命令將Ambari Server上的ambari.repo文件拷貝到Agent Host上。 
3. 復制Ambari Agent Setup script:利用scp命令將setupAgent.py腳本復制到Agent host上。 
4. 在各個Agent上執行Ambari Agent Setup script:SSH到各個Agent Host上然后執行setupAgent.py。 
5. 在Agent上安裝epel-release:用yum工具來安裝epel-release包 
6. 在Agent上安裝Ambari-agent:用yum工具來安裝Ambari-Agent包 
7. 配置Ambari-agent.ini:修改/etc/ambari-agent/conf/ambari-agent.ini,並設置agent host上的hostname 
8. 啟動Ambari-agent:啟動Ambari-agent進程 
9. 開始Ambari Agent注冊:agent開始registration進程

人工手動引導 
具體步驟內容基本同上

 

4.3 Agent注冊流程

步驟 
1. 連接握手端口8441:Ambari Agent連接到Ambari Server的握手端口8441。 
2. 下載Server Certification:Ambari Agent下載Server Certification。 
3. 請求簽署Agent Certification:Ambari Agent請求Ambari Server來簽署Agent證書。 
4. 簽署Agent Cert:Ambari Server通過密碼簽署Agent證書。 
5. 下載Agent Cert並斷掉連接:Ambari Agent下載Agent證書,然后斷掉之前的連接。 
6. 連接注冊端口8440:Ambari Agent連接到Ambari Server的注冊端口8441 
7. 用Agent Cert執行2WAY auth:在Agent和Server之間完成2WAY權限認證。 
8. 獲取FQDN:Ambari Agent host獲取Fully Qualified Domain Name(FQDN) 
9. 注冊Host:利用FQDN,host向Ambari Server提出注冊。 
10. 完成Host注冊:Ambari Server完成host的注冊過程,把host加入到Ambari數據庫 。
11. Agent心跳程序啟動:Ambari Agent向Ambari Server開啟心跳程序,確認各種命令的執行 。

5. Ambari-web內部架構

Ambari-web使用了一個流行的前端Embar.js MVC框架實現,Embar.js是一個TodoMVC框架,它涵蓋了現今典型的單頁面應用(single page application)幾乎所有的行為。

使用了nodejs

使用brunch 作為項目的構建管理工具

Brunch ,是一個超快的HTML5構建工具。它有如下功能:

(1)編譯你的腳本、模板、樣式、鏈接它們。

(2)將腳本和模板封裝進common.js/AMD模塊里,鏈接腳本和樣式。

(3)為鏈接文件生成源地圖,復制資源和靜態文件。

(4)通過縮減代碼和優化圖片來收縮輸出,看管你的文件更改。

(5)並通過控制台和系統提示通知你錯誤。

Nodejs 是一個基於Chrome JavaScript運行時建立的一個平台,用來方便的搭建快速的易於擴展的網絡應用,NodeJS借助事件驅動,非阻塞I/O模型變得輕量和高效,非常適合運行在分布式設備的數據密集型的實時應用。

6. 源碼分析

6.1 ambari-server處理ambari-agent請求

  Agent發送過來的心跳請求由org.apache.ambari.server.agent.HeartBeatHandler.handleHeartBeat(HeartBeat)來處理,執行完后,同時會返回org.apache.ambari.server.agent.HeartBeatResponse給agent。 org.apache.ambari.server.agent.HeartBeat里面主要含了兩類信息:節點的狀態信息nodeStatus和服務狀態信息componentStatus。

public class HeartBeatHandler {
  ...
    public HeartBeatResponse handleHeartBeat(HeartBeat heartbeat)
      throws AmbariException {
    long now = System.currentTimeMillis();
    if (heartbeat.getAgentEnv() != null && heartbeat.getAgentEnv().getHostHealth() != null) {
      heartbeat.getAgentEnv().getHostHealth().setServerTimeStampAtReporting(now);
    }

    String hostname = heartbeat.getHostname();
    Long currentResponseId = hostResponseIds.get(hostname);
    HeartBeatResponse response;

    if (currentResponseId == null) {
      //Server restarted, or unknown host.
      LOG.error("CurrentResponseId unknown for " + hostname + " - send register command");
      // 無responseId, 新請求,就進行注冊, responseId =0
      return createRegisterCommand();
    }

    LOG.debug("Received heartbeat from host"
        + ", hostname=" + hostname
        + ", currentResponseId=" + currentResponseId
        + ", receivedResponseId=" + heartbeat.getResponseId());

    if (heartbeat.getResponseId() == currentResponseId - 1) {
      LOG.warn("Old responseId received - response was lost - returning cached response");
      return hostResponses.get(hostname);
    } else if (heartbeat.getResponseId() != currentResponseId) {
      LOG.error("Error in responseId sequence - sending agent restart command");
      // 心跳是歷史記錄,那么就要求其重啟,重新注冊,responseId 不變
      return createRestartCommand(currentResponseId);
    }

    response = new HeartBeatResponse();
    //responseId 加 1 , 返回一個新的responseId,下次心跳又要把這個responseId帶回來。
    response.setResponseId(++currentResponseId);

    Host hostObject;
    try {
      hostObject = clusterFsm.getHost(hostname);
    } catch (HostNotFoundException e) {
      LOG.error("Host: {} not found. Agent is still heartbeating.", hostname);
      if (LOG.isDebugEnabled()) {
        LOG.debug("Host associated with the agent heratbeat might have been " +
          "deleted", e);
      }
      // For now return empty response with only response id.
      return response;
    }
    //失去心跳,要求重新注冊, responseId=0
    if (hostObject.getState().equals(HostState.HEARTBEAT_LOST)) {
      // After loosing heartbeat agent should reregister
      LOG.warn("Host is in HEARTBEAT_LOST state - sending register command");
      return createRegisterCommand();
    }

    hostResponseIds.put(hostname, currentResponseId);
    hostResponses.put(hostname, response);

    // If the host is waiting for component status updates, notify it
    //如果主機正在等待組件狀態更新,請通知它
    //節點已經進行了注冊,但是該節點還沒有匯報相關狀態信息,等待服務狀態更新
    if (heartbeat.componentStatus.size() > 0
        && hostObject.getState().equals(HostState.WAITING_FOR_HOST_STATUS_UPDATES)) {
      try {
        LOG.debug("Got component status updates");
        //更新服務狀態機
        hostObject.handleEvent(new HostStatusUpdatesReceivedEvent(hostname, now));
      } catch (InvalidStateTransitionException e) {
        LOG.warn("Failed to notify the host about component status updates", e);
      }
    }

    if (heartbeat.getRecoveryReport() != null) {
      RecoveryReport rr = heartbeat.getRecoveryReport();
      processRecoveryReport(rr, hostname);
    }

    try {
      if (heartbeat.getNodeStatus().getStatus().equals(HostStatus.Status.HEALTHY)) {
        //向狀態機發送更新事件,更新節點至正常狀態
        hostObject.handleEvent(new HostHealthyHeartbeatEvent(hostname, now,
            heartbeat.getAgentEnv(), heartbeat.getMounts()));
      } else { // 把節點列入不健康
        hostObject.handleEvent(new HostUnhealthyHeartbeatEvent(hostname, now, null));
      }
    } catch (InvalidStateTransitionException ex) {
      LOG.warn("Asking agent to re-register due to " + ex.getMessage(), ex);
      hostObject.setState(HostState.INIT);
      return createRegisterCommand();
    }

    /**
     * A host can belong to only one cluster. Though getClustersForHost(hostname)
     * returns a set of clusters, it will have only one entry.
     *主機只能屬於一個集群。 通過getClustersForHost(hostname)返回一組集群,它只有一個條目。
     *
     * TODO: Handle the case when a host is a part of multiple clusters.
     * 處理 主機是多個集群的一部分時的 情況。
     */
    Set<Cluster> clusters = clusterFsm.getClustersForHost(hostname);

    if (clusters.size() > 0) {
      String clusterName = clusters.iterator().next().getClusterName();

      if (recoveryConfigHelper.isConfigStale(clusterName, hostname, heartbeat.getRecoveryTimestamp())) {
        RecoveryConfig rc = recoveryConfigHelper.getRecoveryConfig(clusterName, hostname);
        response.setRecoveryConfig(rc);

        if (response.getRecoveryConfig() != null) {
          LOG.info("Recovery configuration set to {}", response.getRecoveryConfig().toString());
        }
      }
    }

    heartbeatProcessor.addHeartbeat(heartbeat);

    // Send commands if node is active
    if (hostObject.getState().equals(HostState.HEALTHY)) {
      sendCommands(hostname, response);
      annotateResponse(hostname, response);
    }

    return response;
  }

  ...
}

6.2 Ambari-Agent執行流程

  安裝ambari-agent 服務時會把相應在的python代碼置於python執行的環境上下文中,例如其入口代碼可能是/usr/lib/python2.6/site-packages/ambari_agent/main.py,並且進行相關初始化工作(例如驗證參數,與server建立連接,初始化安全驗證證書),最后會產生一個新的控制器Controller子線程來統一管理節點的狀態。Controller線程里面有一個動作隊列ActionQueue線程,並且開啟向Server注冊和發心跳服務。可以看出來,ambari-agent主要由兩個線程組成,Controller線程向Server發送注冊或心跳請求,請求到的Action數據放到ActionQueue線程里面,ActionQueue線程維護着兩個隊列:CommandQueue和ResultQueue。ActionQueue線程會監聽CommandQueue的狀況。

class Controller(threading.Thread):    
  def __init__(self, config, range=30):  
  // 在初始化Controller之前,ambari-agent就會在main.py里面進行判斷:ambari-server是否正常,正常才會初始化Controller  
  // 省略初始化代碼
  def run(self):
    try:
      // 初始化隊列線程
      self.actionQueue = ActionQueue(self.config, controller=self)
      self.actionQueue.start()
      // 初始化注冊類
      self.register = Register(self.config)
      // 初始化心跳類
      self.heartbeat = Heartbeat(self.actionQueue, self.config, self.alert_scheduler_handler.collector())

      opener = urllib2.build_opener()
      urllib2.install_opener(opener)

      while True:
        self.repeatRegistration = False
        //開始注冊 並且 定時發心跳 
        self.registerAndHeartbeat()
        if not self.repeatRegistration:
          logger.info("Finished heartbeating and registering cycle")
          break
    except:
      logger.exception("Controller thread failed with exception:")
      raise

    logger.info("Controller thread has successfully finished")

CommandQueue隊列主要有3類command: 
1. REGISTER_COMMAND:該類命令主要通知agent重新向server發送注冊請求。 
2. STATUS_COMMAND:該類命令主要告訴agent需要向server發送某組件的狀態信息。 
3. EXECUTION_COMMAND:要求agent執行puppet或者軟件集升級任務

三、獲取指標流程:

  1. jersy接口接收到請求,創建一個ResourceInstance實例;
  2. 解析http請求構造一個Request對象,然后交給reques的process()方法來處理;
  3. reques解析url或http_body得到一個Predicate對象;
  4. 根據http類型獲取handler,GET請求對應ReadHandler;
  5. handler向Query對象中添加分頁、Render、Predicate等屬性后,然后讓query.execute();
  6. 根據Resource.Type獲得對應的ResourceProvider對象,調用其getResources方法得到Set;
  7. 調用對應的PropertyProvider填充Resource;
  8. 處理結果,放回json結果

Ambari-Server啟動

Ambari-Server接受來自兩處的REST請求,Agent過來的請求處理邏輯由包org.apache.ambari.server.agent處理, 而API所的處理邏輯來自org.apache.ambari.server.api。詳見如下代碼:

“`

“`

Ambari-Server有一個狀態機管理模塊,所有節點的狀態信息更改都最終提供給狀態機進行更改操作,因此狀態機是一個很忙的組件。在Ambari-Server里面,把每一次更改操作都把它當作是一類事件,采用事件驅動機制完成對應的任務。這種思想有點借鑒已經運用在hadoop 2.x YARN里面的事件驅動機制。事件驅動機制能夠一種高效的異步RPC請求方式,直接調用需要執行相應的代碼邏輯,而事件驅動只需要產生事件統一提交給事件處理器,因此事件驅動需要一個更復雜的有限狀態機結合起來一同使用。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM