【源碼】Flink 三層圖結構 —— JobGraph 生成過程


注: 所有內容都基於Flink 本地模式

JobGraph 的生成是從 LocalExecutor.java. execute 方法開始的

// 本地執行調用  Pipeline 是 StreamGraph 的父類
@Override
public CompletableFuture<JobClient> execute(Pipeline pipeline, Configuration configuration) throws Exception {
    checkNotNull(pipeline);
    checkNotNull(configuration);

    Configuration effectiveConfig = new Configuration();
    effectiveConfig.addAll(this.configuration);
    effectiveConfig.addAll(configuration);

    // we only support attached execution with the local executor.
    checkState(configuration.getBoolean(DeploymentOptions.ATTACHED));
    // 生成 jobGraph,傳入 StreamGraph、 配置
    final JobGraph jobGraph = getJobGraph(pipeline, effectiveConfig);

    return PerJobMiniClusterFactory.createWithFactory(effectiveConfig, miniClusterFactory).submitJob(jobGraph);
}

pipeline 即使剛生成的 StreamGraph, configuration 即是啟動配置。

pipeline 內容如下圖,主要包含 StreamNode 和其他配置

effectiveConfig 配置: {execution.attached=true, execution.target=local} 即本地模式

getJobGraph 方法

private JobGraph getJobGraph(Pipeline pipeline, Configuration configuration) throws MalformedURLException {
    // This is a quirk in how LocalEnvironment used to work. It sets the default parallelism
    // to <num taskmanagers> * <num task slots>. Might be questionable but we keep the behaviour
    // for now.
    // 如果是 batch
    if (pipeline instanceof Plan) {
        Plan plan = (Plan) pipeline;
        final int slotsPerTaskManager = configuration.getInteger(
                TaskManagerOptions.NUM_TASK_SLOTS, plan.getMaximumParallelism());
        final int numTaskManagers = configuration.getInteger(
                ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 1);

        plan.setDefaultParallelism(slotsPerTaskManager * numTaskManagers);
    }
    // 生成 JobGraph
    return PipelineExecutorUtils.getJobGraph(pipeline, configuration);
}

PipelineExecutorUtils: 作業執行相關的方法的工具類。

注: 從 StreamGraph 和 JobGraph 兩個 Graph 生成類的命令看,好像是兩撥人搞的,不過 JobGraph 是通用的,StreamGraph 則綁定在 Environment 上

PipelineExecutorUtils.java.getJobGraph

public static JobGraph getJobGraph(@Nonnull final Pipeline pipeline, @Nonnull final Configuration configuration) throws MalformedURLException {
    checkNotNull(pipeline);
    checkNotNull(configuration);

    // 默認配置參數
    final ExecutionConfigAccessor executionConfigAccessor = ExecutionConfigAccessor.fromConfiguration(configuration);
    // jobGraph
    final JobGraph jobGraph = FlinkPipelineTranslationUtil
        // 默認並行度 1
            .getJobGraph(pipeline, configuration, executionConfigAccessor.getParallelism());

    configuration
            .getOptional(PipelineOptionsInternal.PIPELINE_FIXED_JOB_ID)
            .ifPresent(strJobID -> jobGraph.setJobID(JobID.fromHexString(strJobID)));
    // 設置空的默認配置
    jobGraph.addJars(executionConfigAccessor.getJars());
    jobGraph.setClasspaths(executionConfigAccessor.getClasspaths());
    jobGraph.setSavepointRestoreSettings(executionConfigAccessor.getSavepointRestoreSettings());

    return jobGraph;
}

又到了 FlinkPipelineTranslationUtil.getJobGraph 多傳了個 默認並行度

public static JobGraph getJobGraph(
            Pipeline pipeline,
            Configuration optimizerConfiguration,
            int defaultParallelism) {

    // 獲取程序的 Translator stream or batch
    FlinkPipelineTranslator pipelineTranslator = getPipelineTranslator(pipeline);
    // 使用 pipelineTranslator translate JobGraph
    return pipelineTranslator.translateToJobGraph(pipeline,
            optimizerConfiguration,
            defaultParallelism);
}

getPipelineTranslator 比較粗暴了

// 創建另個 Translator, 如果可以 Translate 就返回對應的 Translator,不行就抱錯
private static FlinkPipelineTranslator getPipelineTranslator(Pipeline pipeline) {
    // DataSet 的 PlanTranslator
    PlanTranslator planTranslator = new PlanTranslator();
    // pipeline instanceof Plan
    if (planTranslator.canTranslate(pipeline)) {
        return planTranslator;
    }
    // Stream 的 Translator
    StreamGraphTranslator streamGraphTranslator = new StreamGraphTranslator();
    // pipeline instanceof StreamGraph
    if (streamGraphTranslator.canTranslate(pipeline)) {
        return streamGraphTranslator;
    }

    throw new RuntimeException("Translator " + streamGraphTranslator + " cannot translate "
        + "the given pipeline " + pipeline + ".");
}

batch 的當然是直接跳過,看下 StreamGraphTranslator 的 translateToJobGraph 方法

public JobGraph translateToJobGraph(
            Pipeline pipeline,
            Configuration optimizerConfiguration,
            int defaultParallelism) {
        checkArgument(pipeline instanceof StreamGraph,
                "Given pipeline is not a DataStream StreamGraph.");

    // pipeline 轉回 子類 StreamGraph
    StreamGraph streamGraph = (StreamGraph) pipeline;
    // 使用 streamGraph 生成 JobGraph (又回到 StreamGraph 類了)
    return streamGraph.getJobGraph(null);
}

StreamGraph.java

public JobGraph getJobGraph(@Nullable JobID jobID) {
    return StreamingJobGraphGenerator.createJobGraph(this, jobID);
}

StreamingJobGraphGenerator.java (熟悉的命令)

public static JobGraph createJobGraph(StreamGraph streamGraph, @Nullable JobID jobID) {
    // 創建 StreamingJobGraphGenerator 隨后生成 JobGraph
    return new StreamingJobGraphGenerator(streamGraph, jobID).createJobGraph();
}

StreamingJobGraphGenerator.java 終於把 jobGraph 的對象創建出來了( jobID 是 Null 的)

private StreamingJobGraphGenerator(StreamGraph streamGraph, @Nullable JobID jobID) {
    this.streamGraph = streamGraph;
    this.defaultStreamGraphHasher = new StreamGraphHasherV2();
    this.legacyStreamGraphHashers = Arrays.asList(new StreamGraphUserHashHasher());

    this.jobVertices = new HashMap<>();
    this.builtVertices = new HashSet<>();
    this.chainedConfigs = new HashMap<>();
    this.vertexConfigs = new HashMap<>();
    this.chainedNames = new HashMap<>();
    this.chainedMinResources = new HashMap<>();
    this.chainedPreferredResources = new HashMap<>();
    this.chainedInputOutputFormats = new HashMap<>();
    this.physicalEdgesInOrder = new ArrayList<>();

    jobGraph = new JobGraph(jobID, streamGraph.getJobName());
}

JobGraph.java 構造方法

public JobGraph(JobID jobId, String jobName) {
    this.jobID = jobId == null ? new JobID() : jobId;
    this.jobName = jobName == null ? "(unnamed job)" : jobName;

    try {
        setExecutionConfig(new ExecutionConfig());
    } catch (IOException e) {
        // this should never happen, since an empty execution config is always serializable
        throw new RuntimeException("bug, empty execution config is not serializable");
    }
}

StreamingJobGraphGenerator.java.createJobGraph 方法生成 jobGraph


JobGraph 的核心方法,做了如下內容:
1、檢驗 checkpoint 配置
2、啟動模式
3、生成一致性 hash
4、核心方法,構造算子 chain, 生成算子名,設置資源, chain 算子,生成 JobVetex
5、添加物理的邊
6、設置 sharingGroup 和 coLocation
7、設置內存比例
8、配置 checkpoint
9、添加 user Artifact
10、把 streamGrpaht 的 配置 放到 jobGraph 中

private JobGraph createJobGraph() {
    // 檢驗 checkpoint 配置
    preValidate();

    // make sure that all vertices start immediately
    // stream 模式 啟動時 所有 task 全部啟動
    jobGraph.setScheduleMode(streamGraph.getScheduleMode());

    // Generate deterministic hashes for the nodes in order to identify them across
    // submission iff they didn't change.
    // 為節點生成確定性哈希,以便在提交不變的情況下在提交時對其進行標識。
    // 每個 streamNode id 的 hash 數組
    Map<Integer, byte[]> hashes = defaultStreamGraphHasher.traverseStreamGraphAndGenerateHashes(streamGraph);

    // Generate legacy version hashes for backwards compatibility
    // 生成舊版本哈希以向后兼容
    List<Map<Integer, byte[]>> legacyHashes = new ArrayList<>(legacyStreamGraphHashers.size());
    for (StreamGraphHasher hasher : legacyStreamGraphHashers) {
        legacyHashes.add(hasher.traverseStreamGraphAndGenerateHashes(streamGraph));
    }

    // 核心方法,構造 JobVetex
    setChaining(hashes, legacyHashes);

    // 添加物理的邊
    setPhysicalEdges();
    // 設置 sharingGroup 和  coLocation
    setSlotSharingAndCoLocation();
    // 設置內存比例
    setManagedMemoryFraction(
        Collections.unmodifiableMap(jobVertices),
        Collections.unmodifiableMap(vertexConfigs),
        Collections.unmodifiableMap(chainedConfigs),
        id -> streamGraph.getStreamNode(id).getMinResources(),
        id -> streamGraph.getStreamNode(id).getManagedMemoryWeight());
    // 配置 checkpoint
    configureCheckpointing();

    // 配置 savepoint 策略
    jobGraph.setSavepointRestoreSettings(streamGraph.getSavepointRestoreSettings());

    // 添加 user Artifact
    JobGraphUtils.addUserArtifactEntries(streamGraph.getUserArtifacts(), jobGraph);

    // set the ExecutionConfig last when it has been finalized
    try {
        // 把 streamGrpaht 的 配置 放到  jobGraph 中
        jobGraph.setExecutionConfig(streamGraph.getExecutionConfig());
    } catch (IOException e) {
        throw new IllegalConfigurationException("Could not serialize the ExecutionConfig." +
            "This indicates that non-serializable types (like custom serializers) were registered");
    }

    return jobGraph;
}

設置算子 chian 是 生成 JobGraph 的核心方法,從 source 節點開始,依次往下游遍歷,遞歸生成算子 chain

private void setChaining(Map<Integer, byte[]> hashes, List<Map<Integer, byte[]>> legacyHashes) {
    // 每個source 創建一個 chain
    for (Integer sourceNodeId : streamGraph.getSourceIDs()) {
        // sourceId  chainIndex  OpeartorChainInfo : SourceNodeId heades legacyHashed? streamGraph
        createChain(
            sourceNodeId,
            // source Node 位於 chain 的 0 位置
            0,
            new OperatorChainInfo(sourceNodeId, hashes, legacyHashes, streamGraph));
    }
}

// 創建一個 chain,遞歸方法: 如果chain 斷了,又重新創建一個
private List<StreamEdge> createChain(Integer currentNodeId, int chainIndex, OperatorChainInfo chainInfo) {
    // startNodeId
    Integer startNodeId = chainInfo.getStartNodeId();
    // 已經創建的 就返回個空的
    if (!builtVertices.contains(startNodeId)) {
        // 遍歷過的列表
        List<StreamEdge> transitiveOutEdges = new ArrayList<StreamEdge>();
        // chain 在一起的輸出
        List<StreamEdge> chainableOutputs = new ArrayList<StreamEdge>();
        // 非 chain 在一起的輸出
        List<StreamEdge> nonChainableOutputs = new ArrayList<StreamEdge>();
        // 當前節點
        StreamNode currentNode = streamGraph.getStreamNode(currentNodeId);
        // 遍歷當前節點的輸出邊,看下游是否可以 chain 起來
        for (StreamEdge outEdge : currentNode.getOutEdges()) {
            if (isChainable(outEdge, streamGraph)) {
                // 可以 chain 的 放到 chainableOutputs 列表
                chainableOutputs.add(outEdge);
            } else {
                // 不可以 chain 的 放到 nonChainableOutputs 列表
                nonChainableOutputs.add(outEdge);
            }
        }
        // 遍歷可以chain 在一起的邊
        for (StreamEdge chainable : chainableOutputs) {
            // 以上游節點的 chainIndex + 1, 遞歸的把所有可以 chain 的下游添加進來
            transitiveOutEdges.addAll(
                createChain(chainable.getTargetId(), chainIndex + 1, chainInfo));
        }
        // 遍歷不可以chain 在一起的邊
        for (StreamEdge nonChainable : nonChainableOutputs) {
            // 添加當前邊到遍歷的列表
            transitiveOutEdges.add(nonChainable);
            // 以當前邊為起點,遞歸的生成新的 chain
            createChain(nonChainable.getTargetId(), 0, chainInfo.newChain(nonChainable.getTargetId()));
        }

        // 設置 chain 的 名字: 拼接 chain 在一起的算子名
        chainedNames.put(currentNodeId, createChainedName(currentNodeId, chainableOutputs));
        // 設置 chain 最小資源: ResourceSpec{UNKNOWN}
        chainedMinResources.put(currentNodeId, createChainedMinResources(currentNodeId, chainableOutputs));
        // 設置 chain 最優資源: ResourceSpec{UNKNOWN}
        chainedPreferredResources.put(currentNodeId, createChainedPreferredResources(currentNodeId, chainableOutputs));

        // 添加 當前節點 到 chain 中
        OperatorID currentOperatorId = chainInfo.addNodeToChain(currentNodeId, chainedNames.get(currentNodeId));
        // 輸入格式
        if (currentNode.getInputFormat() != null) {
            getOrCreateFormatContainer(startNodeId).addInputFormat(currentOperatorId, currentNode.getInputFormat());
        }

        // 生成格式
        if (currentNode.getOutputFormat() != null) {
            getOrCreateFormatContainer(startNodeId).addOutputFormat(currentOperatorId, currentNode.getOutputFormat());
        }

        // 創建 JobVertex, 添加到 jobGraph 的 taskVertices 中
        StreamConfig config = currentNodeId.equals(startNodeId)
            ? createJobVertex(startNodeId, chainInfo)
            : new StreamConfig(new Configuration());
        // 設置 Vertex config
        setVertexConfig(currentNodeId, config, chainableOutputs, nonChainableOutputs);

        // 如果是 chain start 就 connect chain 上的邊
        if (currentNodeId.equals(startNodeId)) {

            config.setChainStart();
            config.setChainIndex(0);
            config.setOperatorName(streamGraph.getStreamNode(currentNodeId).getOperatorName());

            for (StreamEdge edge : transitiveOutEdges) {
                connect(startNodeId, edge);
            }

            config.setOutEdgesInOrder(transitiveOutEdges);
            config.setTransitiveChainedTaskConfigs(chainedConfigs.get(startNodeId));

        } else {
            // 如果不是
            chainedConfigs.computeIfAbsent(startNodeId, k -> new HashMap<Integer, StreamConfig>());
            // 設置 節點在 chain 中的 index
            config.setChainIndex(chainIndex);
            // 獲取當前節點
            StreamNode node = streamGraph.getStreamNode(currentNodeId);
            // 設置節點 operatorName
            config.setOperatorName(node.getOperatorName());
            // 放到 對應 chain 中
            chainedConfigs.get(startNodeId).put(currentNodeId, config);
        }

        // 設置算子 id
        config.setOperatorID(currentOperatorId);

        // 如果 chain 沒有可以chain 在一起的輸出,結束當前 chain
        if (chainableOutputs.isEmpty()) {
            config.setChainEnd();
        }
        return transitiveOutEdges;

    } else {
        return new ArrayList<>();
    }
}

從 source 節點開始,遞歸的生成算子 chain, 先把可以 chain 的遞歸添加進來,遍歷不能 chain 的節點,以它為起點創建個新的算子 chain,繼續遞歸

算子 chain 是JobGraph 的核心內容( 還有 slotSharingGroup )

歡迎關注Flink菜鳥公眾號,會不定期更新Flink(開發技術)相關的推文

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM