Mapreduce-Partition分析


Partition所處的位置


Partition位置

Partition位置

Partition主要作用就是將map的結果發送到相應的reduce。這就對partition有兩個要求:

1)均衡負載,盡量的將工作均勻的分配給不同的reduce。

2)效率,分配速度一定要快。

Mapreduce提供的Partitioner


Mapreduce默認的partitioner是HashPartitioner。除了這個mapreduce還提供了3種partitioner。如下圖所示:

patition類結構


1. Partitioner是partitioner的基類,如果需要定制partitioner也需要繼承該類。

2. HashPartitioner是mapreduce的默認partitioner。計算方法是

which reducer=(key.hashCode() & Integer.MAX_VALUE) % numReduceTasks,得到當前的目的reducer。

3. BinaryPatitioner繼承於Partitioner< BinaryComparable ,V>,是Partitioner的偏特化子類。該類提供leftOffset和rightOffset,在計算which reducer時僅對鍵值K的[rightOffset,leftOffset]這個區間取hash。

Which reducer=(hash & Integer.MAX_VALUE) % numReduceTasks

4. KeyFieldBasedPartitioner也是基於hash的個partitioner。和BinaryPatitioner不同,它提供了多個區間用於計算hash。當區間數為0時KeyFieldBasedPartitioner退化成HashPartitioner。

5. TotalOrderPartitioner這個類可以實現輸出的全排序。不同於以上3個partitioner,這個類並不是基於hash的。在下一節里詳細的介紹totalorderpartitioner。

TotalOrderPartitioner


每一個reducer的輸出在默認的情況下都是有順序的,但是reducer之間在輸入是無序的情況下也是無序的。如果要實現輸出是全排序的那就會用到TotalOrderPartitioner。

要使用TotalOrderPartitioner,得給TotalOrderPartitioner提供一個partition file。這個文件要求Key (這些key就是所謂的划分)的數量和當前reducer的數量-1相同並且是從小到大排列。對於為什么要用到這樣一個文件,以及這個文件的具體細節待會 還會提到。

TotalOrderPartitioner對不同Key的數據類型提供了兩種方案:

1) 對於非BinaryComparable(參考附錄A)類型的Key,TotalOrderPartitioner采用二分發查找當前的K所在的index。

例如reducer的數量為5,partition file 提供的4個划分為【2,4,6,8】。如果當前的一個key value pair 是<4,”good”>利用二分法查找到index=1,index+1=2那么這個key value pair將會發送到第二個reducer。如果一個key value pair為<4.5, “good”>那么二分法查找將返回-3,同樣對-3加1然后取反就是這個key value pair 將要去的reducer。

對於一些數值型的數據來說,利用二分法查找復雜度是o(log (reducer count)),速度比較快。

2) 對於BinaryComparable類型的Key(也可以直接理解為字符串)。字符串按照字典順序也是可以進行排序的。這樣的話也可以給定一些划分,讓不同的字符串key分配到不同的reducer里。這里的處理和數值類型的比較相近。

例如reducer的數量為5,partition file 提供了4個划分為【“abc”, “bce”, “eaa”, ”fhc”】那么“ab”這個字符串將會被分配到第一個reducer里,因為它小於第一個划分“abc”。

但是不同於數值型的數據,字符串的查找和比較不能按照數值型數據的比較方法。mapreducer采用的Tire tree的字符串查找方法。查找的時間復雜度o(m),m為樹的深度,空間復雜度o(255^m-1)。是一個典型的空間換時間的案例。

Tire Tree


Tire tree的構建

假設樹的最大深度為3,划分為【aaad ,aaaf, aaaeh,abbx 】

tairtree結構

tairtree結構


Mapreduce里的Tire tree主要有兩種節點組成:
1) Innertirenode
Innertirenode在mapreduce中是包含了255個字符的一個比較長的串。上圖中的例子只包含了26個英文字母。
2) 葉子節點{unslipttirenode, singesplittirenode, leaftirenode}
Unslipttirenode 是不包含划分的葉子節點。
Singlesplittirenode 是只包含了一個划分點的葉子節點。
Leafnode是包含了多個划分點的葉子節點。(這種情況比較少見,達到樹的最大深度才出現這種情況。在實際操作過程中比較少見)

Tire tree的搜索過程

接上面的例子:
1)假如當前 key value pair 這時會找到圖中的leafnode,在leafnode內部使用二分法繼續查找找到返回 aad在 划分數組中的索引。找不到會返回一個和它最接近的划分的索引。
2)假如找到singlenode,如果和singlenode的划分相同或小返回他的索引,比singlenode的划分大則返回索引+1。
3)假如找到nosplitnode則返回前面的索引。如將會返回abbx的在划分數組中的索引。

TotalOrderPartitioner的疑問

上面介紹了partitioner有兩個要求,一個是速度另外一個是均衡負載。使用tire tree提高了搜素的速度,但是我們怎么才能找到這樣的partition file 呢?讓所有的划分剛好就能實現均衡負載。

InputSampler
輸入采樣類,可以對輸入目錄下的數據進行采樣。提供了3種采樣方法。

采樣類結構圖

采樣類結構圖

采樣方式對比表:

類名稱

采樣方式

構造方法

效率

特點

SplitSampler<K,V>

對前n個記錄進行采樣

采樣總數,划分數

最高

 

RandomSampler<K,V>

遍歷所有數據,隨機采樣

采樣頻率,采樣總數,划分數

最低

 

IntervalSampler<K,V>

固定間隔采樣

采樣頻率,划分數

對有序的數據十分適用

writePartitionFile這個方法很關鍵,這個方法就是根據采樣類提供的樣本,首先進行排序,然后選定(隨機的方法)和reducer 數目-1的樣本寫入到partition file。這樣經過采樣的數據生成的划分,在每個划分區間里的key value pair 就近似相同了,這樣就能完成均衡負載的作用。

TotalOrderPartitioner實例


?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
public class SortByTemperatureUsingTotalOrderPartitioner extends Configured
         implements Tool
{
     @Override
     public int run(String[] args) throws Exception
     {
         JobConf conf = JobBuilder.parseInputAndOutput( this , getConf(), args);
         if (conf == null ) {
             return - 1 ;
         }
         conf.setInputFormat(SequenceFileInputFormat. class );
         conf.setOutputKeyClass(IntWritable. class );
         conf.setOutputFormat(SequenceFileOutputFormat. class );
         SequenceFileOutputFormat.setCompressOutput(conf, true );
         SequenceFileOutputFormat
                 .setOutputCompressorClass(conf, GzipCodec. class );
         SequenceFileOutputFormat.setOutputCompressionType(conf,
                 CompressionType.BLOCK);
         conf.setPartitionerClass(TotalOrderPartitioner. class );
         InputSampler.Sampler<IntWritable, Text> sampler = new InputSampler.RandomSampler<IntWritable, Text>(
                 0.1 , 10000 , 10 );
         Path input = FileInputFormat.getInputPaths(conf)[ 0 ];
         input = input.makeQualified(input.getFileSystem(conf));
         Path partitionFile = new Path(input, "_partitions" );
         TotalOrderPartitioner.setPartitionFile(conf, partitionFile);
         InputSampler.writePartitionFile(conf, sampler);
         // Add to DistributedCache
         URI partitionUri = new URI(partitionFile.toString() + "#_partitions" );
         DistributedCache.addCacheFile(partitionUri, conf);
         DistributedCache.createSymlink(conf);
         JobClient.runJob(conf);
         return 0 ;
     }
 
     public static void main(String[] args) throws Exception {
         int exitCode = ToolRunner.run(
                 new SortByTemperatureUsingTotalOrderPartitioner(), args);
         System.exit(exitCode);
     }
}

示例程序引用於:http://www.cnblogs.com/funnydavid/archive/2010/11/24/1886974.html


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM