HDFS
HDFS提供一套Java API來操作HDFS,包括文件的建立、修改、刪除、權限管理等,下面對幾個常用的API進行介紹,詳細的API接口請參見API文檔,可以在${HADOOP_HOME}/share/doc/api/index.html中找到。
IOUtils類
Hadoop IO的基礎類,提供一組靜態方法來控制HadoopIO。通過IOUtils類,可以使用java.net.URL類來訪問HDFS,同時也可以在標准輸入流和輸出流之間復制數據。需要注意的是,為了是java.net.URL能夠識別HDFS的URL方案(hdfs://namenode:port/)需要在使用前設置URL的流處理工廠類為org.apache.hadoop.fs. FsUrlStreamHandlerFactory,詳見2.3代碼示例。
FileSystem類
FileSystem類是一個抽象文件系統API,在使用該API前需要先確定目標文件系統對應的實現。通過FileSystem的HDFS實現類可以實現對HDFS的一般文件操作,包括讀、寫、刪、追加等。下面對FileSystem類中比較重要的幾個方法進行介紹。
get方法
FileSystem類是一個抽象類,實際使用時需要通過靜態get方法來獲得目標文件系統對應的FileSystem實現。FileSystem類提供了多種get方法,如下所示:
static FileSystem get(Configuration conf)
static FileSystem get(URI uri, Configuration conf)
static FileSystem get(URI uri, Configuration conf, String user)
第一個方法獲取配置文件中設置的文件系統(通過fs.defaultFS設置)的FileSystem實例,默認為本地文件系統;第二個方法根據URI方案獲取相應的FileSystem實例;第三個方法則根據URI方案和配置文件獲取相應的FileSystem實例,該實例具有給定用戶的權限。
open方法
FileSystem實例通過open方法獲取HDFS文件的輸入流,該方法返回FSDataInputStream對象,該對象支持隨機訪問。FileSystem類提供了兩種open方法,如下所示:
FSDataInputStream open(Path f)
abstract FSDataInputStream open(Path f, int bufferSize)
第一個方法使用默認的緩沖區大小(4K),這個值有些偏少,可以通過第二個方法指定合理的緩沖區大小。
可以通過FSDataInputStream來讀取文件,FSDataInputStream繼承自java.io.DataInputStream,使用FSDataInputStream讀取HDFS文件和Java讀取本地文件流的方式沒有區別。FSDataInputStream還提供seek方法實現文件的隨機讀寫,seek方法是一個開銷較大的操作,在批處理應用中應盡可能避免使用。
create方法
FileSystem使用create方法創建文件,該方法返回FSDataOutputStream對象,通過該對象可實現對HDFS文件的寫入操作。FileSystem提供多種create方法,如下所示:
static FSDataOutputStream create(FileSystem fs, Path file, FsPermission permission)
FSDataOutputStream create(Path f)
FSDataOutputStream create(Path f, boolean overwrite)
FSDataOutputStream create(Path f, boolean overwrite, int bufferSize)
FSDataOutputStream create(Path f, boolean overwrite, int bufferSize, Progressable progress)
FSDataOutputStream create(Path f, boolean overwrite, int bufferSize,
short replication, long blockSize)
FSDataOutputStream create(Path f, boolean overwrite, int bufferSize, short replication,
long blockSize, Progressable progress)
abstract FSDataOutputStream create(Path f, FsPermission permission,
boolean overwrite, int bufferSize,
short replication, long blockSize,
Progressable progress)
FSDataOutputStream create(Path f, Progressable progress)
FSDataOutputStream create(Path f, short replication)
FSDataOutputStream create(Path f, short replication, Progressable progress)
如果指定的路徑不存在,create方法會自動為指定文件創建父目錄。
FSDataOutputStream對象繼承自java.io.DataOutputStream,可以像寫本地文件流一樣向HDFS文件寫入數據。使用方法參見2.3.2 FileSystem類示例。
mkdirs方法
FileSystem的mkdirs方法可以一次性建立全部不存在的目錄。FileSystem提供多種mkdirs方法,如下所示:
static boolean mkdirs(FileSystem fs, Path dir, FsPermission permission)
abstract boolean mkdirs(Path f, FsPermission permission)
boolean mkdirs(Path f)
exists方法
FileSystem使用exists方法判斷文件或目錄是否存在,該方法聲明如下:
boolean exists(Path f)
listStatus方法
FileSystem的listStatus方法返回指定目錄或文件的元數據列表,文件或目錄的元數據由FileStatus類封裝,包含了文件長度、塊大小、備份數、修改時間、所有者及其權限等信息。FileSystem提供多種listStatus方法,如下所示:
abstract FileStatus[] listStatus(Path f)
FileStatus[] listStatus(Path[] files)
FileStatus[] listStatus(Path[] files, PathFilter filter)
FileStatus[] listStatus(Path f, PathFilter filter)
IOUtils類示例
下面示例給出了IOUtils類的一個示例,該例通過IOUtils的copyBytes方法將讀取到的HDFS文件通過標准輸出打印。
public class IOUtilsDemo |
FileSystem類示例
下面示例給出了FileSystem的例子,該例子顯示了如何使用FileSystem類來與HDFS交互。
public class HDFSFileOperator { |
MapReduce
MapReduce是一個分布式計算框架,計算模型如圖4-1所示。在該計算模型中,一個作業由兩類任務組成:Map和Reduce。Map任務負責將原始數據轉換成Key/Value形式的鍵值對集合,Reduce任務將Map轉換的數據進行處理,抽取需要的數據輸出。
- 雖然MRv2使用全新的YARN框架,可支持多種分布式計算框架,但是MRv2在對MapReduce框架的支持上兼容MRv1的接口,因此,MRv1的應用程序代碼無需修改就可以在MRv2上運行,只需使用MRv2包進行重新編譯。
- 關於MapReduce的詳細介紹,請參閱:http://hadoop.apache.org/docs/stable/mapred_tutorial.html
如4.2節所述,一個完整的MapReduce作業首先需要將原始數據切片,然后將切片交給map任務處理,之后將map任務處理的結果傳給Reduce處理,最后,將Reduce處理結果輸出。這個過程中一共分四部分:切片、map、reduce和輸出,與之對應的類分別為InputFormat、OutputFormat、Mapper、Reducer。一個MapReduce作業由Job類進行抽象,客戶端配置Job類后調用該類的submit方法將作業提交給MapReduce集群執行。
4.3.1 InputFormat類
InputFormat描述了一個MapReduce任務的輸入切片信息,MapReduce作業通過該類實現如下功能:
- 檢查作業的輸入設置是否有效
- 將作業切片,每個切片用InputSplit類封裝,提交給Mapper類處理;
- 通過實現RecordReader類來讀取切片中的Key/Value對,該Key/Value對是Mapper類處理的最小單位
MapReduce框架預置了很多有用的InputFormat實現,如表4-1所示:
表4-1 InputFormat類族(主要)
類 |
說明 |
AutoInputFormat |
一個可以自動檢測文件類型的FileInputFormat類,該類目前支持文本文件和HDFS序列文件。 |
CombineFileInputFormat |
一個用於處理大量小文件的FileInputFormat類,CombineFileInputFormat將多個文件打包到一個輸入切片中,每個Mapper類可能得到來自多個文件的數據。 |
CompositeInputFormat |
一個可以實現多數據源Join的InputFormat. |
DBInputFormat |
一個面向關系型數據庫的InputFormat實現,該類使用JDBC從關系型數據庫中讀取數據。需要注意的是,MapReduce的高並發性可能給源數據庫造成巨大壓力,甚至導致數據庫崩潰,因此,最好在少量數據集中使用。 |
FileInputFormat |
FileInputFormat是所有文件類型輸入數據的InputFormat基類。 |
KeyValueTextInputFormat |
讀取文本文件的InputFormat,如果行被分隔符分隔為兩部分,則第一部分作為Key,第二部分作為Value,如果沒有分隔符,則整行作為Key,Value為空。 |
NLineInputFormat |
該InputFormat將將文件數據按指定的行數進行切片。 |
SequenceFileAsBinaryInputFormat |
讀取HDFS序列文件的FileInputFormat,該類將序列文件中的鍵值對都按BytesWritable類型進行解析。 |
SequenceFileAsTextInputFormat |
讀取HDFS序列文件的FileInputFormat,該類將序列文件中的鍵值對都按Text類型進行解析。 |
SequenceFileInputFormat |
讀取HDFS序列文件的FileInputFormat |
TeraInputFormat |
該類將每行的前10個字符作為Key,其余作為Value。該類屬於FileInputFormat,按文件進行切片。 |
TextInputFormat |
該類是MapReduce默認的InputFormat,該類將每個文件作為一個切片輸入給Mapper,每個切片按行划分,Key為該行第一個字符在文件中的偏移量。 |
4.3.2 OutputFormat類
OutputFormat類描述了一個MapReduce作業如何輸出結果數據。MapReduce通過該類實現下述功能:
- 驗證輸出設置的有效性,如設置的輸出目錄是否存在;
- 實現RecordWriter類,通過RecordWriter來指定數據輸出格式。
MapReduce框架同樣預置了許多有用的OutputFormat實現,如表4-2所示:
類 |
說明 |
DBOutputFormat |
將輸出數據寫入關系型數據的OutputFormat |
FileOutputFormat |
將輸出數據寫入文件的OutputFormat基類 |
MapFileOutputFormat |
將輸出結果寫入MapFile格式的文件中的FileOutputFormat實現 |
MultipleOutputFormat |
一個將輸出結果寫入多個文件的FileOutputFormat實現 |
MultipleSequenceFileOutputFormat |
一個將輸出結果寫入多個文件HDFS序列文件的FileOutputFormat實現 |
MultipleTextOutputFormat |
一個將輸出結果寫入多個文件文本文件的FileOutputFormat實現 |
NullOutputFormat |
一個特殊的OutputFormat,不輸出任何東西 |
SequenceFileAsBinaryOutputFormat |
將輸出結果寫入HDFS序列文件格式的文件中的FileOutputFormat實現,key和value都設置為BytesWritable格式 |
SequenceFileOutputFormat |
將輸出結果寫入HDFS序列文件格式的文件中的FileOutputFormat實現 |
TeraOutputFormat |
將輸出數據輸出到文本文件,每行以"\r\n"結尾 |
TextOutputFormat |
該類是MapReduce默認的OutputFormat,按文本類型解析所有的Key和Value。 |
4.3.3 Mapper類
Mapper類將輸入的一組Key/Value鍵值對數據映射為中間Key/Value鍵值對,輸出的鍵值對與輸入的鍵值對數據類型可以不一樣,輸出鍵值對應與Reducer輸入一致。Mapper的輸出會被存儲在本地磁盤中,按Reducer進行分組,哪些數據被分給那個Reducer由Partitioner類控制。Mapper類中的map方法實現如下:
/** |
KEYIN、VALUEIN為輸入切片的Key/Value類型,Context為MapReduce作業上下文,可以通過該類獲得作業的配置信息,設置計數器、輸出中間結果等操作。一般情況下,開發者只需要繼承Mapper類,在該類的map方法中實現自己的業務邏輯即可。如果需要定制map任務流程,如初始化一些環境等,可以重載Mapper類的setup、cleanup、run方法來創建一個完全自定義的map任務。
4.3.4 Reducer類
Reducer類將map輸出的屬於它的中間數據拉取到Reducer節點進行處理,處理結果根據作業配置的OutputFormat進行輸出。Reducer的個數由可由Job類的setNumReduceTasks方法設定。Reducer類主要完成三個過程:Shuffle、Sort和Reduce。Shuffle過程中,Reducer將存儲在map端屬於該Reducer的中間數據通過http協議拉取到本地節點。Sort過程按Key對拉取的數據進行歸並排序,相同key的數據按value排序。Reduce階段會調用reduce方法處理拉取的數據,並通過Context將數據輸出到指定目標中。通常開發者只需重載Reducer類的reduce方法來實現自己的業務邏輯。Reducer類的reduce方法實現如下:
/** |
類似的,通過重載Reducer的setup、cleanup和run方法,可以創建一個完全自定義的reduce任務。
4.3.5 Job類
Job類是從客戶端角度對MapReduce作業進行抽象的類,該類包括了作業定義接口和控制接口,通過該類可以定義、提交和監控MapReduce作業。Job類提供的主要方法如下所示:
void killJob()
void setCombinerClass(Class<? extends Reducer> cls)
void setInputFormatClass(Class<? extends InputFormat> cls)
void setMapOutputKeyClass(Class<?> theClass)
void setMapOutputValueClass(Class<?> theClass)
void setMapperClass(Class<? extends Mapper> cls)
void setNumReduceTasks(int tasks)
void setOutputFormatClass(Class<? extends OutputFormat> cls)
void setOutputKeyClass(Class<?> theClass)
void setOutputValueClass(Class<?> theClass)
void setPartitionerClass(Class<? extends Partitioner> cls)
void setReducerClass(Class<? extends Reducer> cls)
void submit()
boolean waitForCompletion(boolean verbose)
通常,一個完整的MapReduce作業需要設置如下幾個參數:
- InputFormat類及其參數
- OutputFormat類及其參數
- Mapper類
- Reducer類
復雜一些的MapReduce作業可能需要設定Partioner類和Combiner類。設定Combiner類可以是MapReduce框架在Mapper節點處理存儲在本地的中間數據,從而減少Reducer的Shuffle過程需要拉取的數據量,一般Combiner與Reducer設置為同一個類。