對一般小公司來說 可能yarn調度能力足夠了 但是對於大規模集群1000 or 2000+的話 yarn的調度性能捉襟見肘
恰好網上看到一篇很好的文章https://tech.meituan.com/2019/08/01/hadoop-yarn-scheduling-performance-optimization-practice.html
參考了YARN-5969 發現hadoop2.9.0已經修正了該issue 實測提高了調度性能
FairScheduler 調度方式有兩種
心跳調度:Yarn的NodeManager會通過心跳的方式定期向ResourceManager匯報自身狀態 伴隨着這次rpc請求 會觸發Resourcemanager 觸發nodeUpdate()方法 為這個節點進行一次資源調度
持續調度:有一個固定守護線程每隔很短的時間調度 實時的資源分配,與NodeManager的心跳出發的調度相互異步並行進行
- 每次dataNode 發來心跳 時候作為一個event走下面方法
FairScheduler 類
@Override public void handle(SchedulerEvent event) { switch (event.getType()) { case NODE_ADDED: if (!(event instanceof NodeAddedSchedulerEvent)) { throw new RuntimeException("Unexpected event type: " + event); } NodeAddedSchedulerEvent nodeAddedEvent = (NodeAddedSchedulerEvent)event; addNode(nodeAddedEvent.getContainerReports(), nodeAddedEvent.getAddedRMNode()); break; case NODE_REMOVED: if (!(event instanceof NodeRemovedSchedulerEvent)) { throw new RuntimeException("Unexpected event type: " + event); } NodeRemovedSchedulerEvent nodeRemovedEvent = (NodeRemovedSchedulerEvent)event; removeNode(nodeRemovedEvent.getRemovedRMNode()); break; case NODE_UPDATE: if (!(event instanceof NodeUpdateSchedulerEvent)) { throw new RuntimeException("Unexpected event type: " + event); } NodeUpdateSchedulerEvent nodeUpdatedEvent = (NodeUpdateSchedulerEvent)event; nodeUpdate(nodeUpdatedEvent.getRMNode()); break; case APP_ADDED: if (!(event instanceof AppAddedSchedulerEvent)) { throw new RuntimeException("Unexpected event type: " + event); } AppAddedSchedulerEvent appAddedEvent = (AppAddedSchedulerEvent) event;
每次nodeUpdate 走的都是相同的邏輯
attemptScheduling(node) 持續調度跟心跳調度都走該方法
// If the node is decommissioning, send an update to have the total // resource equal to the used resource, so no available resource to // schedule. if (nm.getState() == NodeState.DECOMMISSIONING) { this.rmContext .getDispatcher() .getEventHandler() .handle( new RMNodeResourceUpdateEvent(nm.getNodeID(), ResourceOption .newInstance(getSchedulerNode(nm.getNodeID()) .getUsedResource(), 0))); } if (continuousSchedulingEnabled) { if (!completedContainers.isEmpty()) { //持續調度開啟時 attemptScheduling(node); } } else { attemptScheduling(node); //心跳調度 } // Updating node resource utilization node.setAggregatedContainersUtilization( nm.getAggregatedContainersUtilization()); node.setNodeUtilization(nm.getNodeUtilization());
持續調度是一個單獨的守護線程
間隔getContinuousSchedulingSleepMs()時間運行一次continuousSchedulingAttempt方法
/**
* Thread which attempts scheduling resources continuously,
* asynchronous to the node heartbeats.
*/
private class ContinuousSchedulingThread extends Thread {
@Override
public void run() {
while (!Thread.currentThread().isInterrupted()) {
try {
continuousSchedulingAttempt();
Thread.sleep(getContinuousSchedulingSleepMs());
} catch (InterruptedException e) {
LOG.warn("Continuous scheduling thread interrupted. Exiting.", e);
return;
}
}
}
}
之后進行一次node節點 根據資源寬松情況的排序
void continuousSchedulingAttempt() throws InterruptedException { long start = getClock().getTime(); List<NodeId> nodeIdList = new ArrayList<NodeId>(nodes.keySet()); // Sort the nodes by space available on them, so that we offer // containers on emptier nodes first, facilitating an even spread. This // requires holding the scheduler lock, so that the space available on a // node doesn't change during the sort. synchronized (this) { Collections.sort(nodeIdList, nodeAvailableResourceComparator); } // iterate all nodes for (NodeId nodeId : nodeIdList) { FSSchedulerNode node = getFSSchedulerNode(nodeId); try { if (node != null && Resources.fitsIn(minimumAllocation, node.getAvailableResource())) { attemptScheduling(node); } } catch (Throwable ex) { LOG.error("Error while attempting scheduling for node " + node + ": " + ex.toString(), ex); if ((ex instanceof YarnRuntimeException) && (ex.getCause() instanceof InterruptedException)) { // AsyncDispatcher translates InterruptedException to // YarnRuntimeException with cause InterruptedException. // Need to throw InterruptedException to stop schedulingThread. throw (InterruptedException)ex.getCause(); } } }
依次對node遍歷分配Container
queueMgr.getRootQueue().assignContainer(node) 從root遍歷樹 對抽象的應用資源遍歷
boolean validReservation = false; FSAppAttempt reservedAppSchedulable = node.getReservedAppSchedulable(); if (reservedAppSchedulable != null) { validReservation = reservedAppSchedulable.assignReservedContainer(node); } if (!validReservation) { // No reservation, schedule at queue which is farthest below fair share int assignedContainers = 0; Resource assignedResource = Resources.clone(Resources.none()); Resource maxResourcesToAssign = Resources.multiply(node.getAvailableResource(), 0.5f); while (node.getReservedContainer() == null) { boolean assignedContainer = false; Resource assignment = queueMgr.getRootQueue().assignContainer(node); if (!assignment.equals(Resources.none())) { //判斷是否分配到container assignedContainers++; assignedContainer = true; Resources.addTo(assignedResource, assignment); } if (!assignedContainer) { break; } if (!shouldContinueAssigning(assignedContainers, maxResourcesToAssign, assignedResource)) { break; } }
接下來在assignContainer 方法中對子隊列使用特定的比較器排序這里是fairSchduler
@Override public Resource assignContainer(FSSchedulerNode node) { 對於每一個服務器,對資源樹進行一次遞歸搜索 Resource assigned = Resources.none(); // If this queue is over its limit, reject if (!assignContainerPreCheck(node)) { return assigned; } // Hold the write lock when sorting childQueues writeLock.lock(); try { Collections.sort(childQueues, policy.getComparator()); } finally { writeLock.unlock(); }
對隊列下的app排序
/* * We are releasing the lock between the sort and iteration of the * "sorted" list. There could be changes to the list here: * 1. Add a child queue to the end of the list, this doesn't affect * container assignment. * 2. Remove a child queue, this is probably good to take care of so we * don't assign to a queue that is going to be removed shortly. */ readLock.lock(); try { for (FSQueue child : childQueues) { assigned = child.assignContainer(node); if (!Resources.equals(assigned, Resources.none())) { break; } } } finally { readLock.unlock(); } return assigned;
assignContainer 可能傳入的是app 可能傳入的是一個隊列 是隊列的話 進行遞歸 直到找到app為止(root(FSParentQueue)節點遞歸調用assignContainer()
,最終將到達最終葉子節點的assignContainer()
方法,才真正開始進行分配)
優化一 : 優化隊列比較器
我們在這里 關注的就是排序
hadoop2.8.4 排序類 FairSharePolicy中的 根據權重 需求的資源大小 和內存占比 進行排序 多次獲取
getResourceUsage() 產生了大量重復計算 這個方法是一個動態獲取的過程(耗時)
@Override
public int compare(Schedulable s1, Schedulable s2) {
double minShareRatio1, minShareRatio2;
double useToWeightRatio1, useToWeightRatio2;
Resource minShare1 = Resources.min(RESOURCE_CALCULATOR, null,
s1.getMinShare(), s1.getDemand());
Resource minShare2 = Resources.min(RESOURCE_CALCULATOR, null,
s2.getMinShare(), s2.getDemand());
boolean s1Needy = Resources.lessThan(RESOURCE_CALCULATOR, null,
s1.getResourceUsage(), minShare1);
boolean s2Needy = Resources.lessThan(RESOURCE_CALCULATOR, null,
s2.getResourceUsage(), minShare2);
minShareRatio1 = (double) s1.getResourceUsage().getMemorySize()
/ Resources.max(RESOURCE_CALCULATOR, null, minShare1, ONE).getMemorySize();
minShareRatio2 = (double) s2.getResourceUsage().getMemorySize()
/ Resources.max(RESOURCE_CALCULATOR, null, minShare2, ONE).getMemorySize();
useToWeightRatio1 = s1.getResourceUsage().getMemorySize() /
s1.getWeights().getWeight(ResourceType.MEMORY);
useToWeightRatio2 = s2.getResourceUsage().getMemorySize() /
s2.getWeights().getWeight(ResourceType.MEMORY);
int res = 0;
if (s1Needy && !s2Needy)
res = -1;
else if (s2Needy && !s1Needy)
res = 1;
else if (s1Needy && s2Needy)
res = (int) Math.signum(minShareRatio1 - minShareRatio2);
else
// Neither schedulable is needy
res = (int) Math.signum(useToWeightRatio1 - useToWeightRatio2);
if (res == 0) {
// Apps are tied in fairness ratio. Break the tie by submit time and job
// name to get a deterministic ordering, which is useful for unit tests.
res = (int) Math.signum(s1.getStartTime() - s2.getStartTime());
if (res == 0)
res = s1.getName().compareTo(s2.getName());
}
return res;
}
}
新版優化后如下
@Override public int compare(Schedulable s1, Schedulable s2) { int res = compareDemand(s1, s2); // Pre-compute resource usages to avoid duplicate calculation Resource resourceUsage1 = s1.getResourceUsage(); Resource resourceUsage2 = s2.getResourceUsage(); if (res == 0) { res = compareMinShareUsage(s1, s2, resourceUsage1, resourceUsage2); } if (res == 0) { res = compareFairShareUsage(s1, s2, resourceUsage1, resourceUsage2); } // Break the tie by submit time if (res == 0) { res = (int) Math.signum(s1.getStartTime() - s2.getStartTime()); } // Break the tie by job name if (res == 0) { res = s1.getName().compareTo(s2.getName()); } return res; } private int compareDemand(Schedulable s1, Schedulable s2) { int res = 0; Resource demand1 = s1.getDemand(); Resource demand2 = s2.getDemand(); if (demand1.equals(Resources.none()) && Resources.greaterThan( RESOURCE_CALCULATOR, null, demand2, Resources.none())) { res = 1; } else if (demand2.equals(Resources.none()) && Resources.greaterThan( RESOURCE_CALCULATOR, null, demand1, Resources.none())) { res = -1; } return res; } private int compareMinShareUsage(Schedulable s1, Schedulable s2, Resource resourceUsage1, Resource resourceUsage2) { int res; Resource minShare1 = Resources.min(RESOURCE_CALCULATOR, null, s1.getMinShare(), s1.getDemand()); Resource minShare2 = Resources.min(RESOURCE_CALCULATOR, null, s2.getMinShare(), s2.getDemand()); boolean s1Needy = Resources.lessThan(RESOURCE_CALCULATOR, null, resourceUsage1, minShare1); boolean s2Needy = Resources.lessThan(RESOURCE_CALCULATOR, null, resourceUsage2, minShare2); if (s1Needy && !s2Needy) { res = -1; } else if (s2Needy && !s1Needy) { res = 1; } else if (s1Needy && s2Needy) { double minShareRatio1 = (double) resourceUsage1.getMemorySize() / Resources.max(RESOURCE_CALCULATOR, null, minShare1, ONE) .getMemorySize(); double minShareRatio2 = (double) resourceUsage2.getMemorySize() / Resources.max(RESOURCE_CALCULATOR, null, minShare2, ONE) .getMemorySize(); res = (int) Math.signum(minShareRatio1 - minShareRatio2); } else { res = 0; } return res; } /** * To simplify computation, use weights instead of fair shares to calculate * fair share usage. */ private int compareFairShareUsage(Schedulable s1, Schedulable s2, Resource resourceUsage1, Resource resourceUsage2) { double weight1 = s1.getWeights().getWeight(ResourceType.MEMORY); double weight2 = s2.getWeights().getWeight(ResourceType.MEMORY); double useToWeightRatio1; double useToWeightRatio2; if (weight1 > 0.0 && weight2 > 0.0) { useToWeightRatio1 = resourceUsage1.getMemorySize() / weight1; useToWeightRatio2 = resourceUsage2.getMemorySize() / weight2; } else { // Either weight1 or weight2 equals to 0 if (weight1 == weight2) { // If they have same weight, just compare usage useToWeightRatio1 = resourceUsage1.getMemorySize(); useToWeightRatio2 = resourceUsage2.getMemorySize(); } else { // By setting useToWeightRatios to negative weights, we give the // zero-weight one less priority, so the non-zero weight one will // be given slots. useToWeightRatio1 = -weight1; useToWeightRatio2 = -weight2; } } return (int) Math.signum(useToWeightRatio1 - useToWeightRatio2); } }
用了測試環境集群 比較了修改前后兩次隊列排序耗時
圖中使用挫劣的方式比對 請觀眾湊合看吧^-^
上面紅框里為 新版本 下面紅框為老版本 雖然沒有進行壓測 但是在同樣的調度任務前提下 是有說服力的 在大集群上每秒調度上千萬乃至上億次該方法時 調度優化變的明顯
上線壓測時 在1000隊列 1500 pending任務600running任務時 調度性能提高了一倍 還是比較明顯的提升的
優化二 : 優化yarn調度邏輯
思想:在大規模集群中 資源利用率表現的並不好,為了提高資源利用率,開啟持續調度 然而實踐發現 資源利用率是上去了但是 集群調度能力很弱 處理跟釋放的container並沒有提高
排查原因是心跳調度跟持續調度 走相同的synchronized 方法修飾的attemptScheduling 導致競爭鎖 分配和釋放都變的緩慢 且隊列排序分配 在集群pending任務巨多時異常緩慢
優化:1,啟用持續調度 禁用心跳調度
2,持續調度按批進行 間接減少隊列排序造成的耗時影響
3. 釋放不重要的鎖 解放性能
說干就干
開啟yarn的持續調度 配置如下:
<property>
<name>yarn.scheduler.fair.continuous-scheduling-enabled</name>
<value>true</value>
<discription>是否打開連續調度功能</discription>
</property>
<property>
持續調度 每5ms執行一次上述方法 對node依次迭代執行
void continuousSchedulingAttempt() throws InterruptedException { long start = getClock().getTime(); List<NodeId> nodeIdList = new ArrayList<NodeId>(nodes.keySet()); // Sort the nodes by space available on them, so that we offer // containers on emptier nodes first, facilitating an even spread. This // requires holding the scheduler lock, so that the space available on a // node doesn't change during the sort. synchronized (this) { Collections.sort(nodeIdList, nodeAvailableResourceComparator); //對所有node 根據資源排序 } // iterate all nodes for (NodeId nodeId : nodeIdList) { //遍歷所有的node FSSchedulerNode node = getFSSchedulerNode(nodeId); try { if (node != null && Resources.fitsIn(minimumAllocation, node.getAvailableResource())) { //判斷該node 上現有的資源是否大於最小配置資源單位 attemptScheduling(node); //執行ttemptScheduling方法
} } catch (Throwable ex) { LOG.error("Error while attempting scheduling for node " + node + ": " + ex.toString(), ex); if ((ex instanceof YarnRuntimeException) && (ex.getCause() instanceof InterruptedException)) { // AsyncDispatcher translates InterruptedException to // YarnRuntimeException with cause InterruptedException. // Need to throw InterruptedException to stop schedulingThread. throw (InterruptedException)ex.getCause(); } } }
下面看下attemptScheduling方法
@VisibleForTesting synchronized void attemptScheduling(FSSchedulerNode node) { if (rmContext.isWorkPreservingRecoveryEnabled() && !rmContext.isSchedulerReadyForAllocatingContainers()) { return; } final NodeId nodeID = node.getNodeID(); if (!nodes.containsKey(nodeID)) { //合法性 // The node might have just been removed while this thread was waiting // on the synchronized lock before it entered this synchronized method LOG.info("Skipping scheduling as the node " + nodeID + " has been removed"); return; } // Assign new containers... // 1. Check for reserved applications // 2. Schedule if there are no reservations boolean validReservation = false; FSAppAttempt reservedAppSchedulable = node.getReservedAppSchedulable(); if (reservedAppSchedulable != null) { validReservation = reservedAppSchedulable.assignReservedContainer(node); } if (!validReservation) { //合法性判斷 // No reservation, schedule at queue which is farthest below fair share int assignedContainers = 0; Resource assignedResource = Resources.clone(Resources.none()); Resource maxResourcesToAssign = Resources.multiply(node.getAvailableResource(), 0.5f); //默認使用該node最大50%的資源 while (node.getReservedContainer() == null) { boolean assignedContainer = false; Resource assignment = queueMgr.getRootQueue().assignContainer(node); //主要方法 依次對root樹 遍歷直到app 對該node上分配container if (!assignment.equals(Resources.none())) { //分配到資源 assignedContainers++; //分配到的container個數增1 assignedContainer = true; Resources.addTo(assignedResource, assignment); } if (!assignedContainer) { break; } //未匹配到 跳出 if (!shouldContinueAssigning(assignedContainers, //根據相關配置判斷 現在分配的container個數 是否超出node上配置最大數 或node上的可用資源是否超出最小的配置資源 maxResourcesToAssign, assignedResource)) { break; } } } updateRootQueueMetrics(); }
針對上面源碼 修改為如下內容:
持續調度一次分配五個node,減少每個node及分配過程排序的耗時操作。
void continuousSchedulingAttempt() throws InterruptedException { long start = getClock().getTime(); List<NodeId> nodeIdList = new ArrayList<NodeId>(nodes.keySet()); // Sort the nodes by space available on them, so that we offer // containers on emptier nodes first, facilitating an even spread. This // requires holding the scheduler lock, so that the space available on a // node doesn't change during the sort. synchronized (this) { Collections.sort(nodeIdList, nodeAvailableResourceComparator); } ArrayList<ArrayList<NodeId>> newNodeList = inBatchesNodes(nodeIdList, batchNodeAssigon); // 按批次返回node for (ArrayList<NodeId> nodeList : newNodeList) { //每個node進行檢查 ArrayList<FSSchedulerNode> fsSchedulerNodeList = new ArrayList<>(); try { for (NodeId nodeId : nodeList) { FSSchedulerNode node = getFSSchedulerNode(nodeId); if (node != null && Resources.fitsIn(minimumAllocation, node.getAvailableResource())) { fsSchedulerNodeList.add(node); } } attemptSchedulings(fsSchedulerNodeList); // 批次進行attemptSchedule }catch (Exception e){ LOG.error("Processing attemptSchedulings error"+fsSchedulerNodeList +":"+fsSchedulerNodeList.toString(),e); fsSchedulerNodeList.stream().filter(Objects::nonNull).forEach(node->{ try { attemptScheduling(node); //有異常仍然走之前的邏輯 } catch (Throwable ex) { LOG.error("Error while attempting scheduling for node " + nodeList + ": " + ex.toString(), ex); } }); } } long duration = getClock().getTime() - start; fsOpDurations.addContinuousSchedulingRunDuration(duration); }
/** * 將傳入的list 按size批次 返回 * @param list * @param size * @return */ private ArrayList<ArrayList<NodeId>> inBatchesNodes(List<NodeId> list, int size) { int listSize = list.size(); //表示一共需要取幾次 int count = (list.size() % size == 0 ? list.size() / size : list.size() / size + 1); ArrayList<ArrayList<NodeId>> returnList = new ArrayList<>(count); for (int i = 0; i < listSize; i += size) { if (i + size > list.size()) { size = listSize - i; } ArrayList<NodeId> newList = new ArrayList<>(size); for (int j = i; j < i + size; j++) { newList.add(list.get(j)); } returnList.add(newList); } return returnList; }
interface Schedulable 接口新增 方法
/** * Assign list container list this node if possible, and return the amount of * resources assigned. */ public List<Resource> assignContainers(List<FSSchedulerNode> nodes);
@VisibleForTesting protected void attemptSchedulings(ArrayList<FSSchedulerNode> fsSchedulerNodeList) { if (rmContext.isWorkPreservingRecoveryEnabled() && !rmContext.isSchedulerReadyForAllocatingContainers()) { return; } List<FSSchedulerNode> fsSchedulerNodes = new ArrayList(); //定義個新集合 添加通過檢查的node 抽象對象 fsSchedulerNodeList.stream().forEach(node -> { final NodeId nodeID = node.getNodeID(); if (nodes.containsKey(nodeID)) { // Assign new containers...// 1. Check for reserved applications // 2. Schedule if there are no reservations boolean validReservation = false; FSAppAttempt reservedAppSchedulable = node.getReservedAppSchedulable(); if (reservedAppSchedulable != null) { validReservation = reservedAppSchedulable.assignReservedContainer(node); } if (!validReservation) { //通過合法檢查 if (node.getReservedContainer() == null) { //該node上 沒有被某個container預留 fsSchedulerNodes.add(node); } } } else { LOG.info("Skipping scheduling as the node " + nodeID + " has been removed"); } }); if (fsSchedulerNodes.isEmpty()) { LOG.error("Handle fsSchedulerNodes empty and return"); return; } LOG.info("符合條件的nodes:" + fsSchedulerNodeList.size()); List<Resource> resources = queueMgr.getRootQueue().assignContainers(fsSchedulerNodes); //傳入node的集合 批量操作 fsOpDurations.addDistributiveContainer(resources.size()); LOG.info("本次分配的container count:" + resources.size()); updateRootQueueMetrics(); }
FSParentQueue 類中 添加實現
@Override public List<Resource> assignContainers(List<FSSchedulerNode> nodes) { List<Resource> assignedsNeed = new ArrayList<>(); ArrayList<FSSchedulerNode> fsSchedulerNodes = new ArrayList<>(); for (FSSchedulerNode node : nodes) { if (assignContainerPreCheck(node)) { fsSchedulerNodes.add(node); } } if (fsSchedulerNodes.isEmpty()) { LOG.info("Nodes is empty, skip this assign around"); return assignedsNeed; } // Hold the write lock when sorting childQueues writeLock.lock(); try { Collections.sort(childQueues, policy.getComparator()); //排序又見排序 哈哈 } finally { writeLock.unlock(); } /* * We are releasing the lock between the sort and iteration of the * "sorted" list. There could be changes to the list here: * 1. Add a child queue to the end of the list, this doesn't affect * container assignment. * 2. Remove a child queue, this is probably good to take care of so we * don't assign to a queue that is going to be removed shortly. */ readLock.lock(); try { for (FSQueue child : childQueues) { List<Resource> assigneds = child.assignContainers(fsSchedulerNodes); //同樣傳入node集合 if (!assigneds.isEmpty()) { for (Resource assign : assigneds) { assignedsNeed.add(assign); } break; } } } finally { readLock.unlock(); } return assignedsNeed; }
app最終在FSLeafQueue節點上得到處理(第一版)
@Override public List<Resource> assignContainers(List<FSSchedulerNode> nodes) { Resource assigned = Resources.none(); List<Resource> assigneds = new ArrayList<>(); ArrayList<FSSchedulerNode> fsSchedulerNodes = new ArrayList<>(); for (FSSchedulerNode node : nodes) { if (assignContainerPreCheck(node)) { fsSchedulerNodes.add(node); } } if (fsSchedulerNodes.isEmpty()) { LOG.info("Nodes is empty, skip this assign around"); return assigneds; } // Apps that have resource demands. TreeSet<FSAppAttempt> pendingForResourceApps = new TreeSet<FSAppAttempt>(policy.getComparator()); readLock.lock(); try { for (FSAppAttempt app : runnableApps) { //所有的app running or pending 隊列 進行依次排序 Resource pending = app.getAppAttemptResourceUsage().getPending(); if (!pending.equals(Resources.none())) { //有資源需求的加入排序隊列 pendingForResourceApps.add(app); } } } finally { readLock.unlock(); } int count = 0; //每個node 分配container計數 Set<String> repeatApp = new HashSet<>(); //定義去重集合 for (FSSchedulerNode node : fsSchedulerNodes) { //node 遍歷 count = 0; for (FSAppAttempt sched : pendingForResourceApps) { //app遍歷 // One node just allocate for one app once if (repeatApp.contains(sched.getId())) { //去重 continue; } if (SchedulerAppUtils.isPlaceBlacklisted(sched, node, LOG)) { //判斷app有沒有在node黑名單里 continue; } if (node.getReservedContainer() == null && Resources.fitsIn(minimumAllocation, node.getAvailableResource())) { //判斷node上還有沒有資源 assigned = sched.assignContainer(node); //具體分配container方法 if (!assigned.equals(Resources.none())) {//給container 在node上分配到了資源 count++; repeatApp.add(sched.getId()); assigneds.add(assigned); if (LOG.isDebugEnabled()) { LOG.debug("Assigned container in queue:" + getName() + " " + "container:" + assigned); } } } if (count >= maxNodeContainerAssign) { //node 分配的數量 超出最大的配置數 跳出 給下一node 分配 break; } } } return assigneds; }
經過幾次修正修改如下(第二版):
@Override public int assignContainers(List<FSSchedulerNode> nodes) { int result = 0; getMetrics().assignCount.incr(); Resource assigned = Resources.none(); ArrayList<FSSchedulerNode> fsSchedulerNodes = new ArrayList<>(); long start = System.currentTimeMillis(); for (FSSchedulerNode node : nodes) { if (assignContainerPreCheck(node)) { fsSchedulerNodes.add(node); } } long preCheckTime = System.currentTimeMillis(); getMetrics().preCheckTime.incr((preCheckTime - start)); if (fsSchedulerNodes.isEmpty()) { LOG.info("Nodes is empty, skip this assign around"); return result; } long runAppSortStart = System.currentTimeMillis(); // Apps that have resource demands. TreeSet<FSAppAttempt> pendingForResourceApps = new TreeSet<FSAppAttempt>(policy.getComparator()); readLock.lock(); try { for (FSAppAttempt app : runnableApps) { Resource pending = app.getAppAttemptResourceUsage().getPending(); if (!pending.equals(Resources.none())) { pendingForResourceApps.add(app); } } } finally { readLock.unlock(); } long runAppSortEnd = System.currentTimeMillis(); getMetrics().runAppSortTime.incr((runAppSortEnd - runAppSortStart)); int count = 0; long appAssignStart, appAssignEnd = 0; Set<Integer> repeatApp = new HashSet<>(); for (FSSchedulerNode node : fsSchedulerNodes) { count = 0; for (FSAppAttempt sched : pendingForResourceApps) { // One node just allocate for one app once if (repeatApp.contains(sched.getApplicationId().hashCode())) { continue; } if (SchedulerAppUtils.isPlaceBlacklisted(sched, node, LOG)) { continue; } appAssignStart = System.currentTimeMillis(); if (node.getReservedContainer() == null && Resources.fitsIn(minimumAllocation, node.getAvailableResource())) { assigned = sched.assignContainer(node); if (!assigned.equals(Resources.none())) { repeatApp.add(sched.getApplicationId().hashCode()); count++; result++; if (LOG.isDebugEnabled()) { LOG.debug("Queue " + getName() + " assigned container in queue:" + getName() + " " + "container:" + assigned); } } else { if (LOG.isDebugEnabled()) { LOG.debug("Queue " + getName() + " app " + sched.getId() + " skip node " + node.getNodeName()); } } } else { if (LOG.isDebugEnabled()) { LOG.debug("Queue " + getName() + " node " + node.getNodeName() + " has resource " + node.getAvailableResource() + " left, node reserved is " + (node.getReservedContainer() == null)); } } appAssignEnd = System.currentTimeMillis(); getMetrics().appContainerAssignTime.incr((appAssignEnd - appAssignStart)); if (appAssignEnd - appAssignStart > 2) { LOG.info("Queue " + getName() + " app assign container use " + (appAssignEnd - appAssignStart) + " ms"); } if (count == maxNodeContainerAssign) { break; } } } return result; }
這輪優化 完畢 對比之前 調度性能提高了四倍樣子 線上的積壓問題得到有效解決
優化后nodeUpdate耗時對比如下