注: 所有內容都基於Flink 本地模式
JobGraph 的生成是從 LocalExecutor.java. execute 方法開始的
// 本地執行調用 Pipeline 是 StreamGraph 的父類 @Override public CompletableFuture<JobClient> execute(Pipeline pipeline, Configuration configuration) throws Exception { checkNotNull(pipeline); checkNotNull(configuration); Configuration effectiveConfig = new Configuration(); effectiveConfig.addAll(this.configuration); effectiveConfig.addAll(configuration); // we only support attached execution with the local executor. checkState(configuration.getBoolean(DeploymentOptions.ATTACHED)); // 生成 jobGraph,傳入 StreamGraph、 配置 final JobGraph jobGraph = getJobGraph(pipeline, effectiveConfig); return PerJobMiniClusterFactory.createWithFactory(effectiveConfig, miniClusterFactory).submitJob(jobGraph); }
pipeline 即使剛生成的 StreamGraph, configuration 即是啟動配置。
pipeline 內容如下圖,主要包含 StreamNode 和其他配置

effectiveConfig 配置: {execution.attached=true, execution.target=local} 即本地模式
getJobGraph 方法
private JobGraph getJobGraph(Pipeline pipeline, Configuration configuration) throws MalformedURLException { // This is a quirk in how LocalEnvironment used to work. It sets the default parallelism // to <num taskmanagers> * <num task slots>. Might be questionable but we keep the behaviour // for now. // 如果是 batch if (pipeline instanceof Plan) { Plan plan = (Plan) pipeline; final int slotsPerTaskManager = configuration.getInteger( TaskManagerOptions.NUM_TASK_SLOTS, plan.getMaximumParallelism()); final int numTaskManagers = configuration.getInteger( ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 1); plan.setDefaultParallelism(slotsPerTaskManager * numTaskManagers); } // 生成 JobGraph return PipelineExecutorUtils.getJobGraph(pipeline, configuration); }
PipelineExecutorUtils: 作業執行相關的方法的工具類。
注: 從 StreamGraph 和 JobGraph 兩個 Graph 生成類的命令看,好像是兩撥人搞的,不過 JobGraph 是通用的,StreamGraph 則綁定在 Environment 上
PipelineExecutorUtils.java.getJobGraph
public static JobGraph getJobGraph(@Nonnull final Pipeline pipeline, @Nonnull final Configuration configuration) throws MalformedURLException { checkNotNull(pipeline); checkNotNull(configuration); // 默認配置參數 final ExecutionConfigAccessor executionConfigAccessor = ExecutionConfigAccessor.fromConfiguration(configuration); // jobGraph final JobGraph jobGraph = FlinkPipelineTranslationUtil // 默認並行度 1 .getJobGraph(pipeline, configuration, executionConfigAccessor.getParallelism()); configuration .getOptional(PipelineOptionsInternal.PIPELINE_FIXED_JOB_ID) .ifPresent(strJobID -> jobGraph.setJobID(JobID.fromHexString(strJobID))); // 設置空的默認配置 jobGraph.addJars(executionConfigAccessor.getJars()); jobGraph.setClasspaths(executionConfigAccessor.getClasspaths()); jobGraph.setSavepointRestoreSettings(executionConfigAccessor.getSavepointRestoreSettings()); return jobGraph; }
又到了 FlinkPipelineTranslationUtil.getJobGraph 多傳了個 默認並行度
public static JobGraph getJobGraph( Pipeline pipeline, Configuration optimizerConfiguration, int defaultParallelism) { // 獲取程序的 Translator stream or batch FlinkPipelineTranslator pipelineTranslator = getPipelineTranslator(pipeline); // 使用 pipelineTranslator translate JobGraph return pipelineTranslator.translateToJobGraph(pipeline, optimizerConfiguration, defaultParallelism); }
getPipelineTranslator 比較粗暴了
// 創建另個 Translator, 如果可以 Translate 就返回對應的 Translator,不行就抱錯 private static FlinkPipelineTranslator getPipelineTranslator(Pipeline pipeline) { // DataSet 的 PlanTranslator PlanTranslator planTranslator = new PlanTranslator(); // pipeline instanceof Plan if (planTranslator.canTranslate(pipeline)) { return planTranslator; } // Stream 的 Translator StreamGraphTranslator streamGraphTranslator = new StreamGraphTranslator(); // pipeline instanceof StreamGraph if (streamGraphTranslator.canTranslate(pipeline)) { return streamGraphTranslator; } throw new RuntimeException("Translator " + streamGraphTranslator + " cannot translate " + "the given pipeline " + pipeline + "."); }
batch 的當然是直接跳過,看下 StreamGraphTranslator 的 translateToJobGraph 方法
public JobGraph translateToJobGraph( Pipeline pipeline, Configuration optimizerConfiguration, int defaultParallelism) { checkArgument(pipeline instanceof StreamGraph, "Given pipeline is not a DataStream StreamGraph."); // pipeline 轉回 子類 StreamGraph StreamGraph streamGraph = (StreamGraph) pipeline; // 使用 streamGraph 生成 JobGraph (又回到 StreamGraph 類了) return streamGraph.getJobGraph(null); }
StreamGraph.java
public JobGraph getJobGraph(@Nullable JobID jobID) { return StreamingJobGraphGenerator.createJobGraph(this, jobID); }
StreamingJobGraphGenerator.java (熟悉的命令)
public static JobGraph createJobGraph(StreamGraph streamGraph, @Nullable JobID jobID) { // 創建 StreamingJobGraphGenerator 隨后生成 JobGraph return new StreamingJobGraphGenerator(streamGraph, jobID).createJobGraph(); }
StreamingJobGraphGenerator.java 終於把 jobGraph 的對象創建出來了( jobID 是 Null 的)
private StreamingJobGraphGenerator(StreamGraph streamGraph, @Nullable JobID jobID) { this.streamGraph = streamGraph; this.defaultStreamGraphHasher = new StreamGraphHasherV2(); this.legacyStreamGraphHashers = Arrays.asList(new StreamGraphUserHashHasher()); this.jobVertices = new HashMap<>(); this.builtVertices = new HashSet<>(); this.chainedConfigs = new HashMap<>(); this.vertexConfigs = new HashMap<>(); this.chainedNames = new HashMap<>(); this.chainedMinResources = new HashMap<>(); this.chainedPreferredResources = new HashMap<>(); this.chainedInputOutputFormats = new HashMap<>(); this.physicalEdgesInOrder = new ArrayList<>(); jobGraph = new JobGraph(jobID, streamGraph.getJobName()); }
JobGraph.java 構造方法
public JobGraph(JobID jobId, String jobName) { this.jobID = jobId == null ? new JobID() : jobId; this.jobName = jobName == null ? "(unnamed job)" : jobName; try { setExecutionConfig(new ExecutionConfig()); } catch (IOException e) { // this should never happen, since an empty execution config is always serializable throw new RuntimeException("bug, empty execution config is not serializable"); } }
StreamingJobGraphGenerator.java.createJobGraph 方法生成 jobGraph
JobGraph 的核心方法,做了如下內容:
1、檢驗 checkpoint 配置
2、啟動模式
3、生成一致性 hash
4、核心方法,構造算子 chain, 生成算子名,設置資源, chain 算子,生成 JobVetex
5、添加物理的邊
6、設置 sharingGroup 和 coLocation
7、設置內存比例
8、配置 checkpoint
9、添加 user Artifact
10、把 streamGrpaht 的 配置 放到 jobGraph 中
private JobGraph createJobGraph() { // 檢驗 checkpoint 配置 preValidate(); // make sure that all vertices start immediately // stream 模式 啟動時 所有 task 全部啟動 jobGraph.setScheduleMode(streamGraph.getScheduleMode()); // Generate deterministic hashes for the nodes in order to identify them across // submission iff they didn't change. // 為節點生成確定性哈希,以便在提交不變的情況下在提交時對其進行標識。 // 每個 streamNode id 的 hash 數組 Map<Integer, byte[]> hashes = defaultStreamGraphHasher.traverseStreamGraphAndGenerateHashes(streamGraph); // Generate legacy version hashes for backwards compatibility // 生成舊版本哈希以向后兼容 List<Map<Integer, byte[]>> legacyHashes = new ArrayList<>(legacyStreamGraphHashers.size()); for (StreamGraphHasher hasher : legacyStreamGraphHashers) { legacyHashes.add(hasher.traverseStreamGraphAndGenerateHashes(streamGraph)); } // 核心方法,構造 JobVetex setChaining(hashes, legacyHashes); // 添加物理的邊 setPhysicalEdges(); // 設置 sharingGroup 和 coLocation setSlotSharingAndCoLocation(); // 設置內存比例 setManagedMemoryFraction( Collections.unmodifiableMap(jobVertices), Collections.unmodifiableMap(vertexConfigs), Collections.unmodifiableMap(chainedConfigs), id -> streamGraph.getStreamNode(id).getMinResources(), id -> streamGraph.getStreamNode(id).getManagedMemoryWeight()); // 配置 checkpoint configureCheckpointing(); // 配置 savepoint 策略 jobGraph.setSavepointRestoreSettings(streamGraph.getSavepointRestoreSettings()); // 添加 user Artifact JobGraphUtils.addUserArtifactEntries(streamGraph.getUserArtifacts(), jobGraph); // set the ExecutionConfig last when it has been finalized try { // 把 streamGrpaht 的 配置 放到 jobGraph 中 jobGraph.setExecutionConfig(streamGraph.getExecutionConfig()); } catch (IOException e) { throw new IllegalConfigurationException("Could not serialize the ExecutionConfig." + "This indicates that non-serializable types (like custom serializers) were registered"); } return jobGraph; }
設置算子 chian 是 生成 JobGraph 的核心方法,從 source 節點開始,依次往下游遍歷,遞歸生成算子 chain
private void setChaining(Map<Integer, byte[]> hashes, List<Map<Integer, byte[]>> legacyHashes) { // 每個source 創建一個 chain for (Integer sourceNodeId : streamGraph.getSourceIDs()) { // sourceId chainIndex OpeartorChainInfo : SourceNodeId heades legacyHashed? streamGraph createChain( sourceNodeId, // source Node 位於 chain 的 0 位置 0, new OperatorChainInfo(sourceNodeId, hashes, legacyHashes, streamGraph)); } } // 創建一個 chain,遞歸方法: 如果chain 斷了,又重新創建一個 private List<StreamEdge> createChain(Integer currentNodeId, int chainIndex, OperatorChainInfo chainInfo) { // startNodeId Integer startNodeId = chainInfo.getStartNodeId(); // 已經創建的 就返回個空的 if (!builtVertices.contains(startNodeId)) { // 遍歷過的列表 List<StreamEdge> transitiveOutEdges = new ArrayList<StreamEdge>(); // chain 在一起的輸出 List<StreamEdge> chainableOutputs = new ArrayList<StreamEdge>(); // 非 chain 在一起的輸出 List<StreamEdge> nonChainableOutputs = new ArrayList<StreamEdge>(); // 當前節點 StreamNode currentNode = streamGraph.getStreamNode(currentNodeId); // 遍歷當前節點的輸出邊,看下游是否可以 chain 起來 for (StreamEdge outEdge : currentNode.getOutEdges()) { if (isChainable(outEdge, streamGraph)) { // 可以 chain 的 放到 chainableOutputs 列表 chainableOutputs.add(outEdge); } else { // 不可以 chain 的 放到 nonChainableOutputs 列表 nonChainableOutputs.add(outEdge); } } // 遍歷可以chain 在一起的邊 for (StreamEdge chainable : chainableOutputs) { // 以上游節點的 chainIndex + 1, 遞歸的把所有可以 chain 的下游添加進來 transitiveOutEdges.addAll( createChain(chainable.getTargetId(), chainIndex + 1, chainInfo)); } // 遍歷不可以chain 在一起的邊 for (StreamEdge nonChainable : nonChainableOutputs) { // 添加當前邊到遍歷的列表 transitiveOutEdges.add(nonChainable); // 以當前邊為起點,遞歸的生成新的 chain createChain(nonChainable.getTargetId(), 0, chainInfo.newChain(nonChainable.getTargetId())); } // 設置 chain 的 名字: 拼接 chain 在一起的算子名 chainedNames.put(currentNodeId, createChainedName(currentNodeId, chainableOutputs)); // 設置 chain 最小資源: ResourceSpec{UNKNOWN} chainedMinResources.put(currentNodeId, createChainedMinResources(currentNodeId, chainableOutputs)); // 設置 chain 最優資源: ResourceSpec{UNKNOWN} chainedPreferredResources.put(currentNodeId, createChainedPreferredResources(currentNodeId, chainableOutputs)); // 添加 當前節點 到 chain 中 OperatorID currentOperatorId = chainInfo.addNodeToChain(currentNodeId, chainedNames.get(currentNodeId)); // 輸入格式 if (currentNode.getInputFormat() != null) { getOrCreateFormatContainer(startNodeId).addInputFormat(currentOperatorId, currentNode.getInputFormat()); } // 生成格式 if (currentNode.getOutputFormat() != null) { getOrCreateFormatContainer(startNodeId).addOutputFormat(currentOperatorId, currentNode.getOutputFormat()); } // 創建 JobVertex, 添加到 jobGraph 的 taskVertices 中 StreamConfig config = currentNodeId.equals(startNodeId) ? createJobVertex(startNodeId, chainInfo) : new StreamConfig(new Configuration()); // 設置 Vertex config setVertexConfig(currentNodeId, config, chainableOutputs, nonChainableOutputs); // 如果是 chain start 就 connect chain 上的邊 if (currentNodeId.equals(startNodeId)) { config.setChainStart(); config.setChainIndex(0); config.setOperatorName(streamGraph.getStreamNode(currentNodeId).getOperatorName()); for (StreamEdge edge : transitiveOutEdges) { connect(startNodeId, edge); } config.setOutEdgesInOrder(transitiveOutEdges); config.setTransitiveChainedTaskConfigs(chainedConfigs.get(startNodeId)); } else { // 如果不是 chainedConfigs.computeIfAbsent(startNodeId, k -> new HashMap<Integer, StreamConfig>()); // 設置 節點在 chain 中的 index config.setChainIndex(chainIndex); // 獲取當前節點 StreamNode node = streamGraph.getStreamNode(currentNodeId); // 設置節點 operatorName config.setOperatorName(node.getOperatorName()); // 放到 對應 chain 中 chainedConfigs.get(startNodeId).put(currentNodeId, config); } // 設置算子 id config.setOperatorID(currentOperatorId); // 如果 chain 沒有可以chain 在一起的輸出,結束當前 chain if (chainableOutputs.isEmpty()) { config.setChainEnd(); } return transitiveOutEdges; } else { return new ArrayList<>(); } }
從 source 節點開始,遞歸的生成算子 chain, 先把可以 chain 的遞歸添加進來,遍歷不能 chain 的節點,以它為起點創建個新的算子 chain,繼續遞歸
算子 chain 是JobGraph 的核心內容( 還有 slotSharingGroup )
歡迎關注Flink菜鳥公眾號,會不定期更新Flink(開發技術)相關的推文

