大數據學習(5)MapReduce切片(Split)和分區(Partitioner)


MapReduce中,分片、分區、排序和分組(Group)的關系圖:

 

分片大小

對於HDFS中存儲的一個文件,要進行Map處理前,需要將它切分成多個塊,才能分配給不同的MapTask去執行。 分片的數量等於啟動的MapTask的數量。默認情況下,分片的大小就是HDFS的blockSize。

Map階段的對數據文件的切片,使用如下判斷邏輯:

  protected long computeSplitSize(long blockSize, long minSize,
                                  long maxSize) {
    return Math.max(minSize, Math.min(maxSize, blockSize));
  }

blockSize:默認大小是128M(dfs.blocksize

minSize:默認是1byte(mapreduce.input.fileinputformat.split.minsize):

maxSize:默認值是Long.MaxValue(mapreduce.input.fileinputformat.split.minsize)

由此可以看出兩個可以自定義的值(minSize和maxSize)與blockSize之間的關系如下:

當blockSize位於minSize和maxSize 之間時,認blockSize:

當maxSize小於blockSize時,認maxSize:

當minSize大於blockSize時,認minSize:

 

另外一個極端的情況,maxSize小於minSize時,認minsize,可以理解為minSize的優先級比maxSize大:

實際使用中,建議不要去修改maxSize,通過調整minSize(使他大於blockSize)就可以設定分片(Split)的大小了。

總之通過minSize和maxSize的來設置切片大小,使之在blockSize的上下自由調整。

什么時候需要調整分片的大小

首先要明白,HDFS的分塊其實是指HDFS在存儲文件時的一個參數。而這里分片的大小是為了業務邏輯用的。分片的大小直接影響到MapTask的數量,你可以根據實際的業務需求來調整分片的大小。

分區

 在Reduce過程中,可以根據實際需求(比如按某個維度進行歸檔,類似於數據庫的分組),把Map完的數據Reduce到不同的文件中。分區的設置需要與ReduceTaskNum配合使用。比如想要得到5個分區的數據結果。那么就得設置5個ReduceTask。

自定義Partitioner:

public class URLResponseTimePartitioner extends Partitioner<Text, LongWritable>{

    @Override
    public int getPartition(Text key, LongWritable value, int numPartitions) {
        String accessPath = key.toString();
        if(accessPath.endsWith(".do")) {
            return 0;
        }
        return 1;
    }
    
}

然后可以在job中設置partitioner:

        job.setPartitionerClass(URLResponseTimePartitioner.class);
        //URLResponseTimePartitioner returns 1 or 0,so num of reduce task must be 2
        job.setNumReduceTasks(2);

兩個分區會產生兩個最終結果文件:

[root@centos01 ~]# hadoop fs -ls /access/log/response-time
17/12/19 14:53:55 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Found 3 items
-rw-r--r--   2 root supergroup          0 2017-12-19 14:49 /access/log/response-time/_SUCCESS
-rw-r--r--   2 root supergroup       7769 2017-12-19 14:49 /access/log/response-time/part-r-00000
-rw-r--r--   2 root supergroup      18183 2017-12-19 14:49 /access/log/response-time/part-r-00001

其中00000中存放着.do的統計結果,00001則存放其他訪問路徑的統計結果。

[root@centos01 ~]# hadoop fs -cat /access/log/response-time/part-r-00001 |more
17/12/19 14:55:41 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
//MyAdmin/scripts/setup.php     3857
//css/console.css       356
//css/result_html.css   628
//images/male.png       268
//js/tooltipster/css/plugins/tooltipster/sideTip/themes/tooltipster-sideTip-borderless.min.css  1806
//js/tooltipster/css/tooltipster.bundle.min.css 6495
//myadmin/scripts/setup.php     3857
//phpMyAdmin/scripts/setup.php  3857
//phpmyadmin/scripts/setup.php  3857
//pma/scripts/setup.php 3857
/404/search_children.js 3827
/Dashboard.action       3877
/Homepage.action        3877
/My97DatePicker/WdatePicker.js  9371
/My97DatePicker/calendar.js     22044
/My97DatePicker/lang/zh-cn.js   1089
/My97DatePicker/skin/WdatePicker.css    158
/My97DatePicker/skin/default/datepicker.css     3486
/My97DatePicker/skin/default/img.gif    475

 

排序

要想最終結果中按某個特性排序,則需要在Map階段,通過Key的排序來實現。

例如,想讓上述平均響應時間的統計結果按降序排列,實現如下:

關鍵就在於這個用於OUTKey的Bean。它實現了Comparable接口,所以輸出的結果就是按compareTo的結果有序。

由於這個類會作為Key,所以它的equals方法很重要,會作為,需要按實際情況重寫。這里重寫的邏輯是url相等則表示是同一個Key。(雖然Key相同的情況其實沒有,因為之前的responseTime統計結果已經把url做了group,但是這里還是要注意有這么個邏輯。)

排序並不是依賴於key的equals!

 public class URLResponseTime implements WritableComparable<URLResponseTime>{ String url; long avgResponseTime; public void write(DataOutput out) throws IOException { out.writeUTF(url); out.writeLong(avgResponseTime); } public void readFields(DataInput in) throws IOException { this.url = in.readUTF(); this.avgResponseTime = in.readLong(); } public int compareTo(URLResponseTime urt) { return this.avgResponseTime > urt.avgResponseTime ? -1 : 1; } public String getUrl() { return url; } public void setUrl(String url) { this.url = url; } public long getAvgResponseTime() { return avgResponseTime; } public void setAvgResponseTime(long avgResponseTime) { this.avgResponseTime = avgResponseTime; } @Override public int hashCode() { final int prime = 31; int result = 1; result = prime * result + ((url == null) ? 0 : url.hashCode()); return result; } @Override public boolean equals(Object obj) { if (this == obj) return true; if (obj == null) return false; if (getClass() != obj.getClass()) return false; URLResponseTime other = (URLResponseTime) obj; if (url == null) { if (other.url != null) return false; } else if (!url.equals(other.url)) return false; return true; } @Override public String toString() { return url; } 
}

 

 然后就簡單了,在Map和Reduce分別執行簡單的寫和讀操作就行了,沒有更多的處理,依賴於Hadoop MapReduce框架自身的特點就實現了排序:

public class URLResponseTimeSortMapper extends Mapper<LongWritable,Text,URLResponseTime,LongWritable>{
    

    //make a member property to avoid new instance every time when map function invoked.
    URLResponseTime key = new URLResponseTime();
    LongWritable value = new LongWritable();

    @Override
    protected void map(LongWritable key, Text value, Context context)
            throws IOException, InterruptedException {
        
        String line = value.toString();
        String[] logs = line.split("\t");
        String url = logs[0];
        String responseTimeStr = logs[1];
        
        long responseTime = Long.parseLong(responseTimeStr);
        
        
        this.key.setUrl(url);
        this.key.setAvgResponseTime(responseTime);
        this.value.set(responseTime);
        context.write(this.key,this.value);
    }

    
}

 

public class URLResponseTimeSortReducer extends Reducer<URLResponseTime, LongWritable, URLResponseTime, LongWritable> {

    
    @Override
    protected void reduce(URLResponseTime key, Iterable<LongWritable> values,
            Context ctx) throws IOException, InterruptedException {
        ctx.write(key, values.iterator().next());
    }

    
}

 

 

 

 

 


 

參考:

Hadoop Wiki,HowManyMapsAndReduces :https://wiki.apache.org/hadoop/HowManyMapsAndReduces

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM