JobClient
JobClient是提交job的客戶端,當創建一個實例時,構造函數里面要做的事情是:
public JobClient(JobConf conf) throws IOException { setConf(conf); init(conf); } public void init(JobConf conf) throws IOException { String tracker = conf.get("mapred.job.tracker", "local"); tasklogtimeout = conf.getInt( TASKLOG_PULL_TIMEOUT_KEY, DEFAULT_TASKLOG_TIMEOUT); this.ugi = UserGroupInformation.getCurrentUser();
//根據配置創建需要連接的JobTracker的類型 if ("local".equals(tracker)) { conf.setNumMapTasks(1); this.jobSubmitClient = new LocalJobRunner(conf); } else { this.jobSubmitClient = createRPCProxy(JobTracker.getAddress(conf), conf); } }
創建完實例,向JobTracker提交一個job使用的方法是:
public RunningJob submitJob(JobConf job) throws FileNotFoundException, IOException { try { return submitJobInternal(job); } catch (InterruptedException ie) { throw new IOException("interrupted", ie); } catch (ClassNotFoundException cnfe) { throw new IOException("class not found", cnfe); } } public RunningJob submitJobInternal(final JobConf job ) throws FileNotFoundException, ClassNotFoundException, InterruptedException, IOException { /* * configure the command line options correctly on the submitting dfs */ return ugi.doAs(new PrivilegedExceptionAction<RunningJob>() { public RunningJob run() throws FileNotFoundException, ClassNotFoundException, InterruptedException, IOException{ JobConf jobCopy = job;
//獲得JobTracker資源中轉目錄,這個是job提交job.xml,job.jar等資源的父目錄 Path jobStagingArea = JobSubmissionFiles.getStagingDir(JobClient.this, jobCopy);
//得到新的JobID JobID jobId = jobSubmitClient.getNewJobId(); Path submitJobDir = new Path(jobStagingArea, jobId.toString()); jobCopy.set("mapreduce.job.dir", submitJobDir.toString()); JobStatus status = null; try { populateTokenCache(jobCopy, jobCopy.getCredentials()); //將所有的資源文件拷貝到JobTracker的資源中轉目錄中 copyAndConfigureFiles(jobCopy, submitJobDir); // get delegation token for the dir TokenCache.obtainTokensForNamenodes(jobCopy.getCredentials(), new Path [] {submitJobDir}, jobCopy); Path submitJobFile = JobSubmissionFiles.getJobConfPath(submitJobDir); int reduces = jobCopy.getNumReduceTasks(); InetAddress ip = InetAddress.getLocalHost(); if (ip != null) { job.setJobSubmitHostAddress(ip.getHostAddress()); job.setJobSubmitHostName(ip.getHostName()); } JobContext context = new JobContext(jobCopy, jobId); jobCopy = (JobConf)context.getConfiguration(); // Check the output specification 對作業結果輸出目錄的檢查 if (reduces == 0 ? jobCopy.getUseNewMapper() : jobCopy.getUseNewReducer()) { org.apache.hadoop.mapreduce.OutputFormat<?,?> output = ReflectionUtils.newInstance(context.getOutputFormatClass(), jobCopy); output.checkOutputSpecs(context); } else { jobCopy.getOutputFormat().checkOutputSpecs(fs, jobCopy); } // Create the splits for the job FileSystem fs = submitJobDir.getFileSystem(jobCopy); LOG.debug("Creating splits at " + fs.makeQualified(submitJobDir));
//將輸入數據split好以后寫入到JobTracker指定的目錄 int maps = writeSplits(context, submitJobDir); jobCopy.setNumMapTasks(maps); // write "queue admins of the queue to which job is being submitted" // to job file. String queue = jobCopy.getQueueName(); AccessControlList acl = jobSubmitClient.getQueueAdmins(queue); jobCopy.set(QueueManager.toFullPropertyName(queue, QueueACL.ADMINISTER_JOBS.getAclName()), acl.getACLString()); // Write job file to JobTracker's fs FSDataOutputStream out = FileSystem.create(fs, submitJobFile, new FsPermission(JobSubmissionFiles.JOB_FILE_PERMISSION)); try { jobCopy.writeXml(out); } finally { out.close(); } // // Now, actually submit the job (using the submit name) // printTokens(jobId, jobCopy.getCredentials()); status = jobSubmitClient.submitJob( jobId, submitJobDir.toString(), jobCopy.getCredentials()); JobProfile prof = jobSubmitClient.getJobProfile(jobId); if (status != null && prof != null) { return new NetworkedJob(status, prof, jobSubmitClient); } else { throw new IOException("Could not launch job"); } } finally { if (status == null) { LOG.info("Cleaning up the staging area " + submitJobDir); if (fs != null && submitJobDir != null) fs.delete(submitJobDir, true); } } } }); }
在這個方法里面調用了JobTracker.submitJob()方法,返回值是JobStatus.
JobTracker
JobTracker是以一個單獨的jvm運行的,在接收Job的提交之前,他必須已經啟動:
public static void main(String argv[] ) throws IOException, InterruptedException { StringUtils.startupShutdownMessage(JobTracker.class, argv, LOG); try { if(argv.length == 0) { JobTracker tracker = startTracker(new JobConf()); tracker.offerService(); } else { if ("-dumpConfiguration".equals(argv[0]) && argv.length == 1) { dumpConfiguration(new PrintWriter(System.out)); } else { System.out.println("usage: JobTracker [-dumpConfiguration]"); System.exit(-1); } } } catch (Throwable e) { LOG.fatal(StringUtils.stringifyException(e)); System.exit(-1); } }
main方法里面做了兩件事情:
實例化一個JobTracker,並且啟動它。
開始對外進行服務。
先看實例化JobTracker:
public static JobTracker startTracker(JobConf conf, String identifier) throws IOException, InterruptedException { DefaultMetricsSystem.initialize("JobTracker"); JobTracker result = null; while (true) { try { result = new JobTracker(conf, identifier); result.taskScheduler.setTaskTrackerManager(result); break; } catch (VersionMismatch e) { throw e; } catch (BindException e) { throw e; } catch (UnknownHostException e) { throw e; } catch (AccessControlException ace) { // in case of jobtracker not having right access // bail out throw ace; } catch (IOException e) { LOG.warn("Error starting tracker: " + StringUtils.stringifyException(e)); } Thread.sleep(1000); } if (result != null) { JobEndNotifier.startNotifier(); MBeans.register("JobTracker", "JobTrackerInfo", result); } return result; }
實例化要做的事情有:
從配置文件里面讀取各種配置值。
實例化TaskScheduler,默認的為JobQueueTaskScheduler。
啟動interTrackerServer,內部的PRC服務,提供和TaskTracker的通訊。
啟動Http服務infoServer。
JobQueueTaskScheduler就是job和task的默認調度器,FIFO隊列調度。在offerService()方法里面會啟動JobQueueTaskScheduler。
TaskTracker
TaskTracker也是在單個jvm中執行的,在啟動之初調用run()方法,調用鏈是:run()==>offerService()==>transmitHeartBeat()
==>JobTracker.heartbeat()。 調用transmitHeartBeat()返回的結果是HeartbeatResponse,TaskTracker根據HeartbeatResponse進行相應的處理
TaskTrackerAction[] actions = heartbeatResponse.getActions(); if (actions != null){ for(TaskTrackerAction action: actions) { if (action instanceof LaunchTaskAction) { addToTaskQueue((LaunchTaskAction)action); } else if (action instanceof CommitTaskAction) { CommitTaskAction commitAction = (CommitTaskAction)action; if (!commitResponses.contains(commitAction.getTaskID())) { LOG.info("Received commit task action for " + commitAction.getTaskID()); commitResponses.add(commitAction.getTaskID()); } } else { tasksToCleanup.put(action); } } }
根據返回的TaskTrackerAction,進行不同的操作:開啟一個task或者提交一個task。對於開啟一個task后,TaskLauncher接管了后面的工作,下回再表
附一張草圖: