上一節分析了Job由JobClient提交到JobTracker的流程,利用RPC機制,JobTracker接收到Job ID和Job所在HDFS的目錄,夠早了JobInProgress對象,丟入隊列,另一個線程從隊列中取出JobInProgress對象,並丟入線程池中執行,執行JobInProgress的initJob方法,我們逐步分析。
public void initJob(JobInProgress job) { if (null == job) { LOG.info("Init on null job is not valid"); return; } try { JobStatus prevStatus = (JobStatus)job.getStatus().clone(); LOG.info("Initializing " + job.getJobID()); job.initTasks(); // Inform the listeners if the job state has changed // Note : that the job will be in PREP state. JobStatus newStatus = (JobStatus)job.getStatus().clone(); if (prevStatus.getRunState() != newStatus.getRunState()) { JobStatusChangeEvent event = new JobStatusChangeEvent(job, EventType.RUN_STATE_CHANGED, prevStatus, newStatus); synchronized (JobTracker.this) { updateJobInProgressListeners(event); } } } catch (KillInterruptedException kie) { // If job was killed during initialization, job state will be KILLED LOG.error("Job initialization interrupted:\n" + StringUtils.stringifyException(kie)); killJob(job); } catch (Throwable t) { String failureInfo = "Job initialization failed:\n" + StringUtils.stringifyException(t); // If the job initialization is failed, job state will be FAILED LOG.error(failureInfo); job.getStatus().setFailureInfo(failureInfo); failJob(job); } }
可以看出,先進行 job.initTasks(),初始化Map和Reduce任務,之后更新所有
synchronized (JobTracker.this) { updateJobInProgressListeners(event); }
Map/Reduce Task初始化完畢是一個事件,下面的代碼進行消息通知:
// Update the listeners about the job // Assuming JobTracker is locked on entry. private void updateJobInProgressListeners(JobChangeEvent event) { for (JobInProgressListener listener : jobInProgressListeners) { listener.jobUpdated(event); } }
可見,在Job放入隊列時使用的是jobAdded,此時使用的是jobUpdated。我們在后面再分析jobUpdated后的細節,此時先分析從jobAdded到jobUpdated之間,Job的初始化過程,主要分為幾個階段。
首先執行的是獲取Split信息,這一部分信息事先已經由JobClient上傳至HDFS中。
1、讀取Split信息:
// // read input splits and create a map per a split // TaskSplitMetaInfo[] splits = createSplits(jobId); if (numMapTasks != splits.length) { throw new IOException("Number of maps in JobConf doesn't match number of " + "recieved splits for job " + jobId + "! " + "numMapTasks=" + numMapTasks + ", #splits=" + splits.length); } numMapTasks = splits.length;
createSplits方法的代碼為:
TaskSplitMetaInfo[] createSplits(org.apache.hadoop.mapreduce.JobID jobId) throws IOException { TaskSplitMetaInfo[] allTaskSplitMetaInfo = SplitMetaInfoReader.readSplitMetaInfo(jobId, fs, jobtracker.getConf(), jobSubmitDir); return allTaskSplitMetaInfo; }
即讀取job.splitmetainfo文件,獲得Split信息:
public static JobSplit.TaskSplitMetaInfo[] readSplitMetaInfo( JobID jobId, FileSystem fs, Configuration conf, Path jobSubmitDir) throws IOException { long maxMetaInfoSize = conf.getLong("mapreduce.jobtracker.split.metainfo.maxsize", 10000000L); Path metaSplitFile = JobSubmissionFiles.getJobSplitMetaFile(jobSubmitDir); FileStatus fStatus = fs.getFileStatus(metaSplitFile); if (maxMetaInfoSize > 0 && fStatus.getLen() > maxMetaInfoSize) { throw new IOException("Split metadata size exceeded " + maxMetaInfoSize +". Aborting job " + jobId); } FSDataInputStream in = fs.open(metaSplitFile); byte[] header = new byte[JobSplit.META_SPLIT_FILE_HEADER.length]; in.readFully(header); if (!Arrays.equals(JobSplit.META_SPLIT_FILE_HEADER, header)) { throw new IOException("Invalid header on split file"); } int vers = WritableUtils.readVInt(in); if (vers != JobSplit.META_SPLIT_VERSION) { in.close(); throw new IOException("Unsupported split version " + vers); } int numSplits = WritableUtils.readVInt(in); //TODO: check for insane values JobSplit.TaskSplitMetaInfo[] allSplitMetaInfo = new JobSplit.TaskSplitMetaInfo[numSplits]; final int maxLocations = conf.getInt(JobSplitWriter.MAX_SPLIT_LOCATIONS, Integer.MAX_VALUE); for (int i = 0; i < numSplits; i++) { JobSplit.SplitMetaInfo splitMetaInfo = new JobSplit.SplitMetaInfo(); splitMetaInfo.readFields(in); final int numLocations = splitMetaInfo.getLocations().length; if (numLocations > maxLocations) { throw new IOException("Max block location exceeded for split: #" + i + " splitsize: " + numLocations + " maxsize: " + maxLocations); } JobSplit.TaskSplitIndex splitIndex = new JobSplit.TaskSplitIndex( JobSubmissionFiles.getJobSplitFile(jobSubmitDir).toString(), splitMetaInfo.getStartOffset()); allSplitMetaInfo[i] = new JobSplit.TaskSplitMetaInfo(splitIndex, splitMetaInfo.getLocations(), splitMetaInfo.getInputDataLength()); } in.close(); return allSplitMetaInfo; }
涉及讀取文件的代碼有:
FSDataInputStream in = fs.open(metaSplitFile); byte[] header = new byte[JobSplit.META_SPLIT_FILE_HEADER.length]; in.readFully(header);
這一部分先讀取job.splitmetainfo文件的頭部,頭部實際上是字符串”META-SPL“,該信息由下面的類指定:
public class JobSplit { static final int META_SPLIT_VERSION = 1; static final byte[] META_SPLIT_FILE_HEADER; static { try { META_SPLIT_FILE_HEADER = "META-SPL".getBytes("UTF-8"); } catch (UnsupportedEncodingException u) { throw new RuntimeException(u); } } .......
讀取了文件頭之后,剩下的是讀取版本信息:
int vers = WritableUtils.readVInt(in); if (vers != JobSplit.META_SPLIT_VERSION) { in.close(); throw new IOException("Unsupported split version " + vers); }
檢查了版本(1)后,接下來就是讀取Split的數量:
int numSplits = WritableUtils.readVInt(in); //TODO: check for insane values JobSplit.TaskSplitMetaInfo[] allSplitMetaInfo = new JobSplit.TaskSplitMetaInfo[numSplits];
並根據Split數量創建JobSplit.TaskSplitMetaInfo數組。接下來對於每個Split,循環讀取位置等信息:
for (int i = 0; i < numSplits; i++) { JobSplit.SplitMetaInfo splitMetaInfo = new JobSplit.SplitMetaInfo(); splitMetaInfo.readFields(in); final int numLocations = splitMetaInfo.getLocations().length; if (numLocations > maxLocations) { throw new IOException("Max block location exceeded for split: #" + i + " splitsize: " + numLocations + " maxsize: " + maxLocations); } JobSplit.TaskSplitIndex splitIndex = new JobSplit.TaskSplitIndex( JobSubmissionFiles.getJobSplitFile(jobSubmitDir).toString(), splitMetaInfo.getStartOffset()); allSplitMetaInfo[i] = new JobSplit.TaskSplitMetaInfo(splitIndex, splitMetaInfo.getLocations(), splitMetaInfo.getInputDataLength()); }
在上面的代碼中,splitMetaInfo.readFields(in)可以獲得位置信息:
public void readFields(DataInput in) throws IOException { int len = WritableUtils.readVInt(in); locations = new String[len]; for (int i = 0; i < locations.length; i++) { locations[i] = Text.readString(in); } startOffset = WritableUtils.readVLong(in); inputDataLength = WritableUtils.readVLong(in); }
所謂的位置,實際上就是指這個Split在j哪些服務器的信息。獲取到位置、Split數據長度等信息后,全部紀錄在對象JobSplit.TaskSplitMetaInfo中:
JobSplit.TaskSplitIndex splitIndex = new JobSplit.TaskSplitIndex( JobSubmissionFiles.getJobSplitFile(jobSubmitDir).toString(), splitMetaInfo.getStartOffset()); allSplitMetaInfo[i] = new JobSplit.TaskSplitMetaInfo(splitIndex, splitMetaInfo.getLocations(), splitMetaInfo.getInputDataLength());
返回allSplitMetaInfo數組。
2、根據Map任務數量創建相同數量的TaskInProgress對象:
上面返回的數組大小即紀錄了Split的個數,也決定了Map的數量,驗證這些服務器的合法性:
numMapTasks = splits.length;
// Sanity check the locations so we don't create/initialize unnecessary tasks
for (TaskSplitMetaInfo split : splits) {
NetUtils.verifyHostnames(split.getLocations());
}
在監控相關類中設置相應信息:
jobtracker.getInstrumentation().addWaitingMaps(getJobID(), numMapTasks); jobtracker.getInstrumentation().addWaitingReduces(getJobID(), numReduceTasks); this.queueMetrics.addWaitingMaps(getJobID(), numMapTasks); this.queueMetrics.addWaitingReduces(getJobID(), numReduceTasks);
接下來創建TaskInProgress對象,每個Map都對應於一個TaskInProgress對象:
maps = new TaskInProgress[numMapTasks]; for(int i=0; i < numMapTasks; ++i) { inputLength += splits[i].getInputDataLength(); maps[i] = new TaskInProgress(jobId, jobFile, splits[i], jobtracker, conf, this, i, numSlotsPerMap); }
TaskInProgress紀錄了一個Map Task或Reduce Task運行相關的所有信息,類似於JobInProgress,TaskInProgress的構造函數有兩個,分別針對Map和Reduce的,對於Map的:
/** * Constructor for MapTask */ public TaskInProgress(JobID jobid, String jobFile, TaskSplitMetaInfo split, JobTracker jobtracker, JobConf conf, JobInProgress job, int partition, int numSlotsRequired) { this.jobFile = jobFile; this.splitInfo = split; this.jobtracker = jobtracker; this.job = job; this.conf = conf; this.partition = partition; this.maxSkipRecords = SkipBadRecords.getMapperMaxSkipRecords(conf); this.numSlotsRequired = numSlotsRequired; setMaxTaskAttempts(); init(jobid); }
splitInfo紀錄了當前Split的信息,partition即表示這是第幾個Map Task,numSlotsRequired為1.
創建好的TaskInProgress將會放入緩存中:
if (numMapTasks > 0) { nonRunningMapCache = createCache(splits, maxLevel); }
nonRunningMapCache是一個未運行起來的Map任務的關於主機信息等等的緩存,其索引為Node,即服務器;而其值為TaskInProgress對象,其聲明為,因此,實際上就是解析Split所在的服務器,緩存下來,供后續調度使用:
Map<Node, List<TaskInProgress>> nonRunningMapCache;
其方法代碼為:
private Map<Node, List<TaskInProgress>> createCache( TaskSplitMetaInfo[] splits, int maxLevel) throws UnknownHostException { Map<Node, List<TaskInProgress>> cache = new IdentityHashMap<Node, List<TaskInProgress>>(maxLevel); Set<String> uniqueHosts = new TreeSet<String>(); for (int i = 0; i < splits.length; i++) { String[] splitLocations = splits[i].getLocations(); if (splitLocations == null || splitLocations.length == 0) { nonLocalMaps.add(maps[i]); continue; } for(String host: splitLocations) { Node node = jobtracker.resolveAndAddToTopology(host); uniqueHosts.add(host); LOG.info("tip:" + maps[i].getTIPId() + " has split on node:" + node); for (int j = 0; j < maxLevel; j++) { List<TaskInProgress> hostMaps = cache.get(node); if (hostMaps == null) { hostMaps = new ArrayList<TaskInProgress>(); cache.put(node, hostMaps); hostMaps.add(maps[i]); } //check whether the hostMaps already contains an entry for a TIP //This will be true for nodes that are racks and multiple nodes in //the rack contain the input for a tip. Note that if it already //exists in the hostMaps, it must be the last element there since //we process one TIP at a time sequentially in the split-size order if (hostMaps.get(hostMaps.size() - 1) != maps[i]) { hostMaps.add(maps[i]); } node = node.getParent(); } } } // Calibrate the localityWaitFactor - Do not override user intent! if (localityWaitFactor == DEFAULT_LOCALITY_WAIT_FACTOR) { int jobNodes = uniqueHosts.size(); int clusterNodes = jobtracker.getNumberOfUniqueHosts(); if (clusterNodes > 0) { localityWaitFactor = Math.min((float)jobNodes/clusterNodes, localityWaitFactor); } LOG.info(jobId + " LOCALITY_WAIT_FACTOR=" + localityWaitFactor); } return cache; }
3、根據Reduce任務數量創建相同數量的TaskInProgress對象:
代碼和Map基本相同:
// // Create reduce tasks // this.reduces = new TaskInProgress[numReduceTasks]; for (int i = 0; i < numReduceTasks; i++) { reduces[i] = new TaskInProgress(jobId, jobFile, numMapTasks, i, jobtracker, conf, this, numSlotsPerReduce); nonRunningReduces.add(reduces[i]); }
4、計算Reduce任務啟動前Map最少應該啟動的數量:
根據MapReduce原理,先進行Map計算,之后中間結果再傳遞至Reduce計算,因此,Map要先進行計算,Reduce如果和Map一起啟動,那么,Reduce必然先一直處於等待中。這會消耗機器資源,且Shuffle時間比較長。所以,這個值默認是Map所有任務數量的5%:
// Calculate the minimum number of maps to be complete before // we should start scheduling reduces completedMapsForReduceSlowstart = (int)Math.ceil( (conf.getFloat("mapred.reduce.slowstart.completed.maps", DEFAULT_COMPLETED_MAPS_PERCENT_FOR_REDUCE_SLOWSTART) * numMapTasks)); // ... use the same for estimating the total output of all maps resourceEstimator.setThreshhold(completedMapsForReduceSlowstart);
從DEFAULT_COMPLETED_MAPS_PERCENT_FOR_REDUCE_SLOWSTART可以看出,是5%:
private static float DEFAULT_COMPLETED_MAPS_PERCENT_FOR_REDUCE_SLOWSTART = 0.05f;
5、創建Map和Reduce任務的清理任務,各一個:
// create cleanup two cleanup tips, one map and one reduce. cleanup = new TaskInProgress[2]; // cleanup map tip. This map doesn't use any splits. Just assign an empty // split. TaskSplitMetaInfo emptySplit = JobSplit.EMPTY_TASK_SPLIT; cleanup[0] = new TaskInProgress(jobId, jobFile, emptySplit, jobtracker, conf, this, numMapTasks, 1); cleanup[0].setJobCleanupTask(); // cleanup reduce tip. cleanup[1] = new TaskInProgress(jobId, jobFile, numMapTasks, numReduceTasks, jobtracker, conf, this, 1); cleanup[1].setJobCleanupTask();
6、創建Map和Reduce任務的啟動任務,各一個:
// create two setup tips, one map and one reduce. setup = new TaskInProgress[2]; // setup map tip. This map doesn't use any split. Just assign an empty // split. setup[0] = new TaskInProgress(jobId, jobFile, emptySplit, jobtracker, conf, this, numMapTasks + 1, 1); setup[0].setJobSetupTask(); // setup reduce tip. setup[1] = new TaskInProgress(jobId, jobFile, numMapTasks, numReduceTasks + 1, jobtracker, conf, this, 1); setup[1].setJobSetupTask();
7、Map/Reduce Task初始化完畢:
synchronized(jobInitKillStatus){ jobInitKillStatus.initDone = true; // set this before the throw to make sure cleanup works properly tasksInited = true; if(jobInitKillStatus.killed) { throw new KillInterruptedException("Job " + jobId + " killed in init"); } }
初始化完畢后,會通過jobUpdated進行通知。Job更新的事件主要有三種:
static enum EventType {RUN_STATE_CHANGED, START_TIME_CHANGED, PRIORITY_CHANGED}
此時初始化完畢屬於RUN_STATE_CHANGED。從其代碼來看,如果是運行狀態改變,並不執行什么操作:
public synchronized void jobUpdated(JobChangeEvent event) { JobInProgress job = event.getJobInProgress(); if (event instanceof JobStatusChangeEvent) { // Check if the ordering of the job has changed // For now priority and start-time can change the job ordering JobStatusChangeEvent statusEvent = (JobStatusChangeEvent)event; JobSchedulingInfo oldInfo = new JobSchedulingInfo(statusEvent.getOldStatus()); if (statusEvent.getEventType() == EventType.PRIORITY_CHANGED || statusEvent.getEventType() == EventType.START_TIME_CHANGED) { // Make a priority change reorderJobs(job, oldInfo); } else if (statusEvent.getEventType() == EventType.RUN_STATE_CHANGED) { // Check if the job is complete int runState = statusEvent.getNewStatus().getRunState(); if (runState == JobStatus.SUCCEEDED || runState == JobStatus.FAILED || runState == JobStatus.KILLED) { jobCompleted(oldInfo); } } } }
因為此時Job並未結束。從此可以看出,Job在初始化完畢后,線程池又去執行其他Job的初始化等操作,等待TaskTracker來取。
關於TaskTracker與JobTracker之間的心跳,以及任務的獲取等操作,比較復雜,留作后續博文分析。
后記
由流程圖來看:
本博文在上一節分析了1、2、3、4的基礎上,分析了5、6兩個步驟,即Job的初始化、到HDFS中獲取資源數據,獲得Map和Reduce數量等過程。關於7、8、9、10等后續操作,在后續博文中分析。