我們在https://www.cnblogs.com/dongxiao-yang/p/9403427.html文章里分析了flink提交single job到yarn集群上的代碼,flink在1.5版本后對整個框架的deploy方式重構了全新的流程(參考https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=65147077),本文基於flink1.6.1版本源碼分析一下新模式在yarn的整個流程。
一 初始化
客戶端本地整個初始化流程與https://www.cnblogs.com/dongxiao-yang/p/9403427.html差不多,由於newmode的關系,幾個有區別的地方為
1 final ClusterDescriptor<T> clusterDescriptor = customCommandLine.createClusterDescriptor(commandLine); ,返回的具體對象類為YarnClusterDescriptor
2 ClientFrontend.runProgram方法會進入if (isNewMode && clusterId == null && runOptions.getDetachedMode()) {..方法塊,調用路徑為
YarnClusterDescriptor.deployJobCluster->AbstractYarnClusterDescriptor.deployInternal->startAppMaster
這個時候我們發現AM的啟動類變成了YarnJobClusterEntrypoint
二 YarnJobClusterEntrypoint
YarnJobClusterEntrypoint的main函數是整個AM進程的啟動入口,在方法的最后會調用其祖父類ClusterEntrypoint的startCluster方法開啟整個集群組件的啟動過程。
具體調用鏈路為startCluster->runCluster->startClusterComponents
protected void startClusterComponents(
Configuration configuration,
RpcService rpcService,
HighAvailabilityServices highAvailabilityServices,
BlobServer blobServer,
HeartbeatServices heartbeatServices,
MetricRegistry metricRegistry) throws Exception {
synchronized (lock) {
dispatcherLeaderRetrievalService = highAvailabilityServices.getDispatcherLeaderRetriever();
resourceManagerRetrievalService = highAvailabilityServices.getResourceManagerLeaderRetriever();
LeaderGatewayRetriever<DispatcherGateway> dispatcherGatewayRetriever = new RpcGatewayRetriever<>(
rpcService,
DispatcherGateway.class,
DispatcherId::fromUuid,
10,
Time.milliseconds(50L));
LeaderGatewayRetriever<ResourceManagerGateway> resourceManagerGatewayRetriever = new RpcGatewayRetriever<>(
rpcService,
ResourceManagerGateway.class,
ResourceManagerId::fromUuid,
10,
Time.milliseconds(50L));
// TODO: Remove once we have ported the MetricFetcher to the RpcEndpoint
final ActorSystem actorSystem = ((AkkaRpcService) rpcService).getActorSystem();
final Time timeout = Time.milliseconds(configuration.getLong(WebOptions.TIMEOUT));
webMonitorEndpoint = createRestEndpoint(
configuration,
dispatcherGatewayRetriever,
resourceManagerGatewayRetriever,
transientBlobCache,
rpcService.getExecutor(),
new AkkaQueryServiceRetriever(actorSystem, timeout),
highAvailabilityServices.getWebMonitorLeaderElectionService());
LOG.debug("Starting Dispatcher REST endpoint.");
webMonitorEndpoint.start();
resourceManager = createResourceManager(
configuration,
ResourceID.generate(),
rpcService,
highAvailabilityServices,
heartbeatServices,
metricRegistry,
this,
clusterInformation,
webMonitorEndpoint.getRestBaseUrl());
jobManagerMetricGroup = MetricUtils.instantiateJobManagerMetricGroup(metricRegistry, rpcService.getAddress());
final HistoryServerArchivist historyServerArchivist = HistoryServerArchivist.createHistoryServerArchivist(configuration, webMonitorEndpoint);
dispatcher = createDispatcher(
configuration,
rpcService,
highAvailabilityServices,
resourceManager.getSelfGateway(ResourceManagerGateway.class),
blobServer,
heartbeatServices,
jobManagerMetricGroup,
metricRegistry.getMetricQueryServicePath(),
archivedExecutionGraphStore,
this,
webMonitorEndpoint.getRestBaseUrl(),
historyServerArchivist);
LOG.debug("Starting ResourceManager.");
resourceManager.start();
resourceManagerRetrievalService.start(resourceManagerGatewayRetriever);
LOG.debug("Starting Dispatcher.");
dispatcher.start();
dispatcherLeaderRetrievalService.start(dispatcherGatewayRetriever);
}
}
從上述代碼里可以發現,AM里面包含兩個重要的全新組件:ResourceManager和Dispatcher
在FLIP6的改進下,Resource這個全新的角色定義如下:
The main tasks of the ResourceManager are
-
Acquire new TaskManager (or slots) by starting containers, or allocating them to a job
-
Giving failure notifications to JobManagers and TaskManagers
-
Caching TaskManagers (containers) to be reused, releasing TaskManagers (containers) that are unused for a certain period.
大體來說就是由ResourceManager負責和YARN集群進行資源申請上的溝通,並給指定JobManager分配特定

aa
在yarn模式下,ResourceManager對應的實現類為YarnResourceManager,在這個類的initialize方法中,我們可以發現它實例化了兩個client,resourceManagerClient和nodeManagerClient,這兩個客戶端分別包含了Yarn框架的AMRMClientAsync和NMClient,分別用來負責和Yarn的ResourceManager和NodeManager通信。
@Override protected void initialize() throws ResourceManagerException { try { resourceManagerClient = createAndStartResourceManagerClient( yarnConfig, yarnHeartbeatIntervalMillis, webInterfaceUrl); } catch (Exception e) { throw new ResourceManagerException("Could not start resource manager client.", e); } nodeManagerClient = createAndStartNodeManagerClient(yarnConfig); }
關於Dispatcher的定義如下,它取代了以前由jobManager負責的提交job給集群的工作,並且預期將來可以由一個dispatcher提交任務給多個集群。
The new design includes the concept of a Dispatcher. The dispatcher accepts job submissions from clients and starts the jobs on their behalf on a cluster manager.
The dispatcher is introduced because:
-
Some cluster managers need a central job spawning and monitoring instance
-
It subsumes the role of the standalone JobManager, waiting for jobs to be submitted

在本文的條件下,Dispatcher具體的實現類為MiniDispatcher,在dispatcher.start();調用后,整個調用鏈經過了
leaderElectionService.start(this)->
ZooKeeperLeaderElectionService.start->
ZooKeeperLeaderElectionService.isLeader->
Dispatcher.grantLeadership->
tryAcceptLeadershipAndRunJobs->
runJob->
createJobManagerRunner
調到了DisPatcher的createJobManagerRunner方法。
private CompletableFuture<JobManagerRunner> createJobManagerRunner(JobGraph jobGraph) {
final RpcService rpcService = getRpcService();
final CompletableFuture<JobManagerRunner> jobManagerRunnerFuture = CompletableFuture.supplyAsync(
CheckedSupplier.unchecked(() ->
jobManagerRunnerFactory.createJobManagerRunner(
ResourceID.generate(),
jobGraph,
configuration,
rpcService,
highAvailabilityServices,
heartbeatServices,
blobServer,
jobManagerSharedServices,
new DefaultJobManagerJobMetricGroupFactory(jobManagerMetricGroup),
fatalErrorHandler)),
rpcService.getExecutor());
return jobManagerRunnerFuture.thenApply(FunctionUtils.uncheckedFunction(this::startJobManagerRunner));
}
上述代碼可以分為兩個部分,第一部分經過DefaultJobManagerRunnerFactory.createJobManagerRunner->new JobManagerRunner->new JobMaster初始化了JobMaster對象。
第二部分經過
startJobManagerRunner->
JobManagerRunner.start->
ZooKeeperLeaderElectionService.start->
ZooKeeperLeaderElectionService.isLeader->
JobManagerRunner.grantLeadership->
verifyJobSchedulingStatusAndStartJobManager->
jobMaster.start->
startJobExecution->
private Acknowledge startJobExecution(JobMasterId newJobMasterId) throws Exception {
validateRunsInMainThread();
checkNotNull(newJobMasterId, "The new JobMasterId must not be null.");
if (Objects.equals(getFencingToken(), newJobMasterId)) {
log.info("Already started the job execution with JobMasterId {}.", newJobMasterId);
return Acknowledge.get();
}
setNewFencingToken(newJobMasterId);
startJobMasterServices();
log.info("Starting execution of job {} ({})", jobGraph.getName(), jobGraph.getJobID());
resetAndScheduleExecutionGraph();
return Acknowledge.get();
}
private void startJobMasterServices() throws Exception {
// start the slot pool make sure the slot pool now accepts messages for this leader
slotPool.start(getFencingToken(), getAddress());
//TODO: Remove once the ZooKeeperLeaderRetrieval returns the stored address upon start
// try to reconnect to previously known leader
reconnectToResourceManager(new FlinkException("Starting JobMaster component."));
// job is ready to go, try to establish connection with resource manager
// - activate leader retrieval for the resource manager
// - on notification of the leader, the connection will be established and
// the slot pool will start requesting slots
resourceManagerLeaderRetriever.start(new ResourceManagerLeaderListener());
}
JobMaster首先調用startJobMasterServices進行連接flink resource manager ,啟動jobmanager服務並注冊等操作。然后通過resetAndScheduleExecutionGraph執行任務資源的初始化申請。resetAndScheduleExecutionGraph方法首先調用createAndRestoreExecutionGraph生成了整個任務的executiongraph,然后通過
scheduleExecutionGraph->
ExecutionGraph.scheduleForExecution->
scheduleEager->
ExecutionJobVertex.allocateResourcesForAll->
Execution.allocateAndAssignSlotForExecution->
ProviderAndOwner.allocateSlot->
SlotPool.allocateSlot->
allocateMultiTaskSlot
提出對任務slot資源的申請
SlotPool.requestSlotFromResourceManager->
ResourceManager.requestSlot->
SlotManager.registerSlotRequest->
internalRequestSlot->
ResourceActionsImpl.allocateResource->
YarnResourceManager.startNewWorker->
申請啟動新的TaskManager
@Override public void startNewWorker(ResourceProfile resourceProfile) { log.info("startNewWorker"); // Priority for worker containers - priorities are intra-application //TODO: set priority according to the resource allocated Priority priority = Priority.newInstance(generatePriority(resourceProfile)); int mem = resourceProfile.getMemoryInMB() < 0 ? defaultTaskManagerMemoryMB : resourceProfile.getMemoryInMB(); int vcore = resourceProfile.getCpuCores() < 1 ? defaultCpus : (int) resourceProfile.getCpuCores(); Resource capability = Resource.newInstance(mem, vcore); requestYarnContainer(capability, priority); } private void requestYarnContainer(Resource resource, Priority priority) { resourceManagerClient.addContainerRequest(new AMRMClient.ContainerRequest(resource, null, null, priority)); // make sure we transmit the request fast and receive fast news of granted allocations resourceManagerClient.setHeartbeatInterval(FAST_YARN_HEARTBEAT_INTERVAL_MS); numPendingContainerRequests++; log.info("Requesting new TaskExecutor container with resources {}. Number pending requests {}.", resource, numPendingContainerRequests); }
上述代碼就是flink resourcemanager 通過yarn客戶端與yarn通信申請taskmanager部分代碼
@Override
public void onContainersAllocated(List<Container> containers) {
log.info("onContainersAllocated");
runAsync(() -> {
for (Container container : containers) {
log.info(
"Received new container: {} - Remaining pending container requests: {}",
container.getId(),
numPendingContainerRequests);
if (numPendingContainerRequests > 0) {
numPendingContainerRequests--;
final String containerIdStr = container.getId().toString();
final ResourceID resourceId = new ResourceID(containerIdStr);
workerNodeMap.put(resourceId, new YarnWorkerNode(container));
try {
// Context information used to start a TaskExecutor Java process
ContainerLaunchContext taskExecutorLaunchContext = createTaskExecutorLaunchContext(
container.getResource(),
containerIdStr,
container.getNodeId().getHost());
nodeManagerClient.startContainer(container, taskExecutorLaunchContext);
} catch (Throwable t) {
log.error("Could not start TaskManager in container {}.", container.getId(), t);
// release the failed container
workerNodeMap.remove(resourceId);
resourceManagerClient.releaseAssignedContainer(container.getId());
// and ask for a new one
requestYarnContainer(container.getResource(), container.getPriority());
}
} else {
// return the excessive containers
log.info("Returning excess container {}.", container.getId());
resourceManagerClient.releaseAssignedContainer(container.getId());
}
}
// if we are waiting for no further containers, we can go to the
// regular heartbeat interval
if (numPendingContainerRequests <= 0) {
resourceManagerClient.setHeartbeatInterval(yarnHeartbeatIntervalMillis);
}
});
}
am客戶端在taskmanager的客戶端里會設置啟動的主類org.apache.flink.yarn.YarnTaskExecutorRunner
