根據wordcount進行分析:
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import java.io.IOException; /** * @author: LUGH1 * @date: 2019-4-8 * @description: */
public class WordCount { public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { Configuration conf = new Configuration(); conf.set("fs.defaultFS","hdfs://192.168.88.130:9000"); Job job = Job.getInstance(conf); job.setJarByClass(WordCount.class); job.setMapperClass(WdMapper.class); job.setReducerClass(WdReducer.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(IntWritable.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); FileInputFormat.setInputPaths(job, new Path("/test/word.txt")); FileOutputFormat.setOutputPath(job, new Path("/test/output")); boolean result = job.waitForCompletion(true); System.exit(result?0:1); System.out.println("good job"); } } class WdMapper extends Mapper<Object, Text, Text, IntWritable> { @Override protected void map(Object key, Text value, Context context) throws IOException, InterruptedException { String line = value.toString(); String[] split = line.split(" "); for(String word : split){ context.write(new Text(word), new IntWritable(1)); } } } class WdReducer extends Reducer<Text, IntWritable, Text, IntWritable> { @Override protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { int count = 0; for(IntWritable i : values){ count += i.get(); } context.write(key,new IntWritable(count)); } }
這上面是個簡單wordcount的代碼,這里就不一一說明了,我們首先看main方法:獲取一個job對象,然后經過一系列的設置,最后調用waitForCompletion方法
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { //....省略具體代碼..... boolean result = job.waitForCompletion(true); //調用由Job類提供的方法waitForCompletion()提交作業 System.exit(result?0:1);
}
接下來我們看下一調用waitForCompletion方法的這個類Job(由於類的內容很多,這里只展示我們需要的部分):
public class Job extends JobContextImpl implements JobContext { private static final Log LOG = LogFactory.getLog(Job.class); public static enum JobState {DEFINE, RUNNING}; //定義兩種狀態 private static final long MAX_JOBSTATUS_AGE = 1000 * 2; //表示最多2000毫秒刷新狀態 public static final String OUTPUT_FILTER = "mapreduce.client.output.filter"; public static final String COMPLETION_POLL_INTERVAL_KEY = "mapreduce.client.completion.pollinterval"; static final int DEFAULT_COMPLETION_POLL_INTERVAL = 5000; public static final String PROGRESS_MONITOR_POLL_INTERVAL_KEY ="mapreduce.client.progressmonitor.pollinterval"; static final int DEFAULT_MONITOR_POLL_INTERVAL = 1000; public static final String USED_GENERIC_PARSER = "mapreduce.client.genericoptionsparser.used"; public static final String SUBMIT_REPLICATION = "mapreduce.client.submit.file.replication"; public static final int DEFAULT_SUBMIT_REPLICATION = 10; public static enum TaskStatusFilter { NONE, KILLED, FAILED, SUCCEEDED, ALL } static { ConfigUtil.loadResources(); //加載配置 } private JobState state = JobState.DEFINE; //加載類的時候默認設置狀態為DEFINE狀態 private JobStatus status; private long statustime; private Cluster cluster; private ReservationId reservationId; boolean waitForCompletion(booleanverbose)
submit() setUseNewAPI() connect() getJobSubmitter(FileSystemfs, ClientProtocolsubmitClient) isUber() //是否“拼車”模式(MapTask與ReduceTask在同一節點上) setPartitionerClass()//Mapper的輸出可能要由Partitioner按某種規則分發給多個Reducer setMapSpeculativeExecution() //是否需要有Speculative的Mapper起預備隊的作用 setReduceSpeculativeExecution() //是否需要有Speculative的Reducer起預備隊的作用 setCacheFiles()
}
在Job類中有很多的靜態變量,代碼塊等,我們知道在java中初始化會先加載靜態的這些變量和代碼塊,所以我們在main方法中調用Job job = Job.getInstance(conf);方法的時候,就會對這些靜態的變量和代碼進行加載,這些靜態的變量和代碼塊就是設置一些參數,比如設置job的默認狀態的DEFINE狀態,以及加載一些配置文件,加載配置文件的方法如下:
public static void loadResources() {
addDeprecatedKeys();
Configuration.addDefaultResource("mapred-default.xml");
Configuration.addDefaultResource("mapred-site.xml");
Configuration.addDefaultResource("yarn-default.xml");
Configuration.addDefaultResource("yarn-site.xml");
}
記載配置文件就是加載hadoop的一些配置文件,所以在我們調用waitForCompletion方法之前這些都是已經加載好了的,接下來我們看waitForCompletion方法:
//org.apache.hadoop.mapreduce中的Job類
public boolean waitForCompletion(boolean verbose) throws IOException, InterruptedException, ClassNotFoundException {
if (state == JobState.DEFINE) { //判斷作業是否是DEFINE狀態,防止重復提交作業
submit(); //提交作業
}
if (verbose) { //提交之后監控其運行,直到作業結束
monitorAndPrintJob(); //周期性報告作業進度情況
} else { //要不然就周期行詢問作業是否文成
// get the completion poll interval from the client.
int completionPollIntervalMillis = Job.getCompletionPollInterval(cluster.getConf());
while (!isComplete()) {
try {
Thread.sleep(completionPollIntervalMillis);
} catch (InterruptedException ie) {
}
}
}
return isSuccessful();
}
從作業提交流程的角度看,這個方法的代碼再簡單不過了,實際就是對Job.submit()的調用,只是在調用之前要檢查一下本作業是否處於 DEFINE 狀態,以確保一個作業不會被提交多次。 如上所述,JobState的值只有 DEFINE 和 RUNNING 兩種,具體Job對象創建之初在構造函數Job()中將其設置成 DEFINE,作業提交成功之后就將其改成 RUNNING,這就把門關上了。
在正常的情況下,Job.submit() 很快就會返回,因為這個方法的作用只是把作業提交上去,而無須等待作業的執行和完成。 但是,在Job.submit()返回之后,Job.waitForCompletion()則要等待作業執行完成了以后才會返回。 在等待期間,如果參數verbose為true,就要周期地報告作業執行的進展,或者就只是周期地檢測作業是否已經完成。
所以我們的作業提交流程目前是:
[WordCount.main() -> Job.waitForCompletion() -> Job.submit() ]
那么,接下來,看一看這個submit方法:
public void submit() throws IOException, InterruptedException, ClassNotFoundException { ensureState(JobState.DEFINE); //確保作業的狀態是DEFINE setUseNewAPI(); //根據配置信息是否使用新的API提交 connect(); //用來連接集群,創建Cluster的cluster對象 final JobSubmitter submitter = getJobSubmitter(cluster.getFileSystem(), cluster.getClient());//獲取JobSubmitter的實例對象submitter
status = ugi.doAs(new PrivilegedExceptionAction<JobStatus>() { //ugi.doAs用來控制權限 public JobStatus run() throws IOException, InterruptedException, ClassNotFoundException { return submitter.submitJobInternal(Job.this, cluster); //真正用於提交作業 } }); state = JobState.RUNNING; //設置job的狀態為RUNNING LOG.info("The url to track the job: " + getTrackingURL()); }
接下來我們先看connect方法:
private synchronized void connect() throws IOException, InterruptedException, ClassNotFoundException { if (cluster == null) { //如果cluter為空,我們就創建一個cluster實例 cluster = ugi.doAs(new PrivilegedExceptionAction<Cluster>() { public Cluster run() throws IOException, InterruptedException, ClassNotFoundException { return new Cluster(getConfiguration()); //創建cluster } }); } }
可見connect()的作用就是保證節點上有個Cluster類對象,如果還沒有,就創建一個。 那我們就看一下Cluster這個類(列出一部分):
public class Cluster { @InterfaceStability.Evolving public static enum JobTrackerStatus {INITIALIZING, RUNNING}; //作業跟蹤狀態
private ClientProtocolProvider clientProtocolProvider; //集群版為YarnClientProtocolProvider ,本地模式為LocalClientProtocolProvider
private ClientProtocol client; //在集群條件下,這是與外界通信的渠道和規則
private UserGroupInformation ugi; //用來控制權限
private Configuration conf; //配置信息
private FileSystem fs = null; //文件系統
private Path sysDir = null; //系統目錄
private Path stagingAreaDir = null; private Path jobHistoryDir = null; //歷史作業目錄
private static final Log LOG = LogFactory.getLog(Cluster.class); //ServiceLoader<ClientProtocolProvider>,就是針對 //ClientProtocolProvider類的ServiceLoader,而且這就是通過ServiceLoaderl.oad()裝載的ServiceLoader實現了Iterable界面,
//提供一個iterator()函數,因而可以用在for循環中。 //它還提供了一個load()方法,可以通過ClassLoader加載Class
private static ServiceLoader<ClientProtocolProvider> frameworkLoader = ServiceLoader.load(ClientProtocolProvider.class); static { ConfigUtil.loadResources(); //加載配置文件
} //構造器
public Cluster(Configuration conf) throws IOException { this(null, conf); } //構造器
public Cluster(InetSocketAddress jobTrackAddr, Configuration conf) throws IOException { this.conf = conf; this.ugi = UserGroupInformation.getCurrentUser(); initialize(jobTrackAddr, conf); //調用initialize方法
} //目的是要創建ClientProtocolProvider和ClientProtocol
private void initialize(InetSocketAddress jobTrackAddr, Configuration conf) throws IOException { synchronized (frameworkLoader) { //不允許多個線程同時進入此段代碼,需要加鎖
for (ClientProtocolProvider provider : frameworkLoader) { //遍歷frameworkLoader獲取provider
LOG.debug("Trying ClientProtocolProvider : " + provider.getClass().getName()); ClientProtocol clientProtocol = null; try { if (jobTrackAddr == null) { //通過ClientProtocolProvider的create方法創建clientProtocol
clientProtocol = provider.create(conf); } else { clientProtocol = provider.create(jobTrackAddr, conf); } if (clientProtocol != null) { clientProtocolProvider = provider; client = clientProtocol; //已經創建了ClientProtocol對象,YARNRunner或LocalJobRunner
LOG.debug("Picked " + provider.getClass().getName() + " as the ClientProtocolProvider"); break; //成功后結束循環
} else { //失敗,記錄日志
LOG.debug("Cannot pick " + provider.getClass().getName() + " as the ClientProtocolProvider - returned null protocol"); } } catch (Exception e) { LOG.info("Failed to use " + provider.getClass().getName() + " due to error: ", e); } } } if (null == clientProtocolProvider || null == client) { //判斷是否創建了ClientProtocolProvider和ClientProtocol對象
throw new IOException( "Cannot initialize Cluster. Please check your configuration for "
+ MRConfig.FRAMEWORK_NAME + " and the correspond server addresses."); } }
那么知道job類的connect方法就是確保有實例cluster,如果沒有就通過Cluster的構造函數進行創建,在創建之前需要加載一些配置信息ConfigUtil.loadResources()和對靜態的變量frameworkLoader等賦值,然后在調用Cluster的構造方法,在Cluster的構造方法中必定調用Cluster.initialize()方法,其中ClientProtocolProvider和ClientProtocol:用戶向RM節點提交作業,是要RM為其安排運行,所以RM起着服務提供者的作用,而用戶則處於客戶的位置。既然如此,雙方就得有個協議,對於雙方怎么交互,乃至服務怎么提供,都得有個規定。在Hadoop的代碼中,這所謂Protocol甚至被“上綱上線”到了計算框架的高度,連是否采用YARN框架也被納入了這個范疇。實際上ClientProtocol就起着這樣的作用,而ClientProtocolProvider顧名思義是ClientProtocol的提供者,起着有點像是Factory的作用。
至於ServiceLoader<ClientProtocolProvider>,那是用來裝載ClientProtocolProvider的。
我們首先看一下這個類ClientProtocolProvider,很明顯是一個抽象類,這意味着只有繼承和擴充了這個抽象類的具體類才能被實體化成對象:
public abstract class ClientProtocolProvider { public abstract ClientProtocol create(Configuration conf) throws IOException; public abstract ClientProtocol create(InetSocketAddress addr, Configuration conf) throws IOException; public abstract void close(ClientProtocol clientProtocol) throws IOException; }
接下來我們看看這個抽象類的兩個子類YarnClientProtocolProvider和LocalClientProtocolProvider
package org.apache.hadoop.mapred; public class YarnClientProtocolProvider extends ClientProtocolProvider { @Override public ClientProtocol create(Configuration conf) throws IOException { if (MRConfig.YARN_FRAMEWORK_NAME.equals(conf.get(MRConfig.FRAMEWORK_NAME))) { return new YARNRunner(conf); //YARNRunner實現了ClientProtocol接口
} return null; } @Override
public ClientProtocol create(InetSocketAddress addr, Configuration conf) throws IOException { return create(conf); } @Override public void close(ClientProtocol clientProtocol) throws IOException { if (clientProtocol instanceof YARNRunner) { ((YARNRunner)clientProtocol).close(); } }
package org.apache.hadoop.mapred; public class LocalClientProtocolProvider extends ClientProtocolProvider { @Override public ClientProtocol create(Configuration conf) throws IOException { String framework = conf.get(MRConfig.FRAMEWORK_NAME, MRConfig.LOCAL_FRAMEWORK_NAME); if (!MRConfig.LOCAL_FRAMEWORK_NAME.equals(framework)) { return null; } conf.setInt(JobContext.NUM_MAPS, 1); //map數為1
return new LocalJobRunner(conf); //LocalJobRunner實現了ClientProtocol接口
} @Override public ClientProtocol create(InetSocketAddress addr, Configuration conf) { return null; // LocalJobRunner doesn't use a socket
} @Override public void close(ClientProtocol clientProtocol) { // no clean up required
}
現在返回來在聊聊Cluster.initialize()方法:
其中ServiceLoader實現了Iterable界面,提供一個iterator()函數,因而可以用在for循環中。它還提供了一個load()方法,可以通過ClassLoader加載Class。此外,它還提供解析文件內容的功能裝載了作為ServiceLoader對象的frameworkLoader,其LinkedHashMap中就有了上述的兩個路徑,這樣就可以通過其iterator()函數依次引用這兩個路徑了
然后,在Cluster類的構造函數中就會調用其initialize(),目的是要創建ClientProtocolProvider和ClientProtocol。
但是ClientProtocolProvider是個抽象類,這意味着只有繼承和擴充了這個抽象類的具體類才能被實體化成對象。Hadoop的源碼中一共只有兩個類擴充和落實了這個抽象類,那就是LocalClientProtocolProvider和YarnClientProtocolProvide
可想而知,由這兩種ClientProtocolProvider提供的ClientProtocol也是不一樣的。事實上ClientProtocol是個界面,實現了這個界面的類也有兩個,分別為LocalJobRunner和YARNRunner。但是實際使用的只能是其中之一。
initialize的for循環,是基於前述ServiceLoader中iterator()的循環。實際上也就是對兩個ClientProtocolProvider的循環,目的是要通過ClientProtocolProvider.create()創建用戶所要求的ClientProtocol,也無非就是LocalJobRunner或YARNRunner。只要有一次創建成功,循環就沒有必要繼續了,因為只能有一種選擇;但是,如果兩次都失敗,程序就無法繼續了,因為不知道該怎樣讓RM提供計算服務。而能否成功創建,則取決於前述配置項的設置。不過ClientProtocolProvider是抽象類,實際上依次進行嘗試的是LocalClientProtocolProvider和YarnClientProtocolProvider。假定第一輪循環時進行嘗試的是前者,那么作業的流程就是:
[WordCount.main() -> Job.waitForCompletion() -> Job.submit() -> Job.connect() -> Cluster.Cluster() -> Cluster.initialize() -> LocalClientProtocolProvider.create()]
如果是后者,則作業的流程就是:
[WordCount.main() -> Job.waitForCompletion() -> Job.submit() -> Job.connect() -> Cluster.Cluster() -> Cluster.initialize() -> YarnClientProtocolProvider.create()]
這里我們假定以yarn方式提交,所以流程為第二種。
通過YarnClientProtocolProvider.create()方法,最終返回的是一個new YARNRunner(conf)對象。
好了,繼續回到我們的Job.submit()方法,到這里connect方法就算執行完畢了,接下就是對getJobSubmitter()的調用。 這個函數創建一個JobSubmitter類對象,然后Jobs. ubmit()就調用它的submitJobInternal()方法,完成作業的提交。創建JobSubmitter對象時的兩個參數就是調用getJobSubmitter()時的兩個參數,就是cluster.getFileSystem()和cluster.getClient()。 其中cluster.getClient()返回的就是 YARNRunner或LocalJobRunner;而cluster.getFileSystem()的返回結果對於 YARNRunner是 RM 節點上文件系統的 URL,對於 LocalJobRunner則是本節點上的一個相對路徑為“mapred/system”的目錄。
接下來了解下JobSubmitter這個類(部分展示):
package org.apache.hadoop.mapreduce; class JobSubmitter { protected static final Log LOG = LogFactory.getLog(JobSubmitter.class); private static final String SHUFFLE_KEYGEN_ALGORITHM = "HmacSHA1"; //shuffle算法 private static final int SHUFFLE_KEY_LENGTH = 64; private FileSystem jtFs; private ClientProtocol submitClient; private String submitHostName; private String submitHostAddress; JobSubmitter(FileSystem submitFs, ClientProtocol submitClient) throws IOException { this.submitClient = submitClient; //在集群條件下是YARNRunner this.jtFs = submitFs; } compareFs(FileSystemsrcFs, FileSystemdestFs) //比較兩個文件系統是否相同 getPathURI() checkSpecs() copyRemoteFiles() copyAndConfigureFiles() copyJar(PathoriginalJarPath, PathsubmitJarFile,shortreplication) addMRFrameworkToDistributedCache() submitJobInternal(Jobjob, Clustercluster) //將作業提交給集群 writeNewSplits(JobContextjob, PathjobSubmitDir)
getJobSubmitter(FileSystem fs, ClientProtocol submitClient)//底層調用的就是JobSubmitter的構造方法
}
接下來看看submitJobInternal方法
JobStatus submitJobInternal(Job job, Cluster cluster) throws ClassNotFoundException, InterruptedException, IOException { //validate the jobs output specs 驗證輸出格式等配置 checkSpecs(job); Configuration conf = job.getConfiguration(); //獲取配置信息 addMRFrameworkToDistributedCache(conf); //添加到緩存 Path jobStagingArea = JobSubmissionFiles.getStagingDir(cluster, conf);// 獲取目錄路徑 //configure the command line options correctly on the submitting dfs InetAddress ip = InetAddress.getLocalHost(); //獲取本節點(該主機)的ip地址 if (ip != null) { submitHostAddress = ip.getHostAddress();//本節點IP地址的字符串形式 submitHostName = ip.getHostName();//本節點名稱 conf.set(MRJobConfig.JOB_SUBMITHOST,submitHostName); //寫入配置conf中 conf.set(MRJobConfig.JOB_SUBMITHOSTADDR,submitHostAddress); } JobID jobId = submitClient.getNewJobID(); //設置JOBId(作業ID唯一) job.setJobID(jobId); //設置job的id Path submitJobDir = new Path(jobStagingArea, jobId.toString());//本作業的臨時子目錄名中包含着作業ID號碼 JobStatus status = null; try { conf.set(MRJobConfig.USER_NAME, UserGroupInformation.getCurrentUser().getShortUserName()); //這是用戶名 conf.set("hadoop.http.filter.initializers", "org.apache.hadoop.yarn.server.webproxy.amfilter.AmFilterInitializer");//准備用於Http接口的過濾器初始化 conf.set(MRJobConfig.MAPREDUCE_JOB_DIR, submitJobDir.toString());//設置提交job的路徑 LOG.debug("Configuring job " + jobId + " with " + submitJobDir + " as the submit dir"); // get delegation token for the dir /* 准備好與訪問權限有關的證件(token) */ TokenCache.obtainTokensForNamenodes(job.getCredentials(), new Path[] { submitJobDir }, conf); //獲取與NameNode打交道所需證件 populateTokenCache(conf, job.getCredentials()); // generate a secret to authenticate shuffle transfers//需要生成Mapper與Reducer之間的數據流動所用的密碼 if (TokenCache.getShuffleSecretKey(job.getCredentials()) == null) { KeyGenerator keyGen; try { keyGen = KeyGenerator.getInstance(SHUFFLE_KEYGEN_ALGORITHM); keyGen.init(SHUFFLE_KEY_LENGTH); } catch (NoSuchAlgorithmException e) { throw new IOException("Error generating shuffle secret key", e); } SecretKey shuffleKey = keyGen.generateKey(); TokenCache.setShuffleSecretKey(shuffleKey.getEncoded(), job.getCredentials()); } if (CryptoUtils.isEncryptedSpillEnabled(conf)) { conf.setInt(MRJobConfig.MR_AM_MAX_ATTEMPTS, 1); LOG.warn("Max job attempts set to 1 since encrypted intermediate" + "data spill is enabled"); } copyAndConfigureFiles(job, submitJobDir);//將可執行文件之類拷貝到HDFS中,默認的是保留10份,會存在不同的節點上 Path submitJobFile = JobSubmissionFiles.getJobConfPath(submitJobDir);//配置文件路徑 // Create the splits for the job LOG.debug("Creating splits at " + jtFs.makeQualified(submitJobDir)); int maps = writeSplits(job, submitJobDir); //設置map數,這里如何設置map的數量我會單獨寫一篇介紹, conf.setInt(MRJobConfig.NUM_MAPS, maps); LOG.info("number of splits:" + maps); // write "queue admins of the queue to which job is being submitted" to job file. String queue = conf.get(MRJobConfig.QUEUE_NAME, JobConf.DEFAULT_QUEUE_NAME); //默認作業調度隊列名為“default” AccessControlList acl = submitClient.getQueueAdmins(queue); conf.set(toFullPropertyName(queue, QueueACL.ADMINISTER_JOBS.getAclName()), acl.getAclString()); //設置acl權限 // removing jobtoken referrals before copying the jobconf to HDFS // as the tasks don't need this setting, actually they may break // because of it if present as the referral will point to a // different job. TokenCache.cleanUpTokenReferral(conf); //清楚Token引用的緩存 if (conf.getBoolean( MRJobConfig.JOB_TOKEN_TRACKING_IDS_ENABLED, MRJobConfig.DEFAULT_JOB_TOKEN_TRACKING_IDS_ENABLED)) { // Add HDFS tracking ids 如果啟用了跟蹤機制的話 ArrayList<String> trackingIds = new ArrayList<String>(); for (Token<? extends TokenIdentifier> t : job.getCredentials().getAllTokens()) { trackingIds.add(t.decodeIdentifier().getTrackingId()); //獲取所有相關跟蹤機制 } conf.setStrings(MRJobConfig.JOB_TOKEN_TRACKING_IDS, trackingIds.toArray(new String[trackingIds.size()])); //設置跟蹤機制 } // Set reservation info if it exists設置預設參數(如果有) ReservationId reservationId = job.getReservationId(); if (reservationId != null) { conf.set(MRJobConfig.RESERVATION_ID, reservationId.toString()); } // Write job file to submit dir writeConf(conf, submitJobFile);//將conf的內容寫入一個.xml文件 // // Now, actually submit the job (using the submit name) // printTokens(jobId, job.getCredentials()); //提交作業,通過YarnRunner.submitJob()或LocalJobRunner.submitJob() status = submitClient.submitJob( jobId, submitJobDir.toString(), job.getCredentials()); if (status != null) { return status; //返回狀態 } else { throw new IOException("Could not launch job"); } } finally { if (status == null) { LOG.info("Cleaning up the staging area " + submitJobDir); if (jtFs != null && submitJobDir != null) jtFs.delete(submitJobDir, true); // 刪除臨時目錄 } } }
從submitJobInternal方法可以得知,需要隨同作業單一起提交的資源和信息有兩類:
一類是需要交到資源管理器RM手里,供RM在立項和調度時使用的;
一類則並非供RM直接使用,而是供具體進行計算的節點使用的。前者包括本節點即作業提交者的IP地址、節點名、用戶名、作業ID號,以及有關MapReduce計算輸入數據文件的信息,還有為提交作業而提供的“證章(Token)”等。這些信息將被打包提交給RM,這就是狹義的作業提交,是流程的主體。后者則有作業執行所需的jar可執行文件、外來對象庫等。如果計算的輸入文件在本地,則后者還應包括輸入文件。這些資源並不需要提交給RM,因為RM本身並不需要用到這些資源,但是必須要把這些資源復制或轉移到全局性的HDFS文件系統中,讓具體承擔計算任務的節點能夠取用。
為了上傳相關的資源和信息,需要在HDFS文件系統中為本作業創建一個目錄。HDFS文件系統中有一個目錄是專門用於作業提交的,稱為“舞台目錄(stagingdirectory)”。所以這里要通過JobSubmissionFiles.getStagingDir()從集群獲取這個目錄的路徑。然后就以本作業的ID,即JobId為目錄名在這個舞台目錄中創建一個臨時的子目錄,這就是代碼中的submitJobDir。以后凡是與本作業有關的資源和信息,就都上傳到這個子目錄中。
這個方法還包括設置map數,執行隊列呀等最后執行connect()方法中創建的對象YARNRunner(或者是LocalJobRunner)的submitJob方法。這樣我們的作業就提交給RM了,作業流程如下:
[WordCount.main() -> Job.waitForCompletion() -> Job.submit() -> Job.connect() -> Cluster.Cluster() -> Cluster.initialize() -> YarnClientProtocolProvider.create() -> JobSubmitter.sbumitJobInternal() -> YARNRunner.submitJob()]
可繼續看(hadoop2.7之作業提交詳解(下))