上一小節(http://www.cnblogs.com/lxf20061900/p/3643581.html)講到Job. submit()方法中的:
info = jobClient.submitJobInternal(conf)方法用來上傳資源提交Job的,這一節就講講這個方法。
一、首先jobClient在構造函數中會構造了和JobTracker通信的對象jobSubmitClient,jobSubmitClient是JobSubmissionProtocol類型的動態代理類。JobSubmissionProtocol協議是JobClient與JobTracker通信專用協議。代碼如下:
private static JobSubmissionProtocol createRPCProxy(InetSocketAddress addr, Configuration conf) throws IOException { return (JobSubmissionProtocol) RPC.getProxy(JobSubmissionProtocol.class, JobSubmissionProtocol.versionID, addr, UserGroupInformation.getCurrentUser(), conf, NetUtils.getSocketFactory(conf, JobSubmissionProtocol.class)); }
getProxy方法的關鍵是Invoker類,Invoker類實現了 InvocationHandler接口,主要有兩個成員變量,remoteId是Client.ConnectionId類型,保存連接地址和用戶的 ticket,客戶端連接服務器由<remoteAddress,protocol,ticket>唯一標識。
Invoker類的invoke方法最重要的操作是:ObjectWritable value = (ObjectWritable) client.call(new Invocation(method, args), remoteId)。Invocation實現了Writable接口,並封裝了method和args,使得可以通過RPC傳輸;Client.call方法將Writable參數封裝到一個Call中,並且連接JobTracker后將封裝后call發送過去,同步等待call執行完畢,返回value。
public Writable call(Writable param, ConnectionId remoteId) throws InterruptedException, IOException { Call call = new Call(param); Connection connection = getConnection(remoteId, call); connection.sendParam(call); // send the parameter boolean interrupted = false; synchronized (call) { while (!call.done) { try { call.wait(); // wait for the result } catch (InterruptedException ie) { // save the fact that we were interrupted interrupted = true; } } if (interrupted) { // set the interrupt flag now that we are done waiting Thread.currentThread().interrupt(); } if (call.error != null) { if (call.error instanceof RemoteException) { call.error.fillInStackTrace(); throw call.error; } else { // local exception // use the connection because it will reflect an ip change, unlike // the remoteId throw wrapException(connection.getRemoteAddress(), call.error); } } else { return call.value; } } }
上面的第四行代碼用於建立同JobTracker的連接。而Client.getConnection方法中connection.setupIOstreams()才是真正建立連接的地方,其中的socket是通過默認的SocketFactory .createSocket(),而這個默認的SocketFactory是org.apache.hadoop.net. StandardSocketFactory。
二、jobClient.submitJobInternal(conf)初始化staging目錄(這是job提交的根目錄):Path jobStagingArea=JobSubmissionFiles.getStagingDir(JobClient.this, jobCopy),這個方法最終會調用jobTracker.getStagingAreaDirInternal()方法,代碼如下:
private String getStagingAreaDirInternal(String user) throws IOException { final Path stagingRootDir = new Path(conf.get("mapreduce.jobtracker.staging.root.dir", "/tmp/hadoop/mapred/staging")); final FileSystem fs = stagingRootDir.getFileSystem(conf); return fs.makeQualified(new Path(stagingRootDir, user+"/.staging")).toString(); }
三、從JobTracker獲取JobID。JobID jobId = jobSubmitClient.getNewJobId()。最終調用的是JobTracker.getNewJobId()方法。然后執行Path submitJobDir = new Path(jobStagingArea, jobId.toString());獲得該job提交的路徑,也就是在stagingDir目錄下建一個以jobId為文件名的目錄,可以查看配置文件中的"mapreduce.job.dir"來查看此完整目錄。有了 submitJobDir之后就可以將job運行所需的全部文件上傳到對應的目錄下了,具體是調用 jobClient.copyAndConfigureFiles(jobCopy, submitJobDir)這個方法。
四、copyAndConfigureFiles(jobCopy, submitJobDir)實現上傳文件,包括-tmpfiles(外部文件)、tmpjars(第三方jar包)、tmparchives(一些歸檔文件)以及job.jar拷貝到HDFS中,這個方法最終調用jobClient.copyAndConfigureFiles(job, jobSubmitDir, replication);這個方法實現文件上傳。而前三種文件(tmpfiles(外部文件)、tmpjars(第三方jar包)、tmparchives(一些歸檔文件))的實際上傳過程在copyRemoteFiles方法中,通過FileUtil.copy完成拷貝,這三種文件都是先分割文件列表后分別上傳(每一類文件可以有多個)。然后是:
// First we check whether the cached archives and files are legal. TrackerDistributedCacheManager.validate(job); // set the timestamps of the archives and files TrackerDistributedCacheManager.determineTimestamps(job); // set the public/private visibility of the archives and files TrackerDistributedCacheManager.determineCacheVisibilities(job); // get DelegationTokens for cache files TrackerDistributedCacheManager.getDelegationTokens(job,job.getCredentials());
上面的代碼是進行一些cached archives and files的校驗和保存其時間戳和權限內容
Job.jar通過fs.copyFromLocalFile方法拷貝到HDFS中。而job.jar(這是打包后的作業)文件則是直接通過fs.copyFromLocalFile(new Path(originalJarPath), submitJarFile);上傳完成。我們在提交作業的時候會在本地先打包成jar文件然后將配置文件中的"mapred.jar"設置為本地jar包路徑,當在這里拷貝到HDFS中后在重新將"mapred.jar"設置為HDFS對應job.jar包的路徑。
同時這四個文件都會設置replication個副本,防止熱點出現。
五、然后就會根據我們設置的outputFormat類執行output.checkOutputSpecs(context),進行輸出路徑的檢驗,主要是保證輸出路徑不存在,存在會拋出異常。
六、對輸入文件進行分片操作了,int maps = writeSplits(context, submitJobDir)。writeSplits方法會根據是否使用了新API選擇不同的方法寫:
private int writeSplits(org.apache.hadoop.mapreduce.JobContext job, Path jobSubmitDir) throws IOException, InterruptedException, ClassNotFoundException { JobConf jConf = (JobConf)job.getConfiguration(); int maps; if (jConf.getUseNewMapper()) { maps = writeNewSplits(job, jobSubmitDir); } else { maps = writeOldSplits(jConf, jobSubmitDir); } return maps; }
使用了新API后,會調用writeNewSplits(job, jobSubmitDir)方法,這個方法代碼如下:
private <T extends InputSplit> int writeNewSplits(JobContext job, Path jobSubmitDir) throws IOException, InterruptedException, ClassNotFoundException { Configuration conf = job.getConfiguration(); InputFormat<?, ?> input = ReflectionUtils.newInstance(job.getInputFormatClass(), conf);//默認是TextInputFormat List<InputSplit> splits = input.getSplits(job); T[] array = (T[]) splits.toArray(new InputSplit[splits.size()]); // sort the splits into order based on size, so that the biggest // go first,大文件優先處理 Arrays.sort(array, new SplitComparator()); JobSplitWriter.createSplitFiles(jobSubmitDir, conf, jobSubmitDir.getFileSystem(conf), array); return array.length;//這是mapper的數量 }
可以看出該方法首先獲取splits數組信息后,排序,將會優先處理大文件。JobSplitWriter.createSplitFiles(jobSubmitDir, conf, jobSubmitDir.getFileSystem(conf), array)方法會將split信息和SplitMetaInfo都寫入HDFS中,其代碼如下:
public static <T extends InputSplit> void createSplitFiles(Path jobSubmitDir, Configuration conf, FileSystem fs, T[] splits) throws IOException, InterruptedException { FSDataOutputStream out = createFile(fs, JobSubmissionFiles.getJobSplitFile(jobSubmitDir), conf); SplitMetaInfo[] info = writeNewSplits(conf, splits, out); out.close(); writeJobSplitMetaInfo(fs,JobSubmissionFiles.getJobSplitMetaFile(jobSubmitDir), new FsPermission(JobSubmissionFiles.JOB_FILE_PERMISSION), splitVersion, info); }
如上writeNewSplits會將信息寫入job.split文件,然后返回SplitMetaInfo數組信息,再通過writeJobSplitMetaInfo方法將SplitMetaInfo信息寫入job.splitmetainfo中。
七、然后將配置文件寫入:jobCopy.writeXml(out);//寫"job.xml"。
八、通過 jobSubmitClient.submitJob(jobId, submitJobDir.toString(), jobCopy.getCredentials())提交job,最終調用的是JobTracker.submitJob。
九、返回一個NetworkedJob(status, prof, jobSubmitClient)對象,它實現了RunningJob接口。這個對象可以在JobClient端(比如eclipse,不斷的打印運行日志)。
ps:
一、hadoop版本是1.0.0;
二、上述文件的提交目錄可以在web ui中打開相應作業的配置文件查找"mapreduce.job.dir",就可以看到文件的上傳目錄。比如:hdfs://XXXX:8020/user/hadoop/.staging/job_201403141637_0160
下一節關注上述的步驟八。
錯誤之處還望大伙指點
參考:
http://www.kankanews.com/ICkengine/archives/87415.shtml