啟動主類 KubernetesSessionCli
KubernetesSessionCli#mian 主類入口
// org.apache.flink.kubernetes.cli.KubernetesSessionCli#main
public static void main(String[] args) {
// 通過環境變量加載配置 ${FLINK_CONF_DIR}/flink-conf.yaml
final Configuration configuration = GlobalConfiguration.loadConfiguration();
// 獲取配置目錄, 按優先級只取前一個符合存在條件的目錄: FLINK_CONF_DIR, ../conf, conf
final String configDir = CliFrontend.getConfigurationDirectoryFromEnv();
int retCode;
try {
final KubernetesSessionCli cli = new KubernetesSessionCli(configuration, configDir);
retCode = SecurityUtils.getInstalledContext().runSecured(() -> cli.run(args));
} catch (CliArgsException e) {
retCode = AbstractCustomCommandLine.handleCliArgsException(e, LOG);
} catch (Exception e) {
retCode = AbstractCustomCommandLine.handleError(e, LOG);
}
System.exit(retCode);
}
KubernetesSessionCli#run 核心流程
獲取有效配置 -> 加載和匹配ClusterClientFactory -> 構造Descriptor -> 查詢或創建服務 -> Descriptor配置集群內部細節 -> 附屬管道檢測stop,quit命令
//org.apache.flink.kubernetes.cli.KubernetesSessionCli#run
private int run(String[] args) throws FlinkException, CliArgsException {
/*
獲取有效配置, 包括三部分:
${FLINK_CONF_DIR}/flink-conf.yaml
其他 -Dk=v動態配置, 無v時視為true
execution.target = kubernetes-session
*/
final Configuration configuration = getEffectiveConfiguration(args);
// loader加載所有ClusterClientFactory, 根據execution.target匹配唯一Factory
final ClusterClientFactory<String> kubernetesClusterClientFactory =
clusterClientServiceLoader.getClusterClientFactory(configuration);
// 加載k8s配置以創建NamespacedKubernetesClient, 以此構造Descriptor
final ClusterDescriptor<String> kubernetesClusterDescriptor =
kubernetesClusterClientFactory.createClusterDescriptor(configuration);
try {
final ClusterClient<String> clusterClient;
// kubernetes.cluster-id
String clusterId = kubernetesClusterClientFactory.getClusterId(configuration);
// execution.attached = false
final boolean detached = !configuration.get(DeploymentOptions.ATTACHED);
// kubernetesClusterDescriptor已經存在一個client連接Cluster, 此處使用新的client用於連接Service
final FlinkKubeClient kubeClient =
FlinkKubeClientFactory.getInstance().fromConfiguration(configuration, "client");
// Retrieve or create a session cluster.
if (clusterId != null && kubeClient.getRestService(clusterId).isPresent()) {
clusterClient = kubernetesClusterDescriptor.retrieve(clusterId).getClusterClient();
} else {
clusterClient =
kubernetesClusterDescriptor
// 集群部署細節可以查看: Flink源碼閱讀 - K8s上部署session模式的集群(內部細節)
.deploySessionCluster(
kubernetesClusterClientFactory.getClusterSpecification(
configuration))
.getClusterClient();
clusterId = clusterClient.getClusterId();
}
// 如果為附屬模式提交管道, 對stop,quit輸入命令做集群關閉, client關閉
try {
if (!detached) {
Tuple2<Boolean, Boolean> continueRepl = new Tuple2<>(true, false);
try (BufferedReader in = new BufferedReader(new InputStreamReader(System.in))) {
// f0 = true 持續接收和解析用戶交互輸入
while (continueRepl.f0) {
continueRepl = repStep(in);
}
} catch (Exception e) {
LOG.warn(
"Exception while running the interactive command line interface.",
e);
}
// 用戶輸入quit/stop時, 不繼續取輸入, stop關閉集群
if (continueRepl.f1) {
kubernetesClusterDescriptor.killCluster(clusterId);
}
}
// 客戶端退出
clusterClient.close();
kubeClient.close();
} catch (Exception e) {
LOG.info("Could not properly shutdown cluster client.", e);
}
} finally {
try {
kubernetesClusterDescriptor.close();
} catch (Exception e) {
LOG.info("Could not properly close the kubernetes cluster descriptor.", e);
}
}
return 0;
}
getEffectiveConfiguration 獲取有效配置
// org.apache.flink.kubernetes.cli.KubernetesSessionCli#getEffectiveConfiguration
Configuration getEffectiveConfiguration(String[] args) throws CliArgsException {
// 解析主類收到的所有入參, GenericCLI cli
final CommandLine commandLine = cli.parseCommandLineOptions(args, true);
// 追溯調用鏈可知baseConfiguration內容即 ${FLINK_CONF_DIR}/flink-conf.yaml
final Configuration effectiveConfiguration = new Configuration(baseConfiguration);
// CommandLine 轉 Configuration
effectiveConfiguration.addAll(cli.toConfiguration(commandLine));
// execution.target = kubernetes-session
effectiveConfiguration.set(DeploymentOptions.TARGET, KubernetesSessionClusterExecutor.NAME);
return effectiveConfiguration;
}
parseCommandLineOptions 解析為命令行
// org.apache.flink.client.cli.CustomCommandLine#parseCommandLineOptions
default CommandLine parseCommandLineOptions(String[] args, boolean stopAtNonOptions)
throws CliArgsException {
final Options options = new Options();
// 添加原生options
addGeneralOptions(options);
// 添加運行時 options, GenericCLI實現為空操作
addRunOptions(options);
// 調用 common-cli 解析參數數組為 CommandLine
return CliFrontendParser.parse(options, args, stopAtNonOptions);
}
// org.apache.flink.client.cli.GenericCLI#addGeneralOptions
@Override
public void addGeneralOptions(Options baseOptions) {
// executorOption = e[executor], hasArg, 匹配示例: -e kubernetes-session
baseOptions.addOption(executorOption);
// targetOption = t[target], hasArg, 匹配示例: -t kubernetes-session
baseOptions.addOption(targetOption);
// 匹配 -Dk=v
baseOptions.addOption(DynamicPropertiesUtil.DYNAMIC_PROPERTIES);
}
// org.apache.flink.client.cli.CliFrontendParser#parse
public static CommandLine parse(Options options, String[] args, boolean stopAtNonOptions)
throws CliArgsException {
final DefaultParser parser = new DefaultParser();
try {
return parser.parse(options, args, stopAtNonOptions);
} catch (ParseException e) {
throw new CliArgsException(e.getMessage());
}
}
toConfiguration 解析為Flink配置
// org.apache.flink.client.cli.GenericCLI#toConfiguration
@Override
public Configuration toConfiguration(final CommandLine commandLine) {
final Configuration resultConfiguration = new Configuration();
final String executorName = commandLine.getOptionValue(executorOption.getOpt());
if (executorName != null) {
resultConfiguration.setString(DeploymentOptions.TARGET, executorName);
}
final String targetName = commandLine.getOptionValue(targetOption.getOpt());
if (targetName != null) {
resultConfiguration.setString(DeploymentOptions.TARGET, targetName);
}
// 以上配置在KubernetesSessionCli調用此方法后被覆蓋為 execution.target = kubernetes-session
// CommandLine -> Properties -> Configuration
DynamicPropertiesUtil.encodeDynamicProperties(commandLine, resultConfiguration);
// $internal.deployment.config-dir = FLINK_CONF_DIR[或 ../conf, conf]
resultConfiguration.set(DeploymentOptionsInternal.CONF_DIR, configurationDir);
return resultConfiguration;
}
// org.apache.flink.client.cli.DynamicPropertiesUtil#encodeDynamicProperties
static void encodeDynamicProperties(
final CommandLine commandLine, final Configuration effectiveConfiguration) {
final Properties properties = commandLine.getOptionProperties(DYNAMIC_PROPERTIES.getOpt());
properties
.stringPropertyNames()
.forEach(
key -> {
final String value = properties.getProperty(key);
if (value != null) {
// 匹配到 -Dk=v
effectiveConfiguration.setString(key, value);
} else {
// 匹配到 -Dk
effectiveConfiguration.setString(key, "true");
}
});
}
getClusterClientFactory 獲取配置工廠
// org.apache.flink.client.deployment.DefaultClusterClientServiceLoader#getClusterClientFactory
@Override
public <ClusterID> ClusterClientFactory<ClusterID> getClusterClientFactory(
final Configuration configuration) {
checkNotNull(configuration);
// JAVA 基礎 ServiceLoader 服務懶加載可自行研究
final ServiceLoader<ClusterClientFactory> loader =
ServiceLoader.load(ClusterClientFactory.class);
final List<ClusterClientFactory> compatibleFactories = new ArrayList<>();
/*
迭代服務資源 META-INF/services/org.apache.flink.client.deployment.ClusterClientFactory
org.apache.flink.client.deployment.StandaloneClientFactory
org.apache.flink.kubernetes.KubernetesClusterClientFactory
org.apache.flink.yarn.YarnClusterClientFactory
因此對應到官方文檔 Deployment.ResourceProviders 章節, 僅提供了三種資源模式: Standalone, Native Kubernetes, Yarn
*/
final Iterator<ClusterClientFactory> factories = loader.iterator();
while (factories.hasNext()) {
try {
final ClusterClientFactory factory = factories.next();
// 檢查工廠是否兼容提供的配置
if (factory != null && factory.isCompatibleWith(configuration)) {
compatibleFactories.add(factory);
}
} catch (Throwable e) {
if (e.getCause() instanceof NoClassDefFoundError) {
LOG.info("Could not load factory due to missing dependencies.");
} else {
throw e;
}
}
}
if (compatibleFactories.size() > 1) {
final List<String> configStr =
configuration.toMap().entrySet().stream()
.map(e -> e.getKey() + "=" + e.getValue())
.collect(Collectors.toList());
throw new IllegalStateException(
"Multiple compatible client factories found for:\n"
+ String.join("\n", configStr)
+ ".");
}
if (compatibleFactories.isEmpty()) {
throw new IllegalStateException(
"No ClusterClientFactory found. If you were targeting a Yarn cluster, "
+ "please make sure to export the HADOOP_CLASSPATH environment variable or have hadoop in your "
+ "classpath. For more information refer to the \"Deployment\" section of the official "
+ "Apache Flink documentation.");
}
//僅允許一個有效工廠
return (ClusterClientFactory<ClusterID>) compatibleFactories.get(0);
}
// org.apache.flink.kubernetes.KubernetesClusterClientFactory#isCompatibleWith
@Override
public boolean isCompatibleWith(Configuration configuration) {
checkNotNull(configuration);
// 此時 execution.target = kubernetes-session
final String deploymentTarget = configuration.getString(DeploymentOptions.TARGET);
return KubernetesDeploymentTarget.isValidKubernetesTarget(deploymentTarget);
}
// org.apache.flink.kubernetes.configuration.KubernetesDeploymentTarget#isValidKubernetesTarget
public static boolean isValidKubernetesTarget(final String configValue) {
return configValue != null
&& Arrays.stream(KubernetesDeploymentTarget.values()) // values = [kubernetes-session, kubernetes-application]
.anyMatch(
kubernetesDeploymentTarget ->
kubernetesDeploymentTarget.name.equalsIgnoreCase(
configValue));
}
createClusterDescriptor 創建集群Descripter
// org.apache.flink.kubernetes.KubernetesClusterClientFactory#createClusterDescriptor
@Override
public KubernetesClusterDescriptor createClusterDescriptor(Configuration configuration) {
checkNotNull(configuration);
if (!configuration.contains(KubernetesConfigOptions.CLUSTER_ID)) {
final String clusterId = generateClusterId();
configuration.setString(KubernetesConfigOptions.CLUSTER_ID, clusterId);
}
return new KubernetesClusterDescriptor(
configuration,
// 封裝K8s原生Client的FlinkKubeClient
FlinkKubeClientFactory.getInstance().fromConfiguration(configuration, "client"));
}
// org.apache.flink.kubernetes.KubernetesClusterClientFactory#generateClusterId
// 生成 flink-cluster-xxx 的共45位的clusterId
private String generateClusterId() {
final String randomID = new AbstractID().toString();
return (CLUSTER_ID_PREFIX + randomID)
.substring(0, Constants.MAXIMUM_CHARACTERS_OF_CLUSTER_ID);
}
fromConfiguration 創建FlinkKubeClient
Flink配置 -> K8s配置, namespace -> NamespacedKubernetesClient -> Fabric8FlinkKubeClient(Flink配置, K8sCli, IO線程池)
// org.apache.flink.kubernetes.kubeclient.FlinkKubeClientFactory#fromConfiguration
public FlinkKubeClient fromConfiguration(Configuration flinkConfig, String useCase) {
final Config config;
// kubernetes.context 基於配置的不同上下文管理不同flink集群
final String kubeContext = flinkConfig.getString(KubernetesConfigOptions.CONTEXT);
if (kubeContext != null) {
LOG.info("Configuring kubernetes client to use context {}.", kubeContext);
}
// kubernetes.config.file
final String kubeConfigFile =
flinkConfig.getString(KubernetesConfigOptions.KUBE_CONFIG_FILE);
if (kubeConfigFile != null) {
LOG.debug("Trying to load kubernetes config from file: {}.", kubeConfigFile);
try {
/*
Config構造 Config fromKubeconfig(String context, String kubeconfigContents, String kubeconfigPath)
如果kubeContext為空,kubeConfigFile中的默認上下文將被使用。注意:第三個參數kubeconfigPath是可選
的,設置為空。它僅用於在傳遞文件時重寫kubecconfig內部的相對tls資產路徑,並且在kubecconfig通過相對
路徑引用一些資產的情況下
*/
config = Config.fromKubeconfig(kubeContext,
FileUtils.readFileUtf8(new File(kubeConfigFile)), null);
} catch (IOException e) {
throw new KubernetesClientException("Load kubernetes config failed.", e);
}
} else {
LOG.debug("Trying to load default kubernetes config.");
config = Config.autoConfigure(kubeContext);
}
// kubernetes.namespace
final String namespace = flinkConfig.getString(KubernetesConfigOptions.NAMESPACE);
LOG.debug("Setting namespace of Kubernetes client to {}", namespace);
config.setNamespace(namespace);
// This could be removed after we bump the fabric8 Kubernetes client version to 4.13.0+ or
// use the a shared connection for all ConfigMap watches. See FLINK-22006 for more
// information. 以后可能會移除
trySetMaxConcurrentRequest(config);
// 與K8s交互的底層Client
final NamespacedKubernetesClient client = new DefaultKubernetesClient(config);
// kubernetes.client.io-pool.size = 4
final int poolSize =
flinkConfig.get(KubernetesConfigOptions.KUBERNETES_CLIENT_IO_EXECUTOR_POOL_SIZE);
return new Fabric8FlinkKubeClient(
flinkConfig, client, createThreadPoolForAsyncIO(poolSize, useCase));
}
@VisibleForTesting
static void trySetMaxConcurrentRequest(Config config) {
// kubernetes.max.concurrent.requests = 64
final String configuredMaxConcurrentRequests =
Utils.getSystemPropertyOrEnvVar(
Config.KUBERNETES_MAX_CONCURRENT_REQUESTS,
String.valueOf(Config.DEFAULT_MAX_CONCURRENT_REQUESTS));
if (configuredMaxConcurrentRequests != null) {
LOG.debug(
"Setting max concurrent requests of Kubernetes client to {}",
configuredMaxConcurrentRequests);
config.setMaxConcurrentRequests(Integer.parseInt(configuredMaxConcurrentRequests));
}
}
repStep 附屬管道對命令的重復檢測
// org.apache.flink.kubernetes.cli.KubernetesSessionCli#repStep
// 檢查是否繼續讀取輸入和關閉集群
// f0=true 繼續讀取用戶交互輸入
// f1=true 關閉集群
private Tuple2<Boolean, Boolean> repStep(BufferedReader in)
throws IOException, InterruptedException {
final long startTime = System.currentTimeMillis();
// 3s內輸入流為空時一直循環等待
while ((System.currentTimeMillis() - startTime) < CLIENT_POLLING_INTERVAL_MS
&& (!in.ready())) {
Thread.sleep(200L);
}
// ------------- handle interactive command by user. ----------------------
if (in.ready()) {
final String command = in.readLine();
switch (command) {
case "quit":
return new Tuple2<>(false, false);
case "stop":
return new Tuple2<>(false, true);
case "help":
System.err.println(KUBERNETES_CLUSTER_HELP);
break;
default:
System.err.println("Unknown command '" + command + "'. Showing help:");
System.err.println(KUBERNETES_CLUSTER_HELP);
break;
}
}
return new Tuple2<>(true, false);
}
后續源碼研讀轉移到 Gitee