Hadoop開發


HDFS

HDFS提供一套Java API來操作HDFS,包括文件的建立、修改、刪除、權限管理等,下面對幾個常用的API進行介紹,詳細的API接口請參見API文檔,可以在${HADOOP_HOME}/share/doc/api/index.html中找到。

IOUtils類

Hadoop IO的基礎類,提供一組靜態方法來控制HadoopIO。通過IOUtils類,可以使用java.net.URL類來訪問HDFS,同時也可以在標准輸入流和輸出流之間復制數據。需要注意的是,為了是java.net.URL能夠識別HDFS的URL方案(hdfs://namenode:port/)需要在使用前設置URL的流處理工廠類為org.apache.hadoop.fs. FsUrlStreamHandlerFactory,詳見2.3代碼示例。

FileSystem類

FileSystem類是一個抽象文件系統API,在使用該API前需要先確定目標文件系統對應的實現。通過FileSystem的HDFS實現類可以實現對HDFS的一般文件操作,包括讀、寫、刪、追加等。下面對FileSystem類中比較重要的幾個方法進行介紹。
get方法
FileSystem類是一個抽象類,實際使用時需要通過靜態get方法來獲得目標文件系統對應的FileSystem實現。FileSystem類提供了多種get方法,如下所示:
static FileSystem get(Configuration conf) 
static FileSystem get(URI uri, Configuration conf) 
static FileSystem get(URI uri, Configuration conf, String user)
第一個方法獲取配置文件中設置的文件系統(通過fs.defaultFS設置)的FileSystem實例,默認為本地文件系統;第二個方法根據URI方案獲取相應的FileSystem實例;第三個方法則根據URI方案和配置文件獲取相應的FileSystem實例,該實例具有給定用戶的權限。
open方法
FileSystem實例通過open方法獲取HDFS文件的輸入流,該方法返回FSDataInputStream對象,該對象支持隨機訪問。FileSystem類提供了兩種open方法,如下所示:
FSDataInputStream open(Path f) 
abstract FSDataInputStream open(Path f, int bufferSize)
第一個方法使用默認的緩沖區大小(4K),這個值有些偏少,可以通過第二個方法指定合理的緩沖區大小。
可以通過FSDataInputStream來讀取文件,FSDataInputStream繼承自java.io.DataInputStream,使用FSDataInputStream讀取HDFS文件和Java讀取本地文件流的方式沒有區別。FSDataInputStream還提供seek方法實現文件的隨機讀寫,seek方法是一個開銷較大的操作,在批處理應用中應盡可能避免使用。
create方法
FileSystem使用create方法創建文件,該方法返回FSDataOutputStream對象,通過該對象可實現對HDFS文件的寫入操作。FileSystem提供多種create方法,如下所示:
static FSDataOutputStream create(FileSystem fs, Path file, FsPermission permission)
FSDataOutputStream create(Path f) 
FSDataOutputStream create(Path f, boolean overwrite) 
FSDataOutputStream create(Path f, boolean overwrite, int bufferSize) 
FSDataOutputStream create(Path f, boolean overwrite, int bufferSize, Progressable progress)
FSDataOutputStream create(Path f, boolean overwrite, int bufferSize, 
short replication, long blockSize) 
FSDataOutputStream create(Path f, boolean overwrite, int bufferSize, short replication,
long blockSize, Progressable progress) 
abstract FSDataOutputStream create(Path f, FsPermission permission, 
boolean overwrite, int bufferSize, 
short replication, long blockSize,
Progressable progress) 
FSDataOutputStream create(Path f, Progressable progress) 
FSDataOutputStream create(Path f, short replication) 
FSDataOutputStream create(Path f, short replication, Progressable progress)
如果指定的路徑不存在,create方法會自動為指定文件創建父目錄。
FSDataOutputStream對象繼承自java.io.DataOutputStream,可以像寫本地文件流一樣向HDFS文件寫入數據。使用方法參見2.3.2 FileSystem類示例。 
mkdirs方法
FileSystem的mkdirs方法可以一次性建立全部不存在的目錄。FileSystem提供多種mkdirs方法,如下所示:
static boolean mkdirs(FileSystem fs, Path dir, FsPermission permission) 
abstract boolean mkdirs(Path f, FsPermission permission) 
boolean mkdirs(Path f)
exists方法
FileSystem使用exists方法判斷文件或目錄是否存在,該方法聲明如下:
boolean exists(Path f)
listStatus方法
FileSystem的listStatus方法返回指定目錄或文件的元數據列表,文件或目錄的元數據由FileStatus類封裝,包含了文件長度、塊大小、備份數、修改時間、所有者及其權限等信息。FileSystem提供多種listStatus方法,如下所示:
abstract FileStatus[] listStatus(Path f) 
FileStatus[] listStatus(Path[] files) 
FileStatus[] listStatus(Path[] files, PathFilter filter) 
FileStatus[] listStatus(Path f, PathFilter filter)

IOUtils類示例

下面示例給出了IOUtils類的一個示例,該例通過IOUtils的copyBytes方法將讀取到的HDFS文件通過標准輸出打印。

public class IOUtilsDemo 

static 

// JVM只能調用一次setURLStreamHandlerFactory方法,所以在只能在靜態方法中使用了, 
// 若工程中其他類庫之前以調用了該方法,那將無法再使用該方法從 hadoop中得到數據 
URL.setURLStreamHandlerFactory(new FsUrlStreamHandlerFactory()); 


public static void main(String[] args) throws Exception 

InputStream in = null
try
in = new URL(arg[0]).openStream(); 
IOUtils.copyBytes(in, System.out, 4091, false); 
}finally
IOUtils.closeStream(in); 


}

FileSystem類示例

下面示例給出了FileSystem的例子,該例子顯示了如何使用FileSystem類來與HDFS交互。

 

public class HDFSFileOperator { 

long count = 0; 

public byte[] getBytesFromURI(Configuration conf, String uri) throws IOException { 

byte[] data = null
InputStream in = null
ByteArrayOutputStream out = new ByteArrayOutputStream(); 

FileSystem fs = FileSystem.get(URI.create(uri), conf); 

try

in = fs.open(new Path(uri)); 
IOUtils.copyBytes(in, out, 4096, false); 
data = out.toByteArray(); 
}finally { 

out.close(); 
IOUtils.closeStream(in); 


return data; 


public void printFileOnHDFS(Configuration conf, String uri) throws IOException { 

FileSystem fs = FileSystem.get(URI.create(uri), conf); 

if(fs.exists(new Path(uri)) == falsereturn

InputStream in = null

try

in = fs.open(new Path(uri)); 
IOUtils.copyBytes(in, System.out, 4096, false); 

}finally { 

IOUtils.closeStream(in); 




public String seekFileOnHDFS(Configuration conf, String uri, long startPos) throws IOException { 

FSDataInputStream in = null
ByteArrayOutputStream out = new ByteArrayOutputStream(); 

FileSystem fs = FileSystem.get(URI.create(uri), conf); 
fs.setVerifyChecksum(false); 

if(fs.exists(new Path(uri)) == falsereturn ""; 

try
in = fs.open(new Path(uri)); 
in.seek(startPos); 

IOUtils.copyBytes(in, out, 4096, false); 

return out.toString(); 

}finally { 

IOUtils.closeStream(in); 




public void writeToHDFS(Configuration conf, String uri, String text) throws IOException { 

FileSystem fs = FileSystem.get(conf); 
OutputStream out; 
if(fs.exists(new Path(uri))){ 

out = fs.append(new Path(uri)); 
}else { 

out = fs.create(new Path(uri), false, 4096); 

out.write(text.getBytes()); 
out.close(); 


public void copyToHDFS(Configuration conf, String localPath, String HDFSPath, boolean showProgress) throws IOException{ 

FileSystem fs = FileSystem.get(URI.create(HDFSPath), conf); 
InputStream in = new BufferedInputStream(new FileInputStream(localPath)); 
OutputStream out = null

if(showProgress) { 
out = fs.create(new Path(HDFSPath) 
true 
, 4096 
new Progressable() { 
public void progress() { 
System.out.print("."); 

} ); 
}else 

out = fs.create(new Path(HDFSPath) 
true 
, 4096); 


IOUtils.copyBytes(in, out, 4096, true); 


public void deleteFileOnHDFS(Configuration conf, String uri) throws IOException { 

FileSystem fs = FileSystem.get(URI.create(uri), conf); 
if(fs.exists(new Path(uri)) == falsereturn
fs.delete(new Path(uri), false); 

}

  
 

MapReduce

MapReduce是一個分布式計算框架,計算模型如圖4-1所示。在該計算模型中,一個作業由兩類任務組成:Map和Reduce。Map任務負責將原始數據轉換成Key/Value形式的鍵值對集合,Reduce任務將Map轉換的數據進行處理,抽取需要的數據輸出。

圖4-1 MapReduce計算模型
在設計MapReduce任務時,首先,要對業務需求進行抽象,將業務分解成一個或多個MapReduce作業。如果業務流程可以分解成一個線性的作業鏈的應用場合,可以使用MapReduce客戶端來控制作業集的有序執行。對於復雜的業務邏輯,可能涉及到作業有向無環圖(DAG)的轉換,這種轉換通常是十分費力的。Hive、Pig和Cascading可以使用更高級的語言來實現這種轉換,開發復雜的MapReduce應用,可以考慮是使用這些工具進行。Apache Oozie是一個以服務器形式運行的工作流服務,可以控制多個MapReduce作業按設計流程運行。在Oozie中,工作流是一個Action節點和控制流節點組成的DAG。CDH v1.0不包含Hive、Pig、Cascading和Oozie組件,但是兼容Hive0.9.0、Pig0.10.0、Oozie3.2.0及以上版本。
注意:

    • 雖然MRv2使用全新的YARN框架,可支持多種分布式計算框架,但是MRv2在對MapReduce框架的支持上兼容MRv1的接口,因此,MRv1的應用程序代碼無需修改就可以在MRv2上運行,只需使用MRv2包進行重新編譯。
    • 關於MapReduce的詳細介紹,請參閱:http://hadoop.apache.org/docs/stable/mapred_tutorial.html

如4.2節所述,一個完整的MapReduce作業首先需要將原始數據切片,然后將切片交給map任務處理,之后將map任務處理的結果傳給Reduce處理,最后,將Reduce處理結果輸出。這個過程中一共分四部分:切片、map、reduce和輸出,與之對應的類分別為InputFormat、OutputFormat、Mapper、Reducer。一個MapReduce作業由Job類進行抽象,客戶端配置Job類后調用該類的submit方法將作業提交給MapReduce集群執行。

4.3.1 InputFormat類

InputFormat描述了一個MapReduce任務的輸入切片信息,MapReduce作業通過該類實現如下功能:

  1. 檢查作業的輸入設置是否有效
  2. 將作業切片,每個切片用InputSplit類封裝,提交給Mapper類處理;
  3. 通過實現RecordReader類來讀取切片中的Key/Value對,該Key/Value對是Mapper類處理的最小單位

MapReduce框架預置了很多有用的InputFormat實現,如表4-1所示:
表4-1 InputFormat類族(主要)

說明

AutoInputFormat

一個可以自動檢測文件類型的FileInputFormat類,該類目前支持文本文件和HDFS序列文件。

CombineFileInputFormat

一個用於處理大量小文件的FileInputFormat類,CombineFileInputFormat將多個文件打包到一個輸入切片中,每個Mapper類可能得到來自多個文件的數據。

CompositeInputFormat

一個可以實現多數據源Join的InputFormat.

DBInputFormat

一個面向關系型數據庫的InputFormat實現,該類使用JDBC從關系型數據庫中讀取數據。需要注意的是,MapReduce的高並發性可能給源數據庫造成巨大壓力,甚至導致數據庫崩潰,因此,最好在少量數據集中使用。

FileInputFormat

FileInputFormat是所有文件類型輸入數據的InputFormat基類。

KeyValueTextInputFormat

讀取文本文件的InputFormat,如果行被分隔符分隔為兩部分,則第一部分作為Key,第二部分作為Value,如果沒有分隔符,則整行作為Key,Value為空。

NLineInputFormat

該InputFormat將將文件數據按指定的行數進行切片。

SequenceFileAsBinaryInputFormat

讀取HDFS序列文件的FileInputFormat,該類將序列文件中的鍵值對都按BytesWritable類型進行解析。

SequenceFileAsTextInputFormat

讀取HDFS序列文件的FileInputFormat,該類將序列文件中的鍵值對都按Text類型進行解析。

SequenceFileInputFormat

讀取HDFS序列文件的FileInputFormat

TeraInputFormat

該類將每行的前10個字符作為Key,其余作為Value。該類屬於FileInputFormat,按文件進行切片。

TextInputFormat

該類是MapReduce默認的InputFormat,該類將每個文件作為一個切片輸入給Mapper,每個切片按行划分,Key為該行第一個字符在文件中的偏移量。

4.3.2 OutputFormat類

OutputFormat類描述了一個MapReduce作業如何輸出結果數據。MapReduce通過該類實現下述功能:

  1. 驗證輸出設置的有效性,如設置的輸出目錄是否存在;
  2. 實現RecordWriter類,通過RecordWriter來指定數據輸出格式。

MapReduce框架同樣預置了許多有用的OutputFormat實現,如表4-2所示:

說明

DBOutputFormat

將輸出數據寫入關系型數據的OutputFormat

FileOutputFormat

將輸出數據寫入文件的OutputFormat基類

MapFileOutputFormat

將輸出結果寫入MapFile格式的文件中的FileOutputFormat實現

MultipleOutputFormat

一個將輸出結果寫入多個文件的FileOutputFormat實現

MultipleSequenceFileOutputFormat

一個將輸出結果寫入多個文件HDFS序列文件的FileOutputFormat實現

MultipleTextOutputFormat

一個將輸出結果寫入多個文件文本文件的FileOutputFormat實現

NullOutputFormat

一個特殊的OutputFormat,不輸出任何東西

SequenceFileAsBinaryOutputFormat

將輸出結果寫入HDFS序列文件格式的文件中的FileOutputFormat實現,key和value都設置為BytesWritable格式

SequenceFileOutputFormat

將輸出結果寫入HDFS序列文件格式的文件中的FileOutputFormat實現

TeraOutputFormat

將輸出數據輸出到文本文件,每行以"\r\n"結尾

TextOutputFormat

該類是MapReduce默認的OutputFormat,按文本類型解析所有的Key和Value。

4.3.3 Mapper類

Mapper類將輸入的一組Key/Value鍵值對數據映射為中間Key/Value鍵值對,輸出的鍵值對與輸入的鍵值對數據類型可以不一樣,輸出鍵值對應與Reducer輸入一致。Mapper的輸出會被存儲在本地磁盤中,按Reducer進行分組,哪些數據被分給那個Reducer由Partitioner類控制。Mapper類中的map方法實現如下:

/** 
* Called once for each key/value pair in the input split. Most applications 
* should override this, but the default is the identity function. 
*/ 
@SuppressWarnings("unchecked") 
protected void map(KEYIN key, VALUEIN value, 
Context context) throws IOException, InterruptedException { 
context.write((KEYOUT) key, (VALUEOUT) value); 
}

KEYIN、VALUEIN為輸入切片的Key/Value類型,Context為MapReduce作業上下文,可以通過該類獲得作業的配置信息,設置計數器、輸出中間結果等操作。一般情況下,開發者只需要繼承Mapper類,在該類的map方法中實現自己的業務邏輯即可。如果需要定制map任務流程,如初始化一些環境等,可以重載Mapper類的setup、cleanup、run方法來創建一個完全自定義的map任務。

4.3.4 Reducer類

Reducer類將map輸出的屬於它的中間數據拉取到Reducer節點進行處理,處理結果根據作業配置的OutputFormat進行輸出。Reducer的個數由可由Job類的setNumReduceTasks方法設定。Reducer類主要完成三個過程:Shuffle、Sort和Reduce。Shuffle過程中,Reducer將存儲在map端屬於該Reducer的中間數據通過http協議拉取到本地節點。Sort過程按Key對拉取的數據進行歸並排序,相同key的數據按value排序。Reduce階段會調用reduce方法處理拉取的數據,並通過Context將數據輸出到指定目標中。通常開發者只需重載Reducer類的reduce方法來實現自己的業務邏輯。Reducer類的reduce方法實現如下:

/** 
* This method is called once for each key. Most applications will define 
* their reduce class by overriding this method. The default implementation 
* is an identity function. 
*/ 
@SuppressWarnings("unchecked") 
protected void reduce(KEYIN key, Iterable<VALUEIN> values, Context context 
throws IOException, InterruptedException { 
for(VALUEIN value: values) { 
context.write((KEYOUT) key, (VALUEOUT) value); 

}

類似的,通過重載Reducer的setup、cleanup和run方法,可以創建一個完全自定義的reduce任務。

4.3.5 Job類

Job類是從客戶端角度對MapReduce作業進行抽象的類,該類包括了作業定義接口和控制接口,通過該類可以定義、提交和監控MapReduce作業。Job類提供的主要方法如下所示:
void killJob()
void setCombinerClass(Class<? extends Reducer> cls) 
void setInputFormatClass(Class<? extends InputFormat> cls)
void setMapOutputKeyClass(Class<?> theClass) 
void setMapOutputValueClass(Class<?> theClass) 
void setMapperClass(Class<? extends Mapper> cls)
void setNumReduceTasks(int tasks) 
void setOutputFormatClass(Class<? extends OutputFormat> cls) 
void setOutputKeyClass(Class<?> theClass) 
void setOutputValueClass(Class<?> theClass) 
void setPartitionerClass(Class<? extends Partitioner> cls) 
void setReducerClass(Class<? extends Reducer> cls) 
void submit() 
boolean waitForCompletion(boolean verbose)
通常,一個完整的MapReduce作業需要設置如下幾個參數:

  • InputFormat類及其參數
  • OutputFormat類及其參數
  • Mapper類
  • Reducer類

復雜一些的MapReduce作業可能需要設定Partioner類和Combiner類。設定Combiner類可以是MapReduce框架在Mapper節點處理存儲在本地的中間數據,從而減少Reducer的Shuffle過程需要拉取的數據量,一般Combiner與Reducer設置為同一個類。

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM