1.1 Mapreduce任務流程
Mapreduce是大量數據並發處理的編程模型,主要包括下面五個實體,客戶端將作業文件復制到分布式文件系統,向資源管理器提交mapreduce作業,資源管理器向節點管理器分配容器資源,節點管理器啟動application Master,application master啟動另外一個節點管理器,向資源管理器申請容器資源,用來運行作業任務。
客戶端 |
提交mapreduce作業 |
資源管理器 |
管理分配資源 |
節點管理器 |
啟動、管理、監視集群中的container容器工作 |
Application Master |
每個程序對應一個AM,負責程序的任務調度,本身也是運行在NM的Container中 |
分布式文件系統 |
存儲作業文件 |
mapreduce流程圖
mapreduce的工作流程
(1) 客戶端調用Job實例的Submit()或者waitForCompletion()方法提交作業;
(2) 客戶端向ResourceManage請求分配一個Application ID,客戶端會對程序的輸出路徑進行檢查,如果沒有問題,進行作業輸入分片的計算。
(3) 將作業運行所需要的資源拷貝到HDFS中,包括jar包、配置文件和計算出來的輸入分片信息等;
(4) 調用ResourceManage的submitApplication方法將作業提交到ResourceManage;
(5) ResourceManage收到submitApplication方法的調用之后會命令一個NM啟動一個Container,.在該NodeManage的Container上啟動管理該作業的ApplicationMaster進程;
(6) .AM對作業進行初始化操作,並將會接收作業的處理和完成情況報告;
(7) AM從HDFS中獲得輸入數據的分片信息;根據分片信息確定要啟動的map任務數,reduce任務數則根據mapreduce.job.reduces屬性或者Job實例的setNumReduceTasks方法來決定。
(8) AM為其每個map和reduce任務向RM請求計算資源,map優先於reduce。Map需要考慮數據本地化(map作業運行在存儲map數據的節點上,避免數據傳輸)。
(9) AM在RM指定的NM上啟動Container,在Container上啟動任務(通過YarnChild進程來運行)
(10)在真正執行任務之前,從HDFS從將任務運行需要的資源拷貝到本地,包括jar包、配置文件信息和分布式緩存文件等
(11)執行map/reduce任務
1.1.1 作業提交
Job的submit方法創建一個內部的jobSummiter實例,並調用其submitJobInternal方法,提交作業后,waitForCompletion每秒輪循作業的進度,更新進度,返回執行成功失敗的結果。代碼詳細
提交作業
job.waitForCompletion(true)
進入函數封裝了submit函數
public void submit()
throws IOException, InterruptedException, ClassNotFoundException {
ensureState(JobState.DEFINE);
setUseNewAPI();
// 創建與文件系統、Yarn的連接
connect();
// 創建JobSubmitter對象,由JobSubmitter來執行提交操作
final JobSubmitter submitter =
getJobSubmitter(cluster.getFileSystem(), cluster.getClient());
status = ugi.doAs(new PrivilegedExceptionAction<JobStatus>() {
public JobStatus run() throws IOException, InterruptedException,
ClassNotFoundException {
return submitter.submitJobInternal(Job.this, cluster);
}
});
state = JobState.RUNNING;
LOG.info("The url to track the job: " + getTrackingURL());
}
繼續跟進submitJobInternal方法,檢查輸入和輸出目錄
checkSpecs(job);
詳細步驟如下
(1) 向資源管理器申請新應用ID,作為mapreduce的作業ID。
JobID jobId = submitClient.getNewJobID();
(2) 檢查輸出目錄是否指定或者是否存在,沒有則不提交,拋出異常給mapreduce程序。
(3) 計算作業的輸入分片,檢查輸入路徑是否存在。沒有則不提交作業,拋出異常給mapreduce程序。
intmaps
=writeSplits(
job
,submitJobDir
);
(4) 復制jar文件,配置文件,輸入分片到共享文件系統中作業ID命名的文件夾下
job
.setJobID(jobId
);
Path submitJobDir
=new
Path(
jobStagingArea
,jobId
.toString());
copyAndConfigureFiles(job, submitJobDir);
// 上傳共享的文件
uploadFiles(job
,files
,submitJobDir
,mapredSysPerms
,replication
,
fileSCUploadPolicies
,statCache
);
// 上傳依賴的jar包
uploadLibJars(job
,libjars
,submitJobDir
,mapredSysPerms
,replication
,
fileSCUploadPolicies
,statCache
);
// 上傳檔案
uploadArchives(job
,archives
,submitJobDir
,mapredSysPerms
,replication
,
archiveSCUploadPolicies
,statCache
);
// 上傳job jar
uploadJobJar(job
,jobJar
,submitJobDir
,replication
,statCache
);
(5) 通過調用資源管理器的submitApplication方法提交作業。
1.1.2 作業的初始化
(1) 資源管理器收到調用它的submitapplication消息后,將請求傳遞給YARN調度器,調度器分配一個容器,在容器中啟動application master進程,為每一個輸入分片創建一個map任務對象,創建mapreduce.job.reduces屬性定義的多個reuduce任務,任務id在此時分配,並監視作業進度。
(2) Applicationmaster 會去判斷運行任務的開銷,作業小,則和自己在同一個JVM上運行,例如在新容器中並發運行分配的資源的開銷大於在一個節點上串行的開銷。小作業是指少於10個mapper且只有一個reducer且輸入大小小於HDFS塊的作業。
(3) Application master調用setupJob方法設置OutputCommitter,FileOutputCommitter為默認值,創建作業的輸出目錄和任務輸出的臨時空間。
1.1.3 任務的分配
(1) 不是小作業時,applicationmaster就會為所有map任務和reduce任務請求容器,map優先級高於reduce。
(2) Map任務有數據本地化的要求,reduce則可以在任意位置執行。
(3) 每個map任務和reduce任務都分到1024M的內存和一個虛擬的內核。可以通過設置屬性mapreduce.map.memory.mb,mapreduce.map.cpu.vcores, mapreduce.reuduce. memory.mb, mapreduce.reuduce.cpu.vcores。
1.1.4 任務的執行
(1)資源管理器的調度器為任務分配一個特定節點上的容器,application與該容器的節點管理器通信,Java應用程序YarnChild將數據本地化,執行任務。yarnChild咋指定的jvm中運行,任務 缺陷崩潰不會影響節點管理器。
1.1.5 進度和狀態更新
作業執行時間幾秒到幾小時,任務狀態和進度需要實時更新。Reduce任務復制、排序、執行reduce各占1/3.
Map任務和reduce運行時,子進程通過umbilical接口和application master通訊,每隔3秒更新進度和狀態。資源管理器界面有連接進入application master界面,查看細節。
客戶端每秒輪循applicationmaster接收最新狀態,也可以通過job的getStatus方法得到JobStatus實例,包含所有狀態信息。
1.1.6 作業完成
(1)Application master 收到最后一個任務執行完成后,把作業的狀態設置為成功,job輪循狀態,通知用戶,waitforcompletion方法返回,job的統計信息和計數值也輸出到控制台。
(2)Application master也會發送HTTP通知給客戶端,需要設置屬性mapreduce.job.end-notification.url來寫地址。
(2)作業完成后,application master和任務容器清理中間輸出,OutputCommitter的ComitJob方法會被調用,歷史服務器存儲作業信息,以便日后用戶查詢。
自己開發了一個股票智能分析軟件,功能很強大,需要的點擊下面的鏈接獲取: