一 MapReduce入門
1.1 MapReduce定義
Mapreduce是一個分布式運算程序的編程框架,是用戶開發“基於hadoop的數據分析應用”的核心框架;
Mapreduce核心功能是將用戶編寫的業務邏輯代碼和自帶默認組件整合成一個完整的分布式運算程序,並發運行在一個hadoop集群上。
1.2 MapReduce優缺點
1.2.1 優點
1)MapReduce 易於編程。它簡單的實現一些接口,就可以完成一個分布式程序,這個分布式程序可以分布到大量廉價的 PC 機器運行。也就是說你寫一個分布式程序,跟寫一個簡單的串行程序是一模一樣的。 就是因為這個特點使得 MapReduce 編程變得非常流行。
2)良好的擴展性。當你的計算資源不能得到滿足的時候,你可以通過簡單的增加機器來擴展它的計算能力。
3)高容錯性。MapReduce 設計的初衷就是使程序能夠部署在廉價的 PC 機器上,這就要求它具有很高的容錯性。比如其中一台機器掛了,它可以把上面的計算任務轉移到另外一個節點上面上運行,不至於這個任務運行失敗,而且這個過程不需要人工參與,而完全是由 Hadoop 內部完成的。
4)適合 PB 級以上海量數據的離線處理。這里加紅字體離線處理,說明它適合離線處理而不適合在線處理。比如像毫秒級別的返回一個結果,MapReduce 很難做到。
1.2.2 缺點
MapReduce不擅長做實時計算、流式計算、DAG(有向圖)計算。
1)實時計算。MapReduce 無法像 Mysql 一樣,在毫秒或者秒級內返回結果。
2)流式計算。流式計算的輸入數據時動態的,而 MapReduce 的輸入數據集是靜態的,不能動態變化。這是因為 MapReduce 自身的設計特點決定了數據源必須是靜態的。
3)DAG(有向圖)計算。多個應用程序存在依賴關系,后一個應用程序的輸入為前一個的輸出。在這種情況下,MapReduce 並不是不能做,而是使用后,每個MapReduce 作業的輸出結果都會寫入到磁盤,會造成大量的磁盤IO,導致性能非常的低下。
1.3 MapReduce核心思想
1.4 MapReduce進程
一個完整的mapreduce程序在分布式運行時有三類實例進程:
1)MrAppMaster:負責整個程序的過程調度及狀態協調。
2)MapTask:負責map階段的整個數據處理流程。
3)ReduceTask:負責reduce階段的整個數據處理流程。
1.5 MapReduce編程規范
用戶編寫的程序分成三個部分:Mapper,Reducer,Driver(提交運行mr程序的客戶端)
1)Mapper階段
(1)用戶自定義的Mapper要繼承自己的父類
(2)Mapper的輸入數據是KV對的形式(KV的類型可自定義)
(3)Mapper中的業務邏輯寫在map()方法中
(4)Mapper的輸出數據是KV對的形式(KV的類型可自定義)
(5)map()方法(maptask進程)對每一個<K,V>調用一次
2)Reducer階段
(1)用戶自定義的Reducer要繼承自己的父類
(2)Reducer的輸入數據類型對應Mapper的輸出數據類型,也是KV
(3)Reducer的業務邏輯寫在reduce()方法中
(4)Reducetask進程對每一組相同k的<k,v>組調用一次reduce()方法
3)Driver階段
整個程序需要一個Drvier來進行提交,提交的是一個描述了各種必要信息的job對象
1.6 MapReduce程序運行流程分析
1)在MapReduce程序讀取文件的輸入目錄上存放相應的文件。
2)客戶端程序在submit()方法執行前,獲取待處理的數據信息,然后根據集群中參數的配置形成一個任務分配規划。
3)客戶端提交job.split、jar包、job.xml等文件給yarn,yarn中的resourcemanager啟動MRAppMaster。
4)MRAppMaster啟動后根據本次job的描述信息,計算出需要的maptask實例數量,然后向集群申請機器啟動相應數量的maptask進程。
5)maptask利用客戶指定的inputformat來讀取數據,形成輸入KV對。
6)maptask將輸入KV對傳遞給客戶定義的map()方法,做邏輯運算
7)map()運算完畢后將KV對收集到maptask緩存。
8)maptask緩存中的KV對按照K分區排序后不斷寫到磁盤文件
9)MRAppMaster監控到所有maptask進程任務完成之后,會根據客戶指定的參數啟動相應數量的reducetask進程,並告知reducetask進程要處理的數據分區。
10)Reducetask進程啟動之后,根據MRAppMaster告知的待處理數據所在位置,從若干台maptask運行所在機器上獲取到若干個maptask輸出結果文件,並在本地進行重新歸並排序,然后按照相同key的KV為一個組,調用客戶定義的reduce()方法進行邏輯運算。
11)Reducetask運算完畢后,調用客戶指定的outputformat將結果數據輸出到外部存儲。
二 Hadoop序列化
2.1 為什么要序列化?
一般來說,“活的”對象只生存在內存里,關機斷電就沒有了。而且“活的”對象只能由本地的進程使用,不能被發送到網絡上的另外一台計算機。 然而序列化可以存儲“活的”對象,可以將“活的”對象發送到遠程計算機。
2.2 什么是序列化?
序列化就是把內存中的對象,轉換成字節序列(或其他數據傳輸協議)以便於存儲(持久化)和網絡傳輸。反序列化就是將收到字節序列(或其他數據傳輸協議)或者是硬盤的持久化數據,轉換成內存中的對象。
2.3 為什么不用Java的序列化?
Java的序列化是一個重量級序列化框架(Serializable),一個對象被序列化后,會附帶很多額外的信息(各種校驗信息,header,繼承體系等),不便於在網絡中高效傳輸。所以,hadoop自己開發了一套序列化機制(Writable),精簡、高效。
2.4 為什么序列化對Hadoop很重要?
因為Hadoop在集群之間進行通訊或者RPC調用的時候,需要序列化,而且要求序列化要快,且體積要小,占用帶寬要小。所以必須理解Hadoop的序列化機制。序列化和反序列化在分布式數據處理領域經常出現:進程通信和永久存儲。然而Hadoop中各個節點的通信是通過遠程調用(RPC)實現的,那么 RPC序列化要求具有以下特點:
1)緊湊:緊湊的格式能讓我們能充分利用網絡帶寬,而帶寬是數據中心最稀缺的資
2)快速:進程通信形成了分布式系統的骨架,所以需要盡量減少序列化和反序列化的性能開銷,這是基本的;
3)可擴展:協議為了滿足新的需求變化,所以控制客戶端和服務器過程中,需要直接引進相應的協議,這些是新協議,原序列化方式能支持新的協議報文;
4)互操作:能支持不同語言寫的客戶端和服務端進行交互;
2.5 常用數據序列化類型
2.6 自定義bean對象實現序列化接口(Writable)
1)自定義bean對象要想序列化傳輸,必須實現序列化接口,需要注意以下7項。
// 1 必須實現Writable接口 public class FlowBean implements Writable { private long upFlow; private long downFlow; private long sumFlow; //2 反序列化時,需要反射調用空參構造函數,所以必須有 public FlowBean() { super(); } /** * 3重寫序列化方法 * @param out * @throws IOException */ @Override public void write(DataOutput out) throws IOException { out.writeLong(upFlow); out.writeLong(downFlow); out.writeLong(sumFlow); } /** * 4 重寫反序列化方法 * 5 注意反序列化的順序和序列化的順序完全一致 * @param in * @throws IOException */ @Override public void readFields(DataInput in) throws IOException { upFlow = in.readLong(); downFlow = in.readLong(); sumFlow = in.readLong(); } // 6要想把結果顯示在文件中,需要重寫toString(),且用”\t”分開,方便后續用 @Override public String toString() { return upFlow + "\t" + downFlow + "\t" + sumFlow; } //7 如果需要將自定義的bean放在key中傳輸,則還需要實現comparable接口,因為mapreduce框中的shuffle過程一定會對key進行排序 @Override public int compareTo(FlowBean o) { // 倒序排列,從大到小 return this.sumFlow > o.getSumFlow() ? -1 : 1; } }
(1)必須實現Writable接口
(2)反序列化時,需要反射調用空參構造函數,所以必須有空參構造
(3)重寫序列化方法
(4)重寫反序列化方法
(5)注意反序列化的順序和序列化的順序完全一致
(6)要想把結果顯示在文件中,需要重寫toString(),且用”\t”分開,方便后續用
(7)如果需要將自定義的bean放在key中傳輸,則還需要實現comparable接口,因為mapreduce框中的shuffle過程一定會對key進行排序
2)案例實操
詳見統計每一個手機號耗費的總上行流量、下行流量、總流量(序列化)。
三 MapReduce框架原理
3.1 MapReduce工作流程
1)流程示意圖
2)流程詳解
上面的流程是整個mapreduce最全工作流程,但是shuffle過程只是從第7步開始到第16步結束,具體shuffle過程詳解,如下:
1)maptask收集我們的map()方法輸出的kv對,放到內存緩沖區中
2)從內存緩沖區不斷溢出本地磁盤文件,可能會溢出多個文件
3)多個溢出文件會被合並成大的溢出文件
4)在溢出過程中,及合並的過程中,都要調用partitioner進行分區和針對key進行排序
5)reducetask根據自己的分區號,去各個maptask機器上取相應的結果分區數據
6)reducetask會取到同一個分區的來自不同maptask的結果文件,reducetask會將這些文件再進行合並(歸並排序)
7)合並成大文件后,shuffle的過程也就結束了,后面進入reducetask的邏輯運算過程(從文件中取出一個一個的鍵值對group,調用用戶自定義的reduce()方法)
3)注意
Shuffle中的緩沖區大小會影響到mapreduce程序的執行效率,原則上說,緩沖區越大,磁盤io的次數越少,執行速度就越快。
緩沖區的大小可以通過參數調整,參數:io.sort.mb 默認100M
3.2 InputFormat數據輸入
3.2.1 InputFormat接口實現類
InputFormat 常見的接口實現類包括:TextInputFormat、KeyValueTextInputFormat、NLineInputFormat、CombineTextInputFormat和自定義InputFormat等。
1)TextInputFormat
TextInputFormat 是默認的 InputFormat。每條記錄是一行輸入。鍵是LongWritable 類型,存儲該行在整個文件中的字節偏移量。 值是這行的內容,不包括任何行終止符(換行符合回車符),它被打包成一個 Text 對象。
以下是一個示例,比如,一個分片包含了如下4條文本記錄。
Rich learning form Intelligent learning engine Learning more convenient From the real demand for more close to the enterprise |
每條記錄表示為以下鍵/值對:
(0,Rich learning form) (19,Intelligent learning engine) (47,Learning more convenient) (72,From the real demand for more close to the enterprise) |
很明顯,鍵並不是行號。一般情況下,很難取得行號,因為文件按字節而不是按行切分為分片。
2)KeyValueTextInputFormat
每一行均為一條記錄, 被分隔符(缺省是tab(\t))分割為key(Text),value(Text)。可以通過 mapreduce.input.keyvaluelinerecordreader.key.value,separator屬性(或者舊版本 API 中的 key.value.separator.in.input.line)來設定分隔符。 它的默認值是一個制表符。
以下是一個示例,輸入是一個包含4條記錄的分片。其中——>表示一個(水平方向的)制表符。
line1 ——>Rich learning form line2 ——>Intelligent learning engine line3 ——>Learning more convenient line4 ——>From the real demand for more close to the enterprise |
每條記錄表示為以下鍵/值對:
(line1,Rich learning form) (line2,Intelligent learning engine) (line3,Learning more convenient) (line4,From the real demand for more close to the enterprise) |
此時的鍵是每行排在制表符之前的 Text 序列。
3)NLineInputFormat
通過 TextInputFormat 和 KeyValueTextInputFormat,每個 Mapper 收到的輸入行數不同。行數取決於輸入分片的大小和行的長度。 如果希望 Mapper 收到固定行數的輸入,需要將 NLineInputFormat 作為 InputFormat。與 TextInputFormat 一樣, 鍵是文件中行的字節偏移量,值是行本身。N 是每個 Mapper 收到的輸入行數。N 設置為1(默認值)時,每個 Mapper 正好收到一行輸入。 mapreduce.input.lineinputformat.linespermap 屬性(在舊版本 API 中的 mapred.line.input.format.linespermap 屬性)實現 N 值的設定。
以下是一個示例,仍然以上面的4行輸入為例。
Rich learning form Intelligent learning engine Learning more convenient From the real demand for more close to the enterprise |
例如,如果 N 是2,則每個輸入分片包含兩行。一個 mapper 收到前兩行鍵值對:
(0,Rich learning form) (19,Intelligent learning engine) |
另一個 mapper 則收到后兩行:
(47,Learning more convenient) (72,From the real demand for more close to the enterprise) |
這里的鍵和值與TextInputFormat生成的一樣。
3.2.2 自定義InputFormat
1)概述
(1)自定義一個類繼承FileInputFormat
(2)改寫RecordReader,實現一次讀取一個完整文件封裝為KV
(3)在輸出時使用SequenceFileOutPutFormat輸出合並文件
2)案例實操
詳見小文件處理(自定義InputFormat)。
3.2.3 FileInputFormat切片機制
1)job提交流程源碼詳解
waitForCompletion() submit(); // 1建立連接 connect(); // 1)創建提交job的代理 new Cluster(getConfiguration()); // (1)判斷是本地yarn還是遠程 initialize(jobTrackAddr, conf); // 2 提交job submitter.submitJobInternal(Job.this, cluster) // 1)創建給集群提交數據的Stag路徑 Path jobStagingArea = JobSubmissionFiles.getStagingDir(cluster, conf); // 2)獲取jobid ,並創建job路徑 JobID jobId = submitClient.getNewJobID(); // 3)拷貝jar包到集群 copyAndConfigureFiles(job, submitJobDir); rUploader.uploadFiles(job, jobSubmitDir); // 4)計算切片,生成切片規划文件 writeSplits(job, submitJobDir); maps = writeNewSplits(job, jobSubmitDir); input.getSplits(job); // 5)向Stag路徑寫xml配置文件 writeConf(conf, submitJobFile); conf.writeXml(out); // 6)提交job,返回提交狀態 status = submitClient.submitJob(jobId, submitJobDir.toString(), job.getCredentials());
2)FileInputFormat源碼解析(input.getSplits(job))
(1)找到你數據存儲的目錄。
(2)開始遍歷處理(規划切片)目錄下的每一個文件
(3)遍歷第一個文件ss.txt
a)獲取文件大小fs.sizeOf(ss.txt);
b)計算切片大小computeSliteSize(Math.max(minSize,Math.min(maxSize,blocksize)))=blocksize=128M
c)默認情況下,切片大小=blocksize
d)開始切,形成第1個切片:ss.txt—0:128M 第2個切片ss.txt—128:256M 第3個切片ss.txt—256M:300M(每次切片時,都要判斷切完剩下的部分是否大於塊的1.1倍,不大於1.1倍就划分一塊切片)
e)將切片信息寫到一個切片規划文件中
f)整個切片的核心過程在getSplit()方法中完成。
g)數據切片只是在邏輯上對輸入數據進行分片,並不會再磁盤上將其切分成分片進行存儲。InputSplit只記錄了分片的元數據信息,比如起始位置、長度以及所在的節點列表等。
h)注意:block是HDFS上物理上存儲的存儲的數據,切片是對數據邏輯上的划分。
(4)提交切片規划文件到yarn上,yarn上的MrAppMaster就可以根據切片規划文件計算開啟maptask個數。
3)FileInputFormat中默認的切片機制:
(1)簡單地按照文件的內容長度進行切片
(2)切片大小,默認等於block大小
(3)切片時不考慮數據集整體,而是逐個針對每一個文件單獨切片
比如待處理數據有兩個文件: file1.txt 320M file2.txt 10M 經過FileInputFormat的切片機制運算后,形成的切片信息如下: file1.txt.split1-- 0~128 file1.txt.split2-- 128~256 file1.txt.split3-- 256~320 file2.txt.split1-- 0~10M
4)FileInputFormat切片大小的參數配置
(1)通過分析源碼,在FileInputFormat中,計算切片大小的邏輯:Math.max(minSize, Math.min(maxSize, blockSize));
切片主要由這幾個值來運算決定
mapreduce.input.fileinputformat.split.minsize=1 默認值為1
mapreduce.input.fileinputformat.split.maxsize= Long.MAXValue 默認值Long.MAXValue
因此,默認情況下,切片大小=blocksize。
maxsize(切片最大值):參數如果調得比blocksize小,則會讓切片變小,而且就等於配置的這個參數的值。
minsize(切片最小值):參數調的比blockSize大,則可以讓切片變得比blocksize還大。
5)獲取切片信息API
// 根據文件類型獲取切片信息 FileSplit inputSplit = (FileSplit) context.getInputSplit(); // 獲取切片的文件名稱 String name = inputSplit.getPath().getName();
3.2.4 CombineTextInputFormat切片機制
關於大量小文件的優化策略
1)默認情況下TextInputformat對任務的切片機制是按文件規划切片,不管文件多小,都會是一個單獨的切片,都會交給一個maptask,這樣如果有大量小文件,就會產生大量的maptask,處理效率極其低下。
2)優化策略
(1)最好的辦法,在數據處理系統的最前端(預處理/采集),將小文件先合並成大文件,再上傳到HDFS做后續分析。
(2)補救措施:如果已經是大量小文件在HDFS中了,可以使用另一種InputFormat來做切片(CombineFileInputFormat),它的切片邏輯跟TextFileInputFormat不同:它可以將多個小文件從邏輯上規划到一個切片中,這樣,多個小文件就可以交給一個maptask。
(3)優先滿足最小切片大小,不超過最大切片大小
CombineTextInputFormat.setMaxInputSplitSize(job, 4194304);// 4m
CombineTextInputFormat.setMinInputSplitSize(job, 2097152);// 2m
舉例:0.5m+1m+0.3m+5m=2m + 4.8m=2m + 4m + 0.8m
3)具體實現步驟
// 9 如果不設置InputFormat,它默認用的是TextInputFormat.class
job.setInputFormatClass(CombineTextInputFormat.class)
CombineTextInputFormat.setMaxInputSplitSize(job, 4194304);// 4m
CombineTextInputFormat.setMinInputSplitSize(job, 2097152);// 2m
4)案例實操
詳見大量小文件的切片優化(CombineTextInputFormat)。
3.3 MapTask工作機制
3.3.1 並行度決定機制
1)問題引出
maptask的並行度決定map階段的任務處理並發度,進而影響到整個job的處理速度。那么,mapTask並行任務是否越多越好呢?
2)MapTask並行度決定機制
一個job的map階段MapTask並行度(個數),由客戶端提交job時的切片個數決定。
3.3.2 MapTask工作機制
(1)Read階段:Map Task通過用戶編寫的RecordReader,從輸入InputSplit中解析出一個個key/value。
(2)Map階段:該節點主要是將解析出的key/value交給用戶編寫map()函數處理,並產生一系列新的key/value。
(3)Collect收集階段:在用戶編寫map()函數中,當數據處理完成后,一般會調用OutputCollector.collect()輸出結果。在該函數內部,它會將生成的key/value分區(調用Partitioner),並寫入一個環形內存緩沖區中。
(4)Spill階段:即“溢寫”,當環形緩沖區滿后,MapReduce會將數據寫到本地磁盤上,生成一個臨時文件。需要注意的是,將數據寫入本地磁盤之前,先要對數據進行一次本地排序,並在必要時對數據進行合並、壓縮等操作。
溢寫階段詳情:
步驟1:利用快速排序算法對緩存區內的數據進行排序,排序方式是,先按照分區編號partition進行排序,然后按照key進行排序。這樣,經過排序后,數據以分區為單位聚集在一起,且同一分區內所有數據按照key有序。
步驟2:按照分區編號由小到大依次將每個分區中的數據寫入任務工作目錄下的臨時文件output/spillN.out(N表示當前溢寫次數)中。如果用戶設置了Combiner,則寫入文件之前,對每個分區中的數據進行一次聚集操作。
步驟3:將分區數據的元信息寫到內存索引數據結構SpillRecord中,其中每個分區的元信息包括在臨時文件中的偏移量、壓縮前數據大小和壓縮后數據大小。如果當前內存索引大小超過1MB,則將內存索引寫到文件output/spillN.out.index中。
(5)Combine階段:當所有數據處理完成后,MapTask對所有臨時文件進行一次合並,以確保最終只會生成一個數據文件。當所有數據處理完后,MapTask會將所有臨時文件合並成一個大文件,並保存到文件output/file.out中,同時生成相應的索引文件output/file.out.index。在進行文件合並過程中,MapTask以分區為單位進行合並。對於某個分區,它將采用多輪遞歸合並的方式。每輪合並io.sort.factor(默認100)個文件,並將產生的文件重新加入待合並列表中,對文件排序后,重復以上過程,直到最終得到一個大文件。讓每個MapTask最終只生成一個數據文件,可避免同時打開大量文件和同時讀取大量小文件產生的隨機讀取帶來的開銷。
MapTask 並行度經驗之談
如果硬件配置為 2*12core + 64G,恰當的 map 並行度是大約每個節點 20-100 個 map,最好 每個 map 的執行時間至少一分鍾。
1、如果 job 的每個 map 或者 reduce task 的運行時間都只有 30-40 秒鍾,那么就減少該 job 的 map 或者 reduce 數,每一個 task(map|reduce)的 setup 和加入到調度器中進行調度,這個 中間的過程可能都要花費幾秒鍾,所以如果每個 task 都非常快就跑完了,就會在 task 的開 始和結束的時候浪費太多的時間。
配置 task 的 JVM 重用可以改善該問題:
mapred.job.reuse.jvm.num.tasks,默認是 1,表示一個 JVM 上最多可以順序執行的 task 數目(屬於同一個 Job)是 1。也就是說一個 task 啟一個 JVM。這個值可以在 mapred-site.xml 中進行更改,當設置成多個,就意味着這多個 task 運行在同一個 JVM 上,但不是同時執行, 是排隊順序執行
2、如果 input 的文件非常的大,比如 1TB,可以考慮將 hdfs 上的每個 blocksize 設大,比如 設成 256MB 或者 512MB
3.4 Shuffle機制
- MapReduce 中,mapper 階段處理的數據如何傳遞給 reducer 階段,是 MapReduce 框架中 最關鍵的一個流程,這個流程就叫 Shuffle
- Shuffle: 數據混洗 ——(核心機制:數據分區,排序,局部聚合,緩存,拉取,再合並 排序)
- 具體來說:就是將 MapTask 輸出的處理結果數據,按照 Partitioner 組件制定的規則分發 給 ReduceTask,並在分發的過程中,對數據按 key 進行了分區和排序
3.4.1 Shuffle機制
Mapreduce確保每個reducer的輸入都是按鍵排序的。系統執行排序的過程(即將map輸出作為輸入傳給reducer)稱為shuffle。
3.4.2 Partition分區
0)問題引出:要求將統計結果按照條件輸出到不同文件中(分區)。比如:將統計結果按照手機歸屬地不同省份輸出到不同文件中(分區)
1)默認partition分區
public class HashPartitioner<K, V> extends Partitioner<K, V> { /** Use {@link Object#hashCode()} to partition. */ public int getPartition(K key, V value, int numReduceTasks) { return (key.hashCode() & Integer.MAX_VALUE) % numReduceTasks; } }
默認分區是根據key的hashCode對reduceTasks個數取模得到的。用戶沒法控制哪個key存儲到哪個分區。
2)自定義Partitioner步驟
(1)自定義類繼承Partitioner,重寫getPartition()方法
public class ProvincePartitioner extends Partitioner<Text, FlowBean> { @Override public int getPartition(Text key, FlowBean value, int numPartitions) { // 1 獲取電話號碼的前三位 String preNum = key.toString().substring(0, 3); int partition = 4; // 2 判斷是哪個省 if ("136".equals(preNum)) { partition = 0; }else if ("137".equals(preNum)) { partition = 1; }else if ("138".equals(preNum)) { partition = 2; }else if ("139".equals(preNum)) { partition = 3; } return partition; } }
(2)在job驅動中,設置自定義partitioner:
job.setPartitionerClass(CustomPartitioner.class);
(3)自定義partition后,要根據自定義partitioner的邏輯設置相應數量的reduce task
job.setNumReduceTasks(5);
3)注意:
如果reduceTask的數量> getPartition的結果數,則會多產生幾個空的輸出文件part-r-000xx;
如果1<reduceTask的數量<getPartition的結果數,則有一部分分區數據無處安放,會Exception;
如果reduceTask的數量=1,則不管mapTask端輸出多少個分區文件,最終結果都交給這一個reduceTask,最終也就只會產生一個結果文件 part-r-00000;
例如:假設自定義分區數為5,則
(1)job.setNumReduceTasks(1);會正常運行,只不過會產生一個輸出文件
(2)job.setNumReduceTasks(2);會報錯
(3)job.setNumReduceTasks(6);大於5,程序會正常運行,會產生空文件
4)案例實操
詳見將統計結果按照手機歸屬地不同省份輸出到不同文件中(Partitioner)
詳見把單詞按照ASCII碼奇偶分區(Partitioner)
3.4.3 WritableComparable排序
排序是MapReduce框架中最重要的操作之一。Map Task和Reduce Task均會對數據(按照key)進行排序。該操作屬於Hadoop的默認行為。任何應用程序中的數據均會被排序,而不管邏輯上是否需要。
對於Map Task,它會將處理的結果暫時放到一個緩沖區中,當緩沖區使用率達到一定閾值后,再對緩沖區中的數據進行一次排序,並將這些有序數據寫到磁盤上,而當數據處理完畢后,它會對磁盤上所有文件進行一次合並,以將這些文件合並成一個大的有序文件。
對於Reduce Task,它從每個Map Task上遠程拷貝相應的數據文件,如果文件大小超過一定閾值,則放到磁盤上,否則放到內存中。如果磁盤上文件數目達到一定閾值,則進行一次合並以生成一個更大文件;如果內存中文件大小或者數目超過一定閾值,則進行一次合並后將數據寫到磁盤上。當所有數據拷貝完畢后,Reduce Task統一對內存和磁盤上的所有數據進行一次合並。
1)排序的分類:
(1)部分排序:
MapReduce根據輸入記錄的鍵對數據集排序。保證輸出的每個文件內部排序。
(2)全排序:
如何用Hadoop產生一個全局排序的文件?最簡單的方法是使用一個分區。但該方法在處理大型文件時效率極低,因為一台機器必須處理所有輸出文件,從而完全喪失了MapReduce所提供的並行架構。
替代方案:首先創建一系列排好序的文件;其次,串聯這些文件;最后,生成一個全局排序的文件。主要思路是使用一個分區來描述輸出的全局排序。例如:可以為上述文件創建3個分區,在第一分區中,記錄的單詞首字母a-g,第二分區記錄單詞首字母h-n, 第三分區記錄單詞首字母o-z。
(3)輔助排序:(GroupingComparator分組)
Mapreduce框架在記錄到達reducer之前按鍵對記錄排序,但鍵所對應的值並沒有被排序。甚至在不同的執行輪次中,這些值的排序也不固定,因為它們來自不同的map任務且這些map任務在不同輪次中完成時間各不相同。一般來說,大多數MapReduce程序會避免讓reduce函數依賴於值的排序。但是,有時也需要通過特定的方法對鍵進行排序和分組等以實現對值的排序。
(4)二次排序:
在自定義排序過程中,如果compareTo中的判斷條件為兩個即為二次排序。
2)自定義排序WritableComparable
(1)原理分析
bean對象實現WritableComparable接口重寫compareTo方法,就可以實現排序
@Override public int compareTo(FlowBean o) { // 倒序排列,從大到小 return this.sumFlow > o.getSumFlow() ? -1 : 1; }
(2)案例實操
詳將統計結果按照總流量倒序排序(排序)
詳見不同省份輸出文件內部排序(部分排序)
3.4.4 GroupingComparator分組(輔助排序)
1)對reduce階段的數據根據某一個或幾個字段進行分組。
2)案例實操
詳見求出每一個訂單中最貴的商品(GroupingComparator)
3.4.5 Combiner合並
1)combiner是MR程序中Mapper和Reducer之外的一種組件
2)combiner組件的父類就是Reducer
3)combiner和reducer的區別在於運行的位置:
Combiner是在每一個maptask所在的節點運行
Reducer是接收全局所有Mapper的輸出結果;
4)combiner的意義就是對每一個maptask的輸出進行局部匯總,以減小網絡傳輸量
5)combiner能夠應用的前提是不能影響最終的業務邏輯,而且,combiner的輸出kv應該跟reducer的輸入kv類型要對應起來
Mapper 3 5 7 ->(3+5+7)/3=5 2 6 ->(2+6)/2=4 Reducer (3+5+7+2+6)/5=23/5 不等於 (5+4)/2=9/2
6)自定義Combiner實現步驟:
(1)自定義一個combiner繼承Reducer,重寫reduce方法
public class WordcountCombiner extends Reducer<Text, IntWritable, Text, IntWritable>{ @Override protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { int count = 0; for(IntWritable v :values){ count = v.get(); } context.write(key, new IntWritable(count)); } }
(2)在job驅動類中設置:
job.setCombinerClass(WordcountCombiner.class);
7)案例實操
詳見對每一個maptask的輸出局部匯總(Combiner)
3.5 ReduceTask工作機制
1)設置ReduceTask
reducetask的並行度同樣影響整個job的執行並發度和執行效率,但與maptask的並發數由切片數決定不同,Reducetask數量的決定是可以直接手動設置:
//默認值是1,手動設置為4 job.setNumReduceTasks(4);
2)注意
(1)reducetask=0 ,表示沒有reduce階段,輸出文件個數和map個數一致。
(2)reducetask默認值就是1,所以輸出文件個數為一個。
(3)如果數據分布不均勻,就有可能在reduce階段產生數據傾斜
(4)reducetask數量並不是任意設置,還要考慮業務邏輯需求,有些情況下,需要計算全局匯總結果,就只能有1個reducetask。
(5)具體多少個reducetask,需要根據集群性能而定。
(6)如果分區數不是1,但是reducetask為1,是否執行分區過程。答案是:不執行分區過程。因為在maptask的源碼中,執行分區的前提是先判斷reduceNum個數是否大於1。不大於1肯定不執行。
3)實驗:測試reducetask多少合適。
4)ReduceTask工作機制
(1)Copy階段:ReduceTask從各個MapTask上遠程拷貝一片數據,並針對某一片數據,如果其大小超過一定閾值,則寫到磁盤上,否則直接放到內存中。
(2)Merge階段:在遠程拷貝數據的同時,ReduceTask啟動了兩個后台線程對內存和磁盤上的文件進行合並,以防止內存使用過多或磁盤上文件過多。
(3)Sort階段:按照MapReduce語義,用戶編寫reduce()函數輸入數據是按key進行聚集的一組數據。為了將key相同的數據聚在一起,Hadoop采用了基於排序的策略。由於各個MapTask已經實現對自己的處理結果進行了局部排序,因此,ReduceTask只需對所有數據進行一次歸並排序即可。
(4)Reduce階段:reduce()函數將計算結果寫到HDFS上。
3.6 OutputFormat數據輸出
3.6.1 OutputFormat接口實現類
OutputFormat是MapReduce輸出的基類,所有實現MapReduce輸出都實現了 OutputFormat接口。下面我們介紹幾種常見的OutputFormat實現類。
1)文本輸出TextOutputFormat
默認的輸出格式是TextOutputFormat,它把每條記錄寫為文本行。它的鍵和值可以是任意類型,因為TextOutputFormat調用toString()方法把它們轉換為字符串。
2)SequenceFileOutputFormat
SequenceFileOutputFormat將它的輸出寫為一個順序文件。如果輸出需要作為后續 MapReduce任務的輸入,這便是一種好的輸出格式,因為它的格式緊湊,很容易被壓縮。
3)自定義OutputFormat
根據用戶需求,自定義實現輸出。
3.6.2 自定義OutputFormat
為了實現控制最終文件的輸出路徑,可以自定義OutputFormat。
要在一個mapreduce程序中根據數據的不同輸出兩類結果到不同目錄,這類靈活的輸出需求可以通過自定義outputformat來實現。
1)自定義OutputFormat步驟
(1)自定義一個類繼承FileOutputFormat。
(2)改寫recordwriter,具體改寫輸出數據的方法write()。
2)實操案例:
詳見修改日志內容及自定義日志輸出路徑(自定義OutputFormat)。
3.7 計數器應用
Hadoop為每個作業維護若干內置計數器,以描述多項指標。例如,某些計數器記錄已處理的字節數和記錄數,使用戶可監控已處理的輸入數據量和已產生的輸出數據量。
1)API
(1)采用枚舉的方式統計計數
enum MyCounter{MALFORORMED,NORMAL} //對枚舉定義的自定義計數器加1 context.getCounter(MyCounter.MALFORORMED).increment(1);
(2)采用計數器組、計數器名稱的方式統計
context.getCounter("counterGroup", "countera").increment(1); 組名和計數器名稱隨便起,但最好有意義。
(3)計數結果在程序運行后的控制台上查看。
2)案例實操
詳見修改日志內容及自定義日志輸出路徑(自定義OutputFormat)。
3.8 Join多種應用
3.8.1 Reduce join
1)原理:
Map端的主要工作:為來自不同表(文件)的key/value對打標簽以區別不同來源的記錄。然后用連接字段作為key,其余部分和新加的標志作為value,最后進行輸出。
reduce端的主要工作:在reduce端以連接字段作為key的分組已經完成,我們只需要在每一個分組當中將那些來源於不同文件的記錄(在map階段已經打標志)分開,最后進行合並就ok了
2)該方法的缺點
這里主要分析一下reduce join的一些不足。之所以會存在reduce join這種方式,是因為整體數據被分割了,每個map task只處理一部分數據而不能夠獲取到所有需要的join字段,因此我們可以充分利用mapreduce框架的特性,讓他按照join key進行分區,將所有join key相同的記錄集中起來進行處理,所以reduce join這種方式就出現了。
這種方式的缺點很明顯就是會造成map和reduce端也就是shuffle階段出現大量的數據傳輸,效率很低。
3.8.2 Map join
1)使用場景:一張表十分小、一張表很大。
2)使用方法:
在提交作業的時候先將小表文件放到該作業的DistributedCache中,然后從DistributeCache中取出該小表進行join (比如放到Hash Map等等容器中)。然后掃描大表,看大表中的每條記錄的join key/value值是否能夠在內存中找到相同join key的記錄,如果有則直接輸出結果。
3.8.3 Distributedcache分布式緩存
1)數據傾斜原因
如果是多張表的操作都是在reduce階段完成,reduce端的處理壓力太大,map節點的運算負載則很低,資源利用率不高,且在reduce階段極易產生數據傾斜。
2)實操案例:
詳見 reduce端表合並(數據傾斜)
3)解決方案
在map端緩存多張表,提前處理業務邏輯,這樣增加map端業務,減少reduce端數據的壓力,盡可能的減少數據傾斜。
4)具體辦法:采用distributedcache
(1)在mapper的setup階段,將文件讀取到緩存集合中
(2)在驅動函數中加載緩存。
job.addCacheFile(new URI("file:/e:/mapjoincache/pd.txt"));// 緩存普通文件到task運行節點
5)實操案例:
詳見 map端表合並(Distributedcache)
3.9 數據清洗
1)概述
在運行核心業務Mapreduce程序之前,往往要先對數據進行清洗,清理掉不符合用戶要求的數據。清理的過程往往只需要運行mapper程序,不需要運行reduce程序。
2)實操案例
詳見 日志清洗(數據清洗)。
3.10 MapReduce開發總結
在編寫mapreduce程序時,需要考慮的幾個方面:
1)輸入數據接口:InputFormat
默認使用的實現類是:TextInputFormat
TextInputFormat的功能邏輯是:一次讀一行文本,然后將該行的起始偏移量作為key,行內容作為value返回
CombineTextInputFormat可以把多個小文件合並成一個切片處理,提高處理效率。
用戶還可以自定義InputFormat。
2)邏輯處理接口:Mapper
用戶根據業務需求實現其中三個方法:map() setup() cleanup ()
3)Partitioner分區
有默認實現 HashPartitioner,邏輯是根據key的哈希值和numReduces來返回一個分區號;key.hashCode()&Integer.MAXVALUE % numReduces
如果業務上有特別的需求,可以自定義分區。
4)Comparable排序
當我們用自定義的對象作為key來輸出時,就必須要實現WritableComparable接口,重寫其中的compareTo()方法。
部分排序:對最終輸出的沒一個文件進行內部排序。
全排序:對所有數據進行排序,通常只有一個Reduce。
二次排序:排序的條件有兩個。
5)Combiner合並
Combiner合並可以提高程序執行效率,減少io傳輸。但是使用時必須不能影響原有的業務處理結果。
6)reduce端分組:Groupingcomparator
reduceTask拿到輸入數據(一個partition的所有數據)后,首先需要對數據進行分組,其分組的默認原則是key相同,然后對每一組kv數據調用一次reduce()方法,並且將這一組kv中的第一個kv的key作為參數傳給reduce的key,將這一組數據的value的迭代器傳給reduce()的values參數。
利用上述這個機制,我們可以實現一個高效的分組取最大值的邏輯。
自定義一個bean對象用來封裝我們的數據,然后改寫其compareTo方法產生倒序排序的效果。然后自定義一個Groupingcomparator,將bean對象的分組邏輯改成按照我們的業務分組id來分組(比如訂單號)。這樣,我們要取的最大值就是reduce()方法中傳進來key。
7)邏輯處理接口:Reducer
用戶根據業務需求實現其中三個方法: reduce() setup() cleanup ()
8)輸出數據接口:OutputFormat
默認實現類是TextOutputFormat,功能邏輯是:將每一個KV對向目標文本文件中輸出為一行。
用戶還可以自定義OutputFormat。
四 MapReduce運行方式
一、本地方式運行:
1、pc環境:
1.1、將Hadoop安裝本地解壓
1.2、配置Hadoop的環境變量
添加%HADOOP_HOME%
修改%PATH% 添加%HADOOP_HOME%/bin;%HADOOP_HOME%/sbin
1.3、在解壓的Hadoop的bin目錄下 添加winutils.exe工具
2、Java工程
2.1、jdk一定要使用自己的jdk、不要使用eclipse自帶
2.2、根目錄(src目錄下),不要添加任何MapReduce的配置文件 hdfs-site.xml yarn-site.xml core-site.xml mapred-site.xml
2.3、在代碼當中,通過conf.set方式來進行指定。conf.set("fs.defaultFS", "hdfs://node21:8020");
2.4、修改Hadoop源碼
3、右鍵run執行
集群運行兩種方式:
二、
Java工程
1、根目錄(src目錄下),添加Hadoop的配置文件 hdfs-site.xml yarn-site.xml core-site.xml mapred-site.xml
2、在代碼當中,指定jar包的位置,config.set("mapred.jar", "D:\\MR\\wc.jar");
3、修改Hadoop源碼
4、將工程打jar包
5、右鍵run執行
三、
Java工程
1、根目錄(src目錄下),添加Hadoop的配置文件 hdfs-site.xml yarn-site.xml core-site.xml mapred-site.xml
2、將工程打jar包
3、手動將jar包上傳到集群當中
4、通過hadoop命令來運行。hadoop jar jar位置 mr代碼入口 (例如:hadoop jar /usr/wc.jar com.sxt.mr.WcJob)
HBase運行:
在代碼當中指定HBase所使用的ZooKeeper集群。
(注意:如果hbase搭建的是偽分布式,那么對應的ZooKeeper就是那台偽分布式的服務器)
conf.set("hbase.zookeeper.quorum", "node21,node22,node23");
System.setProperty("HADOOP_USER_NAME", "root");