1.一個標准 MR-Job 的執行入口:
//參數 true 表示檢查並打印 Job 和 Task 的運行狀況
System.exit(job.waitForCompletion(true) ? 0 : 1);
2.job.waitForCompletion(true)方法的內部實現:
//job.waitForCompletion()方法的內部實現
public boolean waitForCompletion(boolean verbose ) throws IOException, InterruptedException, ClassNotFoundException { if (state == JobState.DEFINE) { submit(); //此方法的核心在於submit() } if (verbose) { //根據傳入的參數,決定是否打印Job運行的詳細過程
monitorAndPrintJob(); } else { // get the completion poll interval from the client.
int completionPollIntervalMillis = Job.getCompletionPollInterval(cluster.getConf()); while (!isComplete()) { try { Thread.sleep(completionPollIntervalMillis); } catch (InterruptedException ie) { } } }
3. Job 類 submit()方法的內部實現:
public void submit() throws IOException, InterruptedException, ClassNotFoundException { ensureState(JobState.DEFINE);
setUseNewAPI();//使用MapReduce新的API
connect();//返回一個【客戶端代理對象Cluster】(屬於Job類)用於和服務端NN建立RPC通信
final JobSubmitter submitter = getJobSubmitter(cluster.getFileSystem(), cluster.getClient()); status = ugi.doAs(new PrivilegedExceptionAction<JobStatus>() { public JobStatus run() throws IOException, InterruptedException, ClassNotFoundException { //提交Job
return submitter.submitJobInternal(Job.this, cluster); } }); state = JobState.RUNNING;//設置 JobStatus 為 Running
LOG.info("The url to track the job: " + getTrackingURL()); }
3.1.1.查看Connect()方法的內部實現:
private synchronized void connect() throws IOException, InterruptedException, ClassNotFoundException { if (cluster == null) { cluster = ugi.doAs(new PrivilegedExceptionAction<Cluster>() { public Cluster run() throws IOException, InterruptedException, ClassNotFoundException { //返回一個Cluster對象,並將此對象作為 Job 類的一個成員變量
//即 Job 類持有 Cluster 的引用。
return new Cluster(getConfiguration()); } }); } }
3.1.2.查看new Cluster()的實現過程:
public Cluster(InetSocketAddress jobTrackAddr, Configuration conf) throws IOException { this.conf = conf; this.ugi = UserGroupInformation.getCurrentUser(); initialize(jobTrackAddr, conf);//重點在於此方法的內部實現
}
3.1.3.客戶端代理對象Cluster實例化過程:
synchronized (frameworkLoader) { for (ClientProtocolProvider provider : frameworkLoader) { LOG.debug("Trying ClientProtocolProvider : "
+ provider.getClass().getName());
//ClientProtocol是Client和NN通信的RPC協議,根據RPC通信原理,此協議接口中必定包含一個 versionID 字段。
ClientProtocol clientProtocol = null;
try { if (jobTrackAddr == null) {
//provider創建YARNRunner對象 clientProtocol = provider.create(conf); } else { clientProtocol = provider.create(jobTrackAddr, conf); } if (clientProtocol != null) { //初始化Cluster內部成員變量
clientProtocolProvider = provider; client = clientProtocol; //創建Cluster類的客戶端代理對象client
LOG.debug("Picked " + provider.getClass().getName() + " as the ClientProtocolProvider"); break; } else { LOG.debug("Cannot pick " + provider.getClass().getName() + " as the ClientProtocolProvider - returned null protocol"); } } catch (Exception e) { LOG.info("Failed to use " + provider.getClass().getName() + " due to error: " + e.getMessage()); } } }
3.1.4.ClientProtocol接口中包含的versionID 字段
//Version 37: More efficient serialization format for framework counters
public static final long versionID = 37L;
3.1.5.provider.create()方法創建【客戶端代理對象】有兩種實現方式:LocalClientProtocolProvider(本地模式,此處不做研究) 和 YarnClientProtocolProvider(Yarn模式)。
public ClientProtocol create(Configuration conf) throws IOException { if (MRConfig.YARN_FRAMEWORK_NAME.equals(conf.get(MRConfig.FRAMEWORK_NAME))) { return new YARNRunner(conf);//實例化【客戶端代理對象YARNRunner】
} return null; }
3.1.6.new YARNRunner()方法的實現
其中,ResourceMgrDelegate實際上ResourceManager的代理類,其實現了YarnClient接口,通過ApplicationClientProtocol代理直接向RM提交Job,殺死Job,查看Job運行狀態等操作。同時,在ResourceMgrDelegate類中會通過YarnConfiguration來讀取yarn-site.xml、core-site.xml等配置文件中的配置屬性。
public YARNRunner(Configuration conf) { this(conf, new ResourceMgrDelegate(new YarnConfiguration(conf))); }
public YARNRunner(Configuration conf, ResourceMgrDelegate resMgrDelegate, ClientCache clientCache) { this.conf = conf; try { this.resMgrDelegate = resMgrDelegate; this.clientCache = clientCache; this.defaultFileContext = FileContext.getFileContext(this.conf); } catch (UnsupportedFileSystemException ufe) { throw new RuntimeException("Error in instantiating YarnClient", ufe); } }
3.2.1.查看 JobSubmitter 類中 submitJobInternal()方法的實現:
JobStatus submitJobInternal(Job job, Cluster cluster) throws ClassNotFoundException, InterruptedException, IOException { //檢查job的輸出路徑是否存在,如果存在則拋出異常 checkSpecs(job);
//返回存放Job相關資源【比如jar包,Job.xml,Splits文件等】路徑的前綴
//默認位置 /tmp/hadoop-yarn/staging/root/.staging,可通過 yarn.app.mapreduce.am.staging-dir 修改 Path jobStagingArea = JobSubmissionFiles.getStagingDir(cluster, job.getConfiguration()); //獲取從命令行配置的Job參數 Configuration conf = job.getConfiguration();
//獲取客戶端的主機名和IP
InetAddress ip = InetAddress.getLocalHost(); if (ip != null) { submitHostAddress = ip.getHostAddress(); submitHostName = ip.getHostName(); conf.set(MRJobConfig.JOB_SUBMITHOST,submitHostName); conf.set(MRJobConfig.JOB_SUBMITHOSTADDR,submitHostAddress); }
//通過RPC,向Yarn的ResourceManager申請JobID對象
JobID jobId = submitClient.getNewJobID(); job.setJobID(jobId);
//將 存放路徑的前綴 和 JobId 拼接成完整的【Job相關文件的存放路徑】
Path submitJobDir = new Path(jobStagingArea, jobId.toString()); JobStatus status = null; try { conf.set(MRJobConfig.USER_NAME, UserGroupInformation.getCurrentUser().getShortUserName()); conf.set("hadoop.http.filter.initializers", "org.apache.hadoop.yarn.server.webproxy.amfilter.AmFilterInitializer"); conf.set(MRJobConfig.MAPREDUCE_JOB_DIR, submitJobDir.toString()); LOG.debug("Configuring job " + jobId + " with " + submitJobDir + " as the submit dir"); // get delegation token for the dir TokenCache.obtainTokensForNamenodes(job.getCredentials(), new Path[] { submitJobDir }, conf); populateTokenCache(conf, job.getCredentials()); // generate a secret to authenticate shuffle transfers if (TokenCache.getShuffleSecretKey(job.getCredentials()) == null) { KeyGenerator keyGen; try { keyGen = KeyGenerator.getInstance(SHUFFLE_KEYGEN_ALGORITHM); keyGen.init(SHUFFLE_KEY_LENGTH); } catch (NoSuchAlgorithmException e) { throw new IOException("Error generating shuffle secret key", e); } SecretKey shuffleKey = keyGen.generateKey(); TokenCache.setShuffleSecretKey(shuffleKey.getEncoded(), job.getCredentials()); } //向集群中拷貝所需文件,默認寫入 10 份(mapreduce.client.submit.file.replication) copyAndConfigureFiles(job, submitJobDir); Path submitJobFile = JobSubmissionFiles.getJobConfPath(submitJobDir);
// Create the splits for the job LOG.debug("Creating splits at " + jtFs.makeQualified(submitJobDir));
//計算並確定map的個數,以及各個輸入切片 Splits 的相關信息【后面詳述】
int maps = writeSplits(job, submitJobDir); conf.setInt(MRJobConfig.NUM_MAPS, maps); LOG.info("number of splits:" + maps); // write "queue admins of the queue to which job is being submitted" // to job file.(設置調度隊列名) String queue = conf.get(MRJobConfig.QUEUE_NAME, JobConf.DEFAULT_QUEUE_NAME); AccessControlList acl = submitClient.getQueueAdmins(queue); conf.set(toFullPropertyName(queue, QueueACL.ADMINISTER_JOBS.getAclName()), acl.getAclString()); // removing jobtoken referrals before copying the jobconf to HDFS // as the tasks don't need this setting, actually they may break // because of it if present as the referral will point to a // different job. TokenCache.cleanUpTokenReferral(conf); if (conf.getBoolean( MRJobConfig.JOB_TOKEN_TRACKING_IDS_ENABLED, MRJobConfig.DEFAULT_JOB_TOKEN_TRACKING_IDS_ENABLED)) { // Add HDFS tracking ids ArrayList<String> trackingIds = new ArrayList<String>(); for (Token<? extends TokenIdentifier> t : job.getCredentials().getAllTokens()) { trackingIds.add(t.decodeIdentifier().getTrackingId()); } conf.setStrings(MRJobConfig.JOB_TOKEN_TRACKING_IDS, trackingIds.toArray(new String[trackingIds.size()])); } // Write job file to submit dir //寫入job.xml
writeConf(conf, submitJobFile); // // Now, actually submit the job (using the submit name) //
printTokens(jobId, job.getCredentials());
//真正的提交任務方法submitJob()【詳細分析】
status = submitClient.submitJob( jobId, submitJobDir.toString(), job.getCredentials()); if (status != null) { return status; } else { throw new IOException("Could not launch job"); } } finally { if (status == null) { LOG.info("Cleaning up the staging area " + submitJobDir); if (jtFs != null && submitJobDir != null) jtFs.delete(submitJobDir, true); } } }
3.2.2.查看submitClient.submitJob()方法的實現:
submitJob()方法是接口 ClientProtocol(RPC 協議)中的一個抽象方法。根據 RPC 原理,在【客戶端代理對象submitClient】調用RPC協議中的submitJob()方法,此方法一定在服務端執行。該方法也有兩種實現: LocalJobRunner(本地模式,略)和 YARNRunner(YARN模式)。
public JobStatus submitJob(JobID jobId, String jobSubmitDir, Credentials ts) throws IOException, InterruptedException { addHistoryToken(ts); // 構建必要的信息,以啟動 MR-AM
ApplicationSubmissionContext appContext = createApplicationSubmissionContext(conf, jobSubmitDir, ts); //提交Job到RM,返回applicationId
try { ApplicationId applicationId = resMgrDelegate.submitApplication(appContext); ApplicationReport appMaster = resMgrDelegate .getApplicationReport(applicationId); String diagnostics = (appMaster == null ?
"application report is null" : appMaster.getDiagnostics()); if (appMaster == null
|| appMaster.getYarnApplicationState() == YarnApplicationState.FAILED || appMaster.getYarnApplicationState() == YarnApplicationState.KILLED) { throw new IOException("Failed to run job : " + diagnostics); } //最后返回 Job 此時的狀態,函數退出
return clientCache.getClient(jobId).getJobStatus(jobId); } catch (YarnException e) { throw new IOException(e); } }
總結:
1.為什么會產生Yarn?
Hadoop1.0生態幾乎是以MapReduce為核心的,其擴展性差、資源利用率低、可靠性等問題都越來越讓人覺得不爽,於是才產生了Yarn,並且Hadoop2.0生態都是以Yarn為核心。Storm、Spark等都可以基於Yarn使用。
2.Configuration類的作用是什么?
配置文件類Configuration,是Hadoop各個模塊的公共使用類,用於加載類路徑下的各種配置文件,讀寫其中的配置選項。
3.GenericOptionsParser類的作用是什么?
4.如何將命令行中的參數配置到變量conf中?
5.哪個方法會獲得傳入的參數?
GenericOptionsParser類是將命令行中參數自動設置到變量conf中。其構造方法內部調用parseGeneralOptions()對傳入的參數進行解析。
Configuration conf = new Configuration(); String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
6.如何在命令行指定reduce的個數?
命令行配置參數的規則是:-D加MapReduce的配置選項,當然還支持-fs等其他參數傳入。
7.默認情況map、reduce為幾?
默認情況下Reduce的數目為1,Map的數目也為1。
8.setJarByClass的作用是什么?
setJarByClass()首先判斷當前Job的狀態是否是運行中,接着通過class找到其所屬的jar文件,將jar路徑賦值給mapreduce.job.jar屬性。至於尋找jar文件的方法,則是通過classloader獲取類路徑下的資源文件,進行循環遍歷。具體實現見ClassUtil類中的findContainingJar方法。
9.如果想在控制台打印job(maoreduce)當前的進度,需要設置哪個參數?
如果想在控制台打印當前的進度,則設置job.waitForCompletion(true)的參數為true。
10.配置了哪個參數,在提交job的時候,會創建一個YARNRunner對象來進行任務的提交?
如果當前在HDFS的配置文件中配置了mapreduce.framework.name屬性為“yarn”的話,會創建一個YARNRunner對象來進行任務的提交。
11.哪個類實現了讀取yarn-site.xml、core-site.xml等配置文件中的配置屬性的?
12.JobSubmitter類中的哪個方法實現了把job提交到集群?
JobSubmitter類中的submitJobInternal()方法。
13.DistributedCache在mapreduce中發揮了什么作用?
文件上傳到HDFS之后,還要被DistributedCache進行緩存起來。這是因為計算節點收到該作業的第一個任務后,就會用DistributedCache自動將作業文件Cache到節點本地目錄下,並且會對壓縮文件進行解壓,如:.zip,.jar,.tar等等,然后開始任務。最后,對於同一個計算節點接下來收到的任務,DistributedCache不會重復去下載作業文件,而是直接運行任務。如果一個作業的任務數很多,這種設計避免了在同一個節點上對用一個job的文件會下載多次,大大提高了任務運行的效率。
14.對每個輸入文件進行split划分,是物理划分還是邏輯划分,他們有什么區別?
邏輯划分。存儲時分塊Block是物理划分。
15.分片的大小有哪些因素來決定?
16.分片是如何計算得來的?