Flink源碼閱讀(1.7.2)


Client提交任務

執行模式有:本地、遠程Standalone等,下面只介紹yarn模式。

Yarn模式

Job模式是每個flink job 單獨在yarn上聲明一個flink集群

Session模式會在集群中維護flink master,即一個yarn application master,運行多個job。

Job模式(重點是加上-m yarn-cluster):./flink run  -m yarn-cluster -d  -yst -yqu flinkqu -yst  -yn 4 -ys 2 -c flinkdemoclass  flinkdemo.jar  args1 args2 ... 
Session模式:
先啟動session:./bin/yarn-session.sh
后提交job:./bin/flink run ./path/to/job.jar

detached模式:上面job模式的-d代表detached,這種情況下flink yarn client將會只提交任務到集群然后關閉自己。這樣就不能從 env.execute() 中獲得 accumulator results 或 exceptions。而在session模式下使用,則無法使用flink停止yarn session,需用yarn工具來停止 yarn application -kill

根據flink的腳本可知,入口類為org.apache.flink.client.cli.CliFrontend。

main -> cliFrontend.parseParameters -> run() -> runProgram -> {
	獲取yarnClusterDescriptor:customCommandLine.createClusterDescriptor()
	
    if (clusterId == null && runOptions.getDetachedMode()) { // job + DetachedMode模式
      從jar包中獲取jobGraph
      新建一個RestClusterClient:clusterDescriptor.deploySessionCluster(); -> {
          在yarn集群中啟動應用:deployInternal -> yarnClusterDescriptor.startAppMaster -> yarnClient.submitApplication() // flink便在yarn集群中啟動 ClusterEntrypoint,這個類的介紹看下面。
      } 
    }
    else {
        if (clusterId != null) { // session模式
          clusterDescriptor.retrieve(clusterId);
        }
        else { // job + non-DetachedMode模式
            針對非DetachedMode的job模式,job might consist of multiple parts (e.g. when using collect)。同樣是新建一個RestClusterClient,只是不需要jobGraph和DetachedMode
        }
    }
    
	executeProgram -> 這里是父類ClusterClient的run方法 -> {
        if 非交互模式 {
            RestClusterClient的run方法,這里阻塞,直到執行完成 -> {
                getOptimizedPlan // 獲取 optPlan 用於轉化為JobGraph,后續圖結構分析
                run // 這里接下面的restClusterClient.run()
            }
        }
        // 下面偽代碼可忽略
        else 交互模式 { 
            prog.invokeInteractiveModeForExecution()真正進入用戶的flink代碼 -> env.execute() 這里的env以及后面的指代StreamContextEnvironment -> {
                獲取streamGraph:this.getStreamGraph();
                if DetachedMode模式,則setDetachedPlan(streamGraph)
                else 執行ContextEnvironment.getClient().run()
	        }
	        if detached mode {
                ((DetachedEnvironment) factory.getLastEnvCreated()).finalizeExecute()這里調用DetachedEnvironment的finalizeExecute,里面調用RestClusterClient的run方法
	        } 
	        else { // blocking mode
                return this.lastJobExecutionResult;
	        }
        }
	}
}

用戶定義的Flink代碼會根據不同的ExecutionEnvironment調用相應的execute方法。yarn模式下的job和session,對應DetachedEnvironment和ContextEnvironment。

ClusterEntrypoint

YarnJobClusterEntrypoint 和 YarnSessionClusterEntrypoint 的父類。從下面偽代碼可知,ClusterEntrypoint 包含了 webMonitor、resourceManager、dispatcher 的服務。

兩者的main -> ClusterEntrypoint.runClusterEntrypoint -> startCluster -> runCluster -> {
    createDispatcherResourceManagerComponentFactory // 創建相應模式的factory
    create -> {
        webMonitorEndpoint.start();
        resourceManagerRetrievalService.start();
        dispatcherLeaderRetrievalService.start();
    }
}

restClusterClient.run()

run() -> {
    getJobGraph
    restClusterClient.submitJob(jobGraph, classLoader) -> restClusterClient.submitJob(jobGraph)這里包括一些文件的上傳 -> sendRetriableRequest -> restClient.sendRequest()
}

綜上所述,flink on yarn模式的Client提交任務流程大致如下:

  • 運行 flink 腳本,創建 yarnClusterDescriptor

  • job or session

    • job + Detached 模式:flink客戶端根據jar包准備好啟動AM的材料,包括yarnClusterDescriptor、RestClusterClient(包含jobGraph),在 Yarn 集群啟動 YarnJobClusterEntrypoint

    • session 模式:Flink Master (AM)已經啟動。clusterDescriptor.retrieve(clusterId)

  • 調用restClusterClient.run(),創建 jobGraph,並向集群提交 job

之后 job 將被提交給 Dispatcher。后續在"flink部署與執行模型"部分。

flink的圖結構

在Yarn模式下不使用StreamGraph,而是用OptimizedPlan生成JobGraph。StreamGraph適用於其他情況,比如本地執行。

flink的圖結構主要有JobGraph和ExecutionGraph。

JobGraph

JobGraph表示一個被 JobManager 接收的底層的Flink dataflow program。所有上層API代碼都會轉化為JobGraphs。抽象來說,JobGraph是一張由 vertices 和 intermediate results 組成的DAG圖。現在 iterations (feedback edges)已經不會被編譯到 JobGraph 了,而是去到了一些建立了反饋管道的 vertices 中。JobGraph規定了 job 層面上的配置,而其所包含的 vertex 和 intermediate result 定義了具體算子的特征和中間結果。

ExecutionGraph

協調數據流的分布式執行的核心數據結構。它保持每個並行任務,每個中間流以及它們之間的通信的表示。它主要由以下三個部分組成:

  • ExecutionJobVertex:對應 JobGraph 的 vertex,通常是一個算子,如map、join。它持有一組並行子任務的聚合狀態aggregated state。由 JobVertex 確定。
  • ExecutionVertex:表示一組並行任務中的其中一個子任務。數量由並行度而定。由 ExecutionJobVertex 和
  • Execution:執行 ExecutionVertex ,一個 ExecutionVertex 可能有多個 Execution 來應對失敗或重新計算。由 ExecutionAttemptID 確定。JM 和 TM 之間關於 task 的部署和更新都是根據 ExecutionAttemptID 來通知。

operator算子:一般operator的操作是通過反射獲取所傳入的function的返回對象,通過transform創建經過該function處理后得到的流實例。在返回生成的流實例之前,flink還會對轉換進行登記,即.addOperator(resultTransform)。

StreamGraph

StreamGraph通過StreamExecutionEnvironment中的getStreamGraph獲取。

for (StreamTransformation<?> transformation: transformations) {
	transform(transformation);
}
return streamGraph;

private Collection<Integer> transform(StreamTransformation<?> transform) {

    // 下面的各類transformXXX內部也會調用這個transform方法
	if (alreadyTransformed.containsKey(transform)) {
		return alreadyTransformed.get(transform);
	}

	// 並行度設置

	// call at least once to trigger exceptions about MissingTypeInfo
	transform.getOutputType();

	Collection<Integer> transformedIds;
	if (transform instanceof OneInputTransformation<?, ?>) {
		transformedIds = transformOneInputTransform((OneInputTransformation<?, ?>) transform);
	} else if (transform instanceof TwoInputTransformation<?, ?, ?>) {
		// ...
	}

	// need this check because the iterate transformation adds itself before
	// transforming the feedback edges
	if (!alreadyTransformed.containsKey(transform)) {
		alreadyTransformed.put(transform, transformedIds);
	}

	if (transform.getBufferTimeout() >= 0) {
		streamGraph.setBufferTimeout(transform.getId(), transform.getBufferTimeout());
	}
	if (transform.getUid() != null) {
		streamGraph.setTransformationUID(transform.getId(), transform.getUid());
	}
	if (transform.getUserProvidedNodeHash() != null) {
		streamGraph.setTransformationUserHash(transform.getId(), transform.getUserProvidedNodeHash());
	}

	if (transform.getMinResources() != null && transform.getPreferredResources() != null) {
		streamGraph.setResources(transform.getId(), transform.getMinResources(), transform.getPreferredResources());
	}

	return transformedIds;
}

transformOneInputTransform

private <IN, OUT> Collection<Integer> transformOneInputTransform(OneInputTransformation<IN, OUT> transform) {

	Collection<Integer> inputIds = transform(transform.getInput());

	// 在遞歸處理節點過程中,某個節點可能已經被其他子節點先處理過了,需要跳過
	if (alreadyTransformed.containsKey(transform)) {
		return alreadyTransformed.get(transform);
	}

    //這里是獲取slotSharingGroup。這個group用來定義當前我們在處理的這個操作符可以跟什么操作符chain到一個slot里進行操作
    //因為有時候我們可能不滿意flink替我們做的chain聚合
    //一個slot就是一個執行task的基本容器
	String slotSharingGroup = determineSlotSharingGroup(transform.getSlotSharingGroup(), inputIds);

	streamGraph.addOperator(transform...);
    
    //對於keyedStream,我們還要記錄它的keySelector方法
    //flink並不真正為每個keyedStream保存一個key,而是每次需要用到key的時候都使用keySelector方法進行計算
    //因此,我們自定義的keySelector方法需要保證冪等性
    //到后面介紹keyGroup的時候我們還會再次提到這一點
	if (transform.getStateKeySelector() != null) {
		TypeSerializer<?> keySerializer = transform.getStateKeyType().createSerializer(env.getConfig());
		streamGraph.setOneInputStateKey(transform.getId(), transform.getStateKeySelector(), keySerializer);
	}

	streamGraph.setParallelism(...);
	streamGraph.setMaxParallelism(...);
    
    //為當前節點和它的依賴節點建立邊
    //這里可以看到之前提到的select, union, partition等邏輯節點被合並入edge的過程
	for (Integer inputId: inputIds) {
		streamGraph.addEdge(inputId, transform.getId(), 0);
	}

	return Collections.singleton(transform.getId());
}

//addEdge的實現,會合並一些邏輯節點
private void addEdgeInternal(...) {
    //如果輸入邊是側輸出節點,則把side的輸入邊作為本節點的輸入邊,並遞歸調用
	if (virtualSideOutputNodes.containsKey(upStreamVertexID)) {
		int virtualId = upStreamVertexID;
		upStreamVertexID = virtualSideOutputNodes.get(virtualId).f0;
		if (outputTag == null) {
			outputTag = virtualSideOutputNodes.get(virtualId).f1;
		}
		addEdgeInternal(upStreamVertexID, downStreamVertexID, typeNumber, partitioner, null, outputTag);
		//如果輸入邊是select,則把select的輸入邊作為本節點的輸入邊
	} else if (
        // virtualSelectNodes和virtualPartitionNodes跟上面的virtualSideOutputNodes操作一樣
	} else {
	//正常的edge處理邏輯
		StreamNode upstreamNode = getStreamNode(upStreamVertexID);
		StreamNode downstreamNode = getStreamNode(downStreamVertexID);

		// If no partitioner was specified and the parallelism of upstream and downstream
		// operator matches use forward partitioning, use rebalance otherwise.
		if (partitioner == null && upstreamNode.getParallelism() == downstreamNode.getParallelism()) {
			partitioner = new ForwardPartitioner<Object>();
		} else if (partitioner == null) {
			partitioner = new RebalancePartitioner<Object>();
		}

        // 定義了ForwardPartitioner但上下游並行度不一致時拋異常
		// ...
        
		StreamEdge edge = new StreamEdge(upstreamNode, downstreamNode, typeNumber, outputNames, partitioner, outputTag);

		getStreamNode(edge.getSourceId()).addOutEdge(edge);
		getStreamNode(edge.getTargetId()).addInEdge(edge);
	}
}

補充:

StreamTransformation的封裝

StreamTransformation整合

並不是每一個 StreamTransformation 都會轉換成runtime層中的物理操作。有一些只是邏輯概念,比如union、split/select、partition等在運行時會優化。

OptimizedPlan

由 PlanNode 和 Channel 組成。它們定義了算子策略,如 sorting-merge join, hash join, sorted grouping 等,定義了傳遞策略,如 local pipe, shuffle, broadcast, rebalance 等,還定義了數據交換模式,如 batched, pipelined。下面代碼從 “Client提交任務” 的 getOptimizedPlan 開始。

 -> compiler.compile(prog.getPlan())這里prog是PackagedProgram.getPlanWithJars獲得的,compiler實際為 Optimizer -> compile() -> {
     // 首先要創建 optimizer plan 的表示,這一步包括:給每個算子創建 optimizer plan node;用 channels 把它們連接起來;尋找 local strategies and channel types 的線索,並設置它們;對數據量進行估計
     new GraphCreatingVisitor,這里與 Spark 生成執行計划有點像,都是利用訪問者模式。
     program.accept(graphCreator); // 這里的 visitor 先從 sink 開始遍歷產生相應的 node,然后從 source 開始將 node 連接起來。
     // 后續代碼關於 node 的內存等細節調整,有不少注釋能幫助了解細節,這里略過。
     // 最后獲取plan
     PlanFinalizer().createFinalPlan() 
 }

JobGraph

獲取 OptimizedPlan 后調用 ClusterClient 的另一個 run 方法。

run -> getJobGraph -> {
  // 這里根據 StreamingPlan 或者 OptimizedPlan 調用不同的方法生成 jobGraph,下面針對 OptimizedPlan
  job = gen.compileJobGraph((OptimizedPlan) optPlan) -> compileJobGraph() -> {
      program.accept(JobGraphGenerator); // 遍歷,積累產生 job graph 的對象
      // 各個 task 的配置
      // 生成 job graph
      new JobGraph()
      for (JobVertex vertex : this.vertices.values()) { // vertices 是 PlanNode 和 JobVertex 的映射
			graph.addVertex(vertex);
	  }

  }
  // 給 job 添加 jar url 和 classpaths
}

下面是 StreamGraph 的 getJobGraph,里面調用StreamingJobGraphGenerator.createJobGraph(this, jobID);

private JobGraph createJobGraph() {
	// make sure that all vertices start immediately,所有節點一開始就啟動
	jobGraph.setScheduleMode(ScheduleMode.EAGER);
	// Generate deterministic hashes for the nodes in order to identify them across
	// submission iff they didn't change.
	Map<Integer, byte[]> hashes = defaultStreamGraphHasher.traverseStreamGraphAndGenerateHashes(streamGraph);
	// Generate legacy version hashes for backwards compatibility
	List<Map<Integer, byte[]>> legacyHashes = new ArrayList<>(legacyStreamGraphHashers.size());
	for (StreamGraphHasher hasher : legacyStreamGraphHashers) {
		legacyHashes.add(hasher.traverseStreamGraphAndGenerateHashes(streamGraph));
	}
	Map<Integer, List<Tuple2<byte[], byte[]>>> chainedOperatorHashes = new HashMap<>();
    // 遍歷streamGraph,看節點之間能否chain,即多個operator合並到一個線程任務。
	setChaining(hashes, legacyHashes, chainedOperatorHashes);
	setPhysicalEdges();
	setSlotSharingAndCoLocation();
	configureCheckpointing();
	JobGraphGenerator.addUserArtifactEntries(streamGraph.getEnvironment().getCachedFiles(), jobGraph);
	// set the ExecutionConfig last when it has been finalized
	try {
		jobGraph.setExecutionConfig(streamGraph.getExecutionConfig());
	}
	catch (){}
	return jobGraph;
}

// chain作用:減少線程之間的切換,減少消息的序列化/反序列化,減少數據在緩沖區的交換,減少了延遲的同時提高整體的吞吐量。chain邏輯如下
public static boolean isChainable(StreamEdge edge, StreamGraph streamGraph) {
	// ...
	return downStreamVertex.getInEdges().size() == 1 // 下游節點沒有來自其他節點的輸入
			&& outOperator != null
			&& headOperator != null
			&& upStreamVertex.isSameSlotSharingGroup(downStreamVertex)
			&& outOperator.getChainingStrategy() == ChainingStrategy.ALWAYS
			&& (headOperator.getChainingStrategy() == ChainingStrategy.HEAD ||
				headOperator.getChainingStrategy() == ChainingStrategy.ALWAYS)
			&& (edge.getPartitioner() instanceof ForwardPartitioner)
			&& upStreamVertex.getParallelism() == downStreamVertex.getParallelism()
			&& streamGraph.isChainingEnabled();
}

ExecutionGraph

調用JobMaster的createAndRestoreExecutionGraph,里面時ExecutionGraphBuilder.buildGraph(…)

方法代碼比較長,包括checkpoint、state backend等的設置,這些在下面代碼中省略

// create a new execution graph, if none exists so far,創建過程基本都是直接賦值和創建java常用集合
// ...

// // set the basic properties
executionGraph.setScheduleMode(jobGraph.getScheduleMode());
executionGraph.setQueuedSchedulingAllowed(jobGraph.getAllowQueuedScheduling());
executionGraph.setJsonPlan(JsonPlanGenerator.generatePlan(jobGraph));

// initialize the vertices that have a master initialization hook
// file output formats create directories here, input formats create splits

log.info("Running initialization on master for job {} ({}).", jobName, jobId);

for (JobVertex vertex : jobGraph.getVertices()) {
    
	// 是否有executableClass,設置vertex的並行度
    // ...
    
    // 針對source和sink有相應的初始化
	vertex.initializeOnMaster(classLoader);
}

log.info("Successfully ran initialization on master in {} ms.",..);

// topologically sort the job vertices and attach the graph to the existing one
List<JobVertex> sortedTopology = jobGraph.getVerticesSortedTopologicallyFromSources();
// Adding {} vertices from job graph
executionGraph.attachJobGraph(sortedTopology);
// Successfully created execution graph from job graph

flink部署與執行模型

調度相關概念和大致流程可參考Flink架構及其工作原理

以下內容參考FLIP-6 - Flink Deployment and Process Model - Standalone, Yarn, Mesos, Kubernetes, etc.,有些新特征還未實現。

下面的 jobManager 和 TaskManager 都是習慣稱呼,在較新版本的flink,比如1.7,底層的 JobManager 主要有 JobMaster 實現,而 TaskManager 則由 TaskExecutor 實現。

Single Job JobManager

The most important change is that the JobManager handles only a single job. The JobManager will be created with a JobGraph and will be destroyed after the job execution is finished. This model more naturally maps what happens with most jobs anyways. Cross-job functionality is handled by other components that wrap and create JobManagers. This leads to a better separation of concerns, and a more modular composability for various cluster managers. The JobManager constructor will also optionally take a Savepoint or Checkpoint to initialize the job from.

The JobManager has a SlotPool which holds the slots that were offered to it and accepted. The JobManager’s scheduler grabs slots from the SlotPool and can thus access all currently registered slots even if the ResourceManager is down. The SlotPool is a modification of what is currently the InstanceManager. The SlotPool will attempt to acquire new slots from the ResourceManager when it cannot serve a slot request. If no ResourceManager is currently available, or it gets a decline from the ResourceManager, or a request times out, it fails the slot request. The SlotPool releases slots that are unused to the ResourceManager. Slots count as unused if they are not used when the job is fully running (fully recovered).

ResourceManager

  • Acquire new TaskManager (or slots) by starting containers, or allocating them to a job
  • Giving failure notifications to JobManagers and TaskManagers
  • Caching TaskManagers (containers) to be reused, releasing TaskManagers (containers) that are unused for a certain period.
  • The ResourceManager may or may not be task slot aware (probably will be). A slot aware ResourceManager maintains a map of available TaskManager slots

For the sake of simplicity, the following talks about “slots”, but one can think simply of “TaskManager” instead, for the common case of a one-slot TaskManager.

The following are core aspects of the ResourceManager design:

  • ResourceManager 接受申請 slots 的請求,jobs 可以請求具有不同資源的 TaskManager。

  • ResourceManager 能夠跨多個 jobs 和 jobManagers運行。它負責 TM 的登記,維持一個資源池,里面有available TaskManagers and their slots。

  • ResourceManager 的 fail 不會影響當前 jobs 的執行,只是當前 jobs 不能獲得新的 slots。

  • ResourceManager fault tolerance should work without persistent state in general

    • All that the ResourceManager does is negotiate between the cluster-manager, the JobManager, and the TaskManagers. Its state can hence be reconstructed from re-acquiring containers and re-registration from JobManagers and TaskManagers
    • Note that certain specialization (for example for Mesos or Yarn) may still persist cluster-manager-specific state, if that is required.
  • JobManager 可以在 ResourceManager 處登記。登記后的 JM 可以收到 TM 及其 slots failover 的通知

如果已登記的 TM 有可用的 slots,(2)(3)可忽略。

YarnResourceManager 是 yarn 模式下 RM 的實現類。從其 initialize() 方法可知,它會創建 resourceManagerClient 和 nodeManagerClient,這兩個客戶端分別包含了Yarn框架的AMRMClientAsync和NMClient,分別用來負責和Yarn的ResourceManager和NodeManager通信。

TaskManager

TaskManagers are both in contact with the ResourceManager and JobManager:

ResourceManager interaction

  • The TaskManager initially registers at the ResourceManager. A disconnect from the ResourceManager simply results in re-tying to register and advertise the currently available slots.
  • With each heartbeat, the TaskManager also transmits its slot availability.
  • The ResourceManager may tell the TaskManager to give a slot to a specific JobManager, and the TaskManager will offer that slot to the JobManager.
  • The ResouceManager may tell the TaskManager to shut down (exit the process)

JobManager interaction

  • The TaskManager offers a slot to a JobManager at the ResourceManager’s behest. That slot is then tied to that JobManager until the JobManager releases the slot.
  • The TaskManager watches all JobManagers to which it has offered slots. Loss of connection to the JobManager results in triggering master-failure recovery (currently: cancel all tasks form that master)
  • JobManagers can deploy tasks only into slots they allocated.
  • Upon loss of connection to the JobManager, the TaskManager will try to re-register the slots at the new JobManager for that job (retrieved via the HA leader lookup). After a moderate timeout period, it releases the slots and makes them available again. If a backup JobManager does not take over within that period, it will have to re-request the slots from the ResourceManager.

YARN

Compared to the state in Flink 1.1, the new Flink-on-YARN architecture offers the following benefits:

  • The client directly starts the Job in YARN, rather than bootstrapping a cluster and after that submitting the job to that cluster. The client can hence disconnect immediately after the job was submitted
  • All user code libraries and config files are directly in the Application Classpath, rather than in the dynamic user code class loader
  • Containers are requested as needed and will be released when not used any more
  • The “as needed” allocation of containers allows for different profiles of containers (CPU / memory) to be used for different operators

Yarn-specific Fault Tolerance Aspects

ResourceManager and JobManager run inside the ApplicationMaster process. Failure detection and restart of that process is done by YARN.

JobGraph and libraries are always part of the working directory from which the ApplicationMaster processes is spawned. Internally, YARN stores them in a private HDFS directory.

在 Yarn 模式下,dispatcher 是可選的。

Without dispatcher

With dispatcher

Dispatcher

The Dispatcher component is responsible for receiving job submissions, persisting them, spawning JobManagers to execute the jobs and to recover them in case of a master failure. Furthermore, it knows about the state of the Flink session cluster.

The dispatcher is introduced because:

  • Some cluster managers need a central job spawning and monitoring instance
  • It subsumes the role of the standalone JobManager, waiting for jobs to be submitted

In some setups, the dispatcher is optional (YARN) or not applicable (Kubernetes).

In the future run, the dispatcher will also help with the following aspects:

  • The dispatcher is a cross-job service that can run a long-lived web dashboard
  • Future versions of the dispatcher should receive only HTTP calls and thus can act as a bridge in firewalled clusters
  • The dispatcher never executes code and can thus be viewed as a trusted process. It can run with higher privileges (superuser credentials) and spawn jobs on behalf of other users (acquiring their authentication tokens). Building on that, the dispatcher can manage user authentications

在源碼中,MiniDispatcher 由 JobClusterEntrypoint 實例化,它只針對一個 jobGrpah。如果執行模式是 DETACHED,那么 MiniDispatcher 會在 job 完成后終止。StandaloneDispatcher 對應 session 模式。

JobMaster

JobMaster 是負責單個 JobGraph 的執行的。JobManager 是老的 runtime 框架,1.7版本依然存在,但主要起作用的應該是 JobMaster。在1.8后,JobManager 類消失了。

JM 的主要執行在本節最后的源碼分析有提及。

YarnTaskExecutorRunner

TaskExecutor 在 yarn 集群中的對象,相當於 TaskManager,它可能有多個 slots,每個 slot 執行一個具體的子任務。每個 TaskExecutor 會將自己的 slots 注冊到 SlotManager 上,並匯報自己的狀態,是忙碌狀態,還是處於一個閑置的狀態。

main -> {
    SignalHandler.register(LOG);
    run(args) -> {
        TaskManagerRunner.runTaskManager -> {
            new TaskManagerRunner -> startTaskManager -> {
                創建 TaskManagerServices,實例化時創建 TaskSlot
                new TaskExecutor 
            }
            taskManagerRunner.start() -> {
                連接 RM:resourceManagerLeaderRetriever.start()
                告訴 task slot 誰負責 task slot 操作:taskSlotTable.start(new SlotActionsImpl());
                jobLeaderService.start();
                startRegistrationTimeout();
            }
        }
    }
}
taskmanagerservice 

源碼分析

Dispatcher 的啟動
Dispatcher.start() -> leaderElectionService.start() ZooKeeperLeaderElectionService的

Dispatcher 接收 client 的 submitjob
RedirectHandler.channelRead0(),一個netty對象-> AbstractHandler.respondAsLeader() -> respondToRequest -> JobSubmitHandler.handleRequest() -> gateway.submitJob(),即 Dispatcher 的方法 -> persistAndRunJob() -> runJob -> createJobManagerRunner(jobGraph){
  jobManagerRunnerFactory.createJobManagerRunner -> {
	創建DefaultJobMasterServiceFactory
	登記libraryCacheManager.registerJob
	啟動(未start)haServices.getJobManagerLeaderElectionService
	啟動jobMasterFactory.createJobMasterService{
        實例化JobMaster{ JobMaster.createAndRestoreExecutionGraph },JM負責一個jobGraph的執行
	}
  } 
  dispatcher.startJobManagerRunner -> {
    jobManagerRunner.start() -> ZooKeeperLeaderElectionService.start -> isLeader -> JobManagerRunner.grantLeadership -> verifyJobSchedulingStatusAndStartJobManager -> startJobMaster -> JobMaster.start -> startJobExecution -> {
  startJobMasterServices:包括slotPool和scheduler的啟動,告知flinkresourceManager leader的地址,當FlinkRM和JM建立好連接后,slot就可以開始requesting slots
  // 執行job
  resetAndScheduleExecutionGraph -> {
      createAndRestoreExecutionGraph -> scheduleExecutionGraph -> executionGraph.scheduleForExecution() -> scheduleEager -> {
          給 Execution 分配 slots: ExecutionJobVertex.allocateResourcesForAll -> Execution.allocateAndAssignSlotForExecution -> ProviderAndOwner.allocateSlot -> SlotPool.allocateSlot -> {
              if task.getSlotSharingGroupId() == null {
	          	  return allocateSingleSlot() -> return SingleLogicalSlot
	          } else {
	          	  return allocateSharedSlot() -> {
                      if (task.getCoLocationConstraint() != null) {
                          return allocateCoLocatedMultiTaskSlot()
                      }
                      else {
                          return allocateMultiTaskSlot() -> {
                              有已經處理完的(被分配后完成job執行的) slot:multiTaskSlotLocality
                              if multiTaskSlotLocality != null && slot 是本地的 {
                                  return multiTaskSlotLocality
                              }
                              available slots: polledSlotAndLocality   
                              if polledSlotAndLocality != null && (polledSlotAndLocality.getLocality() == Locality.LOCAL || multiTaskSlotLocality == null{
                                  allocatedSlot.tryAssignPayload(multiTaskSlot) // 嘗試,成功就返回,失敗就往下走
                              }
                              
                              if multiTaskSlotLocality != null {
                                  return multiTaskSlotLocality
                              }
                              
                              if (allowQueuedScheduling) { // 如果允許排隊等候
                                  檢查所有未處理完的 slot 是否可用
                                  如果沒有,向 RM 申請: requestNewAllocatedSlot -> requestSlotFromResourceManager -> resourceManagerGateway.requestSlot -> slotManager.registerSlotRequest() -> internalRequestSlot() -> allocateResource -> resourceActions.allocateResource(),ResourceActionsImpl的 -> YarnResourceManager.startNewWorker -> requestYarnContainer(),即申請 TM
                              }
                              else {
                                  不限 locality,只要 SlotSharingManager 有 slot 就返回它
                              }
                          }
                      }
	          	  }
	          }
          }
          遍歷 execution,調用其 deploy 方法 -> {
              檢查被分配的 Target slot (TaskManager) 是否還存活等一些檢查
              vertex.createDeploymentDescriptor()
              slot.getTaskManagerGateway(); // 實際上是 RpcTaskManagerGateway -> taskExecutorGateway
              taskManagerGateway.submitTask(); -> taskExecutorGateway.submitTask() -> {
                檢查 JM連接、JM id、slots
                通過 BlobServer 下載 用戶jar文件
                new Task
                task.startTaskThread(); // 至此,任務真正執行
              }
          }
      }
    } 
  }
}

flink源碼閱讀經驗總結

  1. 快捷鍵(基於mac)
    • 順推:control + 左鍵
    • 逆推:選中方法或類名后 shift + command + g
  2. 給代碼分塊,就像閱讀文章的分段,並先看該“代碼塊”的注釋或者條件判斷,就像看文章段落開頭的概括句字。exception、check等通常可略過。
  3. 每次轉跳做好記錄,如 A -> B,方便回溯。
  4. 注意return,尤其有一個以上時,分析好它的條件,避免之后跟錯。
  5. 順推:
    • 一些實例化函數可能含有重要的的邏輯實現(盡管這不是好的做法),例如上面flink執行中提到的createJobManagerRunner中還包含其他對象的實例化,這些實例化中的JobMaster構造方法還有createAndRestoreExecutionGraph。
    • 當跟蹤到抽象時,盡量回頭查看調用該方法的實例對應該抽象的哪個實現。
  6. 逆推:
    • 只關注相關的包,有test修飾的基本可以跳過
  7. 先編譯flink再看源碼更方便
  8. 要關注官方的設計文檔,比如flink的Flink Improvement Proposals,里面的FLIP6介紹了新的flink調度模型

參考:

flink on yarn部分源碼解析

追源索驥:透過源碼看懂Flink核心框架的執行流程


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM