mapreduce job提交流程源碼級分析(二)(原創)


上一小節(http://www.cnblogs.com/lxf20061900/p/3643581.html)講到Job. submit()方法中的:

info = jobClient.submitJobInternal(conf)方法用來上傳資源提交Job的,這一節就講講這個方法。

一、首先jobClient在構造函數中會構造了和JobTracker通信的對象jobSubmitClient,jobSubmitClient是JobSubmissionProtocol類型的動態代理類。JobSubmissionProtocol協議是JobClient與JobTracker通信專用協議。代碼如下:

 private static JobSubmissionProtocol createRPCProxy(InetSocketAddress addr,
      Configuration conf) throws IOException {
    return (JobSubmissionProtocol) RPC.getProxy(JobSubmissionProtocol.class,
        JobSubmissionProtocol.versionID, addr, 
        UserGroupInformation.getCurrentUser(), conf,
        NetUtils.getSocketFactory(conf, JobSubmissionProtocol.class));
  }

   getProxy方法的關鍵是Invoker類,Invoker類實現了 InvocationHandler接口,主要有兩個成員變量,remoteId是Client.ConnectionId類型,保存連接地址和用戶的 ticket,客戶端連接服務器由<remoteAddress,protocol,ticket>唯一標識。

Invoker類的invoke方法最重要的操作是:ObjectWritable value = (ObjectWritable) client.call(new Invocation(method, args), remoteId)。Invocation實現了Writable接口,並封裝了method和args,使得可以通過RPC傳輸;Client.call方法將Writable參數封裝到一個Call中,並且連接JobTracker后將封裝后call發送過去,同步等待call執行完畢,返回value。

 public Writable call(Writable param, ConnectionId remoteId)  
                       throws InterruptedException, IOException {
    Call call = new Call(param);
    Connection connection = getConnection(remoteId, call);
    connection.sendParam(call);                 // send the parameter
    boolean interrupted = false;
    synchronized (call) {
      while (!call.done) {
        try {
          call.wait();                           // wait for the result
        } catch (InterruptedException ie) {
          // save the fact that we were interrupted
          interrupted = true;
        }
      }

      if (interrupted) {
        // set the interrupt flag now that we are done waiting
        Thread.currentThread().interrupt();
      }

      if (call.error != null) {
        if (call.error instanceof RemoteException) {
          call.error.fillInStackTrace();
          throw call.error;
        } else { // local exception
          // use the connection because it will reflect an ip change, unlike
          // the remoteId
          throw wrapException(connection.getRemoteAddress(), call.error);
        }
      } else {
        return call.value;
      }
    }
  }

   上面的第四行代碼用於建立同JobTracker的連接。而Client.getConnection方法中connection.setupIOstreams()才是真正建立連接的地方,其中的socket是通過默認的SocketFactory .createSocket(),而這個默認的SocketFactory是org.apache.hadoop.net. StandardSocketFactory。

二、jobClient.submitJobInternal(conf)初始化staging目錄(這是job提交的根目錄):Path jobStagingArea=JobSubmissionFiles.getStagingDir(JobClient.this, jobCopy),這個方法最終會調用jobTracker.getStagingAreaDirInternal()方法,代碼如下:

  private String getStagingAreaDirInternal(String user) throws IOException {
    final Path stagingRootDir =
      new Path(conf.get("mapreduce.jobtracker.staging.root.dir",
            "/tmp/hadoop/mapred/staging"));
    final FileSystem fs = stagingRootDir.getFileSystem(conf);
    return fs.makeQualified(new Path(stagingRootDir,
                              user+"/.staging")).toString();
  }

三、從JobTracker獲取JobID。JobID jobId = jobSubmitClient.getNewJobId()。最終調用的是JobTracker.getNewJobId()方法。然后執行Path submitJobDir = new Path(jobStagingArea, jobId.toString());獲得該job提交的路徑,也就是在stagingDir目錄下建一個以jobId為文件名的目錄,可以查看配置文件中的"mapreduce.job.dir"來查看此完整目錄。有了 submitJobDir之后就可以將job運行所需的全部文件上傳到對應的目錄下了,具體是調用 jobClient.copyAndConfigureFiles(jobCopy, submitJobDir)這個方法。

四、copyAndConfigureFiles(jobCopy, submitJobDir)實現上傳文件,包括-tmpfiles(外部文件)、tmpjars(第三方jar包)、tmparchives(一些歸檔文件)以及job.jar拷貝到HDFS中,這個方法最終調用jobClient.copyAndConfigureFiles(job, jobSubmitDir, replication);這個方法實現文件上傳。而前三種文件(tmpfiles(外部文件)、tmpjars(第三方jar包)、tmparchives(一些歸檔文件))的實際上傳過程在copyRemoteFiles方法中,通過FileUtil.copy完成拷貝,這三種文件都是先分割文件列表后分別上傳(每一類文件可以有多個)。然后是:

   // First we check whether the cached archives and files are legal.
    TrackerDistributedCacheManager.validate(job);
    //  set the timestamps of the archives and files
    TrackerDistributedCacheManager.determineTimestamps(job);
    //  set the public/private visibility of the archives and files
    TrackerDistributedCacheManager.determineCacheVisibilities(job);
    // get DelegationTokens for cache files
    TrackerDistributedCacheManager.getDelegationTokens(job,job.getCredentials());

上面的代碼是進行一些cached archives and files的校驗和保存其時間戳和權限內容

  Job.jar通過fs.copyFromLocalFile方法拷貝到HDFS中。而job.jar(這是打包后的作業)文件則是直接通過fs.copyFromLocalFile(new Path(originalJarPath), submitJarFile);上傳完成。我們在提交作業的時候會在本地先打包成jar文件然后將配置文件中的"mapred.jar"設置為本地jar包路徑,當在這里拷貝到HDFS中后在重新將"mapred.jar"設置為HDFS對應job.jar包的路徑。

  同時這四個文件都會設置replication個副本,防止熱點出現。

五、然后就會根據我們設置的outputFormat類執行output.checkOutputSpecs(context),進行輸出路徑的檢驗,主要是保證輸出路徑不存在,存在會拋出異常。

六、對輸入文件進行分片操作了,int maps = writeSplits(context, submitJobDir)。writeSplits方法會根據是否使用了新API選擇不同的方法寫:

private int writeSplits(org.apache.hadoop.mapreduce.JobContext job,
      Path jobSubmitDir) throws IOException,
      InterruptedException, ClassNotFoundException {
    JobConf jConf = (JobConf)job.getConfiguration();
    int maps;
    if (jConf.getUseNewMapper()) {
      maps = writeNewSplits(job, jobSubmitDir);
    } else {
      maps = writeOldSplits(jConf, jobSubmitDir);
    }
    return maps;
  }
使用了新API后,會調用writeNewSplits(job, jobSubmitDir)方法,這個方法代碼如下:
 private <T extends InputSplit>
  int writeNewSplits(JobContext job, Path jobSubmitDir) throws IOException,
      InterruptedException, ClassNotFoundException {
    Configuration conf = job.getConfiguration();
    InputFormat<?, ?> input =
      ReflectionUtils.newInstance(job.getInputFormatClass(), conf);//默認是TextInputFormat

    List<InputSplit> splits = input.getSplits(job);
    T[] array = (T[]) splits.toArray(new InputSplit[splits.size()]);

    // sort the splits into order based on size, so that the biggest
    // go first,大文件優先處理
    Arrays.sort(array, new SplitComparator());
    JobSplitWriter.createSplitFiles(jobSubmitDir, conf,
        jobSubmitDir.getFileSystem(conf), array);
    return array.length;//這是mapper的數量
  }
 
        

可以看出該方法首先獲取splits數組信息后,排序,將會優先處理大文件。JobSplitWriter.createSplitFiles(jobSubmitDir, conf, jobSubmitDir.getFileSystem(conf), array)方法會將split信息和SplitMetaInfo都寫入HDFS中,其代碼如下:

public static <T extends InputSplit> void createSplitFiles(Path jobSubmitDir, 
      Configuration conf, FileSystem fs, T[] splits) 
  throws IOException, InterruptedException {
    FSDataOutputStream out = createFile(fs, 
        JobSubmissionFiles.getJobSplitFile(jobSubmitDir), conf);
    SplitMetaInfo[] info = writeNewSplits(conf, splits, out);
    out.close();
    writeJobSplitMetaInfo(fs,JobSubmissionFiles.getJobSplitMetaFile(jobSubmitDir), 
        new FsPermission(JobSubmissionFiles.JOB_FILE_PERMISSION), splitVersion,
        info);
  }

如上writeNewSplits會將信息寫入job.split文件,然后返回SplitMetaInfo數組信息,再通過writeJobSplitMetaInfo方法SplitMetaInfo信息寫入job.splitmetainfo中。

七、然后將配置文件寫入:jobCopy.writeXml(out);//寫"job.xml"。

八、通過 jobSubmitClient.submitJob(jobId, submitJobDir.toString(), jobCopy.getCredentials())提交job,最終調用的是JobTracker.submitJob。

九、返回一個NetworkedJob(status, prof, jobSubmitClient)對象,它實現了RunningJob接口。這個對象可以在JobClient端(比如eclipse,不斷的打印運行日志)。

 

ps:

一、hadoop版本是1.0.0;

二、上述文件的提交目錄可以在web ui中打開相應作業的配置文件查找"mapreduce.job.dir",就可以看到文件的上傳目錄。比如:hdfs://XXXX:8020/user/hadoop/.staging/job_201403141637_0160

 

下一節關注上述的步驟八。

錯誤之處還望大伙指點

 

參考:

http://www.kankanews.com/ICkengine/archives/87415.shtml


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM