Flink源碼閱讀 - K8s上部署session模式的集群(入口配置)


啟動主類 KubernetesSessionCli

KubernetesSessionCli#mian 主類入口

// org.apache.flink.kubernetes.cli.KubernetesSessionCli#main
public static void main(String[] args) {
    // 通過環境變量加載配置 ${FLINK_CONF_DIR}/flink-conf.yaml 
    final Configuration configuration = GlobalConfiguration.loadConfiguration();
    // 獲取配置目錄, 按優先級只取前一個符合存在條件的目錄: FLINK_CONF_DIR, ../conf, conf
    final String configDir = CliFrontend.getConfigurationDirectoryFromEnv();
    int retCode;
    try {
        final KubernetesSessionCli cli = new KubernetesSessionCli(configuration, configDir);
        retCode = SecurityUtils.getInstalledContext().runSecured(() -> cli.run(args));
    } catch (CliArgsException e) {
        retCode = AbstractCustomCommandLine.handleCliArgsException(e, LOG);
    } catch (Exception e) {
        retCode = AbstractCustomCommandLine.handleError(e, LOG);
    }
    System.exit(retCode);
}

KubernetesSessionCli#run 核心流程

獲取有效配置 -> 加載和匹配ClusterClientFactory -> 構造Descriptor -> 查詢或創建服務 -> Descriptor配置集群內部細節 -> 附屬管道檢測stop,quit命令

//org.apache.flink.kubernetes.cli.KubernetesSessionCli#run
private int run(String[] args) throws FlinkException, CliArgsException {
    /*
    獲取有效配置, 包括三部分:
      ${FLINK_CONF_DIR}/flink-conf.yaml
      其他 -Dk=v動態配置, 無v時視為true
      execution.target = kubernetes-session
    */
    final Configuration configuration = getEffectiveConfiguration(args);
    // loader加載所有ClusterClientFactory, 根據execution.target匹配唯一Factory
    final ClusterClientFactory<String> kubernetesClusterClientFactory =
            clusterClientServiceLoader.getClusterClientFactory(configuration);
    // 加載k8s配置以創建NamespacedKubernetesClient, 以此構造Descriptor
    final ClusterDescriptor<String> kubernetesClusterDescriptor =
            kubernetesClusterClientFactory.createClusterDescriptor(configuration);
    try {
        final ClusterClient<String> clusterClient;
        // kubernetes.cluster-id
        String clusterId = kubernetesClusterClientFactory.getClusterId(configuration);
        // execution.attached =  false
        final boolean detached = !configuration.get(DeploymentOptions.ATTACHED);
        // kubernetesClusterDescriptor已經存在一個client連接Cluster, 此處使用新的client用於連接Service
        final FlinkKubeClient kubeClient =
                FlinkKubeClientFactory.getInstance().fromConfiguration(configuration, "client");
        // Retrieve or create a session cluster.
        if (clusterId != null && kubeClient.getRestService(clusterId).isPresent()) {
            clusterClient = kubernetesClusterDescriptor.retrieve(clusterId).getClusterClient();
        } else {
            clusterClient =
                    kubernetesClusterDescriptor
                            // 集群部署細節可以查看: Flink源碼閱讀 - K8s上部署session模式的集群(內部細節) 
                            .deploySessionCluster(
                                    kubernetesClusterClientFactory.getClusterSpecification(
                                            configuration))
                            .getClusterClient();
            clusterId = clusterClient.getClusterId();
        }
        // 如果為附屬模式提交管道, 對stop,quit輸入命令做集群關閉, client關閉
        try {
            if (!detached) {
                Tuple2<Boolean, Boolean> continueRepl = new Tuple2<>(true, false);
                try (BufferedReader in = new BufferedReader(new InputStreamReader(System.in))) {
                    // f0 = true 持續接收和解析用戶交互輸入
                    while (continueRepl.f0) {
                        continueRepl = repStep(in);
                    }
                } catch (Exception e) {
                    LOG.warn(
                            "Exception while running the interactive command line interface.",
                            e);
                }
                // 用戶輸入quit/stop時, 不繼續取輸入, stop關閉集群
                if (continueRepl.f1) {
                    kubernetesClusterDescriptor.killCluster(clusterId);
                }
            }
            // 客戶端退出
            clusterClient.close();
            kubeClient.close();
        } catch (Exception e) {
            LOG.info("Could not properly shutdown cluster client.", e);
        }
    } finally {
        try {
            kubernetesClusterDescriptor.close();
        } catch (Exception e) {
            LOG.info("Could not properly close the kubernetes cluster descriptor.", e);
        }
    }
    return 0;
}

getEffectiveConfiguration 獲取有效配置

// org.apache.flink.kubernetes.cli.KubernetesSessionCli#getEffectiveConfiguration
Configuration getEffectiveConfiguration(String[] args) throws CliArgsException {
    // 解析主類收到的所有入參, GenericCLI cli
    final CommandLine commandLine = cli.parseCommandLineOptions(args, true);
    // 追溯調用鏈可知baseConfiguration內容即 ${FLINK_CONF_DIR}/flink-conf.yaml
    final Configuration effectiveConfiguration = new Configuration(baseConfiguration);
    
    // CommandLine 轉 Configuration
    effectiveConfiguration.addAll(cli.toConfiguration(commandLine));
    // execution.target = kubernetes-session
    effectiveConfiguration.set(DeploymentOptions.TARGET, KubernetesSessionClusterExecutor.NAME);
    return effectiveConfiguration;
}

parseCommandLineOptions 解析為命令行

// org.apache.flink.client.cli.CustomCommandLine#parseCommandLineOptions
default CommandLine parseCommandLineOptions(String[] args, boolean stopAtNonOptions)
        throws CliArgsException {
    final Options options = new Options();
    // 添加原生options
    addGeneralOptions(options);
    // 添加運行時 options, GenericCLI實現為空操作
    addRunOptions(options);
    // 調用 common-cli 解析參數數組為 CommandLine
    return CliFrontendParser.parse(options, args, stopAtNonOptions);
}

// org.apache.flink.client.cli.GenericCLI#addGeneralOptions
@Override
public void addGeneralOptions(Options baseOptions) {
    // executorOption = e[executor], hasArg, 匹配示例: -e kubernetes-session
    baseOptions.addOption(executorOption);
    // targetOption = t[target], hasArg, 匹配示例: -t kubernetes-session
    baseOptions.addOption(targetOption);
    // 匹配 -Dk=v
    baseOptions.addOption(DynamicPropertiesUtil.DYNAMIC_PROPERTIES);
}

// org.apache.flink.client.cli.CliFrontendParser#parse
public static CommandLine parse(Options options, String[] args, boolean stopAtNonOptions)
        throws CliArgsException {
    final DefaultParser parser = new DefaultParser();
    try {
        return parser.parse(options, args, stopAtNonOptions);
    } catch (ParseException e) {
        throw new CliArgsException(e.getMessage());
    }
}

toConfiguration 解析為Flink配置

// org.apache.flink.client.cli.GenericCLI#toConfiguration
@Override
public Configuration toConfiguration(final CommandLine commandLine) {
    final Configuration resultConfiguration = new Configuration();

    final String executorName = commandLine.getOptionValue(executorOption.getOpt());
    if (executorName != null) {
        resultConfiguration.setString(DeploymentOptions.TARGET, executorName);
    }

    final String targetName = commandLine.getOptionValue(targetOption.getOpt());
    if (targetName != null) {
        resultConfiguration.setString(DeploymentOptions.TARGET, targetName);
    }
    // 以上配置在KubernetesSessionCli調用此方法后被覆蓋為  execution.target = kubernetes-session

    // CommandLine -> Properties -> Configuration
    DynamicPropertiesUtil.encodeDynamicProperties(commandLine, resultConfiguration);

    // $internal.deployment.config-dir = FLINK_CONF_DIR[或 ../conf, conf]
    resultConfiguration.set(DeploymentOptionsInternal.CONF_DIR, configurationDir);

    return resultConfiguration;
}

// org.apache.flink.client.cli.DynamicPropertiesUtil#encodeDynamicProperties
static void encodeDynamicProperties(
        final CommandLine commandLine, final Configuration effectiveConfiguration) {
    final Properties properties = commandLine.getOptionProperties(DYNAMIC_PROPERTIES.getOpt());
    properties
            .stringPropertyNames()
            .forEach(
                    key -> {
                        final String value = properties.getProperty(key);
                        if (value != null) {
                            // 匹配到 -Dk=v
                            effectiveConfiguration.setString(key, value);
                        } else {
                            // 匹配到 -Dk
                            effectiveConfiguration.setString(key, "true");
                        }
                    });
}

getClusterClientFactory 獲取配置工廠

// org.apache.flink.client.deployment.DefaultClusterClientServiceLoader#getClusterClientFactory
@Override
public <ClusterID> ClusterClientFactory<ClusterID> getClusterClientFactory(
        final Configuration configuration) {
    checkNotNull(configuration);
    // JAVA 基礎 ServiceLoader 服務懶加載可自行研究
    final ServiceLoader<ClusterClientFactory> loader =
            ServiceLoader.load(ClusterClientFactory.class);
    final List<ClusterClientFactory> compatibleFactories = new ArrayList<>();
    /*
    迭代服務資源 META-INF/services/org.apache.flink.client.deployment.ClusterClientFactory
       org.apache.flink.client.deployment.StandaloneClientFactory
       org.apache.flink.kubernetes.KubernetesClusterClientFactory
       org.apache.flink.yarn.YarnClusterClientFactory
    因此對應到官方文檔 Deployment.ResourceProviders 章節, 僅提供了三種資源模式: Standalone, Native Kubernetes, Yarn
    */
    final Iterator<ClusterClientFactory> factories = loader.iterator();
    while (factories.hasNext()) {
        try {
            final ClusterClientFactory factory = factories.next();
            // 檢查工廠是否兼容提供的配置
            if (factory != null && factory.isCompatibleWith(configuration)) {
                compatibleFactories.add(factory);
            }
        } catch (Throwable e) {
            if (e.getCause() instanceof NoClassDefFoundError) {
                LOG.info("Could not load factory due to missing dependencies.");
            } else {
                throw e;
            }
        }
    }
    if (compatibleFactories.size() > 1) {
        final List<String> configStr =
                configuration.toMap().entrySet().stream()
                        .map(e -> e.getKey() + "=" + e.getValue())
                        .collect(Collectors.toList());
        throw new IllegalStateException(
                "Multiple compatible client factories found for:\n"
                        + String.join("\n", configStr)
                        + ".");
    }
    if (compatibleFactories.isEmpty()) {
        throw new IllegalStateException(
                "No ClusterClientFactory found. If you were targeting a Yarn cluster, "
                        + "please make sure to export the HADOOP_CLASSPATH environment variable or have hadoop in your "
                        + "classpath. For more information refer to the \"Deployment\" section of the official "
                        + "Apache Flink documentation.");
    }
    //僅允許一個有效工廠
    return (ClusterClientFactory<ClusterID>) compatibleFactories.get(0);
}

// org.apache.flink.kubernetes.KubernetesClusterClientFactory#isCompatibleWith
@Override
public boolean isCompatibleWith(Configuration configuration) {
    checkNotNull(configuration);
    // 此時 execution.target = kubernetes-session
    final String deploymentTarget = configuration.getString(DeploymentOptions.TARGET);
    return KubernetesDeploymentTarget.isValidKubernetesTarget(deploymentTarget);
}

// org.apache.flink.kubernetes.configuration.KubernetesDeploymentTarget#isValidKubernetesTarget
public static boolean isValidKubernetesTarget(final String configValue) {
    return configValue != null
            && Arrays.stream(KubernetesDeploymentTarget.values()) // values = [kubernetes-session, kubernetes-application]
                    .anyMatch(
                            kubernetesDeploymentTarget ->
                                    kubernetesDeploymentTarget.name.equalsIgnoreCase(
                                            configValue));
}

createClusterDescriptor 創建集群Descripter

// org.apache.flink.kubernetes.KubernetesClusterClientFactory#createClusterDescriptor
@Override
public KubernetesClusterDescriptor createClusterDescriptor(Configuration configuration) {
    checkNotNull(configuration);
    if (!configuration.contains(KubernetesConfigOptions.CLUSTER_ID)) {
        final String clusterId = generateClusterId();
        configuration.setString(KubernetesConfigOptions.CLUSTER_ID, clusterId);
    }
    return new KubernetesClusterDescriptor(
            configuration,
            // 封裝K8s原生Client的FlinkKubeClient
            FlinkKubeClientFactory.getInstance().fromConfiguration(configuration, "client"));
}

// org.apache.flink.kubernetes.KubernetesClusterClientFactory#generateClusterId
// 生成 flink-cluster-xxx 的共45位的clusterId
private String generateClusterId() {
    final String randomID = new AbstractID().toString();
    return (CLUSTER_ID_PREFIX + randomID)
            .substring(0, Constants.MAXIMUM_CHARACTERS_OF_CLUSTER_ID);
}

fromConfiguration 創建FlinkKubeClient

Flink配置 -> K8s配置, namespace -> NamespacedKubernetesClient -> Fabric8FlinkKubeClient(Flink配置, K8sCli, IO線程池)

// org.apache.flink.kubernetes.kubeclient.FlinkKubeClientFactory#fromConfiguration
public FlinkKubeClient fromConfiguration(Configuration flinkConfig, String useCase) {
    final Config config;
    // kubernetes.context 基於配置的不同上下文管理不同flink集群
    final String kubeContext = flinkConfig.getString(KubernetesConfigOptions.CONTEXT);
    if (kubeContext != null) {
        LOG.info("Configuring kubernetes client to use context {}.", kubeContext);
    }
    // kubernetes.config.file
    final String kubeConfigFile =
            flinkConfig.getString(KubernetesConfigOptions.KUBE_CONFIG_FILE);
    if (kubeConfigFile != null) {
        LOG.debug("Trying to load kubernetes config from file: {}.", kubeConfigFile);
        try {
            /* 
            Config構造 Config fromKubeconfig(String context, String kubeconfigContents, String kubeconfigPath)
            如果kubeContext為空,kubeConfigFile中的默認上下文將被使用。注意:第三個參數kubeconfigPath是可選
            的,設置為空。它僅用於在傳遞文件時重寫kubecconfig內部的相對tls資產路徑,並且在kubecconfig通過相對
            路徑引用一些資產的情況下
            */
            config = Config.fromKubeconfig(kubeContext,
                    FileUtils.readFileUtf8(new File(kubeConfigFile)), null);
        } catch (IOException e) {
            throw new KubernetesClientException("Load kubernetes config failed.", e);
        }
    } else {
        LOG.debug("Trying to load default kubernetes config.");
        config = Config.autoConfigure(kubeContext);
    }
    // kubernetes.namespace
    final String namespace = flinkConfig.getString(KubernetesConfigOptions.NAMESPACE);
    LOG.debug("Setting namespace of Kubernetes client to {}", namespace);
    config.setNamespace(namespace);
    // This could be removed after we bump the fabric8 Kubernetes client version to 4.13.0+ or
    // use the a shared connection for all ConfigMap watches. See FLINK-22006 for more
    // information.  以后可能會移除
    trySetMaxConcurrentRequest(config);
    // 與K8s交互的底層Client
    final NamespacedKubernetesClient client = new DefaultKubernetesClient(config);
    
    // kubernetes.client.io-pool.size = 4
    final int poolSize =
            flinkConfig.get(KubernetesConfigOptions.KUBERNETES_CLIENT_IO_EXECUTOR_POOL_SIZE);
    return new Fabric8FlinkKubeClient(
            flinkConfig, client, createThreadPoolForAsyncIO(poolSize, useCase));
}

@VisibleForTesting
static void trySetMaxConcurrentRequest(Config config) {
    // kubernetes.max.concurrent.requests = 64
    final String configuredMaxConcurrentRequests =
            Utils.getSystemPropertyOrEnvVar(
                    Config.KUBERNETES_MAX_CONCURRENT_REQUESTS,
                    String.valueOf(Config.DEFAULT_MAX_CONCURRENT_REQUESTS));
    if (configuredMaxConcurrentRequests != null) {
        LOG.debug(
                "Setting max concurrent requests of Kubernetes client to {}",
                configuredMaxConcurrentRequests);
        config.setMaxConcurrentRequests(Integer.parseInt(configuredMaxConcurrentRequests));
    }
}

repStep 附屬管道對命令的重復檢測

// org.apache.flink.kubernetes.cli.KubernetesSessionCli#repStep

// 檢查是否繼續讀取輸入和關閉集群
// f0=true 繼續讀取用戶交互輸入
// f1=true 關閉集群
private Tuple2<Boolean, Boolean> repStep(BufferedReader in)
        throws IOException, InterruptedException {
    final long startTime = System.currentTimeMillis();
    // 3s內輸入流為空時一直循環等待
    while ((System.currentTimeMillis() - startTime) < CLIENT_POLLING_INTERVAL_MS
            && (!in.ready())) {
        Thread.sleep(200L);
    }
    // ------------- handle interactive command by user. ----------------------
    if (in.ready()) {
        final String command = in.readLine();
        switch (command) {
            case "quit":
                return new Tuple2<>(false, false);
            case "stop":
                return new Tuple2<>(false, true);
            case "help":
                System.err.println(KUBERNETES_CLUSTER_HELP);
                break;
            default:
                System.err.println("Unknown command '" + command + "'. Showing help:");
                System.err.println(KUBERNETES_CLUSTER_HELP);
                break;
        }
    }
    return new Tuple2<>(true, false);
}

后續源碼研讀轉移到 Gitee


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM