Flink整體執行流程


以Flink源碼中自帶的WordCount為例,執行的入口從用戶程序的execute()函數入手,execute()的源碼如下:

 1 public JobExecutionResult execute(String jobName) throws Exception {
 2         StreamGraph streamGraph = getStreamGraph();
 3         streamGraph.setJobName(jobName);
 4         JobGraph jobGraph = streamGraph.getJobGraph();
 5         . . . . . . .
 6         LocalFlinkMiniCluster exec = new LocalFlinkMiniCluster(configuration, true);
 7         try {
 8             exec.start();
 9             return exec.submitJobAndWait(jobGraph, getConfig().isSysoutLoggingEnabled());
10         }
11         finally {
12             transformations.clear();
13             exec.stop();
14         }
15     }

  函數內部主要有getStreamGraph()、getJobGraph()、exec.start()、exec.submitJobAndWait()等。getStreamGraph()的作用是生成StreamGraph圖,getJobGraph()的作用是生成JobGraph的圖,exec.start()的作用是建立Client、JobManager、TaskManager三者之間通信初始化,exec.submitJobAndWait()的作用提交job並且等待job執行后的結果,該函數提供了任務執行調度執行的入口,進入Client類中,首先執行createUserCodeClassLoader()函數,創建用戶代碼的加載器,然后執行jobClient.SubmitJobAndWait(),進入JobClient類,在函數內部會執行submit函數,從該函數開始進入AKKA通信階段,首先會進入JobClientActor,會創建一個jobclientActor來對JobManager和client進行通信,當通信對象創建之后,會執行akka機制的ask函數,該函數的作用是發出一個消息,然后要求收到方給予回復。當消息發出之后,OnReceive()函數會收到actor發出的消息請求,然后調用handleMessage()方法來處理消息請求,該函數內部有connectToJobManager()方法,此方法內部的tryToSubmitJob()函數是正式提交任務的操作,主要做的工作就是uploadUserJars()上傳用戶程序的jar文件,接着會jobManager.tell()向JobManager發出一個submit消息請求。

  當JobManager收到Client發送的消息之后,會執行JobManager內部的submitJob方法,

1 case SubmitJob(jobGraph, listeningBehaviour) =>
2       val client = sender()
3 
4       val jobInfo = new JobInfo(client, listeningBehaviour, System.currentTimeMillis(),
5         jobGraph.getSessionTimeout)
6       log.info("liuzf---開始執行JobManager的submitJob()")
7       submitJob(jobGraph, jobInfo)

 首先會把由client收到的job信息封裝在jobinfo中,然后把jobinfo以及job的任務圖jobGraph一起發送給submit()去執行,在JobManager的submit函數中處理的函數邏輯比較復雜,比較重要的函數執行過程如下:

 1 private def submitJob(jobGraph: JobGraph, jobInfo: JobInfo, isRecovery: Boolean = false): Unit = {
 2         try {
 3           libraryCacheManager.registerJob(jobGraph.getJobID, jobGraph.getUserJarBlobKeys,
 4             jobGraph.getClasspaths)
 5         }
 6         val userCodeLoader = libraryCacheManager.getClassLoader(jobGraph.getJobID)
 7       
 8         }
 9         executionGraph = ExecutionGraphBuilder.buildGraph()
10             try {
11               submittedJobGraphs.putJobGraph(new SubmittedJobGraph(jobGraph, jobInfo))
12           jobInfo.notifyClients(
13             decorateMessage(JobSubmitSuccess(jobGraph.getJobID)))
14             log.info(s"開始調度 job $jobId ($jobName).")
15             executionGraph.scheduleForExecution()

  首先執行libraryCacheManager.registerJob(),向CacheManager進行注冊,請求緩存,然后執行getClassLoader()來加載用戶的代碼加載器,接下來會調用ExecutionGraph中的buildGraph()構造ExecutionGraph的並行化版本的執行圖,當邏輯執行圖構造完畢之后,這時候可以通知Client任務已經成功提交,並且提交過程結束。接下來會調用sheduleForExecution()來會整體的資源進行調度分配,主要是每個TaskManager中的slot的分配,並且當slot分配完成之后,所有的task的任務狀態發生改變,由CREATEDàSCHEDULED。接下分配完之后,接下來執行depolyToSlot()函數,就要進入部署狀態,同樣會執行transitionState()函數,將SCHEDULED狀態變為DEPOLYING狀態,接着的重要函數是shumitTask()函數,該函數會通過AKKA機制,向TaskManager發出一個submitTask的消息請求,TaskManager收到消息請求后,會執行submitTask()方法,該函數的重要執行過程如下:

 1 public submitTask(){
 2      val task = new Task(. . . .)
 3       log.info(s"Received task ${task.getTaskInfo.getTaskNameWithSubtasks()}")
 4       val execId = tdd.getExecutionAttemptId
 5       val prevTask = runningTasks.put(execId, task)
 6       if (prevTask != null) {
 7               runningTasks.put(execId, prevTask)
 8         throw new IllegalStateException("TaskM}anager already contains a task for id " + execId)
 9       }
10       task.startTaskThread()
11       sender ! decorateMessage(Acknowledge.get())
12     }

  首先執行Task的構造函數,生成具體物理執行的相關組件,比如ResultPartition等,最后創建執行Task的線程,然后調用startTaskThread()來啟動具體的執行線程,Task線程內部的run()方法承載了被執行的核心邏輯,該方法具體的內容為:

 

 1 public void run() {
 2         while (true) {
 3             ExecutionState current = this.executionState;
 4             if (current == ExecutionState.CREATED) {
 5                 if (transitionState(ExecutionState.CREATED, ExecutionState.DEPLOYING)) {
 6                     break;
 7                 }
 8             }
 9             invokable = loadAndInstantiateInvokable(userCodeClassLoader, nameOfInvokableClass);
10             network.registerTask(this);
11             Environment env = new RuntimeEnvironment(. . . . );    
12             invokable.setEnvironment(env);
13             // ----------------------------------------------------------------
14             //  actual task core work
15             if (!transitionState(ExecutionState.DEPLOYING, ExecutionState.RUNNING)) {
16             }
17             // notify everyone that we switched to running
18             notifyObservers(ExecutionState.RUNNING, null);
19             executingThread.setContextClassLoader(userCodeClassLoader);
20             // run the invokable
21             invokable.invoke();
22 
23             if (transitionState(ExecutionState.RUNNING, ExecutionState.FINISHED)) {
24                 notifyObservers(ExecutionState.FINISHED, null);
25             }
26             Finally{
27                 // free the network resources
28                 network.unregisterTask(this);
29                 // free memory resources
30                 if (invokable != null) {
31                     memoryManager.releaseAll(invokable);
32                 }
33                 libraryCache.unregisterTask(jobId, executionId);
34                 removeCachedFiles(distributedCacheEntries, fileCache);

  首先執行transitionState()函數將TaskManager的狀態由CREATED轉變為DEPOLYING狀態,然后調用loadAndTrantiateInvokable()對用戶代碼打包成jar包,並且生成用戶代碼加載器,然后執行network.registerTask(),執行該函數之前,會執行NetworkEnvironment的構造函數,該類是TaskManager通信的主對象,主要用於跟蹤中間結果並負責所有的數據交換,在該類中會創建協助通信的關鍵部件,比如網絡緩沖池,連接管理器,結果分區管理器,結果分區可消費通知器等。當網絡對象准備完成后,創建一個運行環境,然后執行invoke.setEnvironment(env),將各種配置打包到運行環境中。

  當運行環境准備之后,接下來到了具體分析任務執行的時候,首先會調用transitionState()函數將任務狀態由DEPOLYING改為RUNNING狀態,然后會調用notifyObservers()通知所有的task觀察者也改變狀態,然后執行setContextClassLoader()將執行的類加載器設置為用戶執行的加載器,然后執行invokable.invoke(),該函數是分界點,執行前用戶邏輯沒有被觸發,執行之后說明用戶邏輯已完成。當執行完成之后,調用transitionState()函數執行的RUNNING狀態改成FINISHED狀態。同樣調用notifyObservers()來通知其他觀察者改變狀態,最后,釋放資源。

總體的函數執行圖如下:因圖片太大------>>>>>

鏈接: https://pan.baidu.com/s/15F1rBAmTSmNrkC8I4GmKnA 密碼: du23

 

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM