五、MapReduce進階編程
目錄:
1.篩選日志文件並生成序列化文件
2.Hadoop Java API讀取序列化日志文件
3.優化日志文件統計程序
4.Eclipse提交日志文件統計程序
5.小結
6.實訓
7.小練習
任務背景:網站運營方又提出來新的需求,為了比較今年與去年同期的用戶訪問數據,要求分別統計出2016年1月與2月的用戶訪問次數,並輸出到不同的目錄中。在本章中,將引入一些高級的編程技巧,使得整體編程更加高效實用。
第一個任務:在大數據文件分析處理中,尤其是在處理邏輯比較復雜的情況下,要使用多個MapReduce程序來連續進行處理,就需要在HDFS上保存大量的中間結果。如何提高中間結果的存取效率,對於整個數據處理流程是很有意義的。Hadoop序列化具有緊湊、快速、可擴展以及互操作的特點,非常適合MapReduce任務的輸入與輸出格式。首要任務就是從原數據中篩選出1月與2月的數據,以序列化文件的格式存儲,為后續的數據處理任務做准備。
第二個任務:簡要了解javaAPI的基本操作和應用。通過JavaAPI對HDFS中的文件進行操作,它不但能夠輕松處理各類常規的文件操作,而且還提供了多種文件類型接口,能夠輕松處理文本、鍵值對、序列化等多種文件格式。
第三個任務:在實際任務中,數據結構與邏輯更為復雜,鍵或值可能是由多個元素組成的,那么就需要用戶根據情況自定義鍵值對的類型。Map端的輸出結果是經過網絡傳輸到Reduce端的。當Map端的輸出數據量特別大時,網絡傳輸可能成為影響處理效率的一大因素。為了提高整體處理效率,Hadoop提供了用於優化組件Combiner與Partitioner,可以幫助數據在傳到Reducer之前進行一系列的合並和分區處理。另外,Hadoop提供了執行MapReduce程序過程中的計數功能,用戶也可以根據需要進行個性化的計數設置。現在要編程實現2016年1月與2月的用戶訪問次數統計,並在編程過程中使用自定義的鍵值對類型、組件Combiner與Partitioner、自定義計數器等模塊,有利於對Hadoop編程有更加深刻的認識。
第四個任務:學會MapReduce任務在工作環境中的實際提交流程,就是在Eclipse中直接向Hadoop提交MapReduce任務,而且顯示執行過程中的輸出日志。
1.篩選日志文件並生成序列化文件
任務:以序列化文件的格式輸出篩選的數據。
1.1 MapReduce輸入格式
(1)Hadoop自帶了多個輸入格式,其中一個抽象類為FileInputFormat,所有操作文件的InputFormat類都是從它那里繼承方法和屬性。當啟動hadoop時,FileInputFormat會得到一個路徑參數,這個路徑包含了所需要處理的文件,FileInputFormat會讀取這個文件夾內的所有文件,然后會把這些文件拆分為一個或多個InputSplit。下圖為InputFormat的類繼承結構:
其中TextInputFormat是默認的inputformat。
(2)Hadoop的MapReduce不僅可以處理文本信息,還可以處理二進制格式的數據,二進制格式也成為序列化格式,Hadoop的序列化有以下特點:
①緊湊:高效使用存儲空間 ②快速:讀取數據的額外開銷少
③可擴展:可透明的讀取舊格式的數據 ④互操作:可以使用不同的語言讀/寫永久存儲的數據
處理序列化數據需要使用SequenceFileInputFormat來作為MapReduce的輸入格式。
(3)常用的inputformat的輸入格式:
輸入格式 |
描述 |
鍵類型 |
值類型 |
TextInputFormat |
默認格式,讀取文件的行 |
行的字節偏移量(LongWriable) |
行的內容(Text) |
SequenceFileInputFormat |
Hadoop定義的高性能二進制格式 |
用戶自定義 |
|
KeyValueInputFormat |
把行解析為鍵值對 |
第一個tab字符前的所有字符 |
行剩下的所有內容(Text) |
設置MapReduce輸入格式,可在驅動類中使用Job對象的setInputFormat()方法。例如,我們要讀取社交網站2016年用戶登錄的信息,要設定輸入對象為TextInputFormat,可以在驅動類中設置以下代碼:
job.setInputFormat(TextInputFormat.class)
由於TextInputFormat是默認的輸入格式,所以當輸入格式是TextInputFormat時,驅動類可以不設置輸入格式。但是要使用其他的輸入格式,就要在驅動類中設置輸入格式。
1.2 MapReduce輸出格式
(1)針對上一小節介紹的輸入格式,Hadoop都有對應的輸出格式。輸出格式實際上是輸入格式的逆過程,即把鍵值對寫入HDFS的文件塊內。下圖為OutputFormat類的繼承構造:
默認的輸出格式是TextOutputFormat,它把每一條記錄寫為文本行。它的鍵和值可以是任意類型,因為TextOutputFormat調用toString()方法把他們轉換為字符串。每個鍵值對由制表符進行分割,當然也可以通過設定mapreduce.output.textoutputformat.separator屬性來改變默認的分隔符。
(2)下表列出了常用的輸出格式類:
輸出格式 |
描述 |
TextOutputFormat |
默認的輸出格式,以“key \t value”的方式輸出行 |
SequenceFileOutputFormat |
輸出二進制文件,適合作為子MapReduce作業的輸入 |
NullOutputFormat |
忽略收到的數據,即不做輸出 |
如果作為后續MapReduce任務的輸入,那么序列化輸入是一種好的輸入格式,因為它的格式緊湊,很容易被壓縮。本節任務需要將篩選出來的數據以序列化的格式輸出,只需要在驅動類中添加以下代碼:
job.setOutputFormat(SequenceFileOutputFormat.class)
1.3 任務實現(完成前述第一個任務)
(1)實現步驟:
①以文本格式讀取文件 ②在map函數里判斷讀取進來的數據是否是1月或2月的數據。若是,則將該條數據輸出;若不是,則不輸出
③以序列化的格式輸出數據。本人無只需要Mapper類就可以完成,即Map端的輸出可以直接輸出到HDFS,因此本任務不必設置Reducer類,即在驅動類中設置Reducer的個數為0。
(2)①代碼:
package test;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;;
public class SelectData {
public static class SelectDataMapper extends Mapper<LongWritable,Text,Text,Text>{
protected void map(LongWritable key,Text value,Mapper<LongWritable,Text,Text,Text>.Context context)
throws IOException,InterruptedException{
String[] val=value.toString().split(",");
//過濾選取1月份和2月份的數據
if(val[1].contains("2016-01") || val[1].contains("2016-02")) {
context.write(new Text(val[0]), new Text(val[1]));
}
}
}
public static void main(String[]args) throws IOException,
ClassNotFoundException,InterruptedException{
Configuration conf=new Configuration();
Job job=Job.getInstance(conf,"selectdata");
job.setJarByClass(SelectData.class);
job.setMapperClass(SelectDataMapper.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
job.setInputFormatClass(TextInputFormat.class);//設置輸入格式
job.setOutputFormatClass(SequenceFileOutputFormat.class);//設置輸出格式
job.setNumReduceTasks(0);//設置Reducer的任務數為0
FileInputFormat.addInputPath(job, new Path(args[0]));
FileSystem.get(conf).delete(new Path(args[1]),true);
FileOutputFormat.setOutputPath(job, new Path(args[1]));
System.err.println(job.waitForCompletion(true)?-1:1);
}
}
②執行並查看結果:
hadoop jar selectdata.jar test.SelectData /user/dftest/user_login.txt /user/dftest/Selectdata
③記事本打開是(記事本對數據進行了解析):
由 sublime打開是二進制文件:
序列化輸出完成。
2.Hadoop Java API讀取序列化日志文件
本節將使用Hadoop Java API的方式讀取該序列化文件,並將讀取的數據保存到本地文件系統中,查看內容是否為1月和2月的用戶登錄信息。
2.1 FileSystem API 管理文件
(1)FileSystem是一個通用的文件管理系統API,使用它的第一步是需要先獲取它的一個實例,下面給出了幾個獲取FileSystem實例的靜態方法:
public static FileSystem get(Configuration conf) throws IOException
public static FileSystem get(URI uri,Configuration conf) throws IOException
public static FileSystem get(URI uri,Configuration conf,String user) throws IOException
(2)Configuration 對象封裝了客戶端或服務器端的配置信息,下面說明以上三個方法:
①第一個方法返回了一個默認的文件系統,是在core-site.xml中通過fs.defaultFS來指定的,如果在core-site.xml中沒有設置,則返回本地的文件系統。
②第二個方法是通過uri來指定要返回的文件系統。如果uri是以hdfs標識開頭,那么久返回一個HDFS文件系統;如果uri中沒有相應的標識,則返回本地文件系統。
③第三個方法返回文件系統的原理與②相同,但它同時又限定了該文件系統的用戶,在這方面是很重要的。
通過查看FileSystem的API可以找到FileSystem類的相關方法。
(3)舉例:
修飾符和類型 |
方法 |
abstract FileStatus[] |
listStatus(Path f) |
FileStatus[] |
listStatus(Path[] files) |
FileStatus[] |
listStatus(Path[] files,PathFilter filter) |
FileStatus[] |
listStatus(Path f,PathFilter filter) |
以上方法返回的是一個文件列表。
①列舉文件夾示例代碼:
//獲取配置
Configuration conf = new Configuration();
conf.set("fs.defaultFS","172.16.29.76:8020");
//獲取文件系統
FileSystem fs = FileSystem.get(conf);
//指定要查看的文件目錄
Path path = new Path("/user/dftest");
//獲取文件列表
FileStatus[] filesatus = fs.listStatus(path);
//遍歷文件列表
for (FileStatus file: filestatus) {
//判斷是否為文件夾
if( file.isDirectory() ){
System.out.printlin( file.getPath().toString() );
}
}
//關閉文件系統
fs.close();
首先要設置Configuration來獲取集群配置,然后指定集群內的hdfs文件目錄
②與列舉目錄下的文件夾方式類似,可以使用同樣的方法去遍歷一個文件夾下面的所有文件。代碼如下:
//獲取配置
Configuration conf = new Configuration();
conf.set("fs.defaultFS","172.16.29.76:8020");
//獲取文件系統
FileSystem fs = FileSystem.get(conf);
//指定要查看的文件目錄
Path path = new Path("/user/dftest");
//獲取文件列表
FileStatus[] filesatus = fs.listStatus(path);
//遍歷文件列表
for (FileStatus file: filestatus) {
//判斷是否為文件夾
if( file.isFile() ){
System.out.printlin( file.getPath().toString() );
}
}
//關閉文件系統
fs.close();
(4)用FileSystem API 創建目錄
修飾符和類型 |
方法 |
static boolean |
mkdirs(FileSystem fs,Path dir,FsPermission permission) |
boolean |
mkdirs(Path f) |
abstract boolean |
mkdirs(Path f,FsPermission permission) |
相關參數:fs:文件系統對象 dir:要創建的目錄名稱 permission:為該目錄設置的權限
任務:使用mkdirs(Path f)方法在HDFS上創建目錄/user/dftest/loginmessage
代碼示例如下:
//獲取配置
Configuration conf = new Configuration();
conf.set("fs.defaultFS","172.16.29.76:8020");
//獲取文件系統
FileSystem fs = FileSystem.get(conf);
//聲明要創建的文件目錄
Path path = new Path("/user/dftest/loginmessage");
//調用mkdirs函數創建目錄
fs.mkdirs(path);
//關閉文件系統
fs.close();
2.2 FileSystem API 操作文件
(1)刪除文件:
這里主要介紹delete方法
修飾符和類型 |
方法 |
boolean |
delete(Path f) |
abstract boolean |
delete(Path f,boolean recursive) |
相關參數:f 要刪除的文件路徑 recursive 如果路徑是一個目錄且不為空,要把recursive設置為true,否則會報出異常。在如果是文件的話,此值true或false均可。
使用delete(Path f,boolean recursive)刪除HDFS上的/user/dftest/user_login.txt文件,具體實現代碼如下:
//獲取配置
Configuration conf = new Configuration();
conf.set("fs.defaultFS","172.16.29.76:8020");
//獲取文件系統
FileSystem fs = FileSystem.get(conf);
//聲明要刪除的文件或目錄
Path path = new Path("/user/dftest/user_login.txt");
//調用mkdirs函數創建目錄
fs.delete(path,true);
//關閉文件系統
fs.close();
(2)上傳與下載文件:(示例代碼用的方法是藍色標注的方法,結構與上面的代碼類似,只是注明參數即可)
①本地上傳文件:
修飾符和類型 |
方法 |
void |
copyFromLocalFile(boolean delSrc,boolean overwrite,Path[] srcs,Path dst) |
void |
copyFromLocalFile(boolean delSrc,boolean overwrite,Path srcs,Path dst) |
void |
copyFromLocalFile(boolean delSrc,Path srcs,Path dst) |
void |
copyFromLocalFile(Path srcs,Path dst) |
②下載到本地:
修飾符和類型 |
方法 |
void |
copyToLocalFile(boolean delSrc,Path src,Path dst) |
void |
copyToLocalFile(boolean delSrc,Path src,Path dst,boolean useRawLocalFileSystem) |
void |
copyToLocalFile(Path src,Path dst) |
相關參數 :delSrc:是否刪除源文件 overwrite:是否覆蓋已經存在的文件 srcs:存儲源文件目錄的數組
dst:目標路徑 src:源文件路徑 useRawLocalFileSystem:是否使用原始文件系統作為本地文件系統
2.3 FileSystem API 讀寫數據
(1)查看文件內容
Hadoop Java API 提供了一個獲取指定文件的數據字節流的方法——open()。該方法返回的是FSDataInputStream對象。
修飾符和類型 |
方法 |
FSDataInputStream |
open(Path f) |
abstract FSDataInputStream |
open(Path f,int bufferSize) |
相關參數:f :要打開的文件 bufferSize:要使用的緩沖區大小
示例:讀取HDFS上的文件/user/dftest/loginmessage/user_login.txt的內容,具體實現代碼如下:
//獲取配置
Configuration conf = new Configuration();
//獲取文件系統
FileSystem fs = FileSystem.get(conf);
//聲明要查看的文件路徑
Path path = new Path("/user/dftest/loginmessage/user_login.txt");
//獲取指定文件的數據字節流
FSDataInputStream is = fs.open(path);
//讀取文件內容並打印出來
BufferedReader br=new BufferedReader(new InputStreamReader(is,"utf-8"));
String line="";
while((line=br.readline())!=null){
System.out.println(line);
}
//關閉數據字節流
br.close();
is.close();
//關閉文件系統
fs.close();
(2)寫入數據
與讀取數據類似,寫入數據的實現可以理解為讀取數據的逆向過程。向HDFS寫入數據首先需要創建一個文件,FileSystem類提供了多種方法來創建文件。最常用的就是調用以創建文件的Path對象為參數的create(Path f)方法。除了創建一個新文件來寫入數據,還可以用append()方法向一個已存在的文件添加數據。
示例:在/user/dftest/loginmessage/user_login.txt目錄下創建文件new_user_login.txt,讀取該目錄下的user_login.txt文件並寫入新文件new_user_login.txt中,具體實現代碼如下:
//獲取配置
Configuration conf = new Configuration();
//獲取文件系統
FileSystem fs = FileSystem.get(conf);
//聲明要查看的文件路徑
Path path = new Path("/user/dftest/loginmessage/user_login.txt");
//創建新文件
Path newpath=new Path("/user/root/loginmessage/new_user_login.txt");
fs.delet(newpath);
FSDataOutputStream os=fs.create(newPath);
//獲取指定文件的數據字節流
FSDataInputStream is = fs.open(path);
//讀取文件內容並寫入到新文件
BufferedReader br=new BufferedReader(new InputStreamReader(is,"utf-8"));
BufferedWriter bw=new BufferedWriter(new OutputStreamWriter(os,"utf-8"));
String line="";
while((line=br.readline())!=null){
bw.write(line);
bw.newLine();
}
//關閉數據字節流
bw.close();
os.close();
br.close();
is.close();
//關閉文件系統
fs.close();
2.4 任務實現(完成前述第二個任務)
(1) Hadoop Java API提供了讀取HDFS上的文件的方法,當然也可以讀取序列化文件。不同於普通問件的讀取方法,讀取序列化文件需要獲取到SequenceFile.Reader對象。下表給出了該對象提供的幾個重要方法。
方法 |
描述 |
getKeyClassName() |
返回序列化文件中的鍵類型 |
getValueClassName() |
返回序列化文件中的值類型 |
next(Writable key) |
讀取該文件中的鍵,如果存在下一個鍵,則返回true,讀到文件末尾則返回false |
next(Writable key,Writable value) |
讀取文件中的鍵和值,如果存在下一個鍵值,則返回true,讀到文件末尾返回false |
toString() |
返回文件的路徑名稱 |
(2) 讀取序列化文件
main函數部分代碼如下:
//獲取配置
Configuration conf = new Configuration();
//獲取文件系統
FileSystem fs = FileSystem.get(conf);
//獲取SequenceFile.Reader對象
SequenceFile.Reader reader=new SequenceFile.Reader(fs,new Path("/user/dftest/Selectdata/part-m-00000"),conf);
//獲取序列化文件中使用的鍵值類型
Text key = new Text();
Text value=new Text();
//讀取的數據寫入selectdata.txt文件
BufferedWriter out = new BufferedWriter(new OutputStreamWriter(new FileOutputStream("C:\\Users\\Admin\\desktop\\selectdata.txt",true)));
while(reader.next(key,value)){
out.write(key.toString()+"\t"+value.toString()+"\r\n");
}
out.close();
reader.close();
讀取結果如下(截取的部分數據):
3.優化日志文件統計程序
任務1中篩選了1.2月份的用戶登錄信息,並生成了序列化文件,本節任務就是使用MapReduce讀取該序列化文件,統計在2016年1.2月份每天的登錄次數,並且要求最終的輸出結果根據月份分別保存到兩個不同的文件中。同時要求分別統計輸入記錄中1月份和2月份的記錄數以及輸出結果中1月份和2月份的記錄數。
3.1 自定義鍵值類型
在Hadoop中,mapper和reducer處理的都是鍵值對記錄。Hadoop提供了很多鍵值對類型,如Text、IntWritable、LongWritable等。
下圖為Hadoop內置的數據類型:
下面對常用的集中數據類型進行詳細解析:
類型 |
解釋 |
BooleanWritable |
標准布爾型,相當於java數據里面的boolean,當<key,value>中key或者value為布爾型時使用 |
ByteWritable |
單字節型,相當於Java數據類型里面的byte,當<key,value>中的key或者value為單字節類型時使用 |
DoubleWritable |
雙精度浮點型,相當於Java數據類型里面的double,當<key,value>中的key或者value為double類型時使用 |
FloatWritable |
單精度浮點型,相當於Java數據類型里面的float,當<key,value>中的key或者value為單精度浮點類型時使用 |
IntWritable |
整型,相當於Java數據類型里面的int,當<key,value>中的key或者value為整型時使用 |
LongWritable |
長整型,相當於Java數據類型里面的Long,當<key,value>中的key或者value為長整型時使用 |
Text |
使用UTF-8格式存儲文本,在java數據中主要針對String類型 |
NullWritable |
空值,當<key,value>中的key或value為空時使用 |
Hadoop內置的數據類型可以滿足絕大多數需求。但有時,用戶需要一些特殊的鍵值類型來滿足業務需求,即自定義鍵值類型。
①自定義值類型必須實現Writable接口,接口Writable是一個簡單高效的基於基本I/O的序列化接口對象,包含兩個方法,即write(DataOutput out)與readFields(DataInput in),其功能分別是將數據寫入指定的流中和從指定的流中讀取數據。下表為對這兩個方法的描述。自定義值類型必須實現這兩個方法。
返回類型 |
方向和描述 |
void |
readFields(DataInput in),從in中反序列化該對象的字段 |
void |
write(DataOuput out),將該對象的字段序列化到out中 |
②自定義鍵類型必須實現WritableComparable接口。WritableComparable接口自身又實現了Wriable接口,所以Hadoop中的鍵也可以作為值使用,但是實現Writable接口不能作為鍵使用。WritableComparable接口中不僅有readFields(DataInput in)方法和write(DataOutput out)方法,還提供了一個compareTo(To)方法,該方法提供了3種返回類型,如下表:
返回類型 |
解釋 |
負整數 |
表示小於被比較對象 |
0 |
表示等於被比較對象 |
正整數 |
表示大於被比較對象 |
無論是自定義鍵類型還是自定義值類型,自定義的類默認繼承object類,而object類默認有一個toString方法,該方法返回的是對象的內存地址。但有時候用戶想要看到的是該對象的具體內容,而不是內存地址,這個時候就需要重寫toString方法。重寫方法只需要返回自己想要的字符串即可。
③下面具體介紹如何完整地自定義一個鍵值類型:
本節任務是統計用戶每天登錄的次數,輸入的數據包含兩列信息,用戶名和登錄日期。Mapper輸出的鍵是用戶名及登錄日期,輸出的值是1;Reducer輸出的鍵也是登錄名和日期,輸出的值是每個用戶每天登錄的次數。由此可以看出,Mapper和Reducer輸出的值類型使用Hadoop內置的IntWritable類型即可,而鍵類型可以自己定義。定義一個MemberLogTime類,該類實現接口WritableComparable<MemberLogTime>,類中聲明了兩個對象,分別為用戶名member_name和登錄時間logTime。同時,該類實現了readFields(DataInput in)方法、Write(DataInput out)方法和compareTo(MemberLogTime o)。其中,compareTo(MemberLogTime o)方法是根據用戶名進行排序的。最后還要重寫toString方法,該方法返回用戶名和登錄時間的字符串格式。
自定義鍵類型代碼如下所示:
package essential;
import org.apache.hadoop.io.WritableComparable;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
public class MemberLogTime implements WritableComparable<MemberLogTime> {
private String member_name;
private String logTime;
public MemberLogTime(){
}
public MemberLogTime(String member_name,String logTime){
this.member_name=member_name;
this.logTime=logTime;
}
public String getLogTime() {
return logTime;
}
public void setLogTime(String logTime) {
this.logTime = logTime;
}
public String getMember_name() {
return member_name;
}
public void setMember_name(String member_name) {
this.member_name = member_name;
}
/*關鍵是對下面三個方法進行重寫實現
*
* */
@Override
public void readFields(DataInput dataInput) throws IOException {
this.member_name=dataInput.readUTF();
this.logTime=dataInput.readUTF();
}
@Override
public void write(DataOutput dataOutput) throws IOException {
dataOutput.writeUTF(member_name);
dataOutput.writeUTF(logTime);
}
@Override
public int compareTo(MemberLogTime o) {
return this.getMember_name().compareTo(o.getMember_name());
}
public String toString(){
return this.member_name+","+this.logTime;
}
}
3.2 初步探索Combiner
為了提高MapReduce作業的工作效率,Hadoop允許用戶聲明一個Combiner。Combiner是運行在Map端的一個“迷你Reducer”過程,它只處理單台機器生成的數據。
(1)聲明的Combiner繼承的是Reducer,其方法原理和Reduce的實現原理基本相同。不同的是,Combiner操作發生在Map端,或者說Combiner運行在每一個運行Map任務的節點上。它會接收特定節點上的Map輸出作為輸入,對Map輸入的數據先做一次合並,再把輸出結果發送到Reducer。需要注意的是,combiner不影響程序的處理邏輯,只會影響處理效率。
所有節點的Mapper輸出都會傳送到Reducer,當數據集很大時,Reduce端也會接收大量的數據,這樣無疑會增加Reducer的負擔,影響Reducer的工作效率。當加入Combiner時,每個節點的Mapper輸出現在Combiner上進行整合,Combiner先對Mapper輸出進行計算,然后將計算結果傳輸給Reducer,這樣Reducer端接收到的數據量就會大大減少,提高效率。
下圖為有無Combiner的對比圖:
值得一提的是:並非所有的MapReduce程序都可以加入Combiner。僅當Reducer輸入的鍵值對類型與Reducer輸出的鍵值對類型一樣,並且計算邏輯不影響最終的結果時才可以在MapReduce程序中加入Combiner。例如,統計求和或者求最大值時可以使用Combiner,但是類似計算平均值時就不能使用Combiner。
(2)Combiner繼承的是Reducer,所以聲明Combiner類的繼承必須繼承Reducer,在Combiner類里面重寫reduce方法。下面代碼展示了統計社交網站2016年1月和2月用戶每天登錄該網站次數的Combiner代碼。
package essential;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
public class LogCountCombiner extends Reducer <MemberLogTime, IntWritable,MemberLogTime,IntWritable>{
protected void reduce(MemberLogTime key,Iterable<IntWritable> value,
Reducer<MemberLogTime,IntWritable,MemberLogTime,IntWritable>.Context context)
throws IOException, InterruptedException {
int sum=0;
for(IntWritable iw:value){
sum+=iw.get();
}
context.write(key,new IntWritable(sum));
}
}
除了聲明Combiner類外,還需要在驅動類里面配置Combiner類,如下所示:
job.setCombinerClass(LogCountCombiner.class);
有時候甚至不需要特意聲明一個Combiner類。當Combiner與Reducer實現邏輯相同時,可以不用聲明Combiner類,在驅動類里面添加以下代碼即可:
job.setCombinerClass(LogCountReducer.class);
3.3 淺析Partitioner
(1)下面先給出MapReduce的執行過程
數據首先上傳到HDFS並且被分成文件塊,接着MapReduce框架根據輸入的文件計算輸入分片,每個輸入分片對應一個Map任務。Map在讀取分片數據之前,InputFormat會將分片中的每條記錄解析成鍵值對格式供Map讀取。Map的輸出結果可能會先傳送到Combiner進行合並,而Combiner的輸出結果會被Partitioner均勻地分配到每個Reducer上,Reducer的輸出結果又會通過OutputFormat解析成特定的格式存儲到HDFS上。在這個過程中,Combiner和Partitioner並非必須的,Combiner和Partitioner的使用需要根據實際業務需求來定。
(2)下面對Partitioner進行詳細介紹:
Partitioner組件的功能是讓Map對key進行分區,從而將不同的key分發到不同的Reducer中進行處理。分區階段發生在Map階段之后,Reduce階段之前,分區的數量等於Reducer的個數。Reducer的個數可以在驅動類里面通過job.setNumReduceTasks設置。在使用多個Reducer的情況下,需要一些方法來確保Mapper輸出的鍵值對發送到正確的Reducer中。
Hadoop自帶了一個默認的分區實現,即HashPartitioner。HashPartitioner的實現很簡單,它繼承了Partitioner<K2,V2>,並且重寫了getPartition方法,該方法有三個參數,分別為Mapper輸出的值value以及Reducer的個數numReduceTasks,默認numReduceTasks是1。HashPartitioner的getPartition方法實現是根據key的hash值除以2的31次方后取余數,用該余數再次除以Reducer的數量,再取余數,得到的結果就是這個key對應的Partition的編號。
一般情況下,MapReduce程序都會使用默認的HashPartitioner分區,但有時候用戶會有一些特殊的需求,例如,統計某社交網站2016年1月和2月用戶每天登錄的次數,要求1月份的輸出結果放到一個文件里,2月份的輸出結果放在一個文件里。這個時候就需要自定義Partition來實現這個要求。
(3)下面開始實現自定義Partition
自定義Partitioner需要繼承Partitioner<K2,V2>並且重寫getPartition方法。如果最終的結果是要輸出到多個文件中,只需要讓getPartition方法按照一定的規則返回0,1,2,3等即可。如下代碼所示,自定義Partitioner實現將社交網站用戶每天登陸次數的統計結果根據不同的月份分發到不同的輸出文件里。在getPartition的實現方法里,分別使用0、1與numPartitions相除取余。在本例中,可以取numPartitions為2,這樣可以剛好把1、2月份分開。使用Partitioner還需要在驅動類里設置Partitioner類及Reducer個數,代碼如下:
package essential;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.mapreduce.Partitioner;
public class LogCountPartitioner extends Partitioner<MemberLogTime, IntWritable> {
@Override
public int getPartition(MemberLogTime key, IntWritable value, int numberPartitions) {
String date=key.getLogTime();
if(date.contains("2016-01")){
return 0/numberPartitions;
}else {
return 1/numberPartitions;
}
}
}
在main函數實現設置Partitioner類和設置Reducer個數
job.setPartitionerClass(LogCountPartitioner.class);
job.setNumReduceTask(2);//這樣就可以控制Reducer輸出的文件及數據分區了
3.4 自定義計數器
在Hadoop的運行日志中可以獲取到Map和Reduce的任務數、運行Map任務花費的時間、運行Reduce任務花費的時間,以及Map、Combiner和Reduce的輸入輸出記錄等。這些信息都是Hadoop自帶的計數器統計出來的。
(1)概述:計數器是Hadoop框架使用的一種對統計信息收集的手段,主要應用於對數據的控制及收集統計信息。計數器可以幫助程序設計人員手機某一類特定的信息數據。一般情況下,Hadoop將計數器分為五大類,如下表所示:
計數器 |
屬性名 |
MapReduce任務計數器 |
org.apache.hadoop.mapreduce.TaskCounter |
文件系統計數器 |
org.apache.hadoop.mapreduce.FileSystemCounter |
輸入文件計數器 |
org.apache.hadoop.mapreduce.lib.input.FileInputFormatCounter |
輸出文件計數器 |
org.apache.hadoop.mapreduce.lib.input.FileOutputFormatCounter |
作業計數器 |
org.apache.hadoop.mapreduce.JobCounter |
任務計數器主要用於收集任務在運行時的任務信息,任務計數器可以被部署在各個節點上,並且統一傳送到主節點進行匯集,如果一個任務最終失敗,那么所有的計數器記錄都會被重置,即所有的計數清零。只有當任務成功以后,計數器才會被記錄。
(2)自定義計數器:
自定義計數器有兩種類型:
①通過java枚舉(enum)類型來定義,一個作業可以定義的枚舉類型數量不限,各個枚舉類型包含的字段數量也不限。枚舉類型的名稱即為組的名稱,枚舉類型的字段就是計數器的名稱。
②使用動態計數器。
(3)實現自定義計數器:
本節任務使用的是社交網站1月份和2月份用戶的登錄信息,但是1月份和2月份的數據各有多少不知道,如果想要知道每個月份的數據記錄數,可以通過自定義計數器來實現。首先在Mapper類中定義枚舉類型,代碼如下:
enum LogCounter{
January,
February
}
接着在map函數里調用Context類的getCounter方法,說明使用了枚舉類型中的哪個計數器,還需要調用increment()方法進行計數的添加,代碼如下:
if(logTime.contains("2016-01")){
context.getCounter(LogCounter.January).increment(1);
}else if(logTime.contains("2016-02")){
context.getCounter(LogCounter.February).increment(1);
}
注意:計數器不是只能在Mapper中添加,也可以在Reducer中添加,如果要統計每個月的輸出結果記錄數,則需要在Reducer中添加上面代碼。
(4)另一種自定義計數器的方式是使用動態計數器。除了使用getCounter方法獲取枚舉中值的方式外,Context類中還有一個重載方法getCounter(String groupName,String countName),能夠對當前計數器進行動態計數。例如對統計一月份和二月份用戶每天登錄次數的輸出結果進行計數,則可以在reduce函數中添加代碼:
if(key.getLogTime().contains("2016-01")){
context.getCounter("OutputCounter","JanuaryResult").increment(1);
}else if(key.getLogTime().contains("2016-02")){
context.getCounter("OutputCounter","FebruaryResult").increment(1);
}
3.5 任務實現(完成前述第三個任務)
經過前面的討論探索,現在開始實現本節任務:通過MapReduce編程實現用戶在2016年1月份和2月份每天登錄次數的統計。實現該任務的具體步驟及代碼如下:
自定義Mapper類:
package essential;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
//自定義實現Mapper類
//為了提高效率,在驅動類里面配置Combiner類
public class LogCountMapper extends Mapper<Text,Text,MemberLogTime, IntWritable> {
private MemberLogTime mt=new MemberLogTime();
private IntWritable one=new IntWritable(1);
enum LogCounter{
January,
February
}
protected void map(Text key,Text value,Mapper<Text,Text,MemberLogTime,IntWritable>.Context context) throws IOException, InterruptedException {
String member_name=key.toString();
String logTime=value.toString();
if(logTime.contains("2016-01")){
context.getCounter(LogCounter.January).increment(1);
}else if(logTime.contains("2016-02")){
context.getCounter(LogCounter.February).increment(1);
}
mt.setMember_name(member_name);
mt.setLogTime(logTime);
context.write(mt,one);
}
}
自定義Reducer類:
package essential;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
//自定義實現Reducer類
public class LogCountReducer extends Reducer<MemberLogTime, IntWritable,MemberLogTime,IntWritable> {
protected void reduce(MemberLogTime key,Iterable<IntWritable> value,Reducer<MemberLogTime, IntWritable,MemberLogTime,IntWritable>.Context context) throws IOException, InterruptedException {
if(key.getLogTime().contains("2016-01")){//此處就是使用了計數器
context.getCounter("OutPutCounter","JanuaryResult").increment(1);
}else if(key.getLogTime().contains("2016-02")){
context.getCounter("OutPutCounter","FebruaryResult").increment(1);
}
//下面才是reduce函數的主體
int sum=0;
for(IntWritable iw:value){
sum+=iw.get();
}
context.write(key,new IntWritable(sum));
}
}
驅動類:
package essential;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;
import java.util.Properties;
//編輯查找1、2月數據統計的驅動類
public class LogCount {
public static void main(String args[]) throws IOException, ClassNotFoundException, InterruptedException {
// TODO Auto-generated method stub
Configuration conf = new Configuration();
Properties properties = System.getProperties();
properties.setProperty("HADOOP_USER_NAME","root");
//計數器類在前面的自定義Mapper和Reducer已經寫入
Job job = Job.getInstance(conf,"LogCount");
job.setJarByClass(LogCount.class);
job.setMapperClass(LogCountMapper.class);//設置自定義Mapper類
job.setMapOutputKeyClass(MemberLogTime.class);
job.setMapOutputValueClass(IntWritable.class);
job.setCombinerClass(LogCountCombiner.class);//設置自定義Combiner類
job.setReducerClass(LogCountReducer.class);//設置自定義Reducer類
job.setOutputKeyClass(MemberLogTime.class);
job.setOutputValueClass(IntWritable.class);
job.setPartitionerClass(LogCountPartitioner.class);//設置自定義分區類
job.setNumReduceTasks(2);
//job.setOutputFormatClass(GbkOutputFormat.class);
FileInputFormat.addInputPath(job, new Path(args[0]));
FileSystem.get(conf).delete(new Path(args[1]),true);//防止文件目錄重復
FileOutputFormat.setOutputPath(job, new Path(args[1]));
System.exit(job.waitForCompletion(true)?0:1);
}
}
4.Eclipse提交日志文件統計程序
打包並提交任務到Hadoop集群,比在集成開發工具上更具有高效性,大文件開發可使用。
5.小結
本章是MapReduce編程進階,介紹的內容包括MapReduce的輸入及輸出格式、Hadoop Java API、自定義鍵值類型、Combiner組件、自定義計數器以及Eclipse提交MapReduce任務。其中,自定義鍵值類型、Combiner組件和Partitioner組件對程序的優化起到了舉足輕重的作用,在一定程度上可以提高程序運行效率。
6.實訓
實訓目的:掌握MapReduce的Combiner的使用,掌握自定義數據類型,掌握自定義計數器,掌握MapReduce參數傳遞,掌握ToolRunner的使用和提交MapReduce任務
實訓1.統計全球每年的最高氣溫和最低氣溫
1.訓練要點:掌握Combiner的使用、掌握自定義數據類型
2.需求說明:
將壓縮文件上傳到linux本地目錄,在該目錄下解壓所有文件,文件數據格式如下:
其中,YEARMODA對應年月日,TEMP對應溫度,並且每列數據的分隔符空格數是不同的,這個在預處理數據時要注意。
創建一個文件temperaturedata.txt,在數據文件所在目錄執行命令 sed -i '1d' 來刪除所有文件的首行字段,然后執行cat * >>data.txt將所有的數據輸入到data.txt中
(1)統計全球每年的最高氣溫和最低氣溫。
(2)MapReduce輸出結果包含年份、最高氣溫、最低氣溫,並按照最高氣溫降序排序。如果最高氣溫相同,則按照最低氣溫升序排序。
(3)使用自定義數據類型。
(4)結合Combiner和自定義數據類型完成全球每年最高氣溫和最低氣溫的統計。
3.實現思路及步驟:
目的是選出每年的最高氣溫和最低氣溫。(因為是選擇最大最小值,可以在Combiner階段就開始選擇最大最小值,以提高效率)
輸出結果為包含年份+最高氣溫+最低氣溫,並按照最高氣溫降序排序。如果最高氣溫相同,則按照最低氣溫升序排序。
(1)自定義數據類型
相當於給此次任務設置一個實體類,作為基本數據類型。
自定義數據類型(重寫值類型方法)YearMaxTandMinT繼承Writable接口,在這個類里面定義出相關屬性:year、Maxtemp、MinTemp以及set、get方法。
實現構造函數。
(2)自定義Mapper
命名為MaxTandMinTMapper,其主要功能是作映射,將year作為key,temp作為value輸出。
(3)自定義Combiner
命名為MaxTandMinTCombiner,其主要功能是提前處理Map的一些數據,獲取出年度最高和最低的溫度,然后作為值輸出。
(4)自定義Reducer
命名為MaxTandMinTReducer,其主要功能為對年份進行排序,排序依據如上
4.實現代碼:
(1)自定義實體類:
package essential.Temperature;
import org.apache.hadoop.io.Writable;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
//對值類型進行重寫,實現Writable類
public class YearMaxTandMinT implements Writable {
private String year;
private double Maxtemp;
private double MinTemp;
public YearMaxTandMinT(){
}
public YearMaxTandMinT(String year,double maxtemp,double mintemp){
this.year=year;
this.Maxtemp=maxtemp;
this.MinTemp=mintemp;
}
public String getYear() {
return year;
}
public void setYear(String year) {
this.year = year;
}
public double getMaxtemp() {
return Maxtemp;
}
public void setMaxtemp(double maxtemp) {
Maxtemp = maxtemp;
}
public double getMinTemp() {
return MinTemp;
}
public void setMinTemp(double minTemp) {
MinTemp = minTemp;
}
@Override
public void write(DataOutput dataOutput) throws IOException {
dataOutput.writeUTF(year);
dataOutput.writeDouble(Maxtemp);
dataOutput.writeDouble(MinTemp);
}
@Override
public void readFields(DataInput dataInput) throws IOException {
this.Maxtemp=dataInput.readDouble();
this.MinTemp=dataInput.readDouble();
this.year=dataInput.readUTF();
}
public String toString(){
return "Year:"+year+" ;The MaxTemperature is "+Maxtemp+" and the MinTemperature is "+MinTemp;
}
public int Compare(YearMaxTandMinT otherymm){//設置比較兩對象之間的大小關系,前者與后者比較,前者較大返回1,后者較大返回-1;用於比較年份時使用
if(Maxtemp>otherymm.getMaxtemp()){
return 1;
}else if(Maxtemp==otherymm.getMaxtemp()){
if(MinTemp<otherymm.getMinTemp()){
return 1;
}else if(MinTemp==otherymm.getMinTemp()){
return 0;
} else{
return -1;
}
}else{
return -1;
}
}
}
(2)自定義Mapper:
package essential.Temperature;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
//對Mapper進行自定義
public class MaxTandMinTMapper extends Mapper<LongWritable,Text,Text, DoubleWritable> {
//map處理邏輯:主要功能是作映射,將year作為key,temp作為value輸出。
public void map(LongWritable key,Text value,Mapper<LongWritable,Text,Text, DoubleWritable>.Context context) throws IOException, InterruptedException {
String[] datas=value.toString().split("\\s+");//多空格切割字符串
String year=datas[2].substring(0,4);
double tempdata=Double.parseDouble(datas[3]);
context.write(new Text(year),new DoubleWritable(tempdata));
}
}
(3)自定義Combiner:
package essential.Temperature;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
//此處的Combiner旨在完成排序篩選工作,選出氣溫的最大值和最小值,
//此處我我想錯了,我想的是一步到位,其實應該是和Mapper的輸出應該一致才對,輸出的還是應該為年份+溫度,只不過是一次性寫兩次,Combiner的鍵值對輸入類型和輸出類型應該是一致的
public class MaxTandMinTCombiner extends Reducer<Text, DoubleWritable,Text,DoubleWritable> {
public void reduce(Text key,Iterable<DoubleWritable> tempdatas,Reducer<Text, DoubleWritable,Text,DoubleWritable>.Context context) throws IOException, InterruptedException {
YearMaxTandMinT ymm=new YearMaxTandMinT();
double maxtemp=0;//初始化溫度值
double mintemp=999;
for(DoubleWritable tempdata:tempdatas){//遍歷shuffle階段組合的數組,對key值對應的最大最小值進行更新操作
if(tempdata.get()>maxtemp){
maxtemp=tempdata.get();
}else if(tempdata.get()<mintemp){
mintemp=tempdata.get();
}
}
//ymm.setMaxtemp(maxtemp);
//ymm.setMinTemp(mintemp);
context.write(key,new DoubleWritable(maxtemp));
context.write(key,new DoubleWritable(mintemp));
}
}
(4)自定義Reducer:
package essential.Temperature;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
import java.util.LinkedList;
//Reducer是對有Combiner部分分類好的數據進行最后的整合處理,最終要將每一年的年份和最高最低溫度進行整理
//並對所有年份進行比較,比較的標准為,先以最高溫度為標准進行比較,如果最高溫度相同,則以最低溫度較高者排在前面
public class MaxTandMinTReducer extends Reducer<Text, DoubleWritable, NullWritable,Text> {
private LinkedList<YearMaxTandMinT> ymmlists=new LinkedList<YearMaxTandMinT>();//用於存儲Reduce整合出來的ym
public void reduce(Text key,Iterable<DoubleWritable> temps,Reducer<Text,DoubleWritable, NullWritable,Text>.Context context) throws IOException, InterruptedException {
//先對相同的key的最高溫和最低溫進行整合
System.out.println("壓根沒有執行我嗎");
YearMaxTandMinT ym=new YearMaxTandMinT();
ym.setYear(key.toString());
double yearmintemp=999;
double yearmaxtemp=0;
for(DoubleWritable temp:temps){
if(temp.get()<yearmintemp){
yearmintemp= temp.get();
}
if(temp.get()>yearmaxtemp){
yearmaxtemp=temp.get();
}
}
ym.setMinTemp(yearmintemp);
ym.setMaxtemp(yearmaxtemp);
//到這里就實現了年份+最高溫+最低溫的匹配
//然后進行年份的排序,在這里需要引入鏈表(使用單向鏈表也可以),因為要存儲的數據很少,所以這種方式是可以的。
//鏈表內數據先以最高溫度為標准進行比較,如果最高溫度相同,則以最低溫度較高者排在前面
if(ymmlists.size()==0){//如果得到的是第一條數據直接加入表格
ymmlists.add(ym);
}else {
if(ym.Compare(ymmlists.getFirst())>0){//表示該元素為現有列表中最大
ymmlists.add(0,ym);
}if(ym.Compare(ymmlists.getLast())<0){//表示該元素在現有列表中最小
ymmlists.add(ym);
}else{//表示在最大和最小之間
for(int index=0;index<ymmlists.size();index++){//實現在指定位置插入元素
//ymmlists.add(1,aaa);
int otherindex=index+1;
if(ym.Compare(ymmlists.get(index))<0 && ym.Compare(ymmlists.get(otherindex))>0){
ymmlists.add(otherindex,ym);
break;
}
}
}
}
String printstring="";
for(YearMaxTandMinT yt:ymmlists){
printstring=printstring+yt.toString()+"\n";
System.out.println(printstring);
}
context.write(NullWritable.get(),new Text(printstring));
}
}
(5)驅動類編寫:
package essential.Temperature;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;
import java.util.Properties;
//實現MapReduce任務,編寫驅動類
public class MaxTandMinT {
public static void main(String [] args) throws IOException, ClassNotFoundException, InterruptedException {
// TODO Auto-generated method stub
Configuration conf = new Configuration();
Properties properties = System.getProperties();
properties.setProperty("HADOOP_USER_NAME","root");
//計數器類在前面的自定義Mapper和Reducer已經寫入
Job job = Job.getInstance(conf,"MaxTandMinT");
job.setJarByClass(MaxTandMinT.class);
job.setMapperClass(MaxTandMinTMapper.class);//設置自定義Mapper類
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(DoubleWritable.class);
job.setCombinerClass(MaxTandMinTCombiner.class);//設置自定義Combiner類
job.setReducerClass(MaxTandMinTReducer.class);//設置自定義Reducer類
job.setOutputKeyClass(NullWritable.class);
job.setOutputValueClass(Text.class);
//job.setPartitionerClass(LogCountPartitioner.class);//設置自定義分區類
job.setNumReduceTasks(1);
//job.setOutputFormatClass(GbkOutputFormat.class);
FileInputFormat.addInputPath(job, new Path(args[0]));
FileSystem.get(conf).delete(new Path(args[1]),true);//防止文件目錄重復
FileOutputFormat.setOutputPath(job, new Path(args[1]));
System.exit(job.waitForCompletion(true)?0:1);
}
}
5.實現結果:
實訓2.篩選15~25℃之間的數據
訓練要點:掌握MapReduce參數的傳遞、掌握自定義計數器、掌握ToolRunner的使用和提交MapReduce任務。
7.小練習
(1)下面屬於Hadoop內置數據類型的是:D
A.IntegerWritable B.StringWritable C.ListWritable D.MapWritable
(2)關於自定義數據類型,下列說法正確的是:D
A.自定義數據類型必須繼承Writable接口
B.自定義鍵類型需要繼承Writable接口
C.自定義值類型需要繼承WritableComparable接口
D.自定義數據類型必須實現readFileds(DataInput datainput)方法
(3)設置MapReduce參數傳遞的正確方式是:基於MapReduce的API
conf.set("argName",args[n])傳遞
(4)在Mapper類的setup函數中,下列( )方式可以用來獲取參數值。
context.getConfiguration.get("argName")
既然Hadoop的配置類Configuration里面有根據屬性名稱獲取參數值的方法,即返回類型為String類型的get(String name)方法。在編寫MapReduce程序的時候,可以在setup方法中通過上下文對象Context中的getConfiguaration()方法來獲取配置對象Configuaration,再調用Configuration里面的get(String name)方法獲取這些參數值。
(5)MapReduce的輸入默認格式為TextInputFormat,輸出默認格式為TextOutputFormat
全部基本知識到這里基本就結束了,最后一章是一個網站項目,主要的學習目標是學習KNN算法,並使用MapReduce實現KNN算法。