YARN是hadoop系統上的資源統一管理平台,其主要作用是實現集群資源的統一管理和調度(目前還不完善,只支持粗粒度的CPU和內存的的調配);
它的基本思想是將Mapreduce的jobtracker拆分成兩個獨立的服務:一個全局的資源管理器ResourceManager和每個應用程序特有的ApplicationMaster。其中ResourceManager負責整個系統資源的管理和分配,而ApplicationMaster則負責單個應用程序的管理;
YARN上的應用按其運行的生命周期的長短分為長應用和短應用。
1.短應用通常是分析作業,作業從提交到完成,所耗時間是有限的,作業完成后,其占用的資源就會被釋放,歸還給YARN再次分配
2.長應用通常是一些服務,啟動后除非意外或人為終止,將一直運行下去。長應用通常長期占用集群上的一些資源,且運行期間對資源的需求也時常變化。
YARN在2.2.0版本以后增強了對長應用的支持。
用戶向YARN提交一個應用程序后,YARN將分為兩個階段運行改應用程序:第一個階段是啟動ApplicationMaster;第二個階段是由ApplicationMaster創建應用程序,為它申請資源,並監控它的整個運行過程,直到運行成功。
YARN的工作流程可以分為以下幾個步驟:
1.用戶向YARN提交應用程序,其中包括ApplicationMaster程序、啟動ApplicationMaster的命令、用戶程序等;
2.ResourceManager為該應用程序分配第一個Container,並與對應的NodeManager通信,要求它在整個Container中啟動應用程序的ApplicationMaster;
3ApplicationMaster首先向ResourceManager注冊,這樣用戶可以直接通過ResourceManager查看應用程序的運行狀態,然后它將為各個任務申請資源,並監控它的運行狀態,直到運行結束,即重復步驟4~7;
4.ApplicationMaster采用輪詢的方式通過RPC協議向ResourceManager申請和領取資源;
5.一旦ApplicationMaster申請到資源后,則與對應的NodeManager通信,要求其啟動任務;
6.NodeManager為任務設置好運行環境(包括環境變量、jar包、二進制程序等)后,將任務啟動命令寫到一個腳本中,並通過運行該腳本啟動任務;
7.各個任務通過某RPC協議向ApplicationMaster匯報自己的狀態和進度,以讓ApplicationMaster隨時掌握各個任務的運行狀態,從而可以在任務失敗時重新啟動任務。
在應用程序運行過程中,用戶可以隨時通過RPC向ApplicationMaster查詢應用程序的當前運行狀態;
8.應用程序運行完成后,ApplicationMaster向ResourceManager注銷並關閉自己。
在單機程序設計中,為了快速處理一個大的數據集,通常采用多線程並行編程,大體流程如下:先有操作系統啟動一個主線程,由它負責數據氣氛、任務分配、子線程啟動和銷毀等工作,而各個子線程只負責計算自己的數據,當所有子線程處理完數據后,主線程再退出;
類比理解,YARN上的應用程序運行過程與之非常相近,只不過它是集群上的分布式並行編程。可以將YARN看做一個雲操作系統,它負責為應用程序啟動ApplicationMaster(相當於主線程),然后再由ApplicationMaster負責數據氣氛、任務分配、啟動和監控等工作,而由ApplicationMaster啟動其他各個Node的Task(相當於子線程)僅負責計算任務,當所有任務計算完成后,ApplicationMaster認為應用程序運行完成,然后退出。
YARN上協議層面的通信動作
上圖涉及三個RPC協議:
-
ApplicationClientProtocol: Client-RM之間的協議,主要用於應用的提交;
-
ApplicationMasterProtocol: AM-RM之間的協議,AM通過該協議向RM注冊並申請資源;
-
ContainerManagementProtocol: AM-NM之間的協議,AM通過該協議控制NM啟動容器。
YARN上程序層面的調用動作
可以看出,客戶端的主要作用就是應用的提交和監控應用運行。
程序跟蹤
從http://www.cnblogs.com/admln/p/hadoop2-work-excute-submit.html 的
JobSubmitter.java
status = submitClient.submitJob( jobId, submitJobDir.toString(), job.getCredentials());
接起
1 @Override 2 public JobStatus submitJob(JobID jobId, String jobSubmitDir, Credentials ts) 3 throws IOException, InterruptedException { 4 5 addHistoryToken(ts); 6 7 // Construct necessary information to start the MR AM 8 ApplicationSubmissionContext appContext = 9 createApplicationSubmissionContext(conf, jobSubmitDir, ts); 10 11 // Submit to ResourceManager 12 try { 13 ApplicationId applicationId = 14 resMgrDelegate.submitApplication(appContext); 15 16 ApplicationReport appMaster = resMgrDelegate 17 .getApplicationReport(applicationId); 18 String diagnostics = 19 (appMaster == null ? 20 "application report is null" : appMaster.getDiagnostics()); 21 if (appMaster == null 22 || appMaster.getYarnApplicationState() == YarnApplicationState.FAILED 23 || appMaster.getYarnApplicationState() == YarnApplicationState.KILLED) { 24 throw new IOException("Failed to run job : " + 25 diagnostics); 26 } 27 return clientCache.getClient(jobId).getJobStatus(jobId); 28 } catch (YarnException e) { 29 throw new IOException(e); 30 } 31 }
其中最重要的語句之一就是
ApplicationSubmissionContext appContext =
createApplicationSubmissionContext(conf, jobSubmitDir, ts);
讀注釋可知它用於啟動AppMaster前構造必要的信息
1 public ApplicationSubmissionContext createApplicationSubmissionContext( 2 Configuration jobConf, 3 String jobSubmitDir, Credentials ts) throws IOException { 4 ApplicationId applicationId = resMgrDelegate.getApplicationId(); 5 6 // Setup resource requirements 7 Resource capability = recordFactory.newRecordInstance(Resource.class); 8 capability.setMemory( 9 conf.getInt( 10 MRJobConfig.MR_AM_VMEM_MB, MRJobConfig.DEFAULT_MR_AM_VMEM_MB 11 ) 12 ); 13 capability.setVirtualCores( 14 conf.getInt( 15 MRJobConfig.MR_AM_CPU_VCORES, MRJobConfig.DEFAULT_MR_AM_CPU_VCORES 16 ) 17 ); 18 LOG.debug("AppMaster capability = " + capability); 19 20 // Setup LocalResources 21 Map<String, LocalResource> localResources = 22 new HashMap<String, LocalResource>(); 23 24 Path jobConfPath = new Path(jobSubmitDir, MRJobConfig.JOB_CONF_FILE); 25 26 URL yarnUrlForJobSubmitDir = ConverterUtils 27 .getYarnUrlFromPath(defaultFileContext.getDefaultFileSystem() 28 .resolvePath( 29 defaultFileContext.makeQualified(new Path(jobSubmitDir)))); 30 LOG.debug("Creating setup context, jobSubmitDir url is " 31 + yarnUrlForJobSubmitDir); 32 33 localResources.put(MRJobConfig.JOB_CONF_FILE, 34 createApplicationResource(defaultFileContext, 35 jobConfPath, LocalResourceType.FILE)); 36 if (jobConf.get(MRJobConfig.JAR) != null) { 37 Path jobJarPath = new Path(jobConf.get(MRJobConfig.JAR)); 38 LocalResource rc = createApplicationResource(defaultFileContext, 39 jobJarPath, 40 LocalResourceType.PATTERN); 41 String pattern = conf.getPattern(JobContext.JAR_UNPACK_PATTERN, 42 JobConf.UNPACK_JAR_PATTERN_DEFAULT).pattern(); 43 rc.setPattern(pattern); 44 localResources.put(MRJobConfig.JOB_JAR, rc); 45 } else { 46 // Job jar may be null. For e.g, for pipes, the job jar is the hadoop 47 // mapreduce jar itself which is already on the classpath. 48 LOG.info("Job jar is not present. " 49 + "Not adding any jar to the list of resources."); 50 } 51 52 // TODO gross hack 53 for (String s : new String[] { 54 MRJobConfig.JOB_SPLIT, 55 MRJobConfig.JOB_SPLIT_METAINFO }) { 56 localResources.put( 57 MRJobConfig.JOB_SUBMIT_DIR + "/" + s, 58 createApplicationResource(defaultFileContext, 59 new Path(jobSubmitDir, s), LocalResourceType.FILE)); 60 } 61 62 // Setup security tokens 63 DataOutputBuffer dob = new DataOutputBuffer(); 64 ts.writeTokenStorageToStream(dob); 65 ByteBuffer securityTokens = ByteBuffer.wrap(dob.getData(), 0, dob.getLength()); 66 67 // Setup the command to run the AM 68 List<String> vargs = new ArrayList<String>(8); 69 vargs.add(Environment.JAVA_HOME.$() + "/bin/java"); 70 71 // TODO: why do we use 'conf' some places and 'jobConf' others? 72 long logSize = TaskLog.getTaskLogLength(new JobConf(conf)); 73 String logLevel = jobConf.get( 74 MRJobConfig.MR_AM_LOG_LEVEL, MRJobConfig.DEFAULT_MR_AM_LOG_LEVEL); 75 MRApps.addLog4jSystemProperties(logLevel, logSize, vargs); 76 77 // Check for Java Lib Path usage in MAP and REDUCE configs 78 warnForJavaLibPath(conf.get(MRJobConfig.MAP_JAVA_OPTS,""), "map", 79 MRJobConfig.MAP_JAVA_OPTS, MRJobConfig.MAP_ENV); 80 warnForJavaLibPath(conf.get(MRJobConfig.MAPRED_MAP_ADMIN_JAVA_OPTS,""), "map", 81 MRJobConfig.MAPRED_MAP_ADMIN_JAVA_OPTS, MRJobConfig.MAPRED_ADMIN_USER_ENV); 82 warnForJavaLibPath(conf.get(MRJobConfig.REDUCE_JAVA_OPTS,""), "reduce", 83 MRJobConfig.REDUCE_JAVA_OPTS, MRJobConfig.REDUCE_ENV); 84 warnForJavaLibPath(conf.get(MRJobConfig.MAPRED_REDUCE_ADMIN_JAVA_OPTS,""), "reduce", 85 MRJobConfig.MAPRED_REDUCE_ADMIN_JAVA_OPTS, MRJobConfig.MAPRED_ADMIN_USER_ENV); 86 87 // Add AM admin command opts before user command opts 88 // so that it can be overridden by user 89 String mrAppMasterAdminOptions = conf.get(MRJobConfig.MR_AM_ADMIN_COMMAND_OPTS, 90 MRJobConfig.DEFAULT_MR_AM_ADMIN_COMMAND_OPTS); 91 warnForJavaLibPath(mrAppMasterAdminOptions, "app master", 92 MRJobConfig.MR_AM_ADMIN_COMMAND_OPTS, MRJobConfig.MR_AM_ADMIN_USER_ENV); 93 vargs.add(mrAppMasterAdminOptions); 94 95 // Add AM user command opts 96 String mrAppMasterUserOptions = conf.get(MRJobConfig.MR_AM_COMMAND_OPTS, 97 MRJobConfig.DEFAULT_MR_AM_COMMAND_OPTS); 98 warnForJavaLibPath(mrAppMasterUserOptions, "app master", 99 MRJobConfig.MR_AM_COMMAND_OPTS, MRJobConfig.MR_AM_ENV); 100 vargs.add(mrAppMasterUserOptions); 101 102 vargs.add(MRJobConfig.APPLICATION_MASTER_CLASS); 103 vargs.add("1>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + 104 Path.SEPARATOR + ApplicationConstants.STDOUT); 105 vargs.add("2>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + 106 Path.SEPARATOR + ApplicationConstants.STDERR); 107 108 109 Vector<String> vargsFinal = new Vector<String>(8); 110 // Final command 111 StringBuilder mergedCommand = new StringBuilder(); 112 for (CharSequence str : vargs) { 113 mergedCommand.append(str).append(" "); 114 } 115 vargsFinal.add(mergedCommand.toString()); 116 117 LOG.debug("Command to launch container for ApplicationMaster is : " 118 + mergedCommand); 119 120 // Setup the CLASSPATH in environment 121 // i.e. add { Hadoop jars, job jar, CWD } to classpath. 122 Map<String, String> environment = new HashMap<String, String>(); 123 MRApps.setClasspath(environment, conf); 124 125 // Setup the environment variables for Admin first 126 MRApps.setEnvFromInputString(environment, 127 conf.get(MRJobConfig.MR_AM_ADMIN_USER_ENV)); 128 // Setup the environment variables (LD_LIBRARY_PATH, etc) 129 MRApps.setEnvFromInputString(environment, 130 conf.get(MRJobConfig.MR_AM_ENV)); 131 132 // Parse distributed cache 133 MRApps.setupDistributedCache(jobConf, localResources); 134 135 Map<ApplicationAccessType, String> acls 136 = new HashMap<ApplicationAccessType, String>(2); 137 acls.put(ApplicationAccessType.VIEW_APP, jobConf.get( 138 MRJobConfig.JOB_ACL_VIEW_JOB, MRJobConfig.DEFAULT_JOB_ACL_VIEW_JOB)); 139 acls.put(ApplicationAccessType.MODIFY_APP, jobConf.get( 140 MRJobConfig.JOB_ACL_MODIFY_JOB, 141 MRJobConfig.DEFAULT_JOB_ACL_MODIFY_JOB)); 142 143 // Setup ContainerLaunchContext for AM container 144 ContainerLaunchContext amContainer = 145 ContainerLaunchContext.newInstance(localResources, environment, 146 vargsFinal, null, securityTokens, acls); 147 148 149 // Set up the ApplicationSubmissionContext 150 ApplicationSubmissionContext appContext = 151 recordFactory.newRecordInstance(ApplicationSubmissionContext.class); 152 appContext.setApplicationId(applicationId); // ApplicationId 153 appContext.setQueue( // Queue name 154 jobConf.get(JobContext.QUEUE_NAME, 155 YarnConfiguration.DEFAULT_QUEUE_NAME)); 156 appContext.setApplicationName( // Job name 157 jobConf.get(JobContext.JOB_NAME, 158 YarnConfiguration.DEFAULT_APPLICATION_NAME)); 159 appContext.setCancelTokensWhenComplete( 160 conf.getBoolean(MRJobConfig.JOB_CANCEL_DELEGATION_TOKEN, true)); 161 appContext.setAMContainerSpec(amContainer); // AM Container 162 appContext.setMaxAppAttempts( 163 conf.getInt(MRJobConfig.MR_AM_MAX_ATTEMPTS, 164 MRJobConfig.DEFAULT_MR_AM_MAX_ATTEMPTS)); 165 appContext.setResource(capability); 166 appContext.setApplicationType(MRJobConfig.MR_APPLICATION_TYPE); 167 return appContext; 168 }
其中
ApplicationId applicationId = resMgrDelegate.getApplicationId();
就對應上圖中的第一步,向ResourceManager申請ID;
其中包括了內存、CPU的分配,資源(程序、配置等)路徑的配置,啟動AppMaster的命令,檢查java環境等等;
這些就對應上圖中的第二步,初始化AM的配置;
而submitJob()方法中最重要語句之二就是
ApplicationId applicationId =
resMgrDelegate.submitApplication(appContext);
它用於將AM提交到RM,對應於上圖中的第三步;
submitApplication()方法是由YarnClientImpl.java實現的,即:
1 @Override 2 public ApplicationId 3 submitApplication(ApplicationSubmissionContext appContext) 4 throws YarnException, IOException { 5 ApplicationId applicationId = appContext.getApplicationId(); 6 appContext.setApplicationId(applicationId); 7 SubmitApplicationRequest request = 8 Records.newRecord(SubmitApplicationRequest.class); 9 request.setApplicationSubmissionContext(appContext); 10 rmClient.submitApplication(request); 11 12 int pollCount = 0; 13 while (true) { 14 YarnApplicationState state = 15 getApplicationReport(applicationId).getYarnApplicationState(); 16 if (!state.equals(YarnApplicationState.NEW) && 17 !state.equals(YarnApplicationState.NEW_SAVING)) { 18 break; 19 } 20 // Notify the client through the log every 10 poll, in case the client 21 // is blocked here too long. 22 if (++pollCount % 10 == 0) { 23 LOG.info("Application submission is not finished, " + 24 "submitted application " + applicationId + 25 " is still in " + state); 26 } 27 try { 28 Thread.sleep(statePollIntervalMillis); 29 } catch (InterruptedException ie) { 30 } 31 } 32 33 34 LOG.info("Submitted application " + applicationId + " to ResourceManager" 35 + " at " + rmAddress); 36 return applicationId; 37 }
這個方法主要構造了一個請求,並將這個請求調用相關協議發出,即:
rmClient.submitApplication(request);
客戶端類結構:
到這里客戶端除了查詢監控基本上沒有什么動作了,之后就按照上面的協議通信圖來進行了。
由於YARN的各種協議、接口、封裝等,就簡單從協議層面分析大概流程走向
(在查看協議代碼的時候經常會看到google的字樣,有點老祖宗的感覺)
客戶端和RM之間的協議類是ApplicationClientProtocol
客戶端和RM之間的通信動作包括:
1.獲取應用ID
public abstract void getNewApplication( com.google.protobuf.RpcController controller, org.apache.hadoop.yarn.proto.YarnServiceProtos.GetNewApplicationRequestProto request, com.google.protobuf.RpcCallback<org.apache.hadoop.yarn.proto.YarnServiceProtos.GetNewApplicationResponseProto> done);
2.把應用提交到RM上
public abstract void submitApplication( com.google.protobuf.RpcController controller, org.apache.hadoop.yarn.proto.YarnServiceProtos.SubmitApplicationRequestProto request, com.google.protobuf.RpcCallback<org.apache.hadoop.yarn.proto.YarnServiceProtos.SubmitApplicationResponseProto> done);
具體步驟:
-
客戶端通過getNewApplication方法從RM上獲取應用ID;
-
客戶端將應用相關的運行配置封裝到ApplicationSubmissionContext中,通過submitApplication方法將應用提交到RM上;
-
RM根據ApplicationSubmissionContext上封裝的內容啟動AM;
-
客戶端通過AM或RM獲取應用的運行狀態,並控制應用的運行過程。
在獲取應用程序ID后,客戶端封裝應用相關的配置到ApplicationSubmissionContext中,通過submitApplication方法提交到RM上。
ApplicationSubmissionContext主要包括如下幾個部分:
-
applicationId: 通過getNewApplication獲取的應用ID;
-
applicationName: 應用名稱,將顯示在YARN的web界面上;
-
applicationType: 應用類型,默認為”YARN”;
-
priority: 應用優先級,數值越小,優先級越高;
-
queue: 應用所屬隊列,不同應用可以屬於不同的隊列,使用不同的調度算法;
-
unmanagedAM: 布爾類型,表示AM是否由客戶端啟動(AM既可以運行在YARN平台之上,也可以運行在YARN平台之外。運行在YARN平台之上的AM通過RM啟動,其運行所需的資源受YARN控制);
-
cancelTokensWhenComplete: 應用完成后,是否取消安全令牌;
-
maxAppAttempts: AM啟動失敗后,最大的嘗試重啟次數;
-
resource: 啟動AM所需的資源(虛擬CPU數/內存),虛擬CPU核數是一個歸一化的值;
-
amContainerSpec: 啟動AM容器的上下文,主要包括如下內容:
-
tokens: AM所持有的安全令牌;
-
serviceData: 應用私有的數據,是一個Map,鍵為數據名,值為數據的二進制塊;
-
environment: AM使用的環境變量;
-
commands: 啟動AM的命令列表;
-
applicationACLs:應程序訪問控制列表;
-
localResource: AM啟動需要的本地資源列表,主要是一些外部文件、壓縮包等。
之后就是RM創建AM,並執行某些動作了
AM的主要功能是按照業務需求,從RM處申請資源,並利用這些資源完成業務邏輯。因此,AM既需要與RM通信,又需要與NM通信。這里涉及兩個協議,分別是AM-RM協議(ApplicationMasterProtocol)和AM-NM協議(ContainerManagementProtocol)
首先是AM-RM
AM-RM之間使用ApplicationMasterProtocol協議進行通信,該協議提供如下幾個方法:
//向RM注冊AM
public abstract void registerApplicationMaster( com.google.protobuf.RpcController controller, org.apache.hadoop.yarn.proto.YarnServiceProtos.RegisterApplicationMasterRequestProto request, com.google.protobuf.RpcCallback<org.apache.hadoop.yarn.proto.YarnServiceProtos.RegisterApplicationMasterResponseProto> done);
//告知RM,應用已結束
public abstract void finishApplicationMaster( com.google.protobuf.RpcController controller, org.apache.hadoop.yarn.proto.YarnServiceProtos.FinishApplicationMasterRequestProto request, com.google.protobuf.RpcCallback<org.apache.hadoop.yarn.proto.YarnServiceProtos.FinishApplicationMasterResponseProto> done);
//向RM申請/歸還資源,維持心跳
public abstract void allocate( com.google.protobuf.RpcController controller, org.apache.hadoop.yarn.proto.YarnServiceProtos.AllocateRequestProto request, com.google.protobuf.RpcCallback<org.apache.hadoop.yarn.proto.YarnServiceProtos.AllocateResponseProto> done);
客戶端向RM提交應用后,RM會根據提交的信息,分配一定的資源來啟動AM,AM啟動后調用ApplicationMasterProtocol協議的registerApplicationMaster方法主動向RM注冊。完成注冊后,AM通過ApplicationMasterProtocol協議的allocate方法向RM申請運行任務的資源,獲取資源后,通過ContainerManagementProtocol在NM上啟動資源容器,完成任務。應用完成后,AM通過ApplicationMasterProtocol協議的finishApplicationMaster方法向RM匯報應用的最終狀態,並注銷AM。
需要注意的是,ApplicationMasterProtocol#allocate()方法還兼顧維持AM-RM心跳的作用,因此,即便應用運行過程中有一段時間無需申請任何資源,AM都需要周期性的調用相應該方法,以避免觸發RM的容錯機制。
其次是AM-NM
AM通過ContainerManagementProtocol協議與NM交互,包括3個方面的功能:啟動容器、查詢容器狀態、停止容器
//啟動容器
public abstract void startContainers( com.google.protobuf.RpcController controller, org.apache.hadoop.yarn.proto.YarnServiceProtos.StartContainersRequestProto request, com.google.protobuf.RpcCallback<org.apache.hadoop.yarn.proto.YarnServiceProtos.StartContainersResponseProto> done);
//查詢容器狀態
public abstract void getContainerStatuses( com.google.protobuf.RpcController controller, org.apache.hadoop.yarn.proto.YarnServiceProtos.GetContainerStatusesRequestProto request, com.google.protobuf.RpcCallback<org.apache.hadoop.yarn.proto.YarnServiceProtos.GetContainerStatusesResponseProto> done);
//停止容器
public abstract void stopContainers( com.google.protobuf.RpcController controller, org.apache.hadoop.yarn.proto.YarnServiceProtos.StopContainersRequestProto request, com.google.protobuf.RpcCallback<org.apache.hadoop.yarn.proto.YarnServiceProtos.StopContainersResponseProto> done);
AM通過ContainerManagementProtocol# startContainers()方法啟動一個NM上的容器,AM通過該接口向NM提供啟動容器的必要配置,包括分配到的資源、安全令牌、啟動容器的環境變量和命令等,這些信息都被封裝在StartContainersRequest中。NM收到請求后,會啟動相應的容器,並返回啟動成功的容器列表和失敗的容器列表,同時還返回其上相應的輔助服務元數據
至此,就剩下NM上container的MAP和REDUCE過程了。
本文圖文大量摘自:
http://my.oschina.net/u/1434348/blog/193374?p=1
只為學習,無意侵權
突然發現我能看懂董的博客上的絕大部分文章了