Yarn RM寫ZNode超數據量限制bug修復


問題背景

線上集群出現過幾次 Yarn RM 寫 ZK ZNode 的數據量超過 ZNode 限制,導致 RM 服務均進入 Standby 狀態,用戶無法正常提交任務,整個集群 hang 住,后續排查發現主要是異常任務寫 ZNode 數據量太大,超過 ZNode 限制,導致集群其他提交作業的狀態信息無法正常寫入 ZNode,為避免類似問題再次出現,我們對 RM 寫 ZNode 邏輯進行了優化,規避異常任務對整個集群造成的雪崩效應。

一、問題復現

最直接方式是修改 ZK 的 Jute 最大緩沖區為 512 B,重啟 ZK 和 Yarn 服務,此時 ZK 和 RM 服務均出現異常,ZK 異常信息表現為數據 java.io.IOException: Len error 614 客戶端寫入數據超過 512B 無法正常寫入 ZK,RM 表現為 ”code:CONNECTIONLOSS“,無法連接到 ZK,兩個 RM 均處於 Standy 狀態,此時集群處於不可用狀態。

leader ZK 異常信息:

2020-12-07 16:00:11,869 INFO org.apache.zookeeper.server.ZooKeeperServer: Client attempting to renew session 0x1763c3707800002 at /10.197.1.96:32892
2020-12-07 16:00:11,869 INFO org.apache.zookeeper.server.ZooKeeperServer: Established session 0x1763c3707800002 with negotiated timeout 40000 for client /10.197.1.96:32892
2020-12-07 16:00:11,870 WARN org.apache.zookeeper.server.NIOServerCnxn: Exception causing close of session 0x1763c3707800002 due to java.io.IOException: Len error 614
2020-12-07 16:00:11,870 INFO org.apache.zookeeper.server.NIOServerCnxn: Closed socket connection for client /10.197.1.96:32892 which had sessionid 0x1763c3707800002
2020-12-07 16:00:12,216 INFO org.apache.zookeeper.server.NIOServerCnxnFactory: Accepted socket connection from /10.197.1.141:56492
2020-12-07 16:00:12,216 INFO org.apache.zookeeper.server.ZooKeeperServer: Client attempting to establish new session at /10.197.1.141:56492
2020-12-07 16:00:12,218 INFO org.apache.zookeeper.server.ZooKeeperServer: Established session 0x3763c3707830001 with negotiated timeout 40000 for client /10.197.1.141:56492
2020-12-07 16:00:12,219 WARN org.apache.zookeeper.server.NIOServerCnxn: Exception causing close of session 0x3763c3707830001 due to java.io.IOException: Len error 614
2020-12-07 16:00:12,220 INFO org.apache.zookeeper.server.NIOServerCnxn: Closed socket connection for client /10.197.1.141:56492 which had sessionid 0x3763c3707830001
2020-12-07 16:00:14,275 INFO org.apache.zookeeper.server.NIOServerCnxnFactory: Accepted socket connection from /10.197.1.141:56510
2020-12-07 16:00:14,275 INFO org.apache.zookeeper.server.ZooKeeperServer: Client attempting to renew session 0x3763c3707830001 at /10.197.1.141:56510
2020-12-07 16:00:14,276 INFO org.apache.zookeeper.server.ZooKeeperServer: Established session 0x3763c3707830001 with negotiated timeout 40000 for client /10.197.1.141:56510
2020-12-07 16:00:14,276 WARN org.apache.zookeeper.server.NIOServerCnxn: Exception causing close of session 0x3763c3707830001 due to java.io.IOException: Len error 614
2020-12-07 16:00:14,276 INFO org.apache.zookeeper.server.NIOServerCnxn: Closed socket connection for client /10.197.1.141:56510 which had sessionid 0x3763c3707830001
2020-12-07 16:00:16,000 INFO org.apache.zookeeper.server.ZooKeeperServer: Expiring session 0x1763c3707800000, timeout of 5000ms exceeded
View Code

Yarn RM 日志:

2020-12-07 16:00:10,938 INFO org.apache.hadoop.ha.ActiveStandbyElector: Session connected.
2020-12-07 16:00:10,938 INFO org.apache.hadoop.ha.ActiveStandbyElector: Ignore duplicate monitor lock-node request.
2020-12-07 16:00:11,038 INFO org.apache.hadoop.ha.ActiveStandbyElector: Session disconnected. Entering neutral mode...
2020-12-07 16:00:11,647 INFO org.apache.zookeeper.ClientCnxn: Opening socket connection to server slave-prd-10-197-1-236.v-bj-5.kwang.lan/10.197.1.236:2181. Will not attempt to authenticate using SASL (unknown error)
2020-12-07 16:00:11,647 INFO org.apache.zookeeper.ClientCnxn: Socket connection established, initiating session, client: /10.197.1.141:56854, server: slave-prd-10-197-1-236.v-bj-5.kwang.lan/10.197.1.236:2181
2020-12-07 16:00:11,649 INFO org.apache.zookeeper.ClientCnxn: Session establishment complete on server slave-prd-10-197-1-236.v-bj-5.kwang.lan/10.197.1.236:2181, sessionid = 0x1763c3707800001, negotiated timeout = 40000
2020-12-07 16:00:11,649 INFO org.apache.hadoop.ha.ActiveStandbyElector: Session connected.
2020-12-07 16:00:11,650 INFO org.apache.hadoop.ha.ActiveStandbyElector: Ignore duplicate monitor lock-node request.
2020-12-07 16:00:11,650 INFO org.apache.zookeeper.ClientCnxn: Unable to read additional data from server sessionid 0x1763c3707800001, likely server has closed socket, closing socket connection and attempting reconnect
2020-12-07 16:00:11,750 FATAL org.apache.hadoop.ha.ActiveStandbyElector: Received create error from Zookeeper. code:CONNECTIONLOSS for path /yarn-leader-election/yarnRM/ActiveStandbyElectorLock. Not retrying further znode create connection errors.
2020-12-07 16:00:12,210 INFO org.apache.zookeeper.ZooKeeper: Session: 0x1763c3707800001 closed
2020-12-07 16:00:12,212 WARN org.apache.hadoop.ha.ActiveStandbyElector: Ignoring stale result from old client with sessionId 0x1763c3707800001
2020-12-07 16:00:12,212 WARN org.apache.hadoop.ha.ActiveStandbyElector: Ignoring stale result from old client with sessionId 0x1763c3707800001
2020-12-07 16:00:12,212 INFO org.apache.zookeeper.ClientCnxn: EventThread shut down
2020-12-07 16:00:12,213 ERROR org.apache.hadoop.yarn.server.resourcemanager.ResourceManager: Received RMFatalEvent of type EMBEDDED_ELECTOR_FAILED, caused by Received create error from Zookeeper. code:CONNECTIONLOSS for path /yarn-leader-election/yarnRM/ActiveStandbyElectorLock. Not retrying further znode create connection errors.
2020-12-07 16:00:12,213 WARN org.apache.hadoop.yarn.server.resourcemanager.ResourceManager: Transitioning the resource manager to standby.
2020-12-07 16:00:12,214 INFO org.apache.hadoop.yarn.server.resourcemanager.ResourceManager: Transitioning RM to Standby mode
2020-12-07 16:00:12,214 INFO org.apache.hadoop.yarn.server.resourcemanager.ResourceManager: Already in standby state
2020-12-07 16:00:12,214 INFO org.apache.hadoop.ha.ActiveStandbyElector: Yielding from electionÏ
2020-12-07 16:00:12,214 INFO org.apache.zookeeper.ZooKeeper: Initiating client connection, connectString=slave-prd-10-197-1-236.v-bj-5.kwang.lan:2181,slave-prd-10-197-1-96.v-bj-5.kwang.lan:2181,slave-prd-10-197-1-141.v-bj-5.kwang.lan:2181 sessionTimeout=60000 watcher=org.apache.hadoop.ha.ActiveStandbyElector$WatcherWithClientRef@67b6359c
2020-12-07 16:00:12,215 INFO org.apache.zookeeper.ClientCnxn: Opening socket connection to server slave-prd-10-197-1-141.v-bj-5.kwang.lan/10.197.1.141:2181. Will not attempt to authenticate using SASL (unknown error)
2020-12-07 16:00:12,216 INFO org.apache.zookeeper.ClientCnxn: Socket connection established, initiating session, client: /10.197.1.141:56492, server: slave-prd-10-197-1-141.v-bj-5.kwang.lan/10.197.1.141:2181
2020-12-07 16:00:12,218 INFO org.apache.zookeeper.ClientCnxn: Session establishment complete on server slave-prd-10-197-1-141.v-bj-5.kwang.lan/10.197.1.141:2181, sessionid = 0x3763c3707830001, negotiated timeout = 40000
2020-12-07 16:00:12,219 INFO org.apache.hadoop.ha.ActiveStandbyElector: Session connected.
2020-12-07 16:00:12,220 INFO org.apache.zookeeper.ClientCnxn: Unable to read additional data from server sessionid 0x3763c3707830001, likely server has closed socket, closing socket connection and attempting reconnect
2020-12-07 16:00:12,320 INFO org.apache.hadoop.ha.ActiveStandbyElector: Session disconnected. Entering neutral mode...
2020-12-07 16:00:12,320 WARN org.apache.hadoop.yarn.server.resourcemanager.EmbeddedElectorService: Lost contact with Zookeeper. Transitioning to standby in 60000 ms if connection is not reestablished.
View Code

 

二、RM 與 ZNode 交互原理

2.1 RM 狀態在 ZK 中的存儲

不管 RM 是否啟用了高可用,RM 作為 Yarn 的核心服務組件,不僅要與各個節點上的 ApplicationMaster 進行通信,還要與 NodeManager 進行心跳包的傳輸,自然在 RM 上會注冊進來很多應用,每個應用由一個 ApplicationMaster 負責掌管整個應用周期,既然 RM 角色如此重要,就有必要保存一下 RM 的信息狀態,以免 RM 進程異常退出后導致應用狀態信息全部丟失,RM 重啟無法重跑之前的任務。

 

既然應用狀態信息要保存的目標易經明確了,那保存方式和保存的數據信息是什么呢。

在 Yarn 中 RM 應用狀態信息保存的方式有四種:

  • MemoryRMStateStore——信息狀態保存在內存中的實現類。

  • FileSystemRMStateStore——信息狀態保存在 HDFS 文件系統中,這個是做了持久化的。

  • NullRMStateStore——什么都不做,就是不保存應用狀態信息。

  • ZKRMStateStore——信息狀態保存在 Zookeeper 中。

由於 Yarn 啟用了 RM HA,以上四種方式只能支持 ZKRMStateStore。

 

那 RM 在 ZK 中到底是存儲了哪些信息狀態呢?如下所示,是 ZK 中存儲 RM 信息狀態的目錄格式,可以看出,ZK 中主要存儲 Application(作業的狀態信息)和 SECRET_MANAGER(作業的 TOKEN 信息)等。

    ROOT_DIR_PATH
      |--- VERSION_INFO
      |--- EPOCH_NODE
      |--- RM_ZK_FENCING_LOCK
      |--- RM_APP_ROOT
      |     |----- (#ApplicationId1)
      |     |        |----- (#ApplicationAttemptIds)
      |     |
      |     |----- (#ApplicationId2)
      |     |       |----- (#ApplicationAttemptIds)
      |     ....
      |
      |--- RM_DT_SECRET_MANAGER_ROOT
      |----- RM_DT_SEQUENTIAL_NUMBER_ZNODE_NAME
      |----- RM_DELEGATION_TOKENS_ROOT_ZNODE_NAME
      |       |----- Token_1
      |       |----- Token_2
      |       ....
      |
      |----- RM_DT_MASTER_KEYS_ROOT_ZNODE_NAME
      |      |----- Key_1
      |      |----- Key_2
      ....
      |--- AMRMTOKEN_SECRET_MANAGER_ROOT
      |----- currentMasterKey
      |----- nextMasterKey

2.2 ZK 存儲&更新 RM 信息狀態邏輯

作業提交到 Yarn 上的入口,都是通過 YarnClient 這個接口 api 提交的,具體提交方法為 submitApplication()。

//位置:org/apache/hadoop/yarn/client/api/YarnClient.java
  public abstract ApplicationId submitApplication(
      ApplicationSubmissionContext appContext) throws YarnException,
      IOException;

 

作業提交后,會經過一些列的事件轉換,請求到不同的狀態機進行處理,而保存作業的狀態機 StoreAppTransition 會對 APP 的狀態進行保存,將其元數據存儲到 ZK 中。

//位置:org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java
  public void storeNewApplication(RMApp app) {
    ApplicationSubmissionContext context = app
                                            .getApplicationSubmissionContext();
    assert context instanceof ApplicationSubmissionContextPBImpl;
    ApplicationStateData appState =
        ApplicationStateData.newInstance(
            app.getSubmitTime(), app.getStartTime(), context, app.getUser());
    // 向調度器發送 RMStateStoreEventType.STORE_APP 事件
    dispatcher.getEventHandler().handle(new RMStateStoreAppEvent(appState));
  }

 

這里向調度器發送 RMStateStoreEventType.STORE_APP 事件,並注冊了 StoreAppTransition 狀態機。

//位置:org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java
    .addTransition(RMStateStoreState.ACTIVE,
          EnumSet.of(RMStateStoreState.ACTIVE, RMStateStoreState.FENCED),
          RMStateStoreEventType.STORE_APP, new StoreAppTransition())

 

StoreAppTransition 狀態機最終會調用 ZKRMStateStore#storeApplicationStateInternal() 方法,對 RM 的元數據在 ZK 中進行保存。

//位置:org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java 
  @Override
  public synchronized void storeApplicationStateInternal(ApplicationId appId,
      ApplicationStateData appStateDataPB) throws Exception {
    String nodeCreatePath = getNodePath(rmAppRoot, appId.toString());

    if (LOG.isDebugEnabled()) {
      LOG.debug("Storing info for app: " + appId + " at: " + nodeCreatePath);
    }
    byte[] appStateData = appStateDataPB.getProto().toByteArray();
    createWithRetries(nodeCreatePath, appStateData, zkAcl,
              CreateMode.PERSISTENT);
  }

 

RM Application 的狀態保存到 ZK 后,APP 狀態最終會轉化為 ACCETPED 狀態 ,此時,會觸發 StartAppAttemptTransition 狀態機,對 AppAttemp 狀態進行保存。

//位置:org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java 
  @Override
  public synchronized void storeApplicationAttemptStateInternal(
      ApplicationAttemptId appAttemptId,
      ApplicationAttemptStateData attemptStateDataPB)
      throws Exception {
    String appDirPath = getNodePath(rmAppRoot,
        appAttemptId.getApplicationId().toString());
    String nodeCreatePath = getNodePath(appDirPath, appAttemptId.toString());

    if (LOG.isDebugEnabled()) {
      LOG.debug("Storing info for attempt: " + appAttemptId + " at: "
          + nodeCreatePath);
    }
    byte[] attemptStateData = attemptStateDataPB.getProto().toByteArray();
    createWithRetries(nodeCreatePath, attemptStateData, zkAcl,
                    CreateMode.PERSISTENT);
  }

 

而在任務運行結束時,會對 Application 和 AppAttemp 的狀態進行更新。而更新操作也是容易出現異常的地方,這兩段代碼主要是執行更新或添加任務重試狀態信息到 ZK 中的操作,Yarn 在調度任務的過程中,可能會對任務進行多次重試,主要受網絡、硬件、資源等因素影響,如果任務重試信息保存在 ZK 失敗,會調用 org.apache.hadoop.yarn.server.resourcemanager.recovery.ZKRMStateStore.ZKAction.runWithRetries() 方法重試。

//位置:org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java 
  // 對 Application 狀態進行更新
  @Override
  public synchronized void updateApplicationStateInternal(ApplicationId appId,
      ApplicationStateData appStateDataPB) throws Exception {
    String nodeUpdatePath = getNodePath(rmAppRoot, appId.toString());

    if (LOG.isDebugEnabled()) {
      LOG.debug("Storing final state info for app: " + appId + " at: "
          + nodeUpdatePath);
    }
    byte[] appStateData = appStateDataPB.getProto().toByteArray();

    if (existsWithRetries(nodeUpdatePath, false) != null) {
      setDataWithRetries(nodeUpdatePath, appStateData, -1);
    } else {
      createWithRetries(nodeUpdatePath, appStateData, zkAcl,
              CreateMode.PERSISTENT);
      LOG.debug(appId + " znode didn't exist. Created a new znode to"
              + " update the application state.");
    }
  }

  // 對 AppAttemp 狀態進行更新
  @Override
  public synchronized void updateApplicationAttemptStateInternal(
      ApplicationAttemptId appAttemptId,
      ApplicationAttemptStateData attemptStateDataPB)
      throws Exception {
    String appIdStr = appAttemptId.getApplicationId().toString();
    String appAttemptIdStr = appAttemptId.toString();
    String appDirPath = getNodePath(rmAppRoot, appIdStr);
    String nodeUpdatePath = getNodePath(appDirPath, appAttemptIdStr);
    if (LOG.isDebugEnabled()) {
      LOG.debug("Storing final state info for attempt: " + appAttemptIdStr
          + " at: " + nodeUpdatePath);
    }
    byte[] attemptStateData = attemptStateDataPB.getProto().toByteArray();

    if (existsWithRetries(nodeUpdatePath, false) != null) {
      setDataWithRetries(nodeUpdatePath, attemptStateData, -1);
    } else {
      createWithRetries(nodeUpdatePath, attemptStateData, zkAcl,
              CreateMode.PERSISTENT);
      LOG.debug(appAttemptId + " znode didn't exist. Created a new znode to"
              + " update the application attempt state.");
    }
  }

 

在啟用 Yarn 高可用情況下,

重試間隔機制如下:受 yarn.resourcemanager.zk-timeout-ms(ZK會話超時時間,線上 1 分鍾,即 60000ms)和 yarn.resourcemanager.zk-num-retries(操作失敗后重試次數,線上環境 1000次)參數控制,計算公式為:

重試時間間隔(yarn.resourcemanager.zk-retry-interval-ms )=yarn.resourcemanager.zk-timeout-ms(ZK session超時時間)/yarn.resourcemanager.zk-num-retries(重試次數)

 

即在生產環境中,重試時間間隔 = 600000ms /1000次 = 60 ms/次,即線上環境在任務不成功的條件下,會重試 1000 次,每次 60 ms,這里也可能會導致 RM 堆內存溢出。參考資料:https://my.oschina.net/dabird/blog/3089265

 

重試間隔確定代碼如下:

//位置:src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java  
  @Override
  public synchronized void initInternal(Configuration conf) throws Exception {
    zkHostPort = conf.get(YarnConfiguration.RM_ZK_ADDRESS);
    if (zkHostPort == null) {
      throw new YarnRuntimeException("No server address specified for " +
          "zookeeper state store for Resource Manager recovery. " +
          YarnConfiguration.RM_ZK_ADDRESS + " is not configured.");
    }
    // ZK 連接重試次數
    numRetries =
        conf.getInt(YarnConfiguration.RM_ZK_NUM_RETRIES,
            YarnConfiguration.DEFAULT_ZK_RM_NUM_RETRIES);
    znodeWorkingPath =
        conf.get(YarnConfiguration.ZK_RM_STATE_STORE_PARENT_PATH,
            YarnConfiguration.DEFAULT_ZK_RM_STATE_STORE_PARENT_PATH);

    // ZK session 超時時間
    zkSessionTimeout =
        conf.getInt(YarnConfiguration.RM_ZK_TIMEOUT_MS,
            YarnConfiguration.DEFAULT_RM_ZK_TIMEOUT_MS);
    zknodeLimit =
        conf.getInt(YarnConfiguration.RM_ZK_ZNODE_SIZE_LIMIT_BYTES,
            YarnConfiguration.DEFAULT_RM_ZK_ZNODE_SIZE_LIMIT_BYTES);

    if (HAUtil.isHAEnabled(conf)) {
      zkRetryInterval = zkSessionTimeout / numRetries;
    } else {
      zkRetryInterval =
          conf.getLong(YarnConfiguration.RM_ZK_RETRY_INTERVAL_MS,
              YarnConfiguration.DEFAULT_RM_ZK_RETRY_INTERVAL_MS);
    }
 }

 

至此,我們已經清楚了 RM 中作業的信息狀態是如何保存在 ZK 中並如何進行更新的。

2.3 ZK 刪除 RM 信息狀態邏輯

在了解了 RM 作業信息狀態保存在 ZK 的邏輯后,我們便會產生一個疑問,那 RM 狀態保存在 ZK 中后,是否會一直駐留在 ZK 中呢?答案是否定的,ZK 也會對作業的狀態進行刪除,那刪除邏輯是這樣的呢?

刪除的核心邏輯位於 RMAppManager#checkAppNumCompletedLimit() 方法中調用的 removeApplication() 方法,其邏輯就是判斷保存在 ZK StateStore 中或已完成的作業數量超過對應限制,則對 App 狀態信息進行刪除。

//位置:org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java
  /*
   * check to see if hit the limit for max # completed apps kept
   */
  protected synchronized void checkAppNumCompletedLimit() {
    // check apps kept in state store.
    while (completedAppsInStateStore > this.maxCompletedAppsInStateStore) {
      ApplicationId removeId =
          completedApps.get(completedApps.size() - completedAppsInStateStore);
      RMApp removeApp = rmContext.getRMApps().get(removeId);
      LOG.info("Max number of completed apps kept in state store met:"
          + " maxCompletedAppsInStateStore = " + maxCompletedAppsInStateStore
          + ", removing app " + removeApp.getApplicationId()
          + " from state store.");
      rmContext.getStateStore().removeApplication(removeApp);
      completedAppsInStateStore--;
    }

    // check apps kept in memorty.
    while (completedApps.size() > this.maxCompletedAppsInMemory) {
      ApplicationId removeId = completedApps.remove();
      LOG.info("Application should be expired, max number of completed apps"
          + " kept in memory met: maxCompletedAppsInMemory = "
          + this.maxCompletedAppsInMemory + ", removing app " + removeId
          + " from memory: ");
      rmContext.getRMApps().remove(removeId);
      this.applicationACLsManager.removeApplication(removeId);
    }
  }

 

可以看看相關參數是如何設置的,其中保存在 ZK StateStore 中和保存在 Memory 的 App 最大數量是一致的,默認是 10000(線上環境默認也是 10000),且保存在 ZK StateSotre 中的作業數量不能超過保存在 Memory 中的作業數量。

//位置:org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java
  public RMAppManager(RMContext context,
      YarnScheduler scheduler, ApplicationMasterService masterService,
      ApplicationACLsManager applicationACLsManager, Configuration conf) {
    ...
    // 保存在 Memory 中的 App 最大數量
    this.maxCompletedAppsInMemory = conf.getInt(
        YarnConfiguration.RM_MAX_COMPLETED_APPLICATIONS,
        YarnConfiguration.DEFAULT_RM_MAX_COMPLETED_APPLICATIONS);
    // 保存在 ZK StateStore 中的 App 最大數量,默認和 Memory 中的最大值保存一致
    this.maxCompletedAppsInStateStore =
        conf.getInt(
          YarnConfiguration.RM_STATE_STORE_MAX_COMPLETED_APPLICATIONS,
          YarnConfiguration.DEFAULT_RM_STATE_STORE_MAX_COMPLETED_APPLICATIONS);

    // 保存在 ZK StateStore 中的 App 數量不能超過保存在 Memory 中的 App 數量
    if (this.maxCompletedAppsInStateStore > this.maxCompletedAppsInMemory) {
      this.maxCompletedAppsInStateStore = this.maxCompletedAppsInMemory;
    }
  }

//位置:org/apache/hadoop/yarn/conf/YarnConfiguration.java
  // maxCompletedAppsInMemory 參數定義
  /** The maximum number of completed applications RM keeps. */ 
  public static final String RM_MAX_COMPLETED_APPLICATIONS =
    RM_PREFIX + "max-completed-applications";
  public static final int DEFAULT_RM_MAX_COMPLETED_APPLICATIONS = 10000;

  // maxCompletedAppsInStateStore 參數定義,默認和 maxCompletedAppsInMemory 保持一致
  /**
   * The maximum number of completed applications RM state store keeps, by
   * default equals to DEFAULT_RM_MAX_COMPLETED_APPLICATIONS
   */
  public static final String RM_STATE_STORE_MAX_COMPLETED_APPLICATIONS =
      RM_PREFIX + "state-store.max-completed-applications";
  public static final int DEFAULT_RM_STATE_STORE_MAX_COMPLETED_APPLICATIONS =
      DEFAULT_RM_MAX_COMPLETED_APPLICATIONS;

 

執行真正的刪除操作,刪除在 ZK 中保存的超出限制的 App 狀態信息。

//位置:org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java 
  @Override
  public synchronized void removeApplicationStateInternal(
      ApplicationStateData  appState)
      throws Exception {
    String appId = appState.getApplicationSubmissionContext().getApplicationId()
        .toString();
    String appIdRemovePath = getNodePath(rmAppRoot, appId);
    ArrayList<Op> opList = new ArrayList<Op>();

    // 刪除在 ZK 中保存的 AppAttempt 信息
    for (ApplicationAttemptId attemptId : appState.attempts.keySet()) {
      String attemptRemovePath = getNodePath(appIdRemovePath, attemptId.toString());
      opList.add(Op.delete(attemptRemovePath, -1));
    }
    opList.add(Op.delete(appIdRemovePath, -1));

    if (LOG.isDebugEnabled()) {
      LOG.debug("Removing info for app: " + appId + " at: " + appIdRemovePath
          + " and its attempts.");
    }
    // 刪除在 ZK 中保存的 Applicaton 信息
    doDeleteMultiWithRetries(opList);
  }

 

三、解決方案

3.1 Hadoop 2.9.0 之前修復方法

RM 狀態在 ZK 存儲的過程中,RM 作為客戶端,ZK 作為服務端,在 Hadoop 2.9.0 版本之前,出現這種異常的處理方式為修改 ZK 端 jute.maxbuffer 參數的值,以增加 RM 作業允許寫 ZK 的最大值。但這種處理方式有三種不足:

  1. ZK 服務端允許寫入的 ZNode 數據量太大,會影響 ZK 服務的讀寫性能和 ZK 內存緊張;

  2. 需要重啟 ZK 服務端和客戶端 RM 服務,運維成本較高。(如果有其他服務依賴此 ZK 則成本更高,可能還需要重啟其他服務)

  3. 異常任務寫 ZNode 數據量不可控,某些情況下還是會發生寫入 ZNode 大小超過限制。

 

Q:為什么要限制 ZK 中 ZNode 大小?

A:ZK 是一套高吞吐量的系統,為了提高系統的讀取速度,ZK不允許從文件中讀取需要的數據,而是直接從內存中查找。換句話說,ZK 集群中每一台服務器都包含全量的數據,並且這些數據都會加載到內存中,同時 ZNode 的數據不支持 Append 操作,全部都是 Replace 操作。如果 ZNode 數據量過大,那么讀寫 ZNode 將造成不確定的延時(比如服務端同步數據慢),同時 ZNode 太大會消耗 ZK 服務器的內存,這也是為什么 ZK 不適合存儲大量數據的原因。

3.2 Hadoop 2.9.0 及后續版本修復方法

在 Hadoop 2.9.0 及后續版本中,yarn-site.xml 中增加了 yarn.resourcemanager.zk-max-znode-size.bytes 參數,該參數定義了 ZK 的 ZNode 節點所能存儲的最大數據量,以字節為單位,默認是 1024*1024 字節,也就是 1MB。使用這種方式,我們就不需要修改 ZK 的服務端的配置,而只需修改 Yarn 服務端的配置並重啟 RM 服務,就能限制 RM 往 ZK 中寫入的數據量,而且也提高了 ZK 服務的可用性。

修復的核心主要是在 ZKRMStateStore 類中的 storeApplicationStateInternal()、updateApplicationStateInternal()、storeApplicationAttemptStateInternal()、updateApplicationAttemptStateInternal() 方法邏輯中增加了是否超過寫 ZNode 大小限制的判斷,避免單個作業寫 ZNode 數據量過大導致 RM 和 ZK 服務的不可用。部分代碼如下:

//位置:org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java 
  // Application 寫 ZNode 時判斷大小限制
    @Override
  public synchronized void storeApplicationStateInternal(ApplicationId appId,
      ApplicationStateData appStateDataPB) throws Exception {
    String nodeCreatePath = getNodePath(rmAppRoot, appId.toString());

    if (LOG.isDebugEnabled()) {
      LOG.debug("Storing info for app: " + appId + " at: " + nodeCreatePath);
    }
    byte[] appStateData = appStateDataPB.getProto().toByteArray();
    if (appStateData.length <= zknodeLimit) {
      createWithRetries(nodeCreatePath, appStateData, zkAcl,
              CreateMode.PERSISTENT);
      LOG.debug("Store application state data size for " + appId + " is " + appStateData.length);
    } else {
      LOG.info("Store application state data size for " + appId + " is " + appStateData.length +
        ". exceeds the maximum allowed size " + zknodeLimit + " for application data.");
    }
  }

  // Application 狀態更新時判斷寫 ZNode 大小
  @Override
  public synchronized void updateApplicationStateInternal(ApplicationId appId,
      ApplicationStateData appStateDataPB) throws Exception {
    String nodeUpdatePath = getNodePath(rmAppRoot, appId.toString());

    if (LOG.isDebugEnabled()) {
      LOG.debug("Storing final state info for app: " + appId + " at: "
          + nodeUpdatePath);
    }
    byte[] appStateData = appStateDataPB.getProto().toByteArray();

    if (appStateData.length <= zknodeLimit) {
      if (existsWithRetries(nodeUpdatePath, false) != null) {
        setDataWithRetries(nodeUpdatePath, appStateData, -1);
      } else {
        createWithRetries(nodeUpdatePath, appStateData, zkAcl,
                CreateMode.PERSISTENT);
        LOG.debug(appId + " znode didn't exist. Created a new znode to"
                + " update the application state.");
      }
      LOG.debug("Update application state data size for " + appId + " is " + appStateData.length);
    } else {
      LOG.info("Update application state data size for " + appId + " is " + appStateData.length +
              ". exceeds the maximum allowed size " + zknodeLimit + " for application data.");
    }
  }

 

3.3 任務測試

設置 Yarn app 允許寫 ZNode 的最大值,重啟 active RM

參數:yarn-site.xml 的 ResourceManager 高級配置代碼段(安全閥)
值:
<property>
    <name>yarn.resourcemanager.zk-max-znode-size.bytes</name>
    <value>512</value>
</property>

 

測試任務:

hadoop jar /opt/cloudera/parcels/CDH-5.14.4-1.cdh5.14.4.p0.3/jars/hadoop-mapreduce-examples-2.6.0-cdh5.14.4.jar  pi -Dmapred.job.queue.name=root.exquery 20 10

 

任務失敗時 RM 任務日志如下,可以看出作業狀態信息保存在 ZK 的數據超過了 ZNode 限制,此時 ZK 不會保存該作業的狀態信息,而 ZK 服務和 RM 服務均是正常對外提供服務的,不影響集群的正常使用。

# tailf hadoop-cmf-yarn-RESOURCEMANAGER-slave-prd-10-197-1-141.v-bj-5.vivo.lan.log.out  |grep "the maximum allowed size"
2020-12-10 16:53:37,544 INFO org.apache.hadoop.yarn.server.resourcemanager.recovery.ZKRMStateStore: Application state data size for application_1607589684539_0001 is 1515. exceeds the maximum allowed size 512 for application data.
2020-12-10 16:53:48,086 INFO org.apache.hadoop.yarn.server.resourcemanager.recovery.ZKRMStateStore: Application state data size for application_1607590418121_0001 is 1515. exceeds the maximum allowed size 512 for application data.

# RM 具體 Warn 信息:
2020-12-10 16:53:49,377 WARN org.apache.hadoop.security.UserGroupInformation: PriviledgedActionException as:kwang (auth:SIMPLE) cause:org.apache.hadoop.yarn.exceptions.ApplicationNotFoundException: Application with id 'application_1607590418121_0001' doesn't exist in RM.
2020-12-10 16:53:49,377 INFO org.apache.hadoop.ipc.Server: IPC Server handler 0 on 8032, call org.apache.hadoop.yarn.api.ApplicationClientProtocolPB.getApplicationReport from 10.197.1.141:56026 Call#63 Retry#0
org.apache.hadoop.yarn.exceptions.ApplicationNotFoundException: Application with id 'application_1607590418121_0001' doesn't exist in RM.
        at org.apache.hadoop.yarn.server.resourcemanager.ClientRMService.getApplicationReport(ClientRMService.java:324)
        at org.apache.hadoop.yarn.api.impl.pb.service.ApplicationClientProtocolPBServiceImpl.getApplicationReport(ApplicationClientProtocolPBServiceImpl.java:170)
        at org.apache.hadoop.yarn.proto.ApplicationClientProtocol$ApplicationClientProtocolService$2.callBlockingMethod(ApplicationClientProtocol.java:401)
        at org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:617)
        at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:1073)
        at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2281)
        at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2277)
        at java.security.AccessController.doPrivileged(Native Method)
        at javax.security.auth.Subject.doAs(Subject.java:422)
        at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1924)
        at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2275)

 

四、修復方法

4.1 修復patch並變更參數

設置 Yarn app 允許寫 ZNode 的最大值(4*1024*1024 B,即 4M),重啟 RM。

參數:yarn-site.xml 的 ResourceManager 高級配置代碼段(安全閥)
值:
<property>
    <name>yarn.resourcemanager.zk-max-znode-size.bytes</name>
    <value>4194304</value>
</property>

 

4.2 建議參數變更

前面在 2.2 小節中分析了作業在更新 Application 或 AppAttemp 狀態時,會通過重試的方式向 ZK 的 ZNode 中寫入數據,線上環境默認的重試次數為 1000 次,重試間隔為 60ms,而一旦任務出現異常時,這種高頻次的寫入會對 ZK 或 RM 服務造成一定的壓力,因此可以調小作業的重試次數,減少重試時對服務的壓力。

參數:yarn-site.xml 的 ResourceManager 高級配置代碼段(安全閥)
值:
<property>
    <name>yarn.resourcemanager.zk-num-retries</name>
    <value>100</value>
</property>

 

【參考資料】

  1. https://github.com/apache/hadoop/blob/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java

  2. https://issues.apache.org/jira/browse/YARN-2368

  3. https://cloud.tencent.com/developer/article/1629687

  4. https://blog.csdn.net/Androidlushangderen/article/details/48224707

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM