Hadoop基礎---shuffle機制(進一步理解Hadoop機制)


一:MapReduce框架 (結合YARN框架)

補充:MapReduce框架知道我們寫的map-reduce程序的運行邏輯。我們寫的map-reduce中並沒有管理層的任務運行分配邏輯,該邏輯被封裝在MapReduce框架里面,被封裝為MRAppMaster類,該類用於管理整個map-reduce的運行邏輯。(map-reduce程序的管理者)

MRAppMaster由YARN框架啟動(動態啟動,隨機選取)

(一)框架流程圖

注:MRAppMaster和yarnChild(包括map task和reduce task)都是動態產生的。

注意:yarn框架只做資源的管理,如果要運行一個程序,則會為該程序分配節點、內存、cpu等資源,至於該程序如何運行,yarn框架不進行管理。故也不會知道mapreduce的運行邏輯 。同樣因為這樣的松耦合,yarn框架的使用范圍更加廣泛,可以兼容其他運行程序。

補充:MapReduce框架知道我們寫的map-reduce程序的運行邏輯。我們寫的map-reduce中並沒有管理層的任務運行分配邏輯,該邏輯被封裝在MapReduce框架里面,被封裝為MRAppMaster類,該類用於管理整個map-reduce的運行邏輯。(map-reduce程序的管理者)

重點:步驟6中,由NodeManager主動發送心跳包,去ResourceManager檢測是否有job任務,只當該NodeManager(即DataNode)有相關資源時,才會領取該job

MRAppMaster由YARN框架啟動(動態啟動,隨機選取)

二:map task並發機制---split切片

1.若是一個block對應一個map任務,則如是文件夾下有眾多小文件(即眾多block),若是map進程過多,則效率太低

2.若是一個block過大,則使用一個map進程,則效率也會太低

因此,將block物理層,抽象為split切片邏輯層,可以更好的實現map任務並發數量控制

三:提交任務時獲取切片split信息源碼分析

job.waitForCompletion(true)

(一) job.class中方法waitForCompletion

  public boolean waitForCompletion(boolean verbose
                                   ) throws IOException, InterruptedException,
                                            ClassNotFoundException {
    if (state == JobState.DEFINE) {
      submit();
    }
    return isSuccessful();
  }

(二)job.class中方法submit

  public void submit() 
         throws IOException, InterruptedException, ClassNotFoundException {
        return submitter.submitJobInternal(Job.this, cluster);
   }

(三)jobsummitter.class中submitJobInternal方法

  JobStatus submitJobInternal(Job job, Cluster cluster){
    Path jobStagingArea = JobSubmissionFiles.getStagingDir(cluster, conf);  //從ResourceManager中獲取資源配置存放路徑
    JobID jobId = submitClient.getNewJobID();  //獲取jobid,用於創建目錄
    job.setJobID(jobId);
    Path submitJobDir = new Path(jobStagingArea, jobId.toString());  //生成完整路徑
    JobStatus status = null;
    try {
       copyAndConfigureFiles(job, submitJobDir);  //提交jar包和配置文件到資源hdfs配置路徑
    int maps = writeSplits(job, submitJobDir);  //獲取切片信息,返回要啟動的map任務數量 // Write job file to submit dir
      writeConf(conf, submitJobFile);  //寫描述文件xml到hdfs配置路徑
    }
  }

(四)jobsummitter.class中writeSplits方法,返回要啟動的map任務數量

  private int writeSplits(org.apache.hadoop.mapreduce.JobContext job,
      Path jobSubmitDir) throws IOException,
      InterruptedException, ClassNotFoundException {
    JobConf jConf = (JobConf)job.getConfiguration();
    int maps;
    maps = writeNewSplits(job, jobSubmitDir); return maps;
  }

(五)jobsummitter.class中writeNewSplits方法

  private <T extends InputSplit>
  int writeNewSplits(JobContext job, Path jobSubmitDir) throws IOException,
      InterruptedException, ClassNotFoundException {
    Configuration conf = job.getConfiguration();
 InputFormat<?, ?> input =    //通過反射獲取InputFormat實例---默認從TextInputFormat中獲取 ReflectionUtils.newInstance(job.getInputFormatClass(), conf);

    List<InputSplit> splits = input.getSplits(job);    //從實例中獲取切片信息(有多個),放在list
    T[] array = (T[]) splits.toArray(new InputSplit[splits.size()]);

    // sort the splits into order based on size, so that the biggest
    // go first
    Arrays.sort(array, new SplitComparator());
    JobSplitWriter.createSplitFiles(jobSubmitDir, conf, 
        jobSubmitDir.getFileSystem(conf), array);
    return array.length;
  }

(六)FileInputFormat.class中getSplits方法,獲取切片信息《重點》

  public List<InputSplit> getSplits(JobContext job) throws IOException {
    StopWatch sw = new StopWatch().start();
    long minSize = Math.max(getFormatMinSplitSize(), getMinSplitSize(job));
    long maxSize = getMaxSplitSize(job);

    // generate splits  生成切片
    List<InputSplit> splits = new ArrayList<InputSplit>();  //list存放切片信息
    List<FileStatus> files = listStatus(job);  //獲取:該job,所需要的輸入數據所在的目錄下的文件列表 for (FileStatus file: files) {  //遍歷所有的文件
      Path path = file.getPath();  //獲取文件完整路徑 hdfs://hadoopH1:9000/wc/input/wcdata.txt long length = file.getLen();  //獲取文件大小 160 if (length != 0) {  //處理有內容的文件
        BlockLocation[] blkLocations;  //獲取文件block信息---包括偏移量起止,主機名信息 if (file instanceof LocatedFileStatus) {  
          blkLocations = ((LocatedFileStatus) file).getBlockLocations(); 
        } else {
          FileSystem fs = path.getFileSystem(job.getConfiguration());
          blkLocations = fs.getFileBlockLocations(file, 0, length);
        }
        if (isSplitable(job, path)) {  //查看該文件是否可以被切片,某些文件不允許切片 long blockSize = file.getBlockSize();  //獲取文件塊默認大小128M long splitSize = computeSplitSize(blockSize, minSize, maxSize);  //計算切片大小,詳細見(七),返回大小128M long bytesRemaining = length;  //剩余未處理(未切片)字節數為160字節 while (((double) bytesRemaining)/splitSize > SPLIT_SLOP) {  //循環進行切片,若是剩余字節/切片大小>SPLIT_SLOP則進行切片,其中SPLIT_SLOP為1.1 int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining);
            splits.add(makeSplit(path, length-bytesRemaining, splitSize,
                        blkLocations[blkIndex].getHosts(),
                        blkLocations[blkIndex].getCachedHosts()));
            bytesRemaining -= splitSize;
          }

          if (bytesRemaining != 0) {  //單獨對最后部分的剩余字節進行切片處理  此處bytesRemaining大小160字節,直接到此處切片 int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining);
            splits.add(makeSplit(path, length-bytesRemaining, bytesRemaining,
                       blkLocations[blkIndex].getHosts(),
                       blkLocations[blkIndex].getCachedHosts()));
          }
        } else { // not splitable
          splits.add(makeSplit(path, 0, length, blkLocations[0].getHosts(),
                      blkLocations[0].getCachedHosts()));
        }
      } else { 
        //Create empty hosts array for zero length files
        splits.add(makeSplit(path, 0, length, new String[0]));
      }
    }
    // Save the number of input files for metrics/loadgen
    job.getConfiguration().setLong(NUM_INPUT_FILES, files.size());
    sw.stop();
    return splits;
  }

1.getFormatMinSplitSIze方法獲取該格式最小切片字節數

  protected long getFormatMinSplitSize() {
    return 1;
  }

2.getMinSplitSize獲取切片最小值

  public static long getMinSplitSize(JobContext job) {
    return job.getConfiguration().getLong(SPLIT_MINSIZE, 1L);
  }

3.getMaxSplitSize獲取切片最大值

  public static long getMaxSplitSize(JobContext context) {
    return context.getConfiguration().getLong(SPLIT_MAXSIZE, 
                                              Long.MAX_VALUE);
  }

4.List<FileStatus> files = listStatus(job);  //獲取:該job,所需要的輸入數據所在的目錄下的文件列表

        //指定要處理的輸入數據存放的路徑
        FileInputFormat.setInputPaths(wcjob, new Path("hdfs://hadoopH1:9000/wc/input"));        

[
  LocatedFileStatus{
    path
=hdfs://hadoopH1:9000/wc/input/wcdata.txt;
    isDirectory=false;
    length=160;
    replication=1;
    blocksize=134217728;
    modification_time=1582019683334;
    access_time=1582336696735;
    owner=hadoop;
    group=supergroup;
    permission=rw-r--r--;
    isSymlink=false}
]

5.getBlockLocations獲取文件偏移量信息

        BlockLocation[] blkLocations;
        if (file instanceof LocatedFileStatus) {
          blkLocations = ((LocatedFileStatus) file).getBlockLocations();
        } else {
          FileSystem fs = path.getFileSystem(job.getConfiguration());
          blkLocations = fs.getFileBlockLocations(file, 0, length);
        }

6.切片處理(重點),規划切片信息

          long blockSize = file.getBlockSize();  //獲取文件塊默認大小128M
          long splitSize = computeSplitSize(blockSize, minSize, maxSize);  //計算切片大小,詳細見(七),返回大小128M

          long bytesRemaining = length;  //剩余未處理(未切片)字節數為160字節
          while (((double) bytesRemaining)/splitSize > SPLIT_SLOP) {  //循環進行切片,若是剩余字節/切片大小>SPLIT_SLOP則進行切片,其中SPLIT_SLOP為1.1
            int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining);
            splits.add(makeSplit(path, length-bytesRemaining, splitSize,
                        blkLocations[blkIndex].getHosts(),
                        blkLocations[blkIndex].getCachedHosts()));
            bytesRemaining -= splitSize;
          }

          if (bytesRemaining != 0) {  //單獨對最后部分的剩余字節進行切片處理  此處bytesRemaining大小160字節,直接到此處切片
            int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining);  //傳入block位置,和切片偏移量,獲取該block當前的索引值
            splits.add(makeSplit(path, length-bytesRemaining, bytesRemaining,  //
                       blkLocations[blkIndex].getHosts(),
                       blkLocations[blkIndex].getCachedHosts()));
          }

splits信息:

[hdfs://hadoopH1:9000/wc/input/wcdata.txt:0+160]  //列表 每個元素《切片》后面是切片的起始地址和終止地址

7.getBlockIndex根據切片偏移量,獲取block索引(重點)

  protected int getBlockIndex(BlockLocation[] blkLocations, 
                              long offset) {
    for (int i = 0 ; i < blkLocations.length; i++) {  //獲取該文件偏移量所在的文件塊block中,----邏輯轉物理 // is the offset inside this block? if ((blkLocations[i].getOffset() <= offset) && (offset < blkLocations[i].getOffset() + blkLocations[i].getLength())){ return i; } }
    BlockLocation last = blkLocations[blkLocations.length -1];
    long fileLength = last.getOffset() + last.getLength() -1;
    throw new IllegalArgumentException("Offset " + offset + 
                                       " is outside of file (0.." +
                                       fileLength + ")");
  }

8.makeSplit創建切片

            splits.add(makeSplit(path, length-bytesRemaining, bytesRemaining,  
                       blkLocations[blkIndex].getHosts(),    
                       blkLocations[blkIndex].getCachedHosts()));

傳入信息:

文件路徑 hdfs://hadoopH1:9000/wc/input/wcdata.txt  

切片偏移量 0

剩余字節數 160

當前文件塊所在的主機名 [hadoopH1] 
獲取管理塊的緩存副本的主機列表(主機名) []  因為我們偽分布設置副本為1,所以為空
 

(七)FileInputFormat.class中computeSplitSize方法,計算切片大小

  protected long computeSplitSize(long blockSize, long minSize,
                                  long maxSize) {
    return Math.max(minSize, Math.min(maxSize, blockSize));  //其中blockSize大小默認128M maxSize為long類型最大值,minSize為1
  }

四:Shuffle機制

(一)map task 

1.每個map有一個環形內存緩沖區,用於存儲任務的輸出。默認大小100MB(io.sort.mb屬性),一旦達到閾值0.8(io.sort.spill.percent),一個后台線程把內容寫到磁盤的指定目錄(mapred.local.dir)下的新建的一個溢出寫文件

2.寫磁盤前,要進行分組,排序。存放在內存緩存、溢出寫文件、合並文件中的數據全部是分組、排序后的數據。

3.等map task最后記錄寫完,合並全部溢出寫文件為一個分區且排序的文件

(二)reduce task 

1. reducer通過http方式得到輸出文件的分區。每個reduce task處理一個分組數據

2.TaskTracker為分區文件運行reduce任務。復制階段把Map輸出復制到reducer的內存或者磁盤。一個Map任務完成,reduce就開始復制輸出。

3.map和reduce階段使用歸並方法對各個階段數據進行合並排序操作

五:基於MRappMaster(監控調度機制)實現的shuffle機制

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM