Hadoop學習筆記(7)
——高級編程
從前面的學習中,我們了解到了MapReduce整個過程需要經過以下幾個步驟:
1.輸入(input):將輸入數據分成一個個split,並將split進一步拆成<key, value>。
2.映射(map):根據輸入的<key, value>進生處理,
3.合並(combiner):合並中間相兩同的key值。
4.分區(Partition):將<key, value>分成N分,分別送到下一環節。
5.化簡(Reduce):將中間結果合並,得到最終結果
6.輸出(output):負責輸入最終結果。
其中第3、4步又成洗牌(shuffle)過程。
從前面HelloWorld示例中,我們看到,我們只去個性化了Map和Reduce函數,那其他函數呢,是否可以個性化?答案當然是肯定的。下面我們就對每個環節的個性化進行介紹。
自定義輸入格式
輸入格式(InputFormat)用於描述整個MapReduce作業的數據輸入規范。先對輸入的文件進行格式規范檢查,如輸入路徑,后綴等檢查;然后對數據文件進行輸入分塊(split);再對數據塊逐一讀出;最后轉換成Map所需要的<key, value>健值對。
系統中提供豐富的預置輸入格式。最常用的以下兩種:
TextInputFormat:系統默認的數據輸入格式。將文件分塊,並逐行讀入,每一行記錄行成一對<key, value>。其中,key值為當前行在整個文件中的偏移量,value值為這一行的文本內容。
KeyValueTextInputFormat:這是另一個常用的數據輸入格式,讀入的文本文件內容要求是以<key, value>形式。讀出的結果也就直接形成<key, value>送入map函數中。
如果選擇輸入格式呢?那就只要在job函數中調用
-
job.setInputFormatClass(TextInputFormat.class);
在Hello中我們沒有設定,系統默認選擇了TextInputFormat。
一般情況夠用了,但某些情況下,還是無法滿足用戶的需求,所以還是需要個性化。個性化則按下面的方式進行:
如果數據我們是來源於文件,則可以繼承FileInputFormat:
-
public class MyInputFormat extends FileInputFormat<Text,Text> {
-
@Override
-
public RecordReader<Text, Text> createRecordReader(InputSplit split,
-
TaskAttemptContext context) throws IOException, InterruptedException {
-
// TODO Auto-generated method stub
-
return null;
-
}
-
}
如果數據我們是來源於非文件,如關系數據,則繼承
-
public class MyInputFormat extends InputFormat<Text,Text> {
-
-
@Override
-
public RecordReader<Text, Text> createRecordReader(InputSplit arg0,
-
TaskAttemptContext arg1) throws IOException, InterruptedException {
-
// TODO Auto-generated method stub
-
return null;
-
}
-
-
@Override
-
public List<InputSplit> getSplits(JobContext arg0) throws IOException,
-
InterruptedException {
-
// TODO Auto-generated method stub
-
return null;
-
}
-
-
}
這里比較清晰了,下面個函數為拆分成split,上面個函數跟據split輸出成Key,value。
自定義map處理
這個好理解,我們的HelloWorld程序中就自定義了map處理函數。然后在job中指定了我們的處理類:
-
job.setMapperClass(TokenizerMapper.class);
能不能沒有map呢? 可以的,如果沒有map,也就是這與上面的這個setMapperClass,則系統自動指定一個null,這時處理是將輸入的<key,value>值,不作任何修改,直接送到下一環節中。
個性化代碼如下:
-
public static class TokenizerMapper
-
extends Mapper<Object, Text, Text, IntWritable>{
-
-
public void map(Object key, Text value, Context context
-
) throws IOException, InterruptedException {
-
-
context.write(key, value);
-
}
-
}
自定義合並Combiner
自定義合並Combiner類,主要目的是減少Map階段輸出中間結果的數據量,降低數據的網絡傳輸開銷。
Combine過程,實際跟Reduce過程相似,只是執行不同,Reduce是在Reducer環節運行,而Combine是緊跟着Map之后,在同一台機器上預先將結時進行一輪合並,以減少送到Reducer的數據量。所以在HelloWorld時,可以看到,Combiner和Reducer用的是同一個類:
-
job.setCombinerClass(IntSumReducer.class);
-
job.setReducerClass(IntSumReducer.class);
如何個性化呢,這個跟Reducer差不多了:
-
public static class MyCombiner
-
extends Reducer<Text,IntWritable,Text,IntWritable> {
-
-
public void reduce(Text key, Iterable<IntWritable> values,
-
Context context
-
) throws IOException, InterruptedException {
-
-
context.write(key, new IntWritable(1));
-
}
-
}
自定義分區Partitioner
在MapReduce程序中,Partitioner決定着Map節點的輸出將被分區到哪個Reduce節點。而默認的Partitioner是HashPartitioner,它根據每條數據記錄的主健值進行Hash操作,獲得一個非負整數的Hash碼,然后用當前作業的Reduce節點數取模運算,有N個結點的話,就會平均分配置到N個節點上,一個隔一個依次。大多情況下這個平均分配是夠用了,但也會有一些特殊情況,比如某個文件的,不能被拆開到兩個結點中,這樣就需要個性化了。
個性化方式如下:
-
public static class MyPartitioner
-
extends HashPartitioner<K,V> {
-
-
public void getPartition(K key, V value,int numReduceTasks) {
-
-
super.getPartition(key,value,numReduceTasks);
-
}
-
}
方式其實就是在執行之前可以改變一下key,來欺騙這個hash表。
自定義化簡(Reducer)
這一塊是將Map送來的結果進行化簡處理,並形成最終的輸出值。與前面map一樣,在HelloWorld中我們就見到過了。通過下面代碼可以設置其值:
-
job.setReducerClass(IntSumReducer.class);
同樣,也可以這樣類可以不設置,如果不設置的話,就是把前面送來的值,直接送向輸出格式器中。
如果要個性化,則如下:
-
public static class IntSumReducer
-
extends Reducer<Text,IntWritable,Text,IntWritable> {
-
-
public void reduce(Text key, Iterable<IntWritable> values,
-
Context context
-
) throws IOException, InterruptedException {
-
context.write(key, result);
-
}
-
}
自定義輸出格式
數據輸出格式(OutPutFormat)用於描述MapReduce作業的數據輸出規范。Hadoop提供了豐富的內置數據輸出格式。最常的數據輸出格式是TextOutputFormat,也是系統默認的數據輸出格式,將結果以"key+\t+value"的形式逐行輸出到文本文件中。還有其它的,如:DBOutputFormat,FileOutputFormat,FilterOutputFormat,IndexUpdataOutputFormat,LazyOutputFormat,MapFileOutputFormat,等等。
如果要個性化,則按下面方式進行:
-
public class MyOutputFormat extends OutputFormat<Text,Text> {
-
-
@Override
-
public void checkOutputSpecs(JobContext arg0) throws IOException,
-
InterruptedException {
-
// TODO Auto-generated method stub
-
-
}
-
-
@Override
-
public OutputCommitter getOutputCommitter(TaskAttemptContext arg0)
-
throws IOException, InterruptedException {
-
// TODO Auto-generated method stub
-
return null;
-
}
-
-
@Override
-
public RecordWriter<Text, Text> getRecordWriter(TaskAttemptContext arg0)
-
throws IOException, InterruptedException {
-
// TODO Auto-generated method stub
-
return null;
-
}
-
-
}
復合健——用戶自定義類型。
從前面的整個過程中可以看到,都是采用key-value的方式進行傳入傳出,而這些類型大多是單一的字符串,和整型。如果我的key中需要包含多個信息怎么辦?用字符串直接拼接么? 太不方便了,最好能夠自己定義一個類,作為這個key,這樣就方便了。
如果定義一個類作為key 或value的類型? 有什么要求?就是這個類型必須要繼承WritableComparable<T>這個類,所以如果要自定義一個類型則可以這么實現:
-
public class MyType implements WritableComparable<MyType> {
-
-
private float x,y;
-
public float GetX(){return x;}
-
public float GetY(){return y;}
-
-
@Override
-
public void readFields(DataInput in) throws IOException {
-
x = in.readFloat();
-
y = in.readFloat();
-
}
-
-
@Override
-
public void write(DataOutput out) throws IOException {
-
out.writeFloat(x);
-
out.writeFloat(y);
-
}
-
-
@Override
-
public int compareTo(MyType arg0) {
-
//輸入:-1(小於) 0(等於) 1(大於)
-
return 0;
-
}
-
}
這個示例中,我們添加了兩個float變量:x,y 。 這個信息能過int 和out按次序進行輸入輸出。最后,再實現一個比較函數即可。
Job任務的創建
-
Job job = new Job(conf, "word count");
-
job.setJarByClass(WordCount.class);
-
job.setInputFormatClass(MyInputFormat.class);
-
job.setMapperClass(TokenizerMapper.class);
-
job.setCombinerClass(IntSumReducer.class);
-
job.setPartitionerClass(MyPartitioner.class);
-
job.setReducerClass(IntSumReducer.class);
-
job.setOutputFormatClass(TextOutputFormat.class);
-
job.setOutputKeyClass(Text.class);
-
job.setOutputValueClass(IntWritable.class);
-
FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
-
FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
任務創建比較容易,其實就是new一個實例,然后把上面描述的過程類設置好,然后加上第2行中,jar包的主類,第10、11行的輸入輸出路徑。這樣就完事了。
Job任務的執行
單個任務的執行,沒有什么問題,可以用這個:
-
job.waitForCompletion(true);
但多個任務呢? 多個任務的話,就會形成其組織方式,有串行,有並行,有無關,有組合的,如下圖:
圖中,Job2和Job3將會等Job1執行完了再執行,且可以同時開始,而Job4必須等Job2和Job3同時結束后才結束。
這個組合,就可以采用這樣的代碼來實現:
-
Configuration conf = new Configuration();
-
Job job1 = new Job(conf, "job1");
-
//.. config Job1
-
Job job2 = new Job(conf, "job2");
-
//.. config Job2
-
Job job3 = new Job(conf, "job3");
-
//.. config Job3
-
Job job4 = new Job(conf, "job4");
-
//.. config Job4
-
-
//添加依賴關系
-
job2.addDependingJob(job1);
-
job3.addDependingJob(job1);
-
job4.addDependingJob(job2);
-
job4.addDependingJob(job3);
-
-
JobControl jc = new JobControl("jbo name");
-
jc.addJob(job1);
-
jc.addJob(job2);
-
jc.addJob(job3);
-
jc.addJob(job4);
-
jc.run();
總述
現在回頭看看,其實整個hadoop編程,也就是這幾塊內容了,要實現某個功能,我們就往上面這些步驟上套,然后聯起來執行,達到我們的目的。