一. MapTask並行度決定機制
maptask
的並行度決定
map
階段的任務處理並發度,進而影響到整個
job
的處理速度
那么,
mapTask
並行實例是否越多越好呢?其並行度又是如何決定呢?
1.1 mapTask並行度的決定機制
一個job的map階段並行度由客戶端在提交job時決定
而客戶端對map階段並行度的規划的基本邏輯為:
將待處理數據執行邏輯切片(即按照一個特定切片大小,將待處理數據划分成邏輯上的多個split),然后每一個split分配一個mapTask並行實例處理
(3)注意:block是HDFS上物理上存儲的存儲的數據,切片是對數據邏輯上的划分。兩者之間沒有關系。即使hdfs上是128M存儲,Mapreduce也會切片,只是默認切片也是128M。也可以非128M切片,如100M,多余的部門由框架內部處理和其他結點進行拼接切片。因為切片決定了給其分配mapTask進程數量。
(4)在HDFS上,map默認運算切片大小是128M,但如果是本地運行的話,maP默認切片大小是32M.。maptask 和reducetask其實都框架下的一個類。
這段邏輯及形成的切片規划描述文件,由FileInputFormat實現類的getSplits()方法完成,其過程如下圖:

主要有兩個方法:getsplit(),客戶端用來切片。
creatRecordReader()MR用來讀數據
1.FileInputFormat切片機制(在提交yarn前已完成,客戶端完成)
注解:FileInputFormat繼承與InputFormat類,都是mapreduce包下的類。歸其管理
其余的常用類,如TextInputFormat和CombinInputFormat都是FileInputFormat的子類。
1、切片定義在InputFormat類中的getSplit()方法
2、FileInputFormat中默認的切片機制:
1.簡單地按照文件的內容長度進行切片
2.切片大小,默認等於block大小(這樣如果有很多小文件時,就會產生很多切片,造成很多個maptask,降低系統性能)
3.切片時不考慮數據集整體,而是逐個針對每一個文件單獨切片
2.切片大小,默認等於block大小(這樣如果有很多小文件時,就會產生很多切片,造成很多個maptask,降低系統性能)
3.切片時不考慮數據集整體,而是逐個針對每一個文件單獨切片
比如待處理數據有兩個文件:
file1.txt 320M
file2.txt 10M
|
經過FileInputFormat的切片機制運算后,形成的切片信息如下:
file1.txt.split1-- 0~128
file1.txt.split2-- 128~256
file1.txt.split3-- 256~320
file2.txt.split1-- 0~10M
|
3、FileInputFormat中切片的大小的參數配置

2.FileInputFormat源碼解析(input.getSplits(job))從job.waitforComplecation開始斷點,直到提交到yarn上。
(1)找到你數據存儲的目錄。(可以是目錄,多個文件同時進行統計,然后將統計結果裝載到一個文件或者多個文件里。。也可以單獨統計分析1個文件。)
(2)開始遍歷處理(規划切片)目錄下的每一個文件
(3)遍歷第一個文件ss.txt
a)獲取文件大小fs.sizeOf(ss.txt);
b)計算切片大小computeSliteSize(Math.max(minSize,Math.min(maxSize,blocksize)))=blocksize=128M
c)開始切,形成第1個切片:ss.txt—0:128M第2個切片ss.txt—128:256M第3個切片ss.txt—256M:300M(
每次切片時,都要判斷切完剩下的部分是否大於塊的
1.1
倍,
不大於
1.1
倍就划分一塊切片
)當然類中方法也會判斷,如果是壓縮文件之類的,是不會切片的。報異常。
d)將切片信息寫到一個切片規划文件中
f)整個切片的核心過程在getSplit()方法中完成。(是FileInputFormat類中的方法)
(4)提交切片規划文件到yarn上,yarn上的MrAppMaster就可以根據切片規划文件計算開啟maptask個數。
3.通過分析源碼,在FileInputFormat中,計算切片大小的邏輯:Math.max(minSize, Math.min(maxSize, blockSize)); 切片主要由這幾個值來運算決定
FileInputFormat中默認的切片機制:
(1)簡單地按照文件的內容長度進行切片
(2)切片大小,默認等於block大小
(3)切片時不考慮數據集整體,而是逐個針對每一個文件單獨切片
(2)切片大小,默認等於block大小
(3)切片時不考慮數據集整體,而是逐個針對每一個文件單獨切片
比如待處理數據有兩個文件:
file1.txt 320M
file2.txt 10M
|
經過FileInputFormat的切片機制運算后,形成的切片信息如下:
file1.txt.split1-- 0~128
file1.txt.split2-- 128~256
file1.txt.split3-- 256~320
file2.txt.split1-- 0~10M
|
4)FileInputFormat切片大小的參數配置
(1)通過分析源碼,在FileInputFormat中,計算切片大小的邏輯:Math.max(minSize, Math.min(maxSize, blockSize));
切片主要由這幾個值來運算決定
mapreduce.input.fileinputformat.split.minsize=1默認值為1
mapreduce.input.fileinputformat.split.maxsize= Long.MAXValue默認值Long.MAXValue
因此,默認情況下,切片大小=blocksize。
maxsize(切片最大值):參數如果調得比blocksize小,則會讓切片變小,而且就等於配置的這個參數的值。
minsize(切片最小值):參數調的比blockSize大,則可以讓切片變得比blocksize還大。
5)獲取切片信息API :FileSplit是inputSplit的子類。
//根據文件類型獲取切片信息
FileSplit inputSplit = (FileSplit) context.getInputSplit();
//獲取切片的文件名稱
String name = inputSplit.getPath().getName();//獲取的是被切片文件名:
|
選擇並發數的影響因素:
1- 運算節點的硬件配置
2- 運算任務的類型:CPU密集型還是IO密集型
3- 運算任務的數據量
map並行度的經驗之談
1.如果硬件配置為2*12core + 64G,恰當的map並行度是大約每個節點20-100個map,最好每個map的執行時間至少一分鍾。
2.如果job的每個map或者 reduce task的運行時間都只有30-40秒鍾,那么就減少該job的map或者reduce數,每一個task(map|reduce)的setup和加入到調度器中進行調度,這個中間的過程可能都要花費幾秒鍾,所以如果每個task都非常快就跑完了,就會在task的開始和結束的時候浪費太多的時間。
配置task的JVM重用可以改善該問題:1
(
mapred.job.reuse.jvm.num.tasks
,
默認是
1
,表示一個
JVM
上最多可以順序執行的
task
數目(屬於同一個
Job
)是
1
。也就是說一個
task
啟一個
JVM
)
注釋:JVM重用技術不是指同一Job的兩個或兩個以上的task可以同時運行於同一JVM上,而是排隊按順序執行
3.如果input的文件非常的大,比如1TB,可以考慮將hdfs上的每個block size設大,比如設成256MB或者512MB
2.ReduceTask並行度的決定
reducetask的並行度同樣影響整個job的執行並發度和執行效率,但與maptask的並發數由切片數決定不同,Reducetask數量的決定是可以直接手動設置,默認值為1.
//默認值是1,手動設置為4
job.setNumReduceTasks(4);
如果數據分布不均勻,就有可能在reduce階段產生數據傾斜
注意: reducetask數量並不是任意設置,還要考慮業務邏輯需求,有些情況下,需要計算全局匯總結果,就只能有1個reducetask
盡量不要運行太多的reduce task。對大多數job來說,最好rduce的個數最多和集群中的reduce持平,或者比集群的 reduce slots小。這個對於小集群而言,尤其重要。
1)job提交流程源碼詳解
waitForCompletion()
submit();
// 1建立連接
connect();
// 1)創建提交job的代理
new Cluster(getConfiguration());
// (1)判斷是本地yarn還是遠程
initialize(jobTrackAddr, conf);
// 2 提交job
submitter.submitJobInternal(Job.this, cluster)
// 1)創建給集群提交數據的Stag路徑
Path jobStagingArea = JobSubmissionFiles.getStagingDir(cluster, conf);
// 2)獲取jobid ,並創建job路徑
JobID jobId = submitClient.getNewJobID();
// 3)拷貝jar包到集群
copyAndConfigureFiles(job, submitJobDir);
rUploader.uploadFiles(job, jobSubmitDir);
// 4)計算切片,生成切片規划文件
writeSplits(job, submitJobDir);
maps = writeNewSplits(job, jobSubmitDir);
input.getSplits(job);
// 5)向Stag路徑寫xml配置文件
writeConf(conf, submitJobFile);
conf.writeXml(out);
// 6)提交job,返回提交狀態
status = submitClient.submitJob(jobId, submitJobDir.toString(), job.getCredentials());