Mapreduce實例——MapReduce自定義輸入格式


目的

1.了解Hadoop自帶的幾種輸入格式

2.准確理解MapReduce自定義輸入格式的設計原理

3.熟練掌握MapReduce自定義輸入格式程序代碼編寫

4.培養自己編寫MapReduce自定義輸入格式程序代碼解決實際問題

原理

1.輸入格式:InputFormat類定義了如何分割和讀取輸入文件,它提供有下面的幾個功能:

1)選擇作為輸入的文件或對象;

2定義把文件划分到任務的InputSplits

3)為RecordReader讀取文件提供了一個工廠方法;

Hadoop自帶了好幾個輸入格式。其中有一個抽象類叫FileInputFormat,所有操作文件的InputFormat類都是從它那里繼承功能和屬性。當開啟Hadoop作業時,FileInputFormat會得到一個路徑參數,這個路徑內包含了所需要處理的文件,FileInputFormat會讀取這個文件夾內的所有文件(譯注:默認不包括子文件夾內的),然后它會把這些文件拆分成一個或多個的InputSplit。你可以通過JobConf對象的setInputFormat()方法來設定應用到你的作業輸入文件上的輸入格式。下表給出了一些

默認的輸入格式是TextInputFormat,它把輸入文件每一行作為單獨的一個記錄,但不做解析處理。這對那些沒有被格式化的數據或是基於行的記錄來說是很有用的,比如日志文件。更有趣的一個輸入格式是KeyValueInputFormat,這個格式也是把輸入文件每一行作為單獨的一個記錄。然而不同的是TextInputFormat把整個文件行當做值數據,KeyValueInputFormat則是通過搜尋tab字符來把行拆分為鍵值對。這在把一個MapReduce的作業輸出作為下一個作業的輸入時顯得特別有用,因為默認輸出格式(下面有更詳細的描述)正是按KeyValueInputFormat格式輸出數據。最后來講講SequenceFileInputFormat,它會讀取特殊的特定於Hadoop的二進制文件,這些文件包含了很多能讓Hadoopmapper快速讀取數據的特性。Sequence文件是塊壓縮的並提供了對幾種數據類型(不僅僅是文本類型)直接的序列化與反序列化操作。Squence文件可以作為MapReduce任務的輸出數據,並且用它做一個MapReduce作業到另一個作業的中間數據是很高效的。

輸入塊(InputSplit):一個輸入塊描述了構成MapReduce程序中單個map任務的一個單元。把一個MapReduce程序應用到一個數據集上,即是指一個作業,會由幾個(也可能幾百個)任務組成。Map任務可能會讀取整個文件,但一般是讀取文件的一部分。默認情況下,FileInputFormat及其子類會以64MB(與HDFSBlock默認大小相同,譯注:Hadoop建議Split大小與此相同)為基數來拆分文件。你可以在hadoop-site.xml(譯注:0.20.*以后是在mapred-default.xml里)文件內設定mapred.min.split.size參數來控制具體划分大小,或者在具體MapReduce作業的JobConf對象中重寫這個參數。通過以塊形式處理文件,我們可以讓多個map任務並行的操作一個文件。如果文件非常大的話,這個特性可以通過並行處理大幅的提升性能。更重要的是,因為多個塊(Block)組成的文件可能會分散在集群內的好幾個節點上(譯注:事實上就是這樣),這樣就可以把任務調度在不同的節點上;因此所有的單個塊都是本地處理的,而不是把數據從一個節點傳輸到另外一個節點。當然,日志文件可以以明智的塊處理方式進行處理,但是有些文件格式不支持塊處理方式。針對這種情況,你可以寫一個自定義的InputFormat,這樣你就可以控制你文件是如何被拆分(或不拆分)成文件塊的。

輸入格式定義了組成mapping階段的map任務列表,每一個任務對應一個輸入塊。接着根據輸入文件塊所在的物理地址,這些任務會被分派到對應的系統節點上,可能會有多個map任務被分派到同一個節點上。任務分派好后,節點開始運行任務,嘗試去最大並行化執行。節點上的最大任務並行數由mapred.tasktracker.map.tasks.maximum參數控制。

記錄讀取器(RecordReader):InputSplit定義了如何切分工作,但是沒有描述如何去訪問它。 RecordReader類則是實際的用來加載數據並把數據轉換為適合mapper讀取的鍵值對。RecordReader實例是由輸入格式定義的,默認的輸入格式,TextInputFormat,提供了一個LineRecordReader,這個類的會把輸入文件的每一行作為一個新的值,關聯到每一行的鍵則是該行在文件中的字節偏移量。RecordReader會在輸入塊上被重復的調用直到整個輸入塊被處理完畢,每一次調用RecordReader都會調用Mappermap()方法。

2.當面對一些特殊的<key,value>鍵值對時,如key是由一個文件名和記錄位置組成的鍵值時,這時hadoop本身提供的TextInputFormatCombineInputFormatNLineInputFormat等肯定是無法滿足我們的需求的,所以這里需要重寫自己的輸入分隔。MapReduce定義了接口InputFormat,它提供了兩個方法,getSplits()createRecordRead(),其中getSplits()負責對輸入文件進行切割,切割之后便是一個個split,比如hadoop默認提供的幾個InputFormat都是對大於BlockSize的文件進行切割,小於它的不切割,我們這里可以直接按照這種特性。而createRecordRead()則負責將一個split按照一定格式切割成一個個<K,V>對,以便MapReducemap時調用。所以,我們的關鍵就是去定義這個<K,V>的切割。就要求開發人員繼承FileInputFormat,用於實現一種新的輸入格式,同時還需要繼承RecordReader,用於實現基於新輸入格式KeyValue值的讀取方法。

FileInputFormat實現了InputFormat這個接口,實現了只對大於BlockSize的文件進行切割,並且保留了createRecordRead()這個方法讓我們自己去實現。所以我們可以寫一個類FileKeyInputFormatextends這個FileInputFormat類,然后Override這個createRecordRead()方法。

參考TextInputFormat發現,它也是繼承FileInputFormat,然后重寫了createRecordRead(),在createRecordRead()里面callLineRecordReader,由它來實現輸入分隔。好吧,重點就來到了,那就是自己寫一個類似於LineRecordReaderFileKeyRecordReader類,然后給我們的FileKeyInputForma來調用。LineRecordReader 繼承 RecordReader時,重寫了它的六個方法,分別是initialize()getCurrentKey()getCurrentValue()getProgress()Close()nextKeyValue(),這里也一樣需要重寫這幾個方法。

環境

Linux Ubuntu 14.04

jdk-7u75-linux-x64

hadoop-2.6.0-cdh5.4.5

hadoop-2.6.0-eclipse-cdh5.4.5.jar

eclipse-java-juno-SR2-linux-gtk-x86_64

內容

當面對一些特殊的<key,value>鍵值對時,要求開發人員繼承FileInputFormat,用於實現一種新的輸入格式,同時還需要繼承RecordReader,用於實現基於新輸入格式KeyValue值的讀取方法。假定key是由一個文件名和記錄位置組成的鍵值時,進行自定義輸入格式,需實現一個FileKeyInputFormat類和一個FileKeyRecordReader類。並通過cat1文件數據進行測驗。

cat1包含(cat_idcat_name)四個字段,內容以"\t"分割,數據內容如下:

cat1文件

  1. 52001   有機蔬果    601  
  2. 52002   有機肉類水產  602  
  3. 52003   有機糧油干貨  603  
  4. 52004   有機沖飲    604  
  5. 52005   其它  605  

結果數據

  1. (cat1@0)        :52001  有機蔬果    601  
  2. (cat1@104)  :52005  其它  605  
  3. (cat1@23)   :52002  有機肉類水產  602  
  4. (cat1@52)   :52003  有機糧油干貨  603  
  5. (cat1@81)   :52004  有機沖飲    604  

實驗步驟

1.切換到/apps/hadoop/sbin目錄下,開啟Hadoop

  1. cd /apps/hadoop/sbin  
  2. ./start-all.sh  

2.Linux本地新建/data/mapreduce11目錄。

  1. mkdir -p /data/mapreduce11  

3.Linux中切換到/data/mapreduc11目錄下,用wget命令從http://192.168.1.100:60000/allfiles/mapreduce11/cat1網址上下載文本文件cat1

  1. cd /data/mapreduce11  
  2. wget http://192.168.1.100:60000/allfiles/mapreduce11/cat1  

然后在當前目錄下用wget命令從http://192.168.1.100:60000/allfiles/mapreduce11/hadoop2lib.tar.gz網址上下載項目用到的依賴包。

  1. wget http://192.168.1.100:60000/allfiles/mapreduce11/hadoop2lib.tar.gz  

hadoop2lib.tar.gz解壓到當前目錄下。

  1. tar zxvf hadoop2lib.tar.gz  

4.首先在HDFS上新建/mymapreduce11/in目錄,然后將Linux本地/data/mapreduce11目錄下的cat1文件導入到HDFS/mymapreduce11/in目錄中。

  1. hadoop fs -mkdir -p /mymapreduce11/in  
  2. hadoop fs -put /data/mapreduce11/cat1 /mymapreduce11/in  

5.新建Java Project項目,項目名為mapreduce11

mapreduce11的項目下新建package包,包名為mapreduce

mapredcue包下新建FileKeyInputFormat類。

mapredcue包下新建FileKeyRecordReader類。

mapredcue包下新建FileKeyMR類。

6.添加項目所需依賴的jar包,右鍵單擊項目名,新建一個文件夾hadoop2lib,用於存放項目所需的jar包。

/data/mapreduce11目錄下,hadoop2lib目錄中的jar包,拷貝到eclipsemapreduce11項目的hadoop2lib目錄下。

選中所有項目hadoop2lib目錄下所有jar包,並添加到Build Path中。

7.編寫程序代碼,並描述其設計思路。

FileKeyInputFormat類代碼:

  1. package mapreduce;  
  2. import java.io.IOException;  
  3. import java.util.List;  
  4. import org.apache.hadoop.fs.FileStatus;  
  5. import org.apache.hadoop.fs.Path;  
  6. import org.apache.hadoop.io.Text;  
  7. import org.apache.hadoop.mapreduce.InputSplit;  
  8. import org.apache.hadoop.mapreduce.JobContext;  
  9. import org.apache.hadoop.mapreduce.RecordReader;  
  10. import org.apache.hadoop.mapreduce.TaskAttemptContext;  
  11. import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;  
  12. public class FileKeyInputFormat extends FileInputFormat<Text,Text>{  
  13.     public FileKeyInputFormat(){}  
  14.     public RecordReader<Text,Text> createRecordReader(InputSplit split,TaskAttemptContext tac)  
  15.         throws IOException,InterruptedException{  
  16.         FileKeyRecordReader fkrr=new FileKeyRecordReader();  
  17.          try {  
  18.             fkrr.initialize(split,tac);  
  19.         } catch (Exception e) {  
  20.             e.printStackTrace();  
  21.         }  
  22.         return fkrr;  
  23.     }  
  24.     protected long computeSplitSize(long blockSize,long minSize,long maxSize){  
  25.         return super.computeSplitSize(blockSize,minSize,maxSize);  
  26.     }  
  27.     public List<InputSplit> getSplits(JobContext arg0)throws IOException{  
  28.     return super.getSplits(arg0);  
  29.     }  
  30.     protected boolean isSplitable(JobContext context,Path filename){  
  31.     return true;  
  32.     }  
  33.     protected List<FileStatus> listStatus(JobContext arg0)throws IOException{  
  34.         return super.listStatus(arg0);  
  35.         }  
  36.         }  

MapReduce定義了接口InputFormat,它提供了兩個方法,getSplits()createRecordRead(),其中getSplits()負責對輸入文件進行切割,切割之后便是一個個split,比如hadoop默認提供的幾個InputFormat都是對大於BlockSize的文件進行切割,小於它的不切割,我們這里可以直接按照這種特性。而createRecordRead()則負責將一個split按照一定格式切割成一個個<K,V>對,以便MapReducemap時調用。所以,我們的關鍵就是去定義這個<K,V>的切割。就要求開發人員繼承FileInputFormat,用於實現一種新的輸入格式,同時還需要繼承RecordReader,用於實現基於新輸入格式KeyValue值的讀取方法。

FileKeyRecordReader類代碼:

  1. package mapreduce;  
  2. import java.io.IOException;  
  3. import java.io.InterruptedIOException;  
  4. import org.apache.hadoop.io.LongWritable;  
  5. import org.apache.hadoop.io.Text;  
  6. import org.apache.hadoop.mapreduce.InputSplit;  
  7. import org.apache.hadoop.mapreduce.RecordReader;  
  8. import org.apache.hadoop.mapreduce.TaskAttemptContext;  
  9. import org.apache.hadoop.mapreduce.lib.input.FileSplit;  
  10. import org.apache.hadoop.mapreduce.lib.input.LineRecordReader;  
  11. public class FileKeyRecordReader extends RecordReader<Text,Text> {  
  12.     public FileKeyRecordReader(){  
  13.     }  
  14.         String fn;  
  15.         LineRecordReader lrr=new LineRecordReader();  
  16.         public void initialize(InputSplit arg0,TaskAttemptContext arg1)  
  17.             throws IOException,InterruptedException{  
  18.             lrr.initialize(arg0, arg1);  
  19.             this.fn=((FileSplit)arg0).getPath().getName();  
  20.         }  
  21.         public void close()throws IOException{  
  22.             lrr.close();  
  23.         }  
  24.         public Text getCurrentKey()throws IOException,InterruptedException{  
  25.             System.out.println("CurrentKey");  
  26.             LongWritable lw=lrr.getCurrentKey();  
  27.             Text key =new Text("("+fn+"@"+lw+")");  
  28.             System.out.println("key--"+key);  
  29.             return key;  
  30.         }  
  31.         public Text getCurrentValue()throws IOException,InterruptedException{  
  32.             return lrr.getCurrentValue();  
  33.         }  
  34.         public float getProgress()throws IOException,InterruptedException{  
  35.             return 0;  
  36.         }  
  37.         public boolean nextKeyValue() throws IOException,InterruptedIOException{  
  38.             return lrr.nextKeyValue();  
  39.         }  
  40. }  

繼承RecordReader抽象類,並重寫initialize()getCurrentKey()getCurrentValue() getProgress() nextKeyValue()initialize()方法。

測試代碼:

  1. package mapreduce;  
  2. import java.io.IOException;  
  3. import java.util.Iterator;  
  4. import org.apache.hadoop.conf.Configuration;  
  5. import org.apache.hadoop.fs.Path;  
  6. import org.apache.hadoop.io.Text;  
  7. import org.apache.hadoop.mapreduce.Job;  
  8. import org.apache.hadoop.mapreduce.Mapper;  
  9. import org.apache.hadoop.mapreduce.Reducer;  
  10. import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;  
  11. import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;  
  12. import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;  
  13. public class FileKeyMR{  
  14.     public static class Map extends Mapper<Object,Text,Text,Text>{  
  15.         public void map(Text key,Text value,Context context) throws IOException, InterruptedException{  
  16.             String line=value.toString();  
  17.             System.out.println(line);  
  18.             String str[]=line.split("\t");  
  19.             for(String st:str){  
  20.                 context.write(key,new Text(st));  
  21.             }  
  22.             System.out.println(line);  
  23.         }  
  24.     }  
  25.     public static class Reduce extends Reducer<Text,Text,Text,Text>{  
  26.         public void reduce(Text key,Iterable<Text> values,Context context) throws IOException, InterruptedException{  
  27.     String s=":";  
  28.     for(Text val:values){  
  29.     s+=val;  
  30.     }  
  31.     context.write(key,new Text(s));  
  32.     }  
  33.     }  
  34.     public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException{  
  35.     Configuration conf=new Configuration();  
  36.     Job job=new Job(conf,"FileKeyMR");  
  37.     job.setJarByClass(FileKeyMR.class);  
  38.     job.setMapperClass(Map.class);  
  39.     job.setReducerClass(Reduce.class);  
  40.     job.setInputFormatClass(FileKeyInputFormat.class);  
  41.     job.setOutputKeyClass(Text.class);  
  42.     job.setOutputValueClass(Text.class);  
  43.     FileInputFormat.addInputPath(job, new Path("hdfs://localhost:9000/mymapreduce11/in/cat1"));  
  44.     FileOutputFormat.setOutputPath(job, new Path("hdfs://localhost:9000/mymapreduce11/out"));  
  45.     System.exit(job.waitForCompletion(true)?0:1);  
  46.     }  
  47.     }  

Map部分:把map函數輸入的key直接復制給輸出的key,輸入的value通過split()方法以"\t"鍵解切,將解切的字段放到一個數組中,用增強版for循環遍歷數組,並將里面的元素賦值給輸出value,把<key,value>逐一進行輸出。Reduce部分:map函數輸出的<key,value>鍵值對先經過shuffle,把key相同的所有value值放到一個迭代器中形成values,然后把<key,values>鍵值對傳遞給reduce函數,reduce函數接收了<key,values>之后,先定義String s=":",然后將輸入的key直接復制給輸出的key,將輸入valuesfor循環遍歷加到s中,最后把s賦值給輸出的value,並直接輸出<key,value>。在主函數里job.setInputFormatClass(FileKeyInputFormat.class)

8.FileKeyMR類文件中,右鍵並點擊=>Run As=>Run on Hadoop選項,將MapReduce任務提交到Hadoop中。

9.待執行完畢后,進入命令模式,在HDFS/mymapreduce11/out中查看實驗結果。

  1. hadoop fs -ls /mymapreduce11/out  
  2. hadoop fs -cat /mymapreduce11/out/part-r-00000  


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM