大數據技術之Hadoop(Map-Reduce)
一 MapReduce入門
1.1 MapReduce定義
Mapreduce是一個分布式運算程序的編程框架,是用戶開發“基於hadoop的數據分析應用”的核心框架。
Mapreduce核心功能是將用戶編寫的業務邏輯代碼和自帶默認組件整合成一個完整的分布式運算程序,並發運行在一個hadoop集群上。
1.2 MapReduce優缺點
1.2.1 優點
1)MapReduce 易於編程。它簡單的實現一些接口,就可以完成一個分布式程序,這個分布式程序可以分布到大量廉價的PC機器上運行。也就是說你寫一個分布式程序,跟寫一個簡單的串行程序是一模一樣的。就是因為這個特點使得MapReduce編程變得非常流行。
2)良好的擴展性。當你的計算資源不能得到滿足的時候,你可以通過簡單的增加機器來擴展它的計算能力。
3)高容錯性。MapReduce設計的初衷就是使程序能夠部署在廉價的PC機器上,這就要求它具有很高的容錯性。比如其中一台機器掛了,它可以把上面的計算任務轉移到另外一個節點上運行,不至於這個任務運行失敗,而且這個過程不需要人工參與,而完全是由 Hadoop內部完成的。
4)適合PB級以上海量數據的離線處理,說明它適合離線處理而不適合在線處理。比如像毫秒級別的返回一個結果,MapReduce很難做到。
1.2.2 缺點
MapReduce不擅長做實時計算、流式計算、DAG(有向圖)計算。
1)實時計算。MapReduce無法像Mysql一樣,在毫秒或者秒級內返回結果。
2)流式計算。流式計算的輸入數據是動態的,而MapReduce的輸入數據集是靜態的,不能動態變化。這是因為MapReduce自身的設計特點決定了數據源必須是靜態的。
3)DAG(有向無環圖)計算。多個應用程序存在依賴關系,后一個應用程序的輸入為前一個的輸出。在這種情況下,MapReduce並不是不能做,而是使用后,每個MapReduce作業的輸出結果都會寫入到磁盤,會造成大量的磁盤IO,導致性能非常的低下。
1.3 MapReduce核心思想
1)分布式的運算程序往往需要分成至少2個階段。
2)第一個階段的maptask並發實例,完全並行運行,互不相干。
3)第二個階段的reduce task並發實例互不相干,但是他們的數據依賴於上一個階段的所有maptask並發實例的輸出。
4)MapReduce編程模型只能包含一個map階段和一個reduce階段,如果用戶的業務邏輯非常復雜,那就只能多個mapreduce程序,串行運行。
1.4 MapReduce進程
一個完整的mapreduce程序在分布式運行時有三類實例進程:
1)MrAppMaster:負責整個程序的過程調度及狀態協調。
2)MapTask:負責map階段的整個數據處理流程。
3)ReduceTask:負責reduce階段的整個數據處理流程。
1.5 MapReduce編程規范
用戶編寫的程序分成三個部分:Mapper,Reducer,Driver(提交運行mr程序的客戶端)
1)Mapper階段
(1)用戶自定義的Mapper要繼承自己的父類
(2)Mapper的輸入數據是KV對的形式(KV的類型可自定義)
(3)Mapper中的業務邏輯寫在map()方法中
(4)Mapper的輸出數據是KV對的形式(KV的類型可自定義)
(5)map()方法(maptask進程)對每一個<K,V>調用一次
2)Reducer階段
(1)用戶自定義的Reducer要繼承自己的父類
(2)Reducer的輸入數據類型對應Mapper的輸出數據類型,也是KV
(3)Reducer的業務邏輯寫在reduce()方法中
(4)Reducetask進程對每一組相同k的<k,v>組調用一次reduce()方法
3)Driver階段
整個程序需要一個Drvier來進行提交,提交的是一個描述了各種必要信息的job對象
二 Hadoop序列化
2.1 為什么要序列化?
一般來說,“活的”對象只生存在內存里,關機斷電就沒有了。而且“活的”對象只能由本地的進程使用,不能被發送到網絡上的另外一台計算機。 然而序列化可以存儲“活的”對象,可以將“活的”對象發送到遠程計算機。
2.2 什么是序列化?
序列化就是把內存中的對象,轉換成字節序列(或其他數據傳輸協議)以便於存儲(持久化)和網絡傳輸。
反序列化就是將收到字節序列(或其他數據傳輸協議)或者是硬盤的持久化數據,轉換成內存中的對象。
2.3 為什么不用Java的序列化?
Java的序列化是一個重量級序列化框架(Serializable),一個對象被序列化后,會附帶很多額外的信息(各種校驗信息,header,繼承體系等),不便於在網絡中高效傳輸。所以,hadoop自己開發了一套序列化機制(Writable),精簡、高效。
2.4 為什么序列化對Hadoop很重要?
因為Hadoop在集群之間進行通訊或者RPC調用的時候,需要序列化,而且要求序列化要快,且體積要小,占用帶寬要小。所以必須理解Hadoop的序列化機制。
序列化和反序列化在分布式數據處理領域經常出現:進程通信和永久存儲。然而Hadoop中各個節點的通信是通過遠程調用(RPC)實現的,那么RPC序列化要求具有以下特點:
1)緊湊:緊湊的格式能讓我們充分利用網絡帶寬,而帶寬是數據中心最稀缺的資
2)快速:進程通信形成了分布式系統的骨架,所以需要盡量減少序列化和反序列化的性能開銷,這是基本的;
3)可擴展:協議為了滿足新的需求變化,所以控制客戶端和服務器過程中,需要直接引進相應的協議,這些是新協議,原序列化方式能支持新的協議報文;
4)互操作:能支持不同語言寫的客戶端和服務端進行交互;
2.5 常用數據序列化類型
常用的數據類型對應的hadoop數據序列化類型
Java類型 |
Hadoop Writable類型 |
boolean |
BooleanWritable |
byte |
ByteWritable |
int |
IntWritable |
float |
FloatWritable |
long |
LongWritable |
double |
DoubleWritable |
string |
Text |
map |
MapWritable |
array |
ArrayWritable |
2.6 自定義bean對象實現序列化接口(Writable)
1)自定義bean對象要想序列化傳輸,必須實現序列化接口,需要注意以下7項。
(1)必須實現Writable接口
(2)反序列化時,需要反射調用空參構造函數,所以必須有空參構造
public FlowBean() { super(); } |
(3)重寫序列化方法
@Override public void write(DataOutput out) throws IOException { out.writeLong(upFlow); out.writeLong(downFlow); out.writeLong(sumFlow); } |
(4)重寫反序列化方法
@Override public void readFields(DataInput in) throws IOException { upFlow = in.readLong(); downFlow = in.readLong(); sumFlow = in.readLong(); } |
(5)注意反序列化的順序和序列化的順序完全一致
(6)要想把結果顯示在文件中,需要重寫toString(),可用”\t”分開,方便后續用。
(7)如果需要將自定義的bean放在key中傳輸,則還需要實現comparable接口,因為mapreduce框中的shuffle過程一定會對key進行排序。
@Override public int compareTo(FlowBean o) { // 倒序排列,從大到小 return this.sumFlow > o.getSumFlow() ? -1 : 1; } |
三 MapReduce框架原理
3.1 MapReduce工作流程
1)流程示意圖
2)流程詳解
上面的流程是整個mapreduce最全工作流程,但是shuffle過程只是從第7步開始到第16步結束,具體shuffle過程詳解,如下:
1)maptask收集我們的map()方法輸出的kv對,放到內存緩沖區中
2)從內存緩沖區不斷溢出本地磁盤文件,可能會溢出多個文件
3)多個溢出文件會被合並成大的溢出文件
4)在溢出過程中,及合並的過程中,都要調用partitioner進行分區和針對key進行排序
5)reducetask根據自己的分區號,去各個maptask機器上取相應的結果分區數據
6)reducetask會取到同一個分區的來自不同maptask的結果文件,reducetask會將這些文件再進行合並(歸並排序)
7)合並成大文件后,shuffle的過程也就結束了,后面進入reducetask的邏輯運算過程(從文件中取出一個一個的鍵值對group,調用用戶自定義的reduce()方法)
3)注意
Shuffle中的緩沖區大小會影響到mapreduce程序的執行效率,原則上說,緩沖區越大,磁盤io的次數越少,執行速度就越快。
緩沖區的大小可以通過參數調整,參數:io.sort.mb 默認100M。
3.2 InputFormat數據輸入
3.2.1 切片計算公式
(1)找到你數據存儲的目錄。
(2)開始遍歷處理(規划切片)目錄下的每一個文件
(3)遍歷第一個文件ss.txt
a)獲取文件大小fs.sizeOf(ss.txt);
b)計算切片大小computeSplitSize方法 Math.max(minSize,Math.min(maxSize,blocksize))=blocksize=128M
以上源碼在FileInputFormat得280行
c)默認情況下,切片大小=blocksize
d)開始切,形成第1個切片:ss.txt—0:128M 第2個切片ss.txt—128:256M 第3個切片ss.txt—256M:300M(每次切片時,都要判斷切完剩下的部分是否大於塊的1.1倍,不大於1.1倍就划分一塊切片)
1.1倍解釋源碼解釋:在FileInputFormat這個類得第48行,有個1.1倍得`SPLIT_SLOP `得屬性參數;注會有精度不准確得情況:Double類型
e)將切片信息寫到一個切片規划文件中
f)整個切片的核心過程在getSplit()方法中完成。
g)數據切片只是在邏輯上對輸入數據進行分片,並不會再磁盤上將其切分成分片進行存儲。InputSplit只記錄了分片的元數據信息,比如起始位置、長度以及所在的節點列表等。
h)注意:block是HDFS物理上存儲的數據,切片是對數據邏輯上的划分。
(4)提交切片規划文件到yarn上,yarn上的MrAppMaster就可以根據切片規划文件計算開啟maptask個數。
3.2.2 FileInputFormat切片機制
1)FileInputFormat中默認的切片機制:
(1)簡單地按照文件的內容長度進行切片
(2)切片大小,默認等於block大小
(3)切片時不考慮數據集整體,而是逐個針對每一個文件單獨切片
比如待處理數據有兩個文件:
file1.txt 320M file2.txt 10M |
經過FileInputFormat的切片機制運算后,形成的切片信息如下:
file1.txt.split1-- 0~128 file1.txt.split2-- 128~256 file1.txt.split3-- 256~320 file2.txt.split1-- 0~10M |
2)FileInputFormat切片大小的參數配置
通過分析源碼,在FileInputFormat中,計算切片大小的邏輯:Math.max(minSize, Math.min(maxSize, blockSize));
切片主要由這幾個值來運算決定
mapreduce.input.fileinputformat.split.minsize=1 默認值為1
mapreduce.input.fileinputformat.split.maxsize= Long.MAXValue 默認值Long.MAXValue
因此,默認情況下,切片大小=blocksize。
maxsize(切片最大值):參數如果調得比blocksize小,則會讓切片變小,而且就等於配置的這個參數的值。
minsize(切片最小值):參數調的比blockSize大,則可以讓切片變得比blocksize還大。
3)獲取切片信息API
// 根據文件類型獲取切片信息 FileSplit inputSplit = (FileSplit) context.getInputSplit(); // 獲取切片的文件名稱 String name = inputSplit.getPath().getName(); |
3.2.3 CombineTextInputFormat切片機制
關於大量小文件的優化策略
1)默認情況下TextInputformat對任務的切片機制是按文件規划切片,不管文件多小,都會是一個單獨的切片,都會交給一個maptask,這樣如果有大量小文件,就會產生大量的maptask,處理效率極其低下。
2)優化策略
(1)最好的辦法,在數據處理系統的最前端(預處理/采集),將小文件先合並成大文件,再上傳到HDFS做后續分析。
(2)補救措施:如果已經是大量小文件在HDFS中了,可以使用另一種InputFormat來做切片(CombineTextInputFormat),它的切片邏輯跟TextFileInputFormat不同:它可以將多個小文件從邏輯上規划到一個切片中,這樣,多個小文件就可以交給一個maptask。
(3)優先滿足最小切片大小,不超過最大切片大小
CombineTextInputFormat.setMaxInputSplitSize(job, 4194304);// 4m
CombineTextInputFormat.setMinInputSplitSize(job, 2097152);// 2m
舉例:0.5m+1m+0.3m+5m=2m + 4.8m=2m + 4m + 0.8m
3)具體實現步驟
// 如果不設置InputFormat,它默認用的是TextInputFormat.class job.setInputFormatClass(CombineTextInputFormat.class) CombineTextInputFormat.setMaxInputSplitSize(job, 4194304);// 4m CombineTextInputFormat.setMinInputSplitSize(job, 2097152);// 2m |
4)案例實操
3.2.4 InputFormat接口實現類
MapReduce任務的輸入文件一般是存儲在HDFS里面。輸入的文件格式包括:基於行的日志文件、二進制格式文件等。這些文件一般會很大,達到數十GB,甚至更大。那么MapReduce是如何讀取這些數據的呢?下面我們首先學習InputFormat接口。
InputFormat常見的接口實現類包括:TextInputFormat、KeyValueTextInputFormat、NLineInputFormat、CombineTextInputFormat和自定義InputFormat等。
1)TextInputFormat
TextInputFormat是默認的InputFormat。每條記錄是一行輸入。鍵是LongWritable類型,存儲該行在整個文件中的字節偏移量。值是這行的內容,不包括任何行終止符(換行符和回車符)。
以下是一個示例,比如,一個分片包含了如下4條文本記錄。
Rich learning form Intelligent learning engine Learning more convenient From the real demand for more close to the enterprise |
每條記錄表示為以下鍵/值對:
(0,Rich learning form) (19,Intelligent learning engine) (47,Learning more convenient) (72,From the real demand for more close to the enterprise) |
很明顯,鍵並不是行號。一般情況下,很難取得行號,因為文件按字節而不是按行切分為分片。
2)KeyValueTextInputFormat(僅知道即可)
每一行均為一條記錄,被分隔符分割為key,value。可以通過在驅動類中設置conf.set(KeyValueLineRecordReader.KEY_VALUE_SEPERATOR, " ");來設定分隔符。默認分隔符是tab(\t)。
以下是一個示例,輸入是一個包含4條記錄的分片。其中——>表示一個(水平方向的)制表符。
line1 ——>Rich learning form line2 ——>Intelligent learning engine line3 ——>Learning more convenient line4 ——>From the real demand for more close to the enterprise |
每條記錄表示為以下鍵/值對:
(line1,Rich learning form) (line2,Intelligent learning engine) (line3,Learning more convenient) (line4,From the real demand for more close to the enterprise) |
此時的鍵是每行排在制表符之前的Text序列。
3)NLineInputFormat(僅知道即可)
如果使用NlineInputFormat,代表每個map進程處理的InputSplit不再按block塊去划分,而是按NlineInputFormat指定的行數N來划分。即輸入文件的總行數/N=切片數,如果不整除,切片數=商+1。
以下是一個示例,仍然以上面的4行輸入為例。
Rich learning form Intelligent learning engine Learning more convenient From the real demand for more close to the enterprise |
例如,如果N是2,則每個輸入分片包含兩行。開啟2個maptask。
(0,Rich learning form) (19,Intelligent learning engine) |
另一個 mapper 則收到后兩行:
(47,Learning more convenient) (72,From the real demand for more close to the enterprise) |
這里的鍵和值與TextInputFormat生成的一樣。
3.2.5 自定義InputFormat
1)概述
(1)自定義一個類繼承FileInputFormat。
(2)改寫RecordReader,實現一次讀取一個完整文件封裝為KV。
(3)在輸出時使用SequenceFileOutPutFormat輸出合並文件。
3.3 MapTask工作機制
3.3.1 並行度決定機制
1)問題引出
maptask的並行度決定map階段的任務處理並發度,進而影響到整個job的處理速度。那么,mapTask並行任務是否越多越好呢?
2)MapTask並行度決定機制
一個job的map階段MapTask並行度(個數),由客戶端提交job時的切片個數決定。
3.3.2 MapTask工作機制
(1)Read階段:Map Task通過用戶編寫的RecordReader,從輸入InputSplit中解析出一個個key/value。
(2)Map階段:該節點主要是將解析出的key/value交給用戶編寫map()函數處理,並產生一系列新的key/value。
(3)Collect收集階段:在用戶編寫map()函數中,當數據處理完成后,一般會調用OutputCollector.collect()輸出結果。在該函數內部,它會將生成的key/value分區(調用Partitioner),並寫入一個環形內存緩沖區中。
(4)Spill階段:即“溢寫”,當環形緩沖區滿后,MapReduce會將數據寫到本地磁盤上,生成一個臨時文件。需要注意的是,將數據寫入本地磁盤之前,先要對數據進行一次本地排序,並在必要時對數據進行合並、壓縮等操作。
溢寫階段詳情:
步驟1:利用快速排序算法對緩存區內的數據進行排序,排序方式是,先按照分區編號partition進行排序,然后按照key進行排序。這樣,經過排序后,數據以分區為單位聚集在一起,且同一分區內所有數據按照key有序。
步驟2:按照分區編號由小到大依次將每個分區中的數據寫入任務工作目錄下的臨時文件output/spillN.out(N表示當前溢寫次數)中。如果用戶設置了Combiner,則寫入文件之前,對每個分區中的數據進行一次聚集操作。
步驟3:將分區數據的元信息寫到內存索引數據結構SpillRecord中,其中每個分區的元信息包括在臨時文件中的偏移量、壓縮前數據大小和壓縮后數據大小。如果當前內存索引大小超過1MB,則將內存索引寫到文件output/spillN.out.index中。
(5)Combine階段:當所有數據處理完成后,MapTask對所有臨時文件進行一次合並,以確保最終只會生成一個數據文件。
當所有數據處理完后,MapTask會將所有臨時文件合並成一個大文件,並保存到文件output/file.out中,同時生成相應的索引文件output/file.out.index。
在進行文件合並過程中,MapTask以分區為單位進行合並。對於某個分區,它將采用多輪遞歸合並的方式。每輪合並io.sort.factor(默認100)個文件,並將產生的文件重新加入待合並列表中,對文件排序后,重復以上過程,直到最終得到一個大文件。
讓每個MapTask最終只生成一個數據文件,可避免同時打開大量文件和同時讀取大量小文件產生的隨機讀取帶來的開銷。
3.4 Shuffle機制
3.4.1 Shuffle機制
Mapreduce確保每個reducer的輸入都是按鍵排序的。系統執行排序的過程(即將map輸出作為輸入傳給reducer)稱為shuffle。
3.4.2 Partition分區
0)問題引出:要求將統計結果按照條件輸出到不同文件中(分區)。比如:將統計結果按照手機歸屬地不同省份輸出到不同文件中(分區)
1)默認partition分區
public class HashPartitioner<K, V> extends Partitioner<K, V> { public int getPartition(K key, V value, int numReduceTasks) { return (key.hashCode() & Integer.MAX_VALUE) % numReduceTasks; } } |
默認分區是根據key的hashCode對reduceTasks個數取模得到的。用戶沒法控制哪個key存儲到哪個分區。
2)自定義Partitioner步驟
(1)自定義類繼承Partitioner,重寫getPartition()方法
public class ProvincePartitioner extends Partitioner<Text, FlowBean> {
@Override public int getPartition(Text key, FlowBean value, int numPartitions) {
// 1 獲取電話號碼的前三位 String preNum = key.toString().substring(0, 3);
int partition = 4;
// 2 判斷是哪個省 if ("136".equals(preNum)) { partition = 0; }else if ("137".equals(preNum)) { partition = 1; }else if ("138".equals(preNum)) { partition = 2; }else if ("139".equals(preNum)) { partition = 3; } return partition; } } |
(2)在job驅動中,設置自定義partitioner:
job.setPartitionerClass(CustomPartitioner.class); |
(3)自定義partition后,要根據自定義partitioner的邏輯設置相應數量的reduce task
job.setNumReduceTasks(5); |
3)注意:
如果reduceTask的數量> getPartition的結果數,則會多產生幾個空的輸出文件part-r-000xx;
如果1<reduceTask的數量<getPartition的結果數,則有一部分分區數據無處安放,會Exception;
如果reduceTask的數量=1,則不管mapTask端輸出多少個分區文件,最終結果都交給這一個reduceTask,最終也就只會產生一個結果文件 part-r-00000;
例如:假設自定義分區數為5,則
(1)job.setNumReduceTasks(1);會正常運行,只不過會產生一個輸出文件
(2)job.setNumReduceTasks(2);會報錯
(3)job.setNumReduceTasks(6);大於5,程序會正常運行,會產生空文件
3.4.3 WritableComparable排序
排序是MapReduce框架中最重要的操作之一。Map Task和Reduce Task均會對數據(按照key)進行排序。該操作屬於Hadoop的默認行為。任何應用程序中的數據均會被排序,而不管邏輯上是否需要。默認排序是按照字典順序排序,且實現該排序的方法是快速排序。
對於Map Task,它會將處理的結果暫時放到一個緩沖區中,當緩沖區使用率達到一定閾值后,再對緩沖區中的數據進行一次排序,並將這些有序數據寫到磁盤上,而當數據處理完畢后,它會對磁盤上所有文件進行一次合並,以將這些文件合並成一個大的有序文件。
對於Reduce Task,它從每個Map Task上遠程拷貝相應的數據文件,如果文件大小超過一定閾值,則放到磁盤上,否則放到內存中。如果磁盤上文件數目達到一定閾值,則進行一次合並以生成一個更大文件;如果內存中文件大小或者數目超過一定閾值,則進行一次合並后將數據寫到磁盤上。當所有數據拷貝完畢后,Reduce Task統一對內存和磁盤上的所有數據進行一次合並。
每個階段的默認排序
1)排序的分類:
(1)部分排序:
MapReduce根據輸入記錄的鍵對數據集排序。保證輸出的每個文件內部排序。
(2)全排序:
如何用Hadoop產生一個全局排序的文件?最簡單的方法是使用一個分區。但該方法在處理大型文件時效率極低,因為一台機器必須處理所有輸出文件,從而完全喪失了MapReduce所提供的並行架構。
替代方案:首先創建一系列排好序的文件;其次,串聯這些文件;最后,生成一個全局排序的文件。主要思路是使用一個分區來描述輸出的全局排序。例如:可以為上述文件創建3個分區,在第一分區中,記錄的單詞首字母a-g,第二分區記錄單詞首字母h-n, 第三分區記錄單詞首字母o-z。
(3)輔助排序:(GroupingComparator分組)
Mapreduce框架在記錄到達reducer之前按鍵對記錄排序,但鍵所對應的值並沒有被排序。甚至在不同的執行輪次中,這些值的排序也不固定,因為它們來自不同的map任務且這些map任務在不同輪次中完成時間各不相同。一般來說,大多數MapReduce程序會避免讓reduce函數依賴於值的排序。但是,有時也需要通過特定的方法對鍵進行排序和分組等以實現對值的排序。
(4)二次排序:
在自定義排序過程中,如果compareTo中的判斷條件為兩個即為二次排序。
2)自定義排序WritableComparable
(1)原理分析
bean對象實現WritableComparable接口重寫compareTo方法,就可以實現排序
@Override public int compareTo(FlowBean o) { // 倒序排列,從大到小 return this.sumFlow > o.getSumFlow() ? -1 : 1; } |
3.4.4 GroupingComparator分組(輔助排序)
1)對reduce階段的數據根據某一個或幾個字段進行分組。
3.4.5 Combiner合並
1)combiner是MR程序中Mapper和Reducer之外的一種組件。
2)combiner組件的父類就是Reducer。
3)combiner和reducer的區別在於運行的位置:
Combiner是在每一個maptask所在的節點運行;
Reducer是接收全局所有Mapper的輸出結果;
4)combiner的意義就是對每一個maptask的輸出進行局部匯總,以減小網絡傳輸量。
5)combiner能夠應用的前提是不能影響最終的業務邏輯,而且,combiner的輸出kv應該跟reducer的輸入kv類型要對應起來。
Mapper
3 5 7 ->(3+5+7)/3=5
2 6 ->(2+6)/2=4
Reducer
(3+5+7+2+6)/5=23/5 不等於 (5+4)/2=9/2
6)自定義Combiner實現步驟:
(1)自定義一個combiner繼承Reducer,重寫reduce方法
public class WordcountCombiner extends Reducer<Text, IntWritable, Text, IntWritable>{ @Override protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { // 1 匯總操作 int count = 0; for(IntWritable v :values){ count = v.get(); } // 2 寫出 context.write(key, new IntWritable(count)); } } |
(2)在job驅動類中設置:
job.setCombinerClass(WordcountCombiner.class); |
MapReduce實戰
1.1 WordCount案例
1.1.1 需求1:統計一堆文件中單詞出現的個數
0)需求:在一堆給定的文本文件中統計輸出每一個單詞出現的總次數
1)數據准備:
2)分析
按照mapreduce編程規范,分別編寫Mapper,Reducer,Driver。
3)編寫程序
(1)編寫mapper類
package com.itstar.mapreduce; import java.io.IOException; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper;
public class WordcountMapper extends Mapper<LongWritable, Text, Text, IntWritable>{
Text k = new Text(); IntWritable v = new IntWritable(1);
@Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
// 1 獲取一行 String line = value.toString();
// 2 切割 String[] words = line.split(" ");
// 3 輸出 for (String word : words) {
k.set(word); context.write(k, v); } } } |
(2)編寫reducer類
package com.itstar.mapreduce.wordcount; import java.io.IOException; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer;
public class WordcountReducer extends Reducer<Text, IntWritable, Text, IntWritable>{
@Override protected void reduce(Text key, Iterable<IntWritable> value, Context context) throws IOException, InterruptedException {
// 1 累加求和 int sum = 0; for (IntWritable count : value) { sum += count.get(); }
// 2 輸出 context.write(key, new IntWritable(sum)); } } |
(3)編寫驅動類
package com.itstar.mapreduce.wordcount; import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class WordcountDriver {
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
String[] args=new String{“輸入路徑”,”輸出路徑”};
// 1 獲取配置信息 Configuration configuration = new Configuration(); Job job = Job.getInstance(configuration);
// 2 設置jar加載路徑 job.setJarByClass(WordcountDriver.class);
// 3 設置map和Reduce類 job.setMapperClass(WordcountMapper.class); job.setReducerClass(WordcountReducer.class);
// 4 設置map輸出 job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(IntWritable.class);
// 5 設置Reduce輸出 job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class);
// 6 設置輸入和輸出路徑 FileInputFormat.setInputPaths(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1]));
// 7 提交 boolean result = job.waitForCompletion(true);
System.exit(result ? 0 : 1); } } |
4)集群上測試
(1)將程序打成jar包,然后拷貝到hadoop集群中。
(2)啟動hadoop集群
(3)執行wordcount程序
[itstar@hadoop102 software]$ hadoop jar wc.jar com.itstar.wordcount.WordcountDriver /user/itstar/input /user/itstar/output1
5)本地測試
(1)在windows環境上配置HADOOP_HOME環境變量。
(2)在idea上運行程序
(3)注意:如果idea打印不出日志,在控制台上只顯示
1.log4j:WARN No appenders could be found for logger (org.apache.hadoop.util.Shell). 2.log4j:WARN Please initialize the log4j system properly. 3.log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info. |
需要在項目的src目錄下,新建一個文件,命名為“log4j.properties”,在文件中填入
log4j.rootLogger=INFO, stdout log4j.appender.stdout=org.apache.log4j.ConsoleAppender log4j.appender.stdout.layout=org.apache.log4j.PatternLayout log4j.appender.stdout.layout.ConversionPattern=%d %p [%c] - %m%n log4j.appender.logfile=org.apache.log4j.FileAppender log4j.appender.logfile.File=target/spring.log log4j.appender.logfile.layout=org.apache.log4j.PatternLayout log4j.appender.logfile.layout.ConversionPattern=%d %p [%c] - %m%n |
1.1.2 需求2:把單詞按照ASCII碼奇偶分區(Partitioner)
0)分析
1)自定義分區
package com.itstar.mapreduce.wordcount; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Partitioner;
public class WordCountPartitioner extends Partitioner<Text, IntWritable>{
@Override public int getPartition(Text key, IntWritable value, int numPartitions) {
// 1 獲取單詞key String firWord = key.toString().substring(0, 1); int result = Integer.valueOf(firWord);
// 2 根據奇數偶數分區 if (result % 2 == 0) { return 0; }else { return 1; } } } |
2)在驅動中配置加載分區,設置reducetask個數
job.setPartitionerClass(WordCountPartitioner.class); job.setNumReduceTasks(2); |
1.1.3 需求3:對每一個maptask的輸出局部匯總(Combiner)
0)需求:統計過程中對每一個maptask的輸出進行局部匯總,以減小網絡傳輸量即采用Combiner功能。
1)數據准備:
方案一
1)增加一個WordcountCombiner類繼承Reducer
package com.itstar.mr.combiner; import java.io.IOException; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer;
public class WordcountCombiner extends Reducer<Text, IntWritable, Text, IntWritable>{
@Override protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { // 1 匯總 int count = 0; for(IntWritable v :values){ count += v.get(); } // 2 寫出 context.write(key, new IntWritable(count)); } } |
2)在WordcountDriver驅動類中指定combiner
// 9 指定需要使用combiner,以及用哪個類作為combiner的邏輯 job.setCombinerClass(WordcountCombiner.class); |
方案二
1)將WordcountReducer作為combiner在WordcountDriver驅動類中指定
// 指定需要使用combiner,以及用哪個類作為combiner的邏輯 job.setCombinerClass(WordcountReducer.class); |
運行程序
1.1.4 需求4:大量小文件的切片優化(CombineTextInputFormat)
0)在分布式的架構中,分布式文件系統HDFS,和分布式運算程序編程框架mapreduce。
HDFS:不怕大文件,怕很多小文件
mapreduce :怕數據傾斜
那么mapreduce是如果解決多個小文件的問題呢?
mapreduce關於大量小文件的優化策略
(1) 默認情況下,TextInputFormat對任務的切片機制是按照文件規划切片,不管有多少個小文件,都會是單獨的切片,都會交給一個maptask,這樣,如果有大量的小文件
就會產生大量的maptask,處理效率極端底下
(2)優化策略
最好的方法:在數據處理的最前端(預處理、采集),就將小文件合並成大文件,在上傳到HDFS做后續的分析
補救措施:如果已經是大量的小文件在HDFS中了,可以使用另一種inputformat來做切片(CombineFileInputformat),它的切片邏輯跟TextInputformat
注:CombineTextInputFormat是CombineFileInputformat的子類
不同:
它可以將多個小文件從邏輯上規划到一個切片中,這樣,多個小文件就可以交給一個maptask了
//如果不設置InputFormat,它默認的用的是TextInputFormat.class
/*CombineTextInputFormat為系統自帶的組件類
* setMinInputSplitSize 中的2048是表示n個小文件之和不能大於2048
* setMaxInputSplitSize 中的4096是 當滿足setMinInputSplitSize中的2048情況下 在滿足n+1個小文件之和不能大於4096
*/
job.setInputFormatClass(CombineTextInputFormat.class);
CombineTextInputFormat.setMinInputSplitSize(job, 2048);
CombineTextInputFormat.setMaxInputSplitSize(job, 4096);
1)輸入數據:准備5個小文件
2)實現過程
(1)不做任何處理,運行需求1中的wordcount程序,觀察切片個數為5
(2)在WordcountDriver中增加如下代碼,運行程序,並觀察運行的切片個數為1
// 如果不設置InputFormat,它默認用的是TextInputFormat.class job.setInputFormatClass(CombineTextInputFormat.class); CombineTextInputFormat.setMaxInputSplitSize(job, 4*1024*1024);// 4m CombineTextInputFormat.setMinInputSplitSize(job, 2*1024*1024);// 2m |
注:在看number of splits時,和最大值(MaxSplitSize)有關、總體規律就是和低於最大值是一片、高於最大值1.5倍+,則為兩片;高於最大值2倍以上則向下取整,比如文件大小65MB,切片最大值為4MB,那么切片為14個.總體來說,切片差值不超過1個,不影響整體性能