hadoop2 作業執行過程之yarn調度執行



YARN是hadoop系統上的資源統一管理平台,其主要作用是實現集群資源的統一管理和調度(目前還不完善,只支持粗粒度的CPU和內存的的調配);

它的基本思想是將Mapreduce的jobtracker拆分成兩個獨立的服務:一個全局的資源管理器ResourceManager和每個應用程序特有的ApplicationMaster。其中ResourceManager負責整個系統資源的管理和分配,而ApplicationMaster則負責單個應用程序的管理;

YARN上的應用按其運行的生命周期的長短分為長應用和短應用。

  1.短應用通常是分析作業,作業從提交到完成,所耗時間是有限的,作業完成后,其占用的資源就會被釋放,歸還給YARN再次分配

  2.長應用通常是一些服務,啟動后除非意外或人為終止,將一直運行下去。長應用通常長期占用集群上的一些資源,且運行期間對資源的需求也時常變化。

YARN在2.2.0版本以后增強了對長應用的支持。


用戶向YARN提交一個應用程序后,YARN將分為兩個階段運行改應用程序:第一個階段是啟動ApplicationMaster;第二個階段是由ApplicationMaster創建應用程序,為它申請資源,並監控它的整個運行過程,直到運行成功。

YARN的工作流程可以分為以下幾個步驟:

  1.用戶向YARN提交應用程序,其中包括ApplicationMaster程序、啟動ApplicationMaster的命令、用戶程序等;

  2.ResourceManager為該應用程序分配第一個Container,並與對應的NodeManager通信,要求它在整個Container中啟動應用程序的ApplicationMaster;

  3ApplicationMaster首先向ResourceManager注冊,這樣用戶可以直接通過ResourceManager查看應用程序的運行狀態,然后它將為各個任務申請資源,並監控它的運行狀態,直到運行結束,即重復步驟4~7;

  4.ApplicationMaster采用輪詢的方式通過RPC協議向ResourceManager申請和領取資源;

  5.一旦ApplicationMaster申請到資源后,則與對應的NodeManager通信,要求其啟動任務;

  6.NodeManager為任務設置好運行環境(包括環境變量、jar包、二進制程序等)后,將任務啟動命令寫到一個腳本中,並通過運行該腳本啟動任務;

  7.各個任務通過某RPC協議向ApplicationMaster匯報自己的狀態和進度,以讓ApplicationMaster隨時掌握各個任務的運行狀態,從而可以在任務失敗時重新啟動任務。

    在應用程序運行過程中,用戶可以隨時通過RPC向ApplicationMaster查詢應用程序的當前運行狀態;

  8.應用程序運行完成后,ApplicationMaster向ResourceManager注銷並關閉自己。

 

 


在單機程序設計中,為了快速處理一個大的數據集,通常采用多線程並行編程,大體流程如下:先有操作系統啟動一個主線程,由它負責數據氣氛、任務分配、子線程啟動和銷毀等工作,而各個子線程只負責計算自己的數據,當所有子線程處理完數據后,主線程再退出;

類比理解,YARN上的應用程序運行過程與之非常相近,只不過它是集群上的分布式並行編程。可以將YARN看做一個雲操作系統,它負責為應用程序啟動ApplicationMaster(相當於主線程),然后再由ApplicationMaster負責數據氣氛、任務分配、啟動和監控等工作,而由ApplicationMaster啟動其他各個Node的Task(相當於子線程)僅負責計算任務,當所有任務計算完成后,ApplicationMaster認為應用程序運行完成,然后退出。

 


YARN上協議層面的通信動作

上圖涉及三個RPC協議:

  • ApplicationClientProtocol: Client-RM之間的協議,主要用於應用的提交;

  • ApplicationMasterProtocol: AM-RM之間的協議,AM通過該協議向RM注冊並申請資源;

  • ContainerManagementProtocol: AM-NM之間的協議,AM通過該協議控制NM啟動容器。


YARN上程序層面的調用動作

可以看出,客戶端的主要作用就是應用的提交和監控應用運行。


程序跟蹤

http://www.cnblogs.com/admln/p/hadoop2-work-excute-submit.html 的

JobSubmitter.java
status = submitClient.submitJob( jobId, submitJobDir.toString(), job.getCredentials());

接起

 1   @Override
 2   public JobStatus submitJob(JobID jobId, String jobSubmitDir, Credentials ts)
 3   throws IOException, InterruptedException {
 4     
 5     addHistoryToken(ts);
 6     
 7     // Construct necessary information to start the MR AM
 8     ApplicationSubmissionContext appContext =
 9       createApplicationSubmissionContext(conf, jobSubmitDir, ts);
10 
11     // Submit to ResourceManager
12     try {
13       ApplicationId applicationId =
14           resMgrDelegate.submitApplication(appContext);
15 
16       ApplicationReport appMaster = resMgrDelegate
17           .getApplicationReport(applicationId);
18       String diagnostics =
19           (appMaster == null ?
20               "application report is null" : appMaster.getDiagnostics());
21       if (appMaster == null
22           || appMaster.getYarnApplicationState() == YarnApplicationState.FAILED
23           || appMaster.getYarnApplicationState() == YarnApplicationState.KILLED) {
24         throw new IOException("Failed to run job : " +
25             diagnostics);
26       }
27       return clientCache.getClient(jobId).getJobStatus(jobId);
28     } catch (YarnException e) {
29       throw new IOException(e);
30     }
31   }

其中最重要的語句之一就是

ApplicationSubmissionContext appContext =
      createApplicationSubmissionContext(conf, jobSubmitDir, ts);

讀注釋可知它用於啟動AppMaster前構造必要的信息

  1   public ApplicationSubmissionContext createApplicationSubmissionContext(
  2       Configuration jobConf,
  3       String jobSubmitDir, Credentials ts) throws IOException {
  4     ApplicationId applicationId = resMgrDelegate.getApplicationId();
  5 
  6     // Setup resource requirements
  7     Resource capability = recordFactory.newRecordInstance(Resource.class);
  8     capability.setMemory(
  9         conf.getInt(
 10             MRJobConfig.MR_AM_VMEM_MB, MRJobConfig.DEFAULT_MR_AM_VMEM_MB
 11             )
 12         );
 13     capability.setVirtualCores(
 14         conf.getInt(
 15             MRJobConfig.MR_AM_CPU_VCORES, MRJobConfig.DEFAULT_MR_AM_CPU_VCORES
 16             )
 17         );
 18     LOG.debug("AppMaster capability = " + capability);
 19 
 20     // Setup LocalResources
 21     Map<String, LocalResource> localResources =
 22         new HashMap<String, LocalResource>();
 23 
 24     Path jobConfPath = new Path(jobSubmitDir, MRJobConfig.JOB_CONF_FILE);
 25 
 26     URL yarnUrlForJobSubmitDir = ConverterUtils
 27         .getYarnUrlFromPath(defaultFileContext.getDefaultFileSystem()
 28             .resolvePath(
 29                 defaultFileContext.makeQualified(new Path(jobSubmitDir))));
 30     LOG.debug("Creating setup context, jobSubmitDir url is "
 31         + yarnUrlForJobSubmitDir);
 32 
 33     localResources.put(MRJobConfig.JOB_CONF_FILE,
 34         createApplicationResource(defaultFileContext,
 35             jobConfPath, LocalResourceType.FILE));
 36     if (jobConf.get(MRJobConfig.JAR) != null) {
 37       Path jobJarPath = new Path(jobConf.get(MRJobConfig.JAR));
 38       LocalResource rc = createApplicationResource(defaultFileContext,
 39           jobJarPath, 
 40           LocalResourceType.PATTERN);
 41       String pattern = conf.getPattern(JobContext.JAR_UNPACK_PATTERN, 
 42           JobConf.UNPACK_JAR_PATTERN_DEFAULT).pattern();
 43       rc.setPattern(pattern);
 44       localResources.put(MRJobConfig.JOB_JAR, rc);
 45     } else {
 46       // Job jar may be null. For e.g, for pipes, the job jar is the hadoop
 47       // mapreduce jar itself which is already on the classpath.
 48       LOG.info("Job jar is not present. "
 49           + "Not adding any jar to the list of resources.");
 50     }
 51 
 52     // TODO gross hack
 53     for (String s : new String[] {
 54         MRJobConfig.JOB_SPLIT,
 55         MRJobConfig.JOB_SPLIT_METAINFO }) {
 56       localResources.put(
 57           MRJobConfig.JOB_SUBMIT_DIR + "/" + s,
 58           createApplicationResource(defaultFileContext,
 59               new Path(jobSubmitDir, s), LocalResourceType.FILE));
 60     }
 61 
 62     // Setup security tokens
 63     DataOutputBuffer dob = new DataOutputBuffer();
 64     ts.writeTokenStorageToStream(dob);
 65     ByteBuffer securityTokens  = ByteBuffer.wrap(dob.getData(), 0, dob.getLength());
 66 
 67     // Setup the command to run the AM
 68     List<String> vargs = new ArrayList<String>(8);
 69     vargs.add(Environment.JAVA_HOME.$() + "/bin/java");
 70 
 71     // TODO: why do we use 'conf' some places and 'jobConf' others?
 72     long logSize = TaskLog.getTaskLogLength(new JobConf(conf));
 73     String logLevel = jobConf.get(
 74         MRJobConfig.MR_AM_LOG_LEVEL, MRJobConfig.DEFAULT_MR_AM_LOG_LEVEL);
 75     MRApps.addLog4jSystemProperties(logLevel, logSize, vargs);
 76 
 77     // Check for Java Lib Path usage in MAP and REDUCE configs
 78     warnForJavaLibPath(conf.get(MRJobConfig.MAP_JAVA_OPTS,""), "map", 
 79         MRJobConfig.MAP_JAVA_OPTS, MRJobConfig.MAP_ENV);
 80     warnForJavaLibPath(conf.get(MRJobConfig.MAPRED_MAP_ADMIN_JAVA_OPTS,""), "map", 
 81         MRJobConfig.MAPRED_MAP_ADMIN_JAVA_OPTS, MRJobConfig.MAPRED_ADMIN_USER_ENV);
 82     warnForJavaLibPath(conf.get(MRJobConfig.REDUCE_JAVA_OPTS,""), "reduce", 
 83         MRJobConfig.REDUCE_JAVA_OPTS, MRJobConfig.REDUCE_ENV);
 84     warnForJavaLibPath(conf.get(MRJobConfig.MAPRED_REDUCE_ADMIN_JAVA_OPTS,""), "reduce", 
 85         MRJobConfig.MAPRED_REDUCE_ADMIN_JAVA_OPTS, MRJobConfig.MAPRED_ADMIN_USER_ENV);   
 86 
 87     // Add AM admin command opts before user command opts
 88     // so that it can be overridden by user
 89     String mrAppMasterAdminOptions = conf.get(MRJobConfig.MR_AM_ADMIN_COMMAND_OPTS,
 90         MRJobConfig.DEFAULT_MR_AM_ADMIN_COMMAND_OPTS);
 91     warnForJavaLibPath(mrAppMasterAdminOptions, "app master", 
 92         MRJobConfig.MR_AM_ADMIN_COMMAND_OPTS, MRJobConfig.MR_AM_ADMIN_USER_ENV);
 93     vargs.add(mrAppMasterAdminOptions);
 94     
 95     // Add AM user command opts
 96     String mrAppMasterUserOptions = conf.get(MRJobConfig.MR_AM_COMMAND_OPTS,
 97         MRJobConfig.DEFAULT_MR_AM_COMMAND_OPTS);
 98     warnForJavaLibPath(mrAppMasterUserOptions, "app master", 
 99         MRJobConfig.MR_AM_COMMAND_OPTS, MRJobConfig.MR_AM_ENV);
100     vargs.add(mrAppMasterUserOptions);
101     
102     vargs.add(MRJobConfig.APPLICATION_MASTER_CLASS);
103     vargs.add("1>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR +
104         Path.SEPARATOR + ApplicationConstants.STDOUT);
105     vargs.add("2>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR +
106         Path.SEPARATOR + ApplicationConstants.STDERR);
107 
108 
109     Vector<String> vargsFinal = new Vector<String>(8);
110     // Final command
111     StringBuilder mergedCommand = new StringBuilder();
112     for (CharSequence str : vargs) {
113       mergedCommand.append(str).append(" ");
114     }
115     vargsFinal.add(mergedCommand.toString());
116 
117     LOG.debug("Command to launch container for ApplicationMaster is : "
118         + mergedCommand);
119 
120     // Setup the CLASSPATH in environment
121     // i.e. add { Hadoop jars, job jar, CWD } to classpath.
122     Map<String, String> environment = new HashMap<String, String>();
123     MRApps.setClasspath(environment, conf);
124     
125     // Setup the environment variables for Admin first
126     MRApps.setEnvFromInputString(environment, 
127         conf.get(MRJobConfig.MR_AM_ADMIN_USER_ENV));
128     // Setup the environment variables (LD_LIBRARY_PATH, etc)
129     MRApps.setEnvFromInputString(environment, 
130         conf.get(MRJobConfig.MR_AM_ENV));
131 
132     // Parse distributed cache
133     MRApps.setupDistributedCache(jobConf, localResources);
134 
135     Map<ApplicationAccessType, String> acls
136         = new HashMap<ApplicationAccessType, String>(2);
137     acls.put(ApplicationAccessType.VIEW_APP, jobConf.get(
138         MRJobConfig.JOB_ACL_VIEW_JOB, MRJobConfig.DEFAULT_JOB_ACL_VIEW_JOB));
139     acls.put(ApplicationAccessType.MODIFY_APP, jobConf.get(
140         MRJobConfig.JOB_ACL_MODIFY_JOB,
141         MRJobConfig.DEFAULT_JOB_ACL_MODIFY_JOB));
142 
143     // Setup ContainerLaunchContext for AM container
144     ContainerLaunchContext amContainer =
145         ContainerLaunchContext.newInstance(localResources, environment,
146           vargsFinal, null, securityTokens, acls);
147 
148 
149     // Set up the ApplicationSubmissionContext
150     ApplicationSubmissionContext appContext =
151         recordFactory.newRecordInstance(ApplicationSubmissionContext.class);
152     appContext.setApplicationId(applicationId);                // ApplicationId
153     appContext.setQueue(                                       // Queue name
154         jobConf.get(JobContext.QUEUE_NAME,
155         YarnConfiguration.DEFAULT_QUEUE_NAME));
156     appContext.setApplicationName(                             // Job name
157         jobConf.get(JobContext.JOB_NAME,
158         YarnConfiguration.DEFAULT_APPLICATION_NAME));
159     appContext.setCancelTokensWhenComplete(
160         conf.getBoolean(MRJobConfig.JOB_CANCEL_DELEGATION_TOKEN, true));
161     appContext.setAMContainerSpec(amContainer);         // AM Container
162     appContext.setMaxAppAttempts(
163         conf.getInt(MRJobConfig.MR_AM_MAX_ATTEMPTS,
164             MRJobConfig.DEFAULT_MR_AM_MAX_ATTEMPTS));
165     appContext.setResource(capability);
166     appContext.setApplicationType(MRJobConfig.MR_APPLICATION_TYPE);
167     return appContext;
168   }

其中

ApplicationId applicationId = resMgrDelegate.getApplicationId();

就對應上圖中的第一步,向ResourceManager申請ID;
其中包括了內存、CPU的分配,資源(程序、配置等)路徑的配置,啟動AppMaster的命令,檢查java環境等等;

這些就對應上圖中的第二步,初始化AM的配置;

而submitJob()方法中最重要語句之二就是

ApplicationId applicationId =
          resMgrDelegate.submitApplication(appContext);

它用於將AM提交到RM,對應於上圖中的第三步;
submitApplication()方法是由YarnClientImpl.java實現的,即:

 1   @Override
 2   public ApplicationId
 3       submitApplication(ApplicationSubmissionContext appContext)
 4           throws YarnException, IOException {
 5     ApplicationId applicationId = appContext.getApplicationId();
 6     appContext.setApplicationId(applicationId);
 7     SubmitApplicationRequest request =
 8         Records.newRecord(SubmitApplicationRequest.class);
 9     request.setApplicationSubmissionContext(appContext);
10     rmClient.submitApplication(request);
11 
12     int pollCount = 0;
13     while (true) {
14       YarnApplicationState state =
15           getApplicationReport(applicationId).getYarnApplicationState();
16       if (!state.equals(YarnApplicationState.NEW) &&
17           !state.equals(YarnApplicationState.NEW_SAVING)) {
18         break;
19       }
20       // Notify the client through the log every 10 poll, in case the client
21       // is blocked here too long.
22       if (++pollCount % 10 == 0) {
23         LOG.info("Application submission is not finished, " +
24             "submitted application " + applicationId +
25             " is still in " + state);
26       }
27       try {
28         Thread.sleep(statePollIntervalMillis);
29       } catch (InterruptedException ie) {
30       }
31     }
32 
33 
34     LOG.info("Submitted application " + applicationId + " to ResourceManager"
35         + " at " + rmAddress);
36     return applicationId;
37   }

這個方法主要構造了一個請求,並將這個請求調用相關協議發出,即:

rmClient.submitApplication(request);

客戶端類結構:


到這里客戶端除了查詢監控基本上沒有什么動作了,之后就按照上面的協議通信圖來進行了。

由於YARN的各種協議、接口、封裝等,就簡單從協議層面分析大概流程走向

(在查看協議代碼的時候經常會看到google的字樣,有點老祖宗的感覺)

客戶端和RM之間的協議類是ApplicationClientProtocol

客戶端和RM之間的通信動作包括:

1.獲取應用ID

      public abstract void getNewApplication(
          com.google.protobuf.RpcController controller,
          org.apache.hadoop.yarn.proto.YarnServiceProtos.GetNewApplicationRequestProto request,
          com.google.protobuf.RpcCallback<org.apache.hadoop.yarn.proto.YarnServiceProtos.GetNewApplicationResponseProto> done);

2.把應用提交到RM上

      public abstract void submitApplication(
          com.google.protobuf.RpcController controller,
          org.apache.hadoop.yarn.proto.YarnServiceProtos.SubmitApplicationRequestProto request,
          com.google.protobuf.RpcCallback<org.apache.hadoop.yarn.proto.YarnServiceProtos.SubmitApplicationResponseProto> done);

具體步驟:

  1. 客戶端通過getNewApplication方法從RM上獲取應用ID;

  2. 客戶端將應用相關的運行配置封裝到ApplicationSubmissionContext中,通過submitApplication方法將應用提交到RM上;

  3. RM根據ApplicationSubmissionContext上封裝的內容啟動AM;

  4. 客戶端通過AM或RM獲取應用的運行狀態,並控制應用的運行過程。

在獲取應用程序ID后,客戶端封裝應用相關的配置到ApplicationSubmissionContext中,通過submitApplication方法提交到RM上。

ApplicationSubmissionContext主要包括如下幾個部分:

  • applicationId: 通過getNewApplication獲取的應用ID;

  • applicationName: 應用名稱,將顯示在YARN的web界面上;

  • applicationType: 應用類型,默認為”YARN”;

  • priority: 應用優先級,數值越小,優先級越高;

  • queue: 應用所屬隊列,不同應用可以屬於不同的隊列,使用不同的調度算法;

  • unmanagedAM: 布爾類型,表示AM是否由客戶端啟動(AM既可以運行在YARN平台之上,也可以運行在YARN平台之外。運行在YARN平台之上的AM通過RM啟動,其運行所需的資源受YARN控制);

  • cancelTokensWhenComplete: 應用完成后,是否取消安全令牌;

  • maxAppAttempts: AM啟動失敗后,最大的嘗試重啟次數;

  • resource: 啟動AM所需的資源(虛擬CPU數/內存),虛擬CPU核數是一個歸一化的值;

  • amContainerSpec: 啟動AM容器的上下文,主要包括如下內容:

  • tokens: AM所持有的安全令牌;

  • serviceData: 應用私有的數據,是一個Map,鍵為數據名,值為數據的二進制塊;

  • environment: AM使用的環境變量;

  • commands: 啟動AM的命令列表;

  • applicationACLs:應程序訪問控制列表;

  • localResource: AM啟動需要的本地資源列表,主要是一些外部文件、壓縮包等。


之后就是RM創建AM,並執行某些動作了

AM的主要功能是按照業務需求,從RM處申請資源,並利用這些資源完成業務邏輯。因此,AM既需要與RM通信,又需要與NM通信。這里涉及兩個協議,分別是AM-RM協議(ApplicationMasterProtocol)和AM-NM協議(ContainerManagementProtocol)

首先是AM-RM

AM-RM之間使用ApplicationMasterProtocol協議進行通信,該協議提供如下幾個方法:

  //向RM注冊AM    
  public abstract void registerApplicationMaster( com.google.protobuf.RpcController controller, org.apache.hadoop.yarn.proto.YarnServiceProtos.RegisterApplicationMasterRequestProto request, com.google.protobuf.RpcCallback<org.apache.hadoop.yarn.proto.YarnServiceProtos.RegisterApplicationMasterResponseProto> done);
   //告知RM,應用已結束
public abstract void finishApplicationMaster( com.google.protobuf.RpcController controller, org.apache.hadoop.yarn.proto.YarnServiceProtos.FinishApplicationMasterRequestProto request, com.google.protobuf.RpcCallback<org.apache.hadoop.yarn.proto.YarnServiceProtos.FinishApplicationMasterResponseProto> done);
   //向RM申請/歸還資源,維持心跳
  public abstract void allocate( com.google.protobuf.RpcController controller, org.apache.hadoop.yarn.proto.YarnServiceProtos.AllocateRequestProto request, com.google.protobuf.RpcCallback<org.apache.hadoop.yarn.proto.YarnServiceProtos.AllocateResponseProto> done);

客戶端向RM提交應用后,RM會根據提交的信息,分配一定的資源來啟動AM,AM啟動后調用ApplicationMasterProtocol協議的registerApplicationMaster方法主動向RM注冊。完成注冊后,AM通過ApplicationMasterProtocol協議的allocate方法向RM申請運行任務的資源,獲取資源后,通過ContainerManagementProtocol在NM上啟動資源容器,完成任務。應用完成后,AM通過ApplicationMasterProtocol協議的finishApplicationMaster方法向RM匯報應用的最終狀態,並注銷AM。

需要注意的是,ApplicationMasterProtocol#allocate()方法還兼顧維持AM-RM心跳的作用,因此,即便應用運行過程中有一段時間無需申請任何資源,AM都需要周期性的調用相應該方法,以避免觸發RM的容錯機制。

其次是AM-NM

AM通過ContainerManagementProtocol協議與NM交互,包括3個方面的功能:啟動容器、查詢容器狀態、停止容器

//啟動容器      
public abstract void startContainers( com.google.protobuf.RpcController controller, org.apache.hadoop.yarn.proto.YarnServiceProtos.StartContainersRequestProto request, com.google.protobuf.RpcCallback<org.apache.hadoop.yarn.proto.YarnServiceProtos.StartContainersResponseProto> done);
//查詢容器狀態      
public abstract void getContainerStatuses( com.google.protobuf.RpcController controller, org.apache.hadoop.yarn.proto.YarnServiceProtos.GetContainerStatusesRequestProto request, com.google.protobuf.RpcCallback<org.apache.hadoop.yarn.proto.YarnServiceProtos.GetContainerStatusesResponseProto> done);
//停止容器      
public abstract void stopContainers( com.google.protobuf.RpcController controller, org.apache.hadoop.yarn.proto.YarnServiceProtos.StopContainersRequestProto request, com.google.protobuf.RpcCallback<org.apache.hadoop.yarn.proto.YarnServiceProtos.StopContainersResponseProto> done);

AM通過ContainerManagementProtocol# startContainers()方法啟動一個NM上的容器,AM通過該接口向NM提供啟動容器的必要配置,包括分配到的資源、安全令牌、啟動容器的環境變量和命令等,這些信息都被封裝在StartContainersRequest中。NM收到請求后,會啟動相應的容器,並返回啟動成功的容器列表和失敗的容器列表,同時還返回其上相應的輔助服務元數據


至此,就剩下NM上container的MAP和REDUCE過程了。


本文圖文大量摘自:

http://my.oschina.net/u/1434348/blog/193374?p=1

http://dongxicheng.org/

只為學習,無意侵權


突然發現我能看懂董的博客上的絕大部分文章了


 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM