我們會定義Job,我們會定義map和reduce程序。那么,這個Job到底是怎么提交的?提交到哪去了?它到底和集群怎么進行交互的呢?
這篇文章將從頭講起。
開發hadoop的程序時,一共有三大塊,也就是Driver、map、reduce,在Driver中,我們要定義Configuration,定義Job,在mian方法最后,往往會以這么一段代碼結尾:
if (!job.waitForCompletion(true)) return;
而這句的作用,就是提交了我們的Job。進入代碼里(其實就是Job類)我們可以看到具體實現:
public boolean waitForCompletion(boolean verbose
) throws IOException, InterruptedException,
ClassNotFoundException {
if (state == JobState.DEFINE) {
//這句是重點,提交。那么從代碼里看出這個似乎是異步提交啊,否則后面的監測怎么執行呢?我們拭目以待
submit();
}
if (verbose) {
monitorAndPrintJob();
} else {
// get the completion poll interval from the client.
//從配置里取得輪訓的間隔時間,來分析當前job是否執行完畢
int completionPollIntervalMillis =
Job.getCompletionPollInterval(cluster.getConf());
while (!isComplete()) {
try {
Thread.sleep(completionPollIntervalMillis);
} catch (InterruptedException ie) {
}
}
}
return isSuccessful();
}
依然在Job.class里,這個方法主要動作有二,一是找到集群,二是講Job提交到集群
public void submit()
throws IOException, InterruptedException, ClassNotFoundException {
ensureState(JobState.DEFINE);
setUseNewAPI();
//連接集群/master
connect();
//構造提交器
final JobSubmitter submitter =
getJobSubmitter(cluster.getFileSystem(), cluster.getClient());
status = ugi.doAs(new PrivilegedExceptionAction<JobStatus>() {
public JobStatus run() throws IOException, InterruptedException,
ClassNotFoundException {
//提交
return submitter.submitJobInternal(Job.this, cluster);
}
});
state = JobState.RUNNING;
LOG.info("The url to track the job: " + getTrackingURL());
}
我們繼續往下看,看下提交的時候都做了什么?
JobStatus submitJobInternal(Job job, Cluster cluster)
throws ClassNotFoundException, InterruptedException, IOException {
// 檢查輸出目錄合法性(已存在?沒指定?),這就是為什么每次提交作業,總是這個 錯比較靠前的報出來
checkSpecs(job);
Configuration conf = job.getConfiguration();
// 將框架提交到集群緩存(具體左右還未知?)
addMRFrameworkToDistributedCache(conf);
// 獲得登錄區,用以存放作業執行過程中用到的文件,默認位置/tmp/hadoop-yarn/staging/root/.staging
// ,可通過yarn.app.mapreduce.am.staging-dir修改
Path jobStagingArea = JobSubmissionFiles.getStagingDir(cluster, conf);
// configure the command line options correctly on the submitting dfs
// 這是獲取和設置提交job機器的地址和主機名
InetAddress ip = InetAddress.getLocalHost();
if (ip != null) {
submitHostAddress = ip.getHostAddress();
submitHostName = ip.getHostName();
conf.set(MRJobConfig.JOB_SUBMITHOST, submitHostName);
conf.set(MRJobConfig.JOB_SUBMITHOSTADDR, submitHostAddress);
}
// 取得當前Job的ID(后面詳細關注此處)
JobID jobId = submitClient.getNewJobID();
job.setJobID(jobId);
// 作業提交目錄
Path submitJobDir = new Path(jobStagingArea, jobId.toString());
JobStatus status = null;
try {
conf.set(MRJobConfig.USER_NAME, UserGroupInformation.getCurrentUser().getShortUserName());
conf.set("hadoop.http.filter.initializers",
"org.apache.hadoop.yarn.server.webproxy.amfilter.AmFilterInitializer");
conf.set(MRJobConfig.MAPREDUCE_JOB_DIR, submitJobDir.toString());
LOG.debug("Configuring job " + jobId + " with " + submitJobDir + " as the submit dir");
// get delegation token for the dir
TokenCache.obtainTokensForNamenodes(job.getCredentials(), new Path[] { submitJobDir }, conf);
populateTokenCache(conf, job.getCredentials());
// generate a secret to authenticate shuffle transfers
if (TokenCache.getShuffleSecretKey(job.getCredentials()) == null) {
KeyGenerator keyGen;
try {
int keyLen = CryptoUtils.isShuffleEncrypted(conf)
? conf.getInt(MRJobConfig.MR_ENCRYPTED_INTERMEDIATE_DATA_KEY_SIZE_BITS,
MRJobConfig.DEFAULT_MR_ENCRYPTED_INTERMEDIATE_DATA_KEY_SIZE_BITS)
: SHUFFLE_KEY_LENGTH;
keyGen = KeyGenerator.getInstance(SHUFFLE_KEYGEN_ALGORITHM);
keyGen.init(keyLen);
} catch (NoSuchAlgorithmException e) {
throw new IOException("Error generating shuffle secret key", e);
}
SecretKey shuffleKey = keyGen.generateKey();
TokenCache.setShuffleSecretKey(shuffleKey.getEncoded(), job.getCredentials());
}
// 從本地copy文件到hdfs,比如我們提交的wordcount.jar
copyAndConfigureFiles(job, submitJobDir);
Path submitJobFile = JobSubmissionFiles.getJobConfPath(submitJobDir);
// Create the splits for the job,其實也就是確定了map的數量
LOG.debug("Creating splits at " + jtFs.makeQualified(submitJobDir));
int maps = writeSplits(job, submitJobDir);
conf.setInt(MRJobConfig.NUM_MAPS, maps);
LOG.info("number of splits:" + maps);
// write "queue admins of the queue to which job is being submitted"
// to job file.
String queue = conf.get(MRJobConfig.QUEUE_NAME, JobConf.DEFAULT_QUEUE_NAME);
AccessControlList acl = submitClient.getQueueAdmins(queue);
conf.set(toFullPropertyName(queue, QueueACL.ADMINISTER_JOBS.getAclName()), acl.getAclString());
// removing jobtoken referrals before copying the jobconf to HDFS
// as the tasks don't need this setting, actually they may break
// because of it if present as the referral will point to a
// different job.
TokenCache.cleanUpTokenReferral(conf);
if (conf.getBoolean(MRJobConfig.JOB_TOKEN_TRACKING_IDS_ENABLED,
MRJobConfig.DEFAULT_JOB_TOKEN_TRACKING_IDS_ENABLED)) {
// Add HDFS tracking ids
ArrayList<String> trackingIds = new ArrayList<String>();
for (Token<? extends TokenIdentifier> t : job.getCredentials().getAllTokens()) {
trackingIds.add(t.decodeIdentifier().getTrackingId());
}
conf.setStrings(MRJobConfig.JOB_TOKEN_TRACKING_IDS,
trackingIds.toArray(new String[trackingIds.size()]));
}
// Set reservation info if it exists
ReservationId reservationId = job.getReservationId();
if (reservationId != null) {
conf.set(MRJobConfig.RESERVATION_ID, reservationId.toString());
}
// Write job file to submit dir
writeConf(conf, submitJobFile);
//
// Now, actually submit the job (using the submit name)
//
printTokens(jobId, job.getCredentials());
//提交!!!!!!!!
status = submitClient.submitJob(jobId, submitJobDir.toString(), job.getCredentials());
if (status != null) {
return status;
} else {
throw new IOException("Could not launch job");
}
} finally {
if (status == null) {
LOG.info("Cleaning up the staging area " + submitJobDir);
if (jtFs != null && submitJobDir != null)
jtFs.delete(submitJobDir, true);
}
}
}
那么這個最終提交用到的submitClient是哪來的?他是怎么定義的?
它是上文提到的,連接集群的時候創建的。這個集群定義了很多信息:客戶端信息、用戶組信息、文件系統信息,配置信息,歷史job目錄,系統目錄等。其中客戶端信息,提供了初始化方法,如下:
public Cluster(InetSocketAddress jobTrackAddr, Configuration conf)
throws IOException {
this.conf = conf;
this.ugi = UserGroupInformation.getCurrentUser();
//初始化是重點
initialize(jobTrackAddr, conf);
}
具體看下初始化過程:
private void initialize(InetSocketAddress jobTrackAddr, Configuration conf)
throws IOException {
synchronized (frameworkLoader) {
for (ClientProtocolProvider provider : frameworkLoader) {
LOG.debug("Trying ClientProtocolProvider : "
+ provider.getClass().getName());
//根據配置,創建客戶端協議提供者
ClientProtocol clientProtocol = null;
try {
if (jobTrackAddr == null) {
//提供者返回的是一個具體的協議
clientProtocol = provider.create(conf);
} else {
clientProtocol = provider.create(jobTrackAddr, conf);
}
if (clientProtocol != null) {
clientProtocolProvider = provider;
//看到沒?協議是什么?協議其實就是個類,里面封裝了一些約定好的屬性,以及操作這些屬性的方法。實例化為對象后,就是一個可用於通信的客戶端
client = clientProtocol;
LOG.debug("Picked " + provider.getClass().getName()
+ " as the ClientProtocolProvider");
break;
}
else {
LOG.debug("Cannot pick " + provider.getClass().getName()
+ " as the ClientProtocolProvider - returned null protocol");
}
}
catch (Exception e) {
LOG.info("Failed to use " + provider.getClass().getName()
+ " due to error: " + e.getMessage());
}
}
}
if (null == clientProtocolProvider || null == client) {
throw new IOException(
"Cannot initialize Cluster. Please check your configuration for "
+ MRConfig.FRAMEWORK_NAME
+ " and the correspond server addresses.");
}
}
創建客戶端協議提供者,用java.util.ServiceLoader,目前包含兩個具體實現,LocalClientProtocolProvider(本地作業) YarnClientProtocolProvider(Yarn作業),此處會根據mapreduce.framework.name的配置選擇使用哪個創建相應的客戶端。
而YarnClientProtocolProvider的本質是創建了一個YarnRunner對象
public ClientProtocol create(Configuration conf) throws IOException {
if (MRConfig.YARN_FRAMEWORK_NAME.equals(conf.get(MRConfig.FRAMEWORK_NAME))) {
return new YARNRunner(conf);
}
return null;
}
YarnRunner對象是干什么的?根據注釋解釋,是讓當前JobClient在yarn上運行的。提供一些提交Job啊,殺死Job之類的方法。它實現了ClientProtocol接口,上面講的提交的最后一步,其實最終就是調用了YarnRunner的submitJob方法。
它里面封裝了ResourceMgrDelegate委托,委托的方法正是YarnClient類里的提交方法submitApplication。這樣,當前作業(Application)提交過程,走到了YarnClient階段。
總結:Job目前提交到了YarnClient實例中。那么YarnClient接下來怎么處理呢?
