從源碼來看 Flink 提交作業並調度執行


從提交來一步一步分析,本文源碼基於Apache社區 1.8-release 版本

REST提交作業流程:

1.集群啟動后 通過 /jars/upload 向集群提交可執行jar文件

2.通過 /jars/:jarid/run 來啟動一個job

1.構建並提交JobGraph

我們直接找到WebSubmissionExtension這個類,在StandaloneSession 集群模式下集群初始化DispatcherRestEndpoint的時候會從WebSubmissionExtension里加載所有的Handlers(webSubmissionHandlers)

在WebSubmissionExtension中可以找到  /jars/:jarid/run 對應的Headers是JarRunHeaders,而接受http請求的是jarRunHandler。

Flink的rest服務是基於netty實現的,在jarRunHandler接受http請求后會調用handleRequest()方法來處理請求。

在handleRequest()方法的第一行如下,會從request中構造一個JarHandlerContext對象,而jobId就是JarHandlerContext對象的一個屬性。在之后的getJobGraphAsync()傳入的第一個參數就是context

在getJobGraphAsync()方法中調用context的toJobGraph()方法獲取jobGraph

protected CompletableFuture<JarRunResponseBody> handleRequest(
			@Nonnull final HandlerRequest<JarRunRequestBody, JarRunMessageParameters> request,
			@Nonnull final DispatcherGateway gateway) throws RestHandlerException {
		final JarHandlerContext context = JarHandlerContext.fromRequest(request, jarDir, log);

...
final CompletableFuture<JobGraph> jobGraphFuture = getJobGraphAsync(context, savepointRestoreSettings, jobName, streamGraphPlan, userLibJars);
...
}

private CompletableFuture<JobGraph> getJobGraphAsync(
			JarHandlerContext context,
			final SavepointRestoreSettings savepointRestoreSettings,
			final String jobName,
			final String plan,
			final List<URL> userLibJars) {
		return CompletableFuture.supplyAsync(() -> {
			final JobGraph jobGraph = context.toJobGraph(configuration, jobName, plan, userLibJars);
			jobGraph.setSavepointRestoreSettings(savepointRestoreSettings);
			return jobGraph;
		}, executor);
	}

  

內部版本當前判斷streamGraphPlan是否存在來執行不同的createJobGraph方法,區別在於是否傳入jobId。

社區版調用PackagedProgramUtils的createJobGraph()方法的時候會把JarHandlerContext的jobId屬性傳過去,隨后通過steamPlan(streamGraph)的getJobGraph()方法把jobId傳進去,之后調用StreamingJobGraphGenerator.createJobGraph()方法傳入this(streamGraph)和jobId,在new jobGraph時傳入jobId和jobName。

JobGraph的構造方法判斷jobId和jobName是否為空,如果為空新生成一個jobId實例,jobName則使用默認值"(unnamed job)"

 JobGraph的構造方法:

public JobGraph(JobID jobId, String jobName) {
		this.jobID = jobId == null ? new JobID() : jobId;
		this.jobName = jobName == null ? "(unnamed job)" : jobName;

		try {
			setExecutionConfig(new ExecutionConfig());
		} catch (IOException e) {
			// this should never happen, since an empty execution config is always serializable
			throw new RuntimeException("bug, empty execution config is not serializable");
		}
	}

在拿到jobGraph后進行一些后續處理然后向集群提交job

CompletableFuture<Acknowledge> jobSubmissionFuture = jarUploadFuture.thenCompose(jobGraph -> {
			// we have to enable queued scheduling because slots will be allocated lazily
			jobGraph.setAllowQueuedScheduling(true);
			return gateway.submitJob(jobGraph, timeout);
		});

集群在接受jobGraph后,有如下的代碼:

private CompletableFuture<Acknowledge> internalSubmitJob(JobGraph jobGraph) {
		log.info("Submitting job {} ({}).", jobGraph.getJobID(), jobGraph.getName());

		final CompletableFuture<Acknowledge> persistAndRunFuture = waitForTerminatingJobManager(jobGraph.getJobID(), jobGraph, this::persistAndRunJob)
			.thenApply(ignored -> Acknowledge.get());

		return persistAndRunFuture.handleAsync((acknowledge, throwable) -> {
			if (throwable != null) {
				cleanUpJobData(jobGraph.getJobID(), true);

				final Throwable strippedThrowable = ExceptionUtils.stripCompletionException(throwable);
				log.error("Failed to submit job {}.", jobGraph.getJobID(), strippedThrowable);
				throw new CompletionException(
					new JobSubmissionException(jobGraph.getJobID(), "Failed to submit job.", strippedThrowable));
			} else {
				return acknowledge;
			}
		}, getRpcService().getExecutor());
	}

在internalSubmitJob()方法中調用waitForTerminatingJobManager()第一個參數就是jobId,隨后在異步執行完成后判斷時候有異常,在沒有異常即提交成功的情況下,調用cleanUpJobData()清理client在提交過程中的數據,清理的標識也是jobId

接着看waitForTerminatingJobManager()方法

private CompletableFuture<Void> waitForTerminatingJobManager(JobID jobId, JobGraph jobGraph, FunctionWithException<JobGraph, CompletableFuture<Void>, ?> action) {
		final CompletableFuture<Void> jobManagerTerminationFuture = getJobTerminationFuture(jobId)
			.exceptionally((Throwable throwable) -> {
				throw new CompletionException(
					new DispatcherException(
						String.format("Termination of previous JobManager for job %s failed. Cannot submit job under the same job id.", jobId),
						throwable)); });

		return jobManagerTerminationFuture.thenComposeAsync(
			FunctionUtils.uncheckedFunction((ignored) -> {
				jobManagerTerminationFutures.remove(jobId);
				return action.apply(jobGraph);
			}),
			getMainThreadExecutor());
	}

	CompletableFuture<Void> getJobTerminationFuture(JobID jobId) {
		if (jobManagerRunnerFutures.containsKey(jobId)) {
			return FutureUtils.completedExceptionally(new DispatcherException(String.format("Job with job id %s is still running.", jobId)));
		} else {
			return jobManagerTerminationFutures.getOrDefault(jobId, CompletableFuture.completedFuture(null));
		}
	}

其中getJobTerminationFuture()來判斷當前的jobId對應的job是否已在運行中,看方法名是在wait任務終止,實際在getJobTerminationFuture(),方法中並沒有終止任務的操作,只是判斷jobManagerRunnerFutures這個map中是否存在當前jobId。

private final Map<JobID, CompletableFuture<JobManagerRunner>> jobManagerRunnerFutures;

jobManagerRunnerFutures看定義就可以了解,是持有運行中job的以jobId為key,CompletableFuture<JobManagerRunner>為value的映射關系。

繼續回到internalSubmitJob()方法,在waitForTerminatingJobManager()用::(jdk1.8特性)傳入了方法persistAndRunJob(),在該方法中先把jobGraph包裝成SubmittedJobGraph寫到zk中,然后調用runJob()方法,runJob()方法會先根據jobId判斷當前job是否已經提交,然后創建一個jobManagerRunner,接着把CompletableFuture<JobManagerRunner>放到名為jobManagerRunnerFutures的Map里,其中key就是jobId。

private CompletableFuture<Void> persistAndRunJob(JobGraph jobGraph) throws Exception {
          //包裝jobGraph 寫入zk
		submittedJobGraphStore.putJobGraph(new SubmittedJobGraph(jobGraph));

		final CompletableFuture<Void> runJobFuture = runJob(jobGraph);

		return runJobFuture.whenComplete(BiConsumerWithException.unchecked((Object ignored, Throwable throwable) -> {
			if (throwable != null) {
				submittedJobGraphStore.removeJobGraph(jobGraph.getJobID());
			}
		}));
	}

	private CompletableFuture<Void> runJob(JobGraph jobGraph) {               
                //判斷當前job是否已經提交
		Preconditions.checkState(!jobManagerRunnerFutures.containsKey(jobGraph.getJobID()));

		final CompletableFuture<JobManagerRunner> jobManagerRunnerFuture = createJobManagerRunner(jobGraph);

		jobManagerRunnerFutures.put(jobGraph.getJobID(), jobManagerRunnerFuture);

		return jobManagerRunnerFuture
			.thenApply(FunctionUtils.nullFn())
			.whenCompleteAsync(
				(ignored, throwable) -> {
					if (throwable != null) {
						jobManagerRunnerFutures.remove(jobGraph.getJobID());
					}
				},
				getMainThreadExecutor());
	}

 繼續看createJobManagerRunner()方法,先異步的創建jobManagerRunner,然后執行startJobManagerRunner()方法,在確認jobManagerRunner后,執行start方法啟動jobManagerRunner。

在jobManagerRunner的start方法中,啟動zk選舉服務,讓自身(this)參與選舉獲得執行權,在zk確認后會回調grantLeadership()方法,jobManagerRunner實現了LeaderContender接口。

public void start() throws Exception {
		try {
			leaderElectionService.start(this);
		} catch (Exception e) {
			log.error("Could not start the JobManager because the leader election service did not start.", e);
			throw new Exception("Could not start the leader election service.", e);
		}
	}

 

@Override
	public void grantLeadership(final UUID leaderSessionID) {
		synchronized (lock) {
			if (shutdown) {
				log.info("JobManagerRunner already shutdown.");
				return;
			}

			leadershipOperation = leadershipOperation.thenCompose(
				(ignored) -> {
					synchronized (lock) {
						return verifyJobSchedulingStatusAndStartJobManager(leaderSessionID);
					}
				});

			handleException(leadershipOperation, "Could not start the job manager.");
		}
	}

 獲得執行權限后調用verifyJobSchedulingStatusAndStartJobManager()方法,先判斷job狀態,如果是DONE(finished),則已經finished,否則執行startJobMaster(),在startJobMaster()方法中先把job狀態設為running,

把job和對應的狀態寫到zk。

如果需要實時的獲取job狀態可以用zk watch這個路徑

private CompletableFuture<Void> verifyJobSchedulingStatusAndStartJobManager(UUID leaderSessionId) {
		final CompletableFuture<JobSchedulingStatus> jobSchedulingStatusFuture = getJobSchedulingStatus();

		return jobSchedulingStatusFuture.thenCompose(
			jobSchedulingStatus -> {
				if (jobSchedulingStatus == JobSchedulingStatus.DONE) {
					return jobAlreadyDone();
				} else {
					return startJobMaster(leaderSessionId);
				}
			});
	}

	private CompletionStage<Void> startJobMaster(UUID leaderSessionId) {
		log.info("JobManager runner for job {} ({}) was granted leadership with session id {} at {}.",
			jobGraph.getName(), jobGraph.getJobID(), leaderSessionId, getAddress());

		try {
			runningJobsRegistry.setJobRunning(jobGraph.getJobID());
		} catch (IOException e) {
			return FutureUtils.completedExceptionally(
				new FlinkException(
					String.format("Failed to set the job %s to running in the running jobs registry.", jobGraph.getJobID()),
					e));
		}

		final CompletableFuture<Acknowledge> startFuture;
		try {
			startFuture = jobMasterService.start(new JobMasterId(leaderSessionId));
		} catch (Exception e) {
			return FutureUtils.completedExceptionally(new FlinkException("Failed to start the JobMaster.", e));
		}

		final CompletableFuture<JobMasterGateway> currentLeaderGatewayFuture = leaderGatewayFuture;
		return startFuture.thenAcceptAsync(
			(Acknowledge ack) -> confirmLeaderSessionIdIfStillLeader(leaderSessionId, currentLeaderGatewayFuture),
			executor);
	}

  然后執行jobMasterService.start(),在jobMaster中 start()方法啟動RPC服務,然后startJobExecution來調度作業。

public CompletableFuture<Acknowledge> start(final JobMasterId newJobMasterId) throws Exception {
		// make sure we receive RPC and async calls
		start();

		return callAsyncWithoutFencing(() -> startJobExecution(newJobMasterId), RpcUtils.INF_TIMEOUT);
	}

  startJobExecution()方法如下:

private Acknowledge startJobExecution(JobMasterId newJobMasterId) throws Exception {

		validateRunsInMainThread();

		checkNotNull(newJobMasterId, "The new JobMasterId must not be null.");

		if (Objects.equals(getFencingToken(), newJobMasterId)) {
			log.info("Already started the job execution with JobMasterId {}.", newJobMasterId);

			return Acknowledge.get();
		}

		setNewFencingToken(newJobMasterId);

		startJobMasterServices();

		log.info("Starting execution of job {} ({}) under job master id {}.", jobGraph.getName(), jobGraph.getJobID(), newJobMasterId);

		resetAndScheduleExecutionGraph();

		return Acknowledge.get();
	}

  其中validateRunsInMainThread()使用斷言來確認調用是否發生在RPC endpoint 的主線程中,正常不會執行。然后判斷jobMasterId,並且確認當前jobMaster沒有調度過其他的job。接着到startJobMasterServices()方法,這個方法的主要作用是在調度作業之前啟動jobMaster相關的組件:

  1. 啟動心跳服務
  2. 啟動taskManager的slotPool RPC服務,確保接受當前jobMaster的調用和分配請求
  3. 啟動schedule
  4. 連接到resourceManager

在這些步驟執行完成之后,執行resetAndScheduleExecutionGraph()來開始調度executionGraph。

private void resetAndScheduleExecutionGraph() throws Exception {
		validateRunsInMainThread();

		final CompletableFuture<Void> executionGraphAssignedFuture;

		if (executionGraph.getState() == JobStatus.CREATED) {
			executionGraphAssignedFuture = CompletableFuture.completedFuture(null);
			executionGraph.start(getMainThreadExecutor());
		} else {
			suspendAndClearExecutionGraphFields(new FlinkException("ExecutionGraph is being reset in order to be rescheduled."));
			final JobManagerJobMetricGroup newJobManagerJobMetricGroup = jobMetricGroupFactory.create(jobGraph);
			final ExecutionGraph newExecutionGraph = createAndRestoreExecutionGraph(newJobManagerJobMetricGroup);

			executionGraphAssignedFuture = executionGraph.getTerminationFuture().handle(
				(JobStatus ignored, Throwable throwable) -> {
					newExecutionGraph.start(getMainThreadExecutor());
					assignExecutionGraph(newExecutionGraph, newJobManagerJobMetricGroup);
					return null;
				});
		}

		executionGraphAssignedFuture.thenRun(this::scheduleExecutionGraph);
	}

  首先判斷executionGraph的狀態是否為create,如果不為create會根據jobGraph創建新的executionGraph來代替當前的executionGraph,然后執行scheduleExecutionGraph(),

private void scheduleExecutionGraph() {
		checkState(jobStatusListener == null);
		// register self as job status change listener
		jobStatusListener = new JobManagerJobStatusListener();
		executionGraph.registerJobStatusListener(jobStatusListener);

		try {
			executionGraph.scheduleForExecution();
		}
		catch (Throwable t) {
			executionGraph.failGlobal(t);
		}
	}

  注冊想executionGraph作業狀態變更監聽器,執行executionGraph.scheduleForExecution(),先更新狀態從created到running,然后判斷調度模式,目前有兩種調度模式:

  1. LAZY_FROM_SOURCES
  2. EAGER

  Eager 調度如其名子所示,它會在作業啟動時申請資源將所有的 Task 調度起來。這種調度算法主要用來調度可能沒有終止的流作業。與之對應,Lazy From Source 則是從 Source 開始,按拓撲順序來進行調度。簡單來說,Lazy From Source 會先調度沒有上游任務的 Source 任務,當這些任務執行完成時,它會將輸出數據緩存到內存或者寫入到磁盤中。然后,對於后續的任務,當它的前驅任務全部執行完成后,Flink 就會將這些任務調度起來。這些任務會從讀取上游緩存的輸出數據進行自己的計算。這一過程繼續進行直到所有的任務完成計算。

 

   我們占時可以先不考慮批程序,從流程序scheduleEager()繼續往下看,scheduleEager()方法有點長,我們先把這個方法貼出來一步一步來看。

private CompletableFuture<Void> scheduleEager(SlotProvider slotProvider, final Time timeout) {
		assertRunningInJobMasterMainThread();
		checkState(state == JobStatus.RUNNING, "job is not running currently");

		// Important: reserve all the space we need up front.
		// that way we do not have any operation that can fail between allocating the slots
		// and adding them to the list. If we had a failure in between there, that would
		// cause the slots to get lost
		final boolean queued = allowQueuedScheduling;

		// collecting all the slots may resize and fail in that operation without slots getting lost
		final ArrayList<CompletableFuture<Execution>> allAllocationFutures = new ArrayList<>(getNumberOfExecutionJobVertices());

		final Set<AllocationID> allPreviousAllocationIds =
			Collections.unmodifiableSet(computeAllPriorAllocationIdsIfRequiredByScheduling());

		// allocate the slots (obtain all their futures
		for (ExecutionJobVertex ejv : getVerticesTopologically()) {
			// these calls are not blocking, they only return futures
			Collection<CompletableFuture<Execution>> allocationFutures = ejv.allocateResourcesForAll(
				slotProvider,
				queued,
				LocationPreferenceConstraint.ALL,
				allPreviousAllocationIds,
				timeout);

			allAllocationFutures.addAll(allocationFutures);
		}

		// this future is complete once all slot futures are complete.
		// the future fails once one slot future fails.
		final ConjunctFuture<Collection<Execution>> allAllocationsFuture = FutureUtils.combineAll(allAllocationFutures);

		return allAllocationsFuture.thenAccept(
			(Collection<Execution> executionsToDeploy) -> {
				for (Execution execution : executionsToDeploy) {
					try {
						execution.deploy();
					} catch (Throwable t) {
						throw new CompletionException(
							new FlinkException(
								String.format("Could not deploy execution %s.", execution),
								t));
					}
				}
			})
			// Generate a more specific failure message for the eager scheduling
			.exceptionally(
				(Throwable throwable) -> {
					final Throwable strippedThrowable = ExceptionUtils.stripCompletionException(throwable);
					final Throwable resultThrowable;
					if (strippedThrowable instanceof TimeoutException) {
						int numTotal = allAllocationsFuture.getNumFuturesTotal();
						int numComplete = allAllocationsFuture.getNumFuturesCompleted();

						String message = "Could not allocate all requires slots within timeout of " +
							timeout + ". Slots required: " + numTotal + ", slots allocated: " + numComplete +
								", previous allocation IDs: " + allPreviousAllocationIds;

						StringBuilder executionMessageBuilder = new StringBuilder();

						for (int i = 0; i < allAllocationFutures.size(); i++) {
							CompletableFuture<Execution> executionFuture = allAllocationFutures.get(i);

							try {
								Execution execution = executionFuture.getNow(null);
								if (execution != null) {
									executionMessageBuilder.append("completed: " + execution);
								} else {
									executionMessageBuilder.append("incomplete: " + executionFuture);
								}
							} catch (CompletionException completionException) {
								executionMessageBuilder.append("completed exceptionally: " + completionException + "/" + executionFuture);
							}

							if (i < allAllocationFutures.size() - 1) {
								executionMessageBuilder.append(", ");
							}
						}

						message += ", execution status: " + executionMessageBuilder.toString();

						resultThrowable = new NoResourceAvailableException(message);
					} else {
						resultThrowable = strippedThrowable;
					}

					throw new CompletionException(resultThrowable);
				});
	}

  首先后驗證當前job的狀態,確認當前的job state確實為running,否者拋出異常,job狀態先設置為running然后才開始調度的。接着從ExecutionJobVertex(以后簡稱ejv)開始遍歷分配slot,在ejv的allocateResourcesForAll()方法中其實又把ejv的ExecutionVertex(簡稱ev)遍歷一遍,然后取ev對應的Execution然后調用Execution的allocateAndAssignSlotForExecution()方法分配slot,具體分配算法之后單獨介紹。

  在分配完slot之后,調用execution.deploy()方法來啟動部署。

streamGraph,jobGraph,executionGraph,ExecutionJobVertex,ExecutionVertex,Execution 的關系可以參考下圖:

 

 

解析


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM