Job提交流程


1.【Driver.class】-- Job job = Job.getInstance(conf);
    -->【job.class】getInstance(conf)
    --> new JobConf(conf) //構建一個空集群配置對象
  說明:將默認configuration(4個配置文件)包裝成Jobconf
2.設置相關參數項:
  job.setJarByClass(AirMapper.class); --> 【MRJobConfig】JobContext.JAR:mapreduce.job.jar
  job.setJobName("Text local"); --> 【MRJobConfig】JobContext.JOB_NAME:mapreduce.job.name
  job.setMapperClass(AirMapper.class); --> 【MRJobConfig】MAP_CLASS_ATTR:mapreduce.job.map.class
  job.setReducerClass(AirReducer.class); --> 【MRJobConfig】REDUCE_CLASS_ATTR:mapreduce.job.reduce.class
  job.setOutputKeyClass(Text.class); --> 【MRJobConfig】JobContext.OUTPUT_KEY_CLASS:mapreduce.job.output.key.class
  job.setOutputValueClass(IntWritable.class); --> 【MRJobConfig】JobContext.OUTPUT_VALUE_CLASS:mapreduce.job.output.value.class
  job.setCombinerClass(AirCombiner.class); --> 【MRJobConfig】COMBINE_CLASS_ATTR:mapreduce.job.combine.class

FileInputFormat.addInputPath(job, new Path("file:///D:/airdata"));
  -->【FileInputFormat.class】 INPUT_DIR:"mapreduce.input.fileinputformat.inputdir";
FileOutputFormat.setOutputPath(job,outfile);
  -->【FileOutputFormat.class】FileOutputFormat.OUTDIR :mapreduce.output.fileoutputformat.outputdir
3.job.waitForCompletion(true)
-->【job.class】waitForCompletion() //作用:提交job至cluster,並等待完成
  -->判定當前State是否為JobState.DEFINE()定義階段,如果定義階段調用submit()
    -->【job.class】submit()
      -->【job.class】connect() //說明:通過UGI(用戶組信息對象)構建集群對象(cluster)
        -->【job.class】new Cluster(getConfiguration());
          -->【 Cluster.class】通過靜態代碼塊,加載【mapred-site.xml和yarn-site.xml】和默認的default文件
通過conf的mapreduce.framework.name的值來返回構建cluster集群對象的客戶端協議:local=LocalJobRunner;yarn=YARNRunner
    -->【job.class】JobSubmitter submitter = getJobSubmitter(cluster.getFileSystem(), cluster.getClient());
      說明:構建job提交器,然后執行submitter.submitJobInternal()
    -->【JobSubmitter.class】submitter.submitJobInternal()
作用:

  1.檢查作業的輸入和輸出規范
  2.計算作業的InputSplit(邏輯分片)。
  3.如果需要,設置必要的緩存信息;
  4.將作業的jar和配置復制到map-reduce系統分布文件系統上的目錄。
  5.提交job至ResourceManager,並監控其狀態
    說明:檢查;生成JobID,設置job相關參數:
  a.MRJobConfig.JOB_SUBMITHOST :mapreduce.job.submithostname
  b.MRJobConfig.JOB_SUBMITHOSTADDR :mapreduce.job.submithostaddress
  c.MRJobConfig.USER_NAME :mapreduce.job.user.name
  d.MRJobConfig.MAPREDUCE_JOB_DIR : mapreduce.job.dir 【/tmp/hadoop-centos/mapred/staging/centos1104417307/.staging/job_local1104417307_0001】
  e.MRJobConfig.NUM_MAPS : mapreduce.job.maps

-->【JobSubmitter.class】copyAndConfigureFiles(job, submitJobDir);
-->【JobResourceUploader.class】 rUploader.uploadFiles(job, jobSubmitDir);
說明:上傳jobjar,配置信息等內容;
-->【JobSubmitter.class】writeSplits(job, submitJobDir);
說明:計算split;
-->【JobSubmitter.class】 writeConf(conf, submitJobFile);
說明:上傳job.xml文件
-->【JobSubmitter.class】 submitClient.submitJob(jobId, submitJobDir.toString(), job.getCredentials());
說明:真正提交Job
-->
-->如果State為JobState.RUNNING運行階段,輪詢job,按5000ms時間間隔
【mapreduce.client.completion.pollinterval=5000】

JOB提交流程:
  創建一個job-->resourcemanager獲得一個應用-->將運行作業所需要的資源復制到文件系統中-->提交作業-->
調度器分配一個容器-->啟動APPMaster-->初始化作業-->接受輸入分片-->若作業不適合作為uber任務運行,Appmaster向資源管理器請求容器-->
Appmaster啟動容器-->資源本地化-->運行map與reduce

 

資源調度的相關配置屬性:
------------------------------------------------
  1.mapreduce.map.memory.mb = 1024 //每個map任務的調度程序請求的內存數量
  2.mapreduce.map.cpu.vcores = 1 //每個map任務從調度程序請求的虛擬內核的數量。
  3.mapreduce.reduce.memory.mb = 1024 //每個reduce任務的調度程序請求的內存數量
  4.mapreduce.reduce.cpu.vcores = 1 //每個reduce任務從調度程序請求的虛擬內核的數量。
  5.mapreduce.tasktracker.map.tasks.maximum = 2 //Nodemanager同時運行的map任務的最大數量。
  6.mapreduce.tasktracker.reduce.tasks.maximum = 2 //Nodemanager同時運行的reduce任務的最大數量。
  7.hadoop中每個守護進程(5個)默認分配1000m內存大小;
修改【hadoop-env.sh】中,export HADOOP_HEAPSIZE=1000(單位MB)
修改ResourceManager堆大小:【yarn-env.sh】export YARN_RESOURCEMANAGER_HEAPSIZE=1000
namenode內存計算原則:參照【P291】
修改namenode:export HADOOP_NAMENODE_OPTS="-Xmx2000m -Dhadoop.security.logger=${HADOOP_SECURITY_LOGGER:-INFO,RFAS} -Dhdfs.audit.logger=${HDFS_AUDIT_LOGGER:-INFO,NullAppender} $HADOOP_NAMENODE_OPTS"
修改Sencondarynamenode:export HADOOP_SECONDARYNAMENODE_OPTS="-Xmx2000m -Dhadoop.security.logger=${HADOOP_SECURITY_LOGGER:-INFO,RFAS} -Dhdfs.audit.logger=${HDFS_AUDIT_LOGGER:-INFO,NullAppender} $HADOOP_SECONDARYNAMENODE_OPTS"
修改datanode:export HADOOP_DATANODE_OPTS=

  8.yarn.nodemanager.resource.memory-mb = 8192m //每個節點可用物理內存,單位MB,為容器
  9.yarn.scheduler.minimum-allocation-mb = 1024m //單個任務可申請最少內存,默認1024MB
  10.yarn.scheduler.maximum-allocation-mb = 8192m //單個任務可申請最大內存,默認8192MB
  11.yarn.scheduler.minimum-allocation-vcores = 4 //單個任務可申請最小cpu核數,默認1
  12.yarn.scheduler.maximum-allocation-vcores = 32 //單個任務可申請最大cpu核數,默認32
注意:yarn.scheduler.minimum-allocation-mb <= mapreduce.map.memory.mb <= yarn.scheduler.maximum-allocation-mb,否則拋InvalidResourceRequestException
一個nodemanager節點運行的map數量 <= yarn.nodemanager.resource.memory-mb / mapreduce.map.memory.mb = 8
一個nodemanager節點運行的reduce數量 <= yarn.nodemanager.resource.memory-mb / mapreduce.reduce.memory.mb = 8

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM