Flink源碼解析(四)——從Flink集群部署和任務提交模式看Flink任務的核心組件


0. 說明

  • 本文基於Flink 1.12;
  • 本文是在閱讀源碼過程結合自己理解所寫,不一定正確,歡迎大伙留言指出;

1. 集群部署

1.1. 部署方式

Flink集群部署可以簡要的分為以下兩種方式:

  • 直接部署在服務器上(物理機、Kubernetes、docker等);
  • 結合其他資源調度框架,如on Yarn、Mesos;

1.2. 部署在服務器上

常見於standalone模式。standalone模式是一種獨立的集群模式,可以不依賴外部資源調度系統直接運行,所需資源是直接基於服務器的,由管理員手工啟動。基於Kubernetes、docker的部署方式,其本質是將JobManager和TaskManager docker化,集群本身還是standalone的。
standalone模式可細分為:HA模式和非HA模式。HA模式需要注意:HA必須依賴於共享存儲文件系統,要保證JobManager的元數據信息對所有節點共享。

1.3. 結合其他資源調度框架

Flink集群所需資源是向調度框架申請,調度框架的資源則是基於物理資源。和直接基於物理機得standalone模式相比,基於資源調度框架更具有靈活性,資源也可以復用。此時,任務的可靠性也是通過資源調度框架來實現的。

2.任務提交模式

2.1 session模式

在該模式,多個任務共享一個提前創建好的Flink集群,整個集群的資源在session集群啟動時已固定了。常見的yarn-session模式,就是在先基於yarn啟動一個Flink集群,然后向該集群提交任務。在集群的standalone模式下,通常提交任務也是session模式,只不過整個集群是由管理員手工啟動。

2.2 Per-job模式

在該模式,每個任務在提交時都會創建各自集群,TM、JM的內存分配在啟動任務的時候可以根據需求制定。任務單獨啟動啟動、恢復都是單獨執行的,任務恢復速度快。

2.2 Application模式

在該模式,每個任務在提交時都會創建一個集群。該模式和其他模式的區別在於:其他模式下,任務的main()方法都是在client側運行,會在client側生成JobGraphs,並將JobGraph提交到集群中,可能對client機器的資源消耗過多;Application模式,則在JobManager上運行application的main()函數,可節省資源。

3. Flink任務涉及核心組件

簡單來說,集群部署模式的不同,在於Flink的啟動是由管理員手動啟動還是有資源調度框架啟動;任務提交模式的不同,在於任務與集群的關系或是任務main函數執行的位置不一樣。這些變化中,任務涉及到的核心組件時相同:JobMaster、ResourceManager、TaskExecutor,所以弄清這些核心組件有利於我們在變化中找到不變的。

3.1. JobMaster

JobMaster是負責單個任務的執行。與之容易混淆的是JobManager,JobManager是一個抽象的概念,其作為Flink集群的manager是一個單獨的進程(JVM),由一些service組成的(主要是Dispatcher、JobMaster、ResourceManager)。JobMaster僅是JobManager中的一個組件,JobManager為JobMaster提供任務資源、jar存儲的等服務。值得注意的是,JobMaster在是由JobManagerRunnerImpl啟動的。

  • JobGraph
    JobGraph的生成是和任務提交的模式相關的,如session模式JobGraph是在client側生成后提交到JobManager。JobMaster啟動之后,JobGraph會被轉換成ExecutionGraph,ExecutionGraph是JobGraph的並行版,兩者的區別在此不詳細展開。
  • SlotPool
    slotPool是JobMaster中管理資源的組件,在分配資源時會先在slotPool中分配,分配slot的策略有兩種:按照位置優先分配;按照之前已分配的slot分配。若從slotPool申請不到slot,則將請求緩存起來,等連接上ResourceManager獲取slot之后再分配slot。
  • SchedulerNG
    SchedulerNG會在JobMaster啟動時啟動,主要負責ExecutionGraph的執行。涉及SchedulerNG主要流程如下:
JobMaster#start()->startScheduling()
|
SchedulerBase#startScheduling()
|
|  //啟動所有的coordinator
|  --startAllOperatorCoordinators()  
|  //根據部署策略(lazy、Eager)分配資源啟動ExecutionGraph
|  --startSchedulingInternal()  

其中,OperatorCoordinator代表的是runtime operators,其運行在JobMaster中,一個OperatorCoordinator對應的是一個operator的Job vertex,其和operators的交互是通過operator event。主要負責subTask的重啟、失敗等,以及operator的checkpoint行為。

  • HeartbeatServices
    主要是為JobMaster和TaskManager、ResourceManager之前的心跳提供服務。
  • HighAvailabilityServices
    HighAvailabilityServices在JobMaster中主要是獲取ResourceManager信息、checkpoint信息。
  • BackPressureStatsTracker
    backPressure是指當一個operator的處理速度小於上游下發的速度,數據就會在input buffer里出現積壓,當buffer滿了,數據就會無處可放,Flink將這種情況稱為backPressure。Dispatcher通過JobMaster的BackPressureStatsTracker對每個TM的subTask做跟蹤。涉及的流程如下:
Dispatcher#構造函數  //在集群啟動時已生成Dispatcher實例
|
|  //從配置文件獲取tracker的參數配置
JobManagerSharedServices#fromConfiguration()

判斷一個operator是否處於backpressure狀態可以看Task#isBackPressured()方法。

  • LeaderRetrievalService
    LeaderRetrievalService獲取當前服務的leader,在JobMaster中,LeaderRetrievalService是負責與ResourceManager鏈接,然后JobMaster會向ResourceManager注冊。

3.2 ResourceManager

ResourceManager是Flink集群內部管理資源(slot)的組件。TaskManager向其提供slot,JobMaster向其請求slot執行任務。與此同時,RM會JobMaster、TaskManager保持心跳,其是心跳請求的發起方,當JobMaster、TaskManager失敗的時候會采取相應的對策。

  • ResourceManager的啟動
    RM的的啟動時在JobManager的過程啟動的,JobManager的啟動入口是ClusuterEntryPoint,過程如下:
  ClusuterEntryPoint#startCluster
  |
  | //在該過程中會啟動Dispatcher、ResourceManager、 WebMonitorEndpoint
  DefaultDispatcherResourceManagerComponentFactory#create
  |
  RpcEndpoint#start

整個過程如下:

  • 在啟動JobManager時,會啟動haServices相關的服務;
  • 在DefaultDispatcherResourceManagerComponentFactory#create()方法中,會先通過HA服務獲取leader節點信息;
  • 通過RpcEndpoint#start啟動RM,以便和其他組件交互;
  • RM中核心組件
    RM中如LeaderElectionService 和JobMaster中的作用是相近的,這里不詳細展開。
    • slotManager
      slotManager維護Flink 集群中一張slot視圖,包括:所有注冊、分配的slot以及待滿足的slot請求(JobMaster中slotPool不能滿足任務需求時發起的slot請求)。為了釋放資源、防止內存泄漏,空閑的TaskManager將會被釋放、超時的slot請求會請求失敗。
      slotManager在slot請求過程的角色如下,整個過程大致分為以下幾個步驟:

      • 過程1:TM向RM注冊后,向RM上報slot信息,slot信息被保存在RM的slotManager中;
      • 過程2:JM會首先向slotPool請求slot,若能,則直接提交任務;若不能滿足,則通過slotPool向RM請求slot資源;
      • 過程3:若是RM中的slotManager的slot能滿足JM的請求,則會向TM發起RPC請求申請對應的slot,TM中TaskSlotTable會把slot信息以slotoffer提供JM的slotPool(過程4);若是不能滿足,則RM會向更底層的系統獲取資源。

3.3 TaskManager

TaskManager與TaskExecutor的關系類似於JobManager與JobMaster。TaskExecutor負責task的執行。
TaskExecutor中的組件有與RM、JM交互服務,如TaskExecutorToResourceManagerConnection、resourceManagerHeartbeatManager等;有與JM交互的服務,jobManagerHeartbeatManager,這些組件功能與RM和JM中的類似,在這里會嘗試分析TaskSlotTable、JobTable、KvStateService等。

  • TaskManager的啟動過程:
    TaskManager的啟動入口在TaskManagerRunner。
  TaskManagerRunner#main
  |
  |-- runTaskManager
  |  //該過程會啟動TaskManager
  |-- createTaskExecutorService
  |  //該過程會從配置中初始化一些服務如:TaskManagerServices、KvStateService,
  |  //返回一個初始化的TaskExecutor,在初始化的過程中會啟動相應的RPC endpoint
  |-- startTaskManager
  |---- TaskManagerServices.fromConfiguration  //可以重點看看
  |-- TaskExecutorService#start

這里我們會着重分析一下runTaskManager:

  • createTaskExecutorService->startTaskManager的過程中會從獲取配置參數,最后會new TaskExecutor時通過調用父類的構造方法啟動相應的RPC endpoint。在這個過程,我們可以看到taskManagerRunner是通過TaskExecutorToServiceAdapter.createFor(),僅僅是一個適配器,其本質還是TaskExecutor。
  • TaskExecutorService#start會逐步調用RpcEndpoint的start()方法,其RpcServer就是在上一步中初始化的。這里會逐步調用到TaskExecutor#onStart()。
  • TaskSlotTable
    TaskSlotTable從不同的維度維護slot信息,如JobID和slot的關系,AllocationID與slot的關系,其具體存儲的方式是map,這樣根據不同的key就可以很快的獲取到slot信息。其中,對於那些已分配但是無法分配到具體JobManage的slot會啟動一個定時任務,若是超時會釋放slot以免內存泄漏。

    • TaskSlotTable的初始化過程是在TaskManagerServices#fromConfiguration方法中被初始化的;
    • TaskSlotTable的啟動時在TaskExecutor#startTaskExecutorServices中,這里對slot明確了free和負責timeout的線程。
      TaskSlotTable在slot整個流轉中的作用見上文。
  • KvStateService
    KvState的注冊服務,其啟動過程TaskManagerServices#fromConfiguration方法中,具體過程如下:

public static TaskManagerServices fromConfiguration(){
  //......
  //從配置文件構造KvStateService
    final KvStateService kvStateService =
            KvStateService.fromConfiguration(taskManagerServicesConfiguration);
    kvStateService.start();
  //......
}

// 上面start()方法,最終會調用AbstractServerBase#start,該方法中要是bind到指定地址上。關鍵步驟如下:
private boolean attemptToBind(final int port) throws Throwable {
        log.debug("Attempting to start {} on port {}.", serverName, port);

        this.queryExecutor = createQueryExecutor();
        this.handler = initializeHandler();

        final NettyBufferPool bufferPool = new NettyBufferPool(numEventLoopThreads);

        final ThreadFactory threadFactory =
                new ThreadFactoryBuilder()
                        .setDaemon(true)
                        .setNameFormat("Flink " + serverName + " EventLoop Thread %d")
                        .build();

        final NioEventLoopGroup nioGroup =
                new NioEventLoopGroup(numEventLoopThreads, threadFactory);

        this.bootstrap =
                new ServerBootstrap()
                        .localAddress(bindAddress, port)
                        .group(nioGroup)
                        .channel(NioServerSocketChannel.class)
                        .option(ChannelOption.ALLOCATOR, bufferPool)
                        .childOption(ChannelOption.ALLOCATOR, bufferPool)
                        .childHandler(new ServerChannelInitializer<>(handler));

        final int defaultHighWaterMark = 64 * 1024; // from DefaultChannelConfig (not exposed)
        //noinspection ConstantConditions
        // (ignore warning here to make this flexible in case the configuration values change)
        if (LOW_WATER_MARK > defaultHighWaterMark) {
            bootstrap.childOption(ChannelOption.WRITE_BUFFER_HIGH_WATER_MARK, HIGH_WATER_MARK);
            bootstrap.childOption(ChannelOption.WRITE_BUFFER_LOW_WATER_MARK, LOW_WATER_MARK);
        } else { // including (newHighWaterMark < defaultLowWaterMark)
            bootstrap.childOption(ChannelOption.WRITE_BUFFER_LOW_WATER_MARK, LOW_WATER_MARK);
            bootstrap.childOption(ChannelOption.WRITE_BUFFER_HIGH_WATER_MARK, HIGH_WATER_MARK);
        }

        try {
            final ChannelFuture future = bootstrap.bind().sync();
            if (future.isSuccess()) {
                final InetSocketAddress localAddress =
                        (InetSocketAddress) future.channel().localAddress();
                serverAddress =
                        new InetSocketAddress(localAddress.getAddress(), localAddress.getPort());
                return true;
            }

            // the following throw is to bypass Netty's "optimization magic"
            // and catch the bind exception.
            // the exception is thrown by the sync() call above.

            throw future.cause();
        } catch (BindException e) {
            //.....
        }
        // any other type of exception we let it bubble up.
        return false;
    }


  • JobTable
    JobTable的任務是用來管理一個Job在TaskExecutor上的生命周期。其主要是維護了兩個Map:JobID與Job/Connection、ResourceId與JobID。其中Job接口反映了job和JobMaster的connect。
    JobTable的初始化過程也是在TaskManagerServices#fromConfiguration中,在此僅僅是初始了默認的DefaultJobTable。
    JobTable的初始過程在RM向TM請求slot的過程中初始化的,具體過程如下:
 @Override
    public CompletableFuture<Acknowledge> requestSlot(
            final SlotID slotId,
            final JobID jobId,
            final AllocationID allocationId,
            final ResourceProfile resourceProfile,
            final String targetAddress,
            final ResourceManagerId resourceManagerId,
            final Time timeout) {
        // TODO: Filter invalid requests from the resource manager by using the
        // instance/registration Id

        log.info(
                "Receive slot request {} for job {} from resource manager with leader id {}.",
                allocationId,
                jobId,
                resourceManagerId);

        if (!isConnectedToResourceManager(resourceManagerId)) {
            final String message =
                    String.format(
                            "TaskManager is not connected to the resource manager %s.",
                            resourceManagerId);
            log.debug(message);
            return FutureUtils.completedExceptionally(new TaskManagerException(message));
        }

        try {
            allocateSlot(slotId, jobId, allocationId, resourceProfile);
        } catch (SlotAllocationException sae) {
            return FutureUtils.completedExceptionally(sae);
        }

        final JobTable.Job job;

        try {
          //初始化job
            job =
                    jobTable.getOrCreateJob(
                            jobId, () -> registerNewJobAndCreateServices(jobId, targetAddress));
        } catch (Exception e) {
            // free the allocated slot
            try {
                taskSlotTable.freeSlot(allocationId);
            } catch (SlotNotFoundException slotNotFoundException) {
                // slot no longer existent, this should actually never happen, because we've
                // just allocated the slot. So let's fail hard in this case!
                onFatalError(slotNotFoundException);
            }

            // release local state under the allocation id.
            localStateStoresManager.releaseLocalStateForAllocationId(allocationId);

            // sanity check
            if (!taskSlotTable.isSlotFree(slotId.getSlotNumber())) {
                onFatalError(new Exception("Could not free slot " + slotId));
            }

            return FutureUtils.completedExceptionally(
                    new SlotAllocationException("Could not create new job.", e));
        }

        if (job.isConnected()) {
            offerSlotsToJobManager(jobId);
        }

        return CompletableFuture.completedFuture(Acknowledge.get());
    }

4. 參考


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM