hadoop2.8.4 版本yarn RM fairScheduler調度性能優化的若干次嘗試


對一般小公司來說 可能yarn調度能力足夠了 但是對於大規模集群1000 or 2000+的話  yarn的調度性能捉襟見肘

恰好網上看到一篇很好的文章https://tech.meituan.com/2019/08/01/hadoop-yarn-scheduling-performance-optimization-practice.html

參考了YARN-5969 發現hadoop2.9.0已經修正了該issue 實測提高了調度性能 

FairScheduler 調度方式有兩種 

心跳調度:Yarn的NodeManager會通過心跳的方式定期向ResourceManager匯報自身狀態 伴隨着這次rpc請求 會觸發Resourcemanager 觸發nodeUpdate()方法 為這個節點進行一次資源調度

持續調度:有一個固定守護線程每隔很短的時間調度 實時的資源分配,與NodeManager的心跳出發的調度相互異步並行進行

  • 每次dataNode 發來心跳 時候作為一個event走下面方法
FairScheduler 類
 @Override
  public void handle(SchedulerEvent event) {
    switch (event.getType()) {
    case NODE_ADDED:
      if (!(event instanceof NodeAddedSchedulerEvent)) {
        throw new RuntimeException("Unexpected event type: " + event);
      }
      NodeAddedSchedulerEvent nodeAddedEvent = (NodeAddedSchedulerEvent)event;
      addNode(nodeAddedEvent.getContainerReports(),
          nodeAddedEvent.getAddedRMNode());
      break;
    case NODE_REMOVED:
      if (!(event instanceof NodeRemovedSchedulerEvent)) {
        throw new RuntimeException("Unexpected event type: " + event);
      }
      NodeRemovedSchedulerEvent nodeRemovedEvent = (NodeRemovedSchedulerEvent)event;
      removeNode(nodeRemovedEvent.getRemovedRMNode());
      break;
    case NODE_UPDATE:
      if (!(event instanceof NodeUpdateSchedulerEvent)) {
        throw new RuntimeException("Unexpected event type: " + event);
      }
      NodeUpdateSchedulerEvent nodeUpdatedEvent = (NodeUpdateSchedulerEvent)event;
      nodeUpdate(nodeUpdatedEvent.getRMNode());
      break;
    case APP_ADDED:
      if (!(event instanceof AppAddedSchedulerEvent)) {
        throw new RuntimeException("Unexpected event type: " + event);
      }
      AppAddedSchedulerEvent appAddedEvent = (AppAddedSchedulerEvent) event;

每次nodeUpdate 走的都是相同的邏輯

attemptScheduling(node) 持續調度跟心跳調度都走該方法
    // If the node is decommissioning, send an update to have the total
    // resource equal to the used resource, so no available resource to
    // schedule.
    if (nm.getState() == NodeState.DECOMMISSIONING) {
      this.rmContext
          .getDispatcher()
          .getEventHandler()
          .handle(
              new RMNodeResourceUpdateEvent(nm.getNodeID(), ResourceOption
                  .newInstance(getSchedulerNode(nm.getNodeID())
                      .getUsedResource(), 0)));
    }

    if (continuousSchedulingEnabled) {
      if (!completedContainers.isEmpty()) {  //持續調度開啟時
       attemptScheduling(node);
      }
    } else {
      attemptScheduling(node);  //心跳調度
    }

    // Updating node resource utilization
    node.setAggregatedContainersUtilization(
        nm.getAggregatedContainersUtilization());
    node.setNodeUtilization(nm.getNodeUtilization());

 持續調度是一個單獨的守護線程 

間隔getContinuousSchedulingSleepMs()時間運行一次continuousSchedulingAttempt方法
 
         
/**
* Thread which attempts scheduling resources continuously,
* asynchronous to the node heartbeats.
*/
private class ContinuousSchedulingThread extends Thread {

@Override
public void run() {
while (!Thread.currentThread().isInterrupted()) {
try {
continuousSchedulingAttempt();
Thread.sleep(getContinuousSchedulingSleepMs());
} catch (InterruptedException e) {
LOG.warn("Continuous scheduling thread interrupted. Exiting.", e);
return;
}
}
}
}
 
        

之后進行一次node節點 根據資源寬松情況的排序

void continuousSchedulingAttempt() throws InterruptedException {
    long start = getClock().getTime();
    List<NodeId> nodeIdList = new ArrayList<NodeId>(nodes.keySet());
    // Sort the nodes by space available on them, so that we offer
    // containers on emptier nodes first, facilitating an even spread. This
    // requires holding the scheduler lock, so that the space available on a
    // node doesn't change during the sort.
    synchronized (this) {
      Collections.sort(nodeIdList, nodeAvailableResourceComparator);
    }

    // iterate all nodes
    for (NodeId nodeId : nodeIdList) {
      FSSchedulerNode node = getFSSchedulerNode(nodeId);
      try {
        if (node != null && Resources.fitsIn(minimumAllocation,
            node.getAvailableResource())) {
        attemptScheduling(node);
        }
      } catch (Throwable ex) {
        LOG.error("Error while attempting scheduling for node " + node +
            ": " + ex.toString(), ex);
        if ((ex instanceof YarnRuntimeException) &&
            (ex.getCause() instanceof InterruptedException)) {
          // AsyncDispatcher translates InterruptedException to
          // YarnRuntimeException with cause InterruptedException.
          // Need to throw InterruptedException to stop schedulingThread.
          throw (InterruptedException)ex.getCause();
        }
      }
    }

依次對node遍歷分配Container 

queueMgr.getRootQueue().assignContainer(node) 從root遍歷樹 對抽象的應用資源遍歷
    boolean validReservation = false;
    FSAppAttempt reservedAppSchedulable = node.getReservedAppSchedulable();
    if (reservedAppSchedulable != null) {
      validReservation = reservedAppSchedulable.assignReservedContainer(node);
    }
    if (!validReservation) {
      // No reservation, schedule at queue which is farthest below fair share
      int assignedContainers = 0;
      Resource assignedResource = Resources.clone(Resources.none());
      Resource maxResourcesToAssign =
          Resources.multiply(node.getAvailableResource(), 0.5f);
      while (node.getReservedContainer() == null) {
        boolean assignedContainer = false;
        Resource assignment = queueMgr.getRootQueue().assignContainer(node);
        if (!assignment.equals(Resources.none())) { //判斷是否分配到container
          assignedContainers++;
          assignedContainer = true;
          Resources.addTo(assignedResource, assignment);
        }
        if (!assignedContainer) { break; }
        if (!shouldContinueAssigning(assignedContainers,
            maxResourcesToAssign, assignedResource)) {
          break;
        }
      }
接下來在assignContainer 方法中對子隊列使用特定的比較器排序這里是fairSchduler
  @Override
  public Resource assignContainer(FSSchedulerNode node) { 對於每一個服務器,對資源樹進行一次遞歸搜索
    Resource assigned = Resources.none();

    // If this queue is over its limit, reject
    if (!assignContainerPreCheck(node)) {
      return assigned;
    }

    // Hold the write lock when sorting childQueues
    writeLock.lock();
    try {
     Collections.sort(childQueues, policy.getComparator());
    } finally {
      writeLock.unlock();
    }

對隊列下的app排序

/*
     * We are releasing the lock between the sort and iteration of the
     * "sorted" list. There could be changes to the list here:
     * 1. Add a child queue to the end of the list, this doesn't affect
     * container assignment.
     * 2. Remove a child queue, this is probably good to take care of so we
     * don't assign to a queue that is going to be removed shortly.
     */
    readLock.lock();
    try {
      for (FSQueue child : childQueues) {
        assigned = child.assignContainer(node);         if (!Resources.equals(assigned, Resources.none())) {
          break;
        }
      }
    } finally {
      readLock.unlock();
    }
    return assigned;
assignContainer 可能傳入的是app 可能傳入的是一個隊列 是隊列的話 進行遞歸 直到找到app為止(root(FSParentQueue)節點遞歸調用assignContainer(),最終將到達最終葉子節點的assignContainer()方法,才真正開始進行分配)

 優化一 : 優化隊列比較器

 我們在這里 關注的就是排序

hadoop2.8.4 排序類 FairSharePolicy中的 根據權重 需求的資源大小 和內存占比 進行排序 多次獲取

getResourceUsage() 產生了大量重復計算 這個方法是一個動態獲取的過程(耗時)
  @Override
public int compare(Schedulable s1, Schedulable s2) {
double minShareRatio1, minShareRatio2;
double useToWeightRatio1, useToWeightRatio2;
Resource minShare1 = Resources.min(RESOURCE_CALCULATOR, null,
s1.getMinShare(), s1.getDemand());
Resource minShare2 = Resources.min(RESOURCE_CALCULATOR, null,
s2.getMinShare(), s2.getDemand());
boolean s1Needy = Resources.lessThan(RESOURCE_CALCULATOR, null,
s1.getResourceUsage(), minShare1);
boolean s2Needy = Resources.lessThan(RESOURCE_CALCULATOR, null,
s2.getResourceUsage(), minShare2);
minShareRatio1 = (double) s1.getResourceUsage().getMemorySize()
/ Resources.max(RESOURCE_CALCULATOR, null, minShare1, ONE).getMemorySize();
minShareRatio2 = (double) s2.getResourceUsage().getMemorySize()
/ Resources.max(RESOURCE_CALCULATOR, null, minShare2, ONE).getMemorySize();
useToWeightRatio1 = s1.getResourceUsage().getMemorySize() /
s1.getWeights().getWeight(ResourceType.MEMORY);
useToWeightRatio2 = s2.getResourceUsage().getMemorySize() /
s2.getWeights().getWeight(ResourceType.MEMORY);
int res = 0;
if (s1Needy && !s2Needy)
res = -1;
else if (s2Needy && !s1Needy)
res = 1;
else if (s1Needy && s2Needy)
res = (int) Math.signum(minShareRatio1 - minShareRatio2);
else
// Neither schedulable is needy
res = (int) Math.signum(useToWeightRatio1 - useToWeightRatio2);
if (res == 0) {
// Apps are tied in fairness ratio. Break the tie by submit time and job
// name to get a deterministic ordering, which is useful for unit tests.
res = (int) Math.signum(s1.getStartTime() - s2.getStartTime());
if (res == 0)
res = s1.getName().compareTo(s2.getName());
}
return res;
}
}

新版優化后如下

@Override
    public int compare(Schedulable s1, Schedulable s2) {
      int res = compareDemand(s1, s2);

      // Pre-compute resource usages to avoid duplicate calculation
      Resource resourceUsage1 = s1.getResourceUsage();
      Resource resourceUsage2 = s2.getResourceUsage();

      if (res == 0) {
        res = compareMinShareUsage(s1, s2, resourceUsage1, resourceUsage2);
      }

      if (res == 0) {
        res = compareFairShareUsage(s1, s2, resourceUsage1, resourceUsage2);
      }

      // Break the tie by submit time
      if (res == 0) {
        res = (int) Math.signum(s1.getStartTime() - s2.getStartTime());
      }

      // Break the tie by job name
      if (res == 0) {
        res = s1.getName().compareTo(s2.getName());
      }

      return res;
    }


    private int compareDemand(Schedulable s1, Schedulable s2) {
      int res = 0;
      Resource demand1 = s1.getDemand();
      Resource demand2 = s2.getDemand();
      if (demand1.equals(Resources.none()) && Resources.greaterThan(
              RESOURCE_CALCULATOR, null, demand2, Resources.none())) {
        res = 1;
      } else if (demand2.equals(Resources.none()) && Resources.greaterThan(
              RESOURCE_CALCULATOR, null, demand1, Resources.none())) {
        res = -1;
      }
      return res;
    }


    private int compareMinShareUsage(Schedulable s1, Schedulable s2,
                                     Resource resourceUsage1, Resource resourceUsage2) {
      int res;
      Resource minShare1 = Resources.min(RESOURCE_CALCULATOR, null,
              s1.getMinShare(), s1.getDemand());
      Resource minShare2 = Resources.min(RESOURCE_CALCULATOR, null,
              s2.getMinShare(), s2.getDemand());
      boolean s1Needy = Resources.lessThan(RESOURCE_CALCULATOR, null,
              resourceUsage1, minShare1);
      boolean s2Needy = Resources.lessThan(RESOURCE_CALCULATOR, null,
              resourceUsage2, minShare2);

      if (s1Needy && !s2Needy) {
        res = -1;
      } else if (s2Needy && !s1Needy) {
        res = 1;
      } else if (s1Needy && s2Needy) {
        double minShareRatio1 = (double) resourceUsage1.getMemorySize() /
                Resources.max(RESOURCE_CALCULATOR, null, minShare1, ONE)
                        .getMemorySize();
        double minShareRatio2 = (double) resourceUsage2.getMemorySize() /
                Resources.max(RESOURCE_CALCULATOR, null, minShare2, ONE)
                        .getMemorySize();
        res = (int) Math.signum(minShareRatio1 - minShareRatio2);
      } else {
        res = 0;
      }

      return res;
    }


    /**
     * To simplify computation, use weights instead of fair shares to calculate
     * fair share usage.
     */
    private int compareFairShareUsage(Schedulable s1, Schedulable s2,
                                      Resource resourceUsage1, Resource resourceUsage2) {
      double weight1 = s1.getWeights().getWeight(ResourceType.MEMORY);
      double weight2 = s2.getWeights().getWeight(ResourceType.MEMORY);
      double useToWeightRatio1;
      double useToWeightRatio2;
      if (weight1 > 0.0 && weight2 > 0.0) {
        useToWeightRatio1 = resourceUsage1.getMemorySize() / weight1;
        useToWeightRatio2 = resourceUsage2.getMemorySize() / weight2;
      } else { // Either weight1 or weight2 equals to 0
        if (weight1 == weight2) {
          // If they have same weight, just compare usage
          useToWeightRatio1 = resourceUsage1.getMemorySize();
          useToWeightRatio2 = resourceUsage2.getMemorySize();
        } else {
          // By setting useToWeightRatios to negative weights, we give the
          // zero-weight one less priority, so the non-zero weight one will
          // be given slots.
          useToWeightRatio1 = -weight1;
          useToWeightRatio2 = -weight2;
        }
      }

      return (int) Math.signum(useToWeightRatio1 - useToWeightRatio2);
    }

  }

用了測試環境集群 比較了修改前后兩次隊列排序耗時

 圖中使用挫劣的方式比對 請觀眾湊合看吧^-^

 上面紅框里為 新版本 下面紅框為老版本 雖然沒有進行壓測 但是在同樣的調度任務前提下 是有說服力的 在大集群上每秒調度上千萬乃至上億次該方法時  調度優化變的明顯

上線壓測時 在1000隊列 1500 pending任務600running任務時 調度性能提高了一倍 還是比較明顯的提升的

優化二 : 優化yarn調度邏輯

思想:在大規模集群中 資源利用率表現的並不好,為了提高資源利用率,開啟持續調度 然而實踐發現 資源利用率是上去了但是 集群調度能力很弱 處理跟釋放的container並沒有提高

排查原因是心跳調度跟持續調度 走相同的synchronized 方法修飾的attemptScheduling 導致競爭鎖 分配和釋放都變的緩慢 且隊列排序分配 在集群pending任務巨多時異常緩慢

優化:1,啟用持續調度 禁用心跳調度

    2,持續調度按批進行 間接減少隊列排序造成的耗時影響

    3. 釋放不重要的鎖 解放性能

說干就干

開啟yarn的持續調度 配置如下:

 <property>
    <name>yarn.scheduler.fair.continuous-scheduling-enabled</name>
    <value>true</value>
    <discription>是否打開連續調度功能</discription>
  </property>
 <property>

持續調度 每5ms執行一次上述方法 對node依次迭代執行

void continuousSchedulingAttempt() throws InterruptedException {
    long start = getClock().getTime();
    List<NodeId> nodeIdList = new ArrayList<NodeId>(nodes.keySet());
    // Sort the nodes by space available on them, so that we offer
    // containers on emptier nodes first, facilitating an even spread. This
    // requires holding the scheduler lock, so that the space available on a
    // node doesn't change during the sort.
    synchronized (this) {
      Collections.sort(nodeIdList, nodeAvailableResourceComparator); //對所有node 根據資源排序
    }

    // iterate all nodes
    for (NodeId nodeId : nodeIdList) {  //遍歷所有的node
      FSSchedulerNode node = getFSSchedulerNode(nodeId); 
      try {
        if (node != null && Resources.fitsIn(minimumAllocation,
            node.getAvailableResource())) {  //判斷該node 上現有的資源是否大於最小配置資源單位
          attemptScheduling(node);           //執行ttemptScheduling方法
} } catch (Throwable ex) { LOG.error("Error while attempting scheduling for node " + node + ": " + ex.toString(), ex); if ((ex instanceof YarnRuntimeException) && (ex.getCause() instanceof InterruptedException)) { // AsyncDispatcher translates InterruptedException to // YarnRuntimeException with cause InterruptedException. // Need to throw InterruptedException to stop schedulingThread. throw (InterruptedException)ex.getCause(); } } }

下面看下attemptScheduling方法

@VisibleForTesting
  synchronized void attemptScheduling(FSSchedulerNode node) {
    if (rmContext.isWorkPreservingRecoveryEnabled()
        && !rmContext.isSchedulerReadyForAllocatingContainers()) {
      return;
    }

    final NodeId nodeID = node.getNodeID();
    if (!nodes.containsKey(nodeID)) {  //合法性
      // The node might have just been removed while this thread was waiting
      // on the synchronized lock before it entered this synchronized method
      LOG.info("Skipping scheduling as the node " + nodeID +
          " has been removed");
      return;
    }

    // Assign new containers...
    // 1. Check for reserved applications
    // 2. Schedule if there are no reservations

    boolean validReservation = false;
    FSAppAttempt reservedAppSchedulable = node.getReservedAppSchedulable();
    if (reservedAppSchedulable != null) {
      validReservation = reservedAppSchedulable.assignReservedContainer(node);
    }
    if (!validReservation) { //合法性判斷
      // No reservation, schedule at queue which is farthest below fair share
      int assignedContainers = 0;
      Resource assignedResource = Resources.clone(Resources.none());
      Resource maxResourcesToAssign =
          Resources.multiply(node.getAvailableResource(), 0.5f); //默認使用該node最大50%的資源
      while (node.getReservedContainer() == null) {
        boolean assignedContainer = false;
        Resource assignment = queueMgr.getRootQueue().assignContainer(node);  //主要方法 依次對root樹 遍歷直到app 對該node上分配container
        if (!assignment.equals(Resources.none())) { //分配到資源
          assignedContainers++; //分配到的container個數增1
          assignedContainer = true; 
          Resources.addTo(assignedResource, assignment);
        }
        if (!assignedContainer) { break; }  //未匹配到 跳出
        if (!shouldContinueAssigning(assignedContainers,  //根據相關配置判斷 現在分配的container個數 是否超出node上配置最大數 或node上的可用資源是否超出最小的配置資源
            maxResourcesToAssign, assignedResource)) {
          break;
        }
      }
    }
    updateRootQueueMetrics();
  }

針對上面源碼 修改為如下內容:

持續調度一次分配五個node,減少每個node及分配過程排序的耗時操作。

 void continuousSchedulingAttempt() throws InterruptedException {
    long start = getClock().getTime();
    List<NodeId> nodeIdList = new ArrayList<NodeId>(nodes.keySet());
    // Sort the nodes by space available on them, so that we offer
    // containers on emptier nodes first, facilitating an even spread. This
    // requires holding the scheduler lock, so that the space available on a
    // node doesn't change during the sort.
    synchronized (this) {
      Collections.sort(nodeIdList, nodeAvailableResourceComparator);
    }
    ArrayList<ArrayList<NodeId>> newNodeList = inBatchesNodes(nodeIdList, batchNodeAssigon); // 按批次返回node
    for (ArrayList<NodeId> nodeList : newNodeList) {
      //每個node進行檢查
      ArrayList<FSSchedulerNode> fsSchedulerNodeList = new ArrayList<>();
      try {
        for (NodeId nodeId : nodeList) {
          FSSchedulerNode node = getFSSchedulerNode(nodeId);
          if (node != null && Resources.fitsIn(minimumAllocation,
                  node.getAvailableResource())) {

            fsSchedulerNodeList.add(node);
          }
        }

          attemptSchedulings(fsSchedulerNodeList); // 批次進行attemptSchedule
        }catch (Exception e){
          LOG.error("Processing  attemptSchedulings error"+fsSchedulerNodeList
                  +":"+fsSchedulerNodeList.toString(),e);
          fsSchedulerNodeList.stream().filter(Objects::nonNull).forEach(node->{
            try {
            attemptScheduling(node); //有異常仍然走之前的邏輯
            } catch (Throwable ex) {
              LOG.error("Error while attempting scheduling for node " + nodeList +
                      ": " + ex.toString(), ex);
            }
          });
        }
    }

    long duration = getClock().getTime() - start;
    fsOpDurations.addContinuousSchedulingRunDuration(duration);
  }

 

  /**
   * 將傳入的list 按size批次 返回
   * @param list
   * @param size
   * @return
   */
  private ArrayList<ArrayList<NodeId>> inBatchesNodes(List<NodeId> list, int size) {
    int listSize = list.size();
    //表示一共需要取幾次
    int count = (list.size() % size == 0 ? list.size() / size : list.size() / size + 1);
    ArrayList<ArrayList<NodeId>> returnList = new ArrayList<>(count);
    for (int i = 0; i < listSize; i += size) {
      if (i + size > list.size()) {
        size = listSize - i;
      }
      ArrayList<NodeId> newList = new ArrayList<>(size);
      for (int j = i; j < i + size; j++) {
        newList.add(list.get(j));
      }
      returnList.add(newList);
    }
    return returnList;
  }

 

interface Schedulable 接口新增 方法
  /**
   * Assign list container list this node if possible, and return the amount of
   * resources assigned.
   */
  public List<Resource> assignContainers(List<FSSchedulerNode> nodes);

 

@VisibleForTesting
  protected void attemptSchedulings(ArrayList<FSSchedulerNode> fsSchedulerNodeList) {
    if (rmContext.isWorkPreservingRecoveryEnabled()
            && !rmContext.isSchedulerReadyForAllocatingContainers()) {
      return;
    }
    List<FSSchedulerNode> fsSchedulerNodes = new ArrayList(); //定義個新集合 添加通過檢查的node 抽象對象 
    fsSchedulerNodeList.stream().forEach(node -> {
      final NodeId nodeID = node.getNodeID();
      if (nodes.containsKey(nodeID)) {
        // Assign new containers...// 1. Check for reserved applications
        // 2. Schedule if there are no reservations
        boolean validReservation = false;
        FSAppAttempt reservedAppSchedulable = node.getReservedAppSchedulable();
        if (reservedAppSchedulable != null) {
          validReservation = reservedAppSchedulable.assignReservedContainer(node);
        }
        if (!validReservation) { //通過合法檢查
         if (node.getReservedContainer() == null) {  //該node上 沒有被某個container預留
            fsSchedulerNodes.add(node);
          }
        }
      } else {
        LOG.info("Skipping scheduling as the node " + nodeID +
            " has been removed");
      }
    });
    if (fsSchedulerNodes.isEmpty()) {
      LOG.error("Handle fsSchedulerNodes empty and return");
      return;
    }
    LOG.info("符合條件的nodes:" + fsSchedulerNodeList.size());
    List<Resource> resources = queueMgr.getRootQueue().assignContainers(fsSchedulerNodes); //傳入node的集合 批量操作
    fsOpDurations.addDistributiveContainer(resources.size());
    LOG.info("本次分配的container count:" + resources.size());
    updateRootQueueMetrics();
  }
FSParentQueue 類中 添加實現
  @Override
  public List<Resource> assignContainers(List<FSSchedulerNode> nodes) {
    List<Resource> assignedsNeed = new ArrayList<>();
    ArrayList<FSSchedulerNode> fsSchedulerNodes = new ArrayList<>();
    for (FSSchedulerNode node : nodes) {
      if (assignContainerPreCheck(node)) {
        fsSchedulerNodes.add(node);
      }
    }
    if (fsSchedulerNodes.isEmpty()) {
      LOG.info("Nodes is empty, skip this assign around");
      return assignedsNeed;
    }

    // Hold the write lock when sorting childQueues
    writeLock.lock();
    try {
      Collections.sort(childQueues, policy.getComparator()); //排序又見排序 哈哈
    } finally {
      writeLock.unlock();
    }

    /*
     * We are releasing the lock between the sort and iteration of the
     * "sorted" list. There could be changes to the list here:
     * 1. Add a child queue to the end of the list, this doesn't affect
     * container assignment.
     * 2. Remove a child queue, this is probably good to take care of so we
     * don't assign to a queue that is going to be removed shortly.
     */
    readLock.lock();
    try {
      for (FSQueue child : childQueues) {
        List<Resource> assigneds = child.assignContainers(fsSchedulerNodes); //同樣傳入node集合
        if (!assigneds.isEmpty()) {
          for (Resource assign : assigneds) {
            assignedsNeed.add(assign);
          }
          break;
        }
      }
    } finally {
      readLock.unlock();
    }

    return assignedsNeed;
  }

 

app最終在FSLeafQueue節點上得到處理(第一版)

@Override
  public List<Resource> assignContainers(List<FSSchedulerNode> nodes) {
    Resource assigned = Resources.none();
    List<Resource> assigneds = new ArrayList<>();
    ArrayList<FSSchedulerNode> fsSchedulerNodes = new ArrayList<>();
    for (FSSchedulerNode node : nodes) {
      if (assignContainerPreCheck(node)) {
        fsSchedulerNodes.add(node);
      }
    }
    if (fsSchedulerNodes.isEmpty()) {
      LOG.info("Nodes is empty, skip this assign around");
      return assigneds;
    }
    // Apps that have resource demands.
    TreeSet<FSAppAttempt> pendingForResourceApps =
            new TreeSet<FSAppAttempt>(policy.getComparator());
    readLock.lock();
    try {
      for (FSAppAttempt app : runnableApps) {  //所有的app running  or pending 隊列 進行依次排序
        Resource pending = app.getAppAttemptResourceUsage().getPending();
        if (!pending.equals(Resources.none())) { //有資源需求的加入排序隊列
          pendingForResourceApps.add(app);
        }
      }
    } finally {
      readLock.unlock();
    }

    int count = 0; //每個node 分配container計數
    Set<String> repeatApp = new HashSet<>(); //定義去重集合
    for (FSSchedulerNode node : fsSchedulerNodes) {  //node 遍歷
      count = 0;
      for (FSAppAttempt sched : pendingForResourceApps) {  //app遍歷
        // One node just allocate for one app once
        if (repeatApp.contains(sched.getId())) {  //去重
          continue;
        }
        if (SchedulerAppUtils.isPlaceBlacklisted(sched, node, LOG)) { //判斷app有沒有在node黑名單里
          continue;
        }
        if (node.getReservedContainer() == null
            && Resources.fitsIn(minimumAllocation, node.getAvailableResource())) { //判斷node上還有沒有資源
          assigned = sched.assignContainer(node); //具體分配container方法
          if (!assigned.equals(Resources.none())) {//給container 在node上分配到了資源
            count++;
            repeatApp.add(sched.getId());
            assigneds.add(assigned);
            if (LOG.isDebugEnabled()) {
              LOG.debug("Assigned container in queue:" + getName() + " " +
                      "container:" + assigned);
            }
          }
        }
        if (count >= maxNodeContainerAssign) { //node 分配的數量 超出最大的配置數 跳出 給下一node 分配
          break;
        }
      }
    }
    return assigneds;
  }


經過幾次修正修改如下(第二版):

  @Override
  public int assignContainers(List<FSSchedulerNode> nodes) {
    int result = 0;
    getMetrics().assignCount.incr();
    Resource assigned = Resources.none();
    ArrayList<FSSchedulerNode> fsSchedulerNodes = new ArrayList<>();
    long start = System.currentTimeMillis();
    for (FSSchedulerNode node : nodes) {
      if (assignContainerPreCheck(node)) {
        fsSchedulerNodes.add(node);
      }
    }
    long preCheckTime = System.currentTimeMillis();
    getMetrics().preCheckTime.incr((preCheckTime - start));

    if (fsSchedulerNodes.isEmpty()) {
      LOG.info("Nodes is empty, skip this assign around");
      return result;
    }
    long runAppSortStart = System.currentTimeMillis();
    // Apps that have resource demands.
    TreeSet<FSAppAttempt> pendingForResourceApps =
            new TreeSet<FSAppAttempt>(policy.getComparator());
    readLock.lock();
    try {
      for (FSAppAttempt app : runnableApps) {
        Resource pending = app.getAppAttemptResourceUsage().getPending();
        if (!pending.equals(Resources.none())) {
          pendingForResourceApps.add(app);
        }
      }
    } finally {
      readLock.unlock();
    }
    long runAppSortEnd = System.currentTimeMillis();
    getMetrics().runAppSortTime.incr((runAppSortEnd - runAppSortStart));
    int count = 0;
    long appAssignStart, appAssignEnd = 0;
    Set<Integer> repeatApp = new HashSet<>();
    for (FSSchedulerNode node : fsSchedulerNodes) {
      count = 0;
      for (FSAppAttempt sched : pendingForResourceApps) {
        // One node just allocate for one app once
        if (repeatApp.contains(sched.getApplicationId().hashCode())) {
          continue;
        }
        if (SchedulerAppUtils.isPlaceBlacklisted(sched, node, LOG)) {
          continue;
        }
        appAssignStart = System.currentTimeMillis();
        if (node.getReservedContainer() == null
            && Resources.fitsIn(minimumAllocation, node.getAvailableResource())) {
          assigned = sched.assignContainer(node);
          if (!assigned.equals(Resources.none())) {
            repeatApp.add(sched.getApplicationId().hashCode());
            count++;
            result++;
            if (LOG.isDebugEnabled()) {
              LOG.debug("Queue " + getName() + " assigned container in queue:" + getName() + " " +
                      "container:" + assigned);
            }
          } else {
            if (LOG.isDebugEnabled()) {
              LOG.debug("Queue " + getName() + " app " + sched.getId() + " skip node " + node.getNodeName());
            }
          }
        } else {
          if (LOG.isDebugEnabled()) {
            LOG.debug("Queue " + getName() + " node " + node.getNodeName() + " has resource "
              + node.getAvailableResource()
              + " left, node reserved is " + (node.getReservedContainer() == null));
          }
        }
        appAssignEnd = System.currentTimeMillis();
        getMetrics().appContainerAssignTime.incr((appAssignEnd - appAssignStart));
        if (appAssignEnd - appAssignStart > 2) {
          LOG.info("Queue " + getName() + " app assign container use " + (appAssignEnd - appAssignStart) + " ms");
        }
        if (count == maxNodeContainerAssign) {
          break;
        }
      }
    }
    return result;
  }

 

這輪優化 完畢 對比之前 調度性能提高了四倍樣子 線上的積壓問題得到有效解決

優化后nodeUpdate耗時對比如下

 

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM