MapReduce中,分片、分區、排序和分組(Group)的關系圖:
分片大小
對於HDFS中存儲的一個文件,要進行Map處理前,需要將它切分成多個塊,才能分配給不同的MapTask去執行。 分片的數量等於啟動的MapTask的數量。默認情況下,分片的大小就是HDFS的blockSize。
Map階段的對數據文件的切片,使用如下判斷邏輯:
protected long computeSplitSize(long blockSize, long minSize, long maxSize) { return Math.max(minSize, Math.min(maxSize, blockSize)); }
blockSize:默認大小是128M(dfs.blocksize)
minSize:默認是1byte(mapreduce.input.fileinputformat.split.minsize):
maxSize:默認值是Long.MaxValue(mapreduce.input.fileinputformat.split.minsize)
由此可以看出兩個可以自定義的值(minSize和maxSize)與blockSize之間的關系如下:
當blockSize位於minSize和maxSize 之間時,認blockSize:
當maxSize小於blockSize時,認maxSize:
當minSize大於blockSize時,認minSize:
另外一個極端的情況,maxSize小於minSize時,認minsize,可以理解為minSize的優先級比maxSize大:
實際使用中,建議不要去修改maxSize,通過調整minSize(使他大於blockSize)就可以設定分片(Split)的大小了。
總之通過minSize和maxSize的來設置切片大小,使之在blockSize的上下自由調整。
什么時候需要調整分片的大小
首先要明白,HDFS的分塊其實是指HDFS在存儲文件時的一個參數。而這里分片的大小是為了業務邏輯用的。分片的大小直接影響到MapTask的數量,你可以根據實際的業務需求來調整分片的大小。
分區
在Reduce過程中,可以根據實際需求(比如按某個維度進行歸檔,類似於數據庫的分組),把Map完的數據Reduce到不同的文件中。分區的設置需要與ReduceTaskNum配合使用。比如想要得到5個分區的數據結果。那么就得設置5個ReduceTask。
自定義Partitioner:
public class URLResponseTimePartitioner extends Partitioner<Text, LongWritable>{ @Override public int getPartition(Text key, LongWritable value, int numPartitions) { String accessPath = key.toString(); if(accessPath.endsWith(".do")) { return 0; } return 1; } }
然后可以在job中設置partitioner:
job.setPartitionerClass(URLResponseTimePartitioner.class); //URLResponseTimePartitioner returns 1 or 0,so num of reduce task must be 2 job.setNumReduceTasks(2);
兩個分區會產生兩個最終結果文件:
[root@centos01 ~]# hadoop fs -ls /access/log/response-time 17/12/19 14:53:55 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable Found 3 items -rw-r--r-- 2 root supergroup 0 2017-12-19 14:49 /access/log/response-time/_SUCCESS -rw-r--r-- 2 root supergroup 7769 2017-12-19 14:49 /access/log/response-time/part-r-00000 -rw-r--r-- 2 root supergroup 18183 2017-12-19 14:49 /access/log/response-time/part-r-00001
其中00000中存放着.do的統計結果,00001則存放其他訪問路徑的統計結果。
[root@centos01 ~]# hadoop fs -cat /access/log/response-time/part-r-00001 |more 17/12/19 14:55:41 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable //MyAdmin/scripts/setup.php 3857 //css/console.css 356 //css/result_html.css 628 //images/male.png 268 //js/tooltipster/css/plugins/tooltipster/sideTip/themes/tooltipster-sideTip-borderless.min.css 1806 //js/tooltipster/css/tooltipster.bundle.min.css 6495 //myadmin/scripts/setup.php 3857 //phpMyAdmin/scripts/setup.php 3857 //phpmyadmin/scripts/setup.php 3857 //pma/scripts/setup.php 3857 /404/search_children.js 3827 /Dashboard.action 3877 /Homepage.action 3877 /My97DatePicker/WdatePicker.js 9371 /My97DatePicker/calendar.js 22044 /My97DatePicker/lang/zh-cn.js 1089 /My97DatePicker/skin/WdatePicker.css 158 /My97DatePicker/skin/default/datepicker.css 3486 /My97DatePicker/skin/default/img.gif 475
排序
要想最終結果中按某個特性排序,則需要在Map階段,通過Key的排序來實現。
例如,想讓上述平均響應時間的統計結果按降序排列,實現如下:
關鍵就在於這個用於OUTKey的Bean。它實現了Comparable接口,所以輸出的結果就是按compareTo的結果有序。
由於這個類會作為Key,所以它的equals方法很重要,會作為,需要按實際情況重寫。這里重寫的邏輯是url相等則表示是同一個Key。(雖然Key相同的情況其實沒有,因為之前的responseTime統計結果已經把url做了group,但是這里還是要注意有這么個邏輯。)
排序並不是依賴於key的equals!
public class URLResponseTime implements WritableComparable<URLResponseTime>{ String url; long avgResponseTime; public void write(DataOutput out) throws IOException { out.writeUTF(url); out.writeLong(avgResponseTime); } public void readFields(DataInput in) throws IOException { this.url = in.readUTF(); this.avgResponseTime = in.readLong(); } public int compareTo(URLResponseTime urt) { return this.avgResponseTime > urt.avgResponseTime ? -1 : 1; } public String getUrl() { return url; } public void setUrl(String url) { this.url = url; } public long getAvgResponseTime() { return avgResponseTime; } public void setAvgResponseTime(long avgResponseTime) { this.avgResponseTime = avgResponseTime; } @Override public int hashCode() { final int prime = 31; int result = 1; result = prime * result + ((url == null) ? 0 : url.hashCode()); return result; } @Override public boolean equals(Object obj) { if (this == obj) return true; if (obj == null) return false; if (getClass() != obj.getClass()) return false; URLResponseTime other = (URLResponseTime) obj; if (url == null) { if (other.url != null) return false; } else if (!url.equals(other.url)) return false; return true; } @Override public String toString() { return url; }
}
然后就簡單了,在Map和Reduce分別執行簡單的寫和讀操作就行了,沒有更多的處理,依賴於Hadoop MapReduce框架自身的特點就實現了排序:
public class URLResponseTimeSortMapper extends Mapper<LongWritable,Text,URLResponseTime,LongWritable>{ //make a member property to avoid new instance every time when map function invoked. URLResponseTime key = new URLResponseTime(); LongWritable value = new LongWritable(); @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String line = value.toString(); String[] logs = line.split("\t"); String url = logs[0]; String responseTimeStr = logs[1]; long responseTime = Long.parseLong(responseTimeStr); this.key.setUrl(url); this.key.setAvgResponseTime(responseTime); this.value.set(responseTime); context.write(this.key,this.value); } }
public class URLResponseTimeSortReducer extends Reducer<URLResponseTime, LongWritable, URLResponseTime, LongWritable> { @Override protected void reduce(URLResponseTime key, Iterable<LongWritable> values, Context ctx) throws IOException, InterruptedException { ctx.write(key, values.iterator().next()); } }
參考:
Hadoop Wiki,HowManyMapsAndReduces :https://wiki.apache.org/hadoop/HowManyMapsAndReduces