Flink Yarn 任務提交獲取YarnId和Web訪問地址


Flink Yarn 任務提交

代碼啟動是參考FlinkX,主要的核心是加載Hadoop中的yarn配置文件以及Flink的配置文件。最后配置啟動任務的屬性,上傳文件到Hdfs上提供給yarn啟動。
最終的主要目的是獲取YarnId和Web訪問的地址方便項目的集成以及任務監控

環境

  • 開發環境: Windows 11
  • 開發工具: IDEA 2021.2.3
  • Flink版本: 1.12.2
  • Hadoop: 2.9.2
  • Java JDK: 1.8

Maven

      <dependency>
         <groupId>org.apache.hadoop</groupId>
         <artifactId>hadoop-hdfs</artifactId>
         <version>2.9.2</version>
      </dependency>

      <dependency>
         <groupId>org.yaml</groupId>
         <artifactId>snakeyaml</artifactId>
         <version>1.30</version>
      </dependency>

      <dependency>
         <groupId>org.apache.flink</groupId>
         <artifactId>flink-java</artifactId>
         <version>1.12.2</version>
      </dependency>

      <dependency>
         <groupId>org.apache.flink</groupId>
         <artifactId>flink-streaming-java_2.12</artifactId>
         <version>1.12.2</version>
      </dependency>

      <dependency>
         <groupId>org.apache.flink</groupId>
         <artifactId>flink-yarn_${scala.binary.version}</artifactId>
         <version>${flink.version}</version>
         <exclusions>
            <exclusion>
               <artifactId>flink-shaded-hadoop2</artifactId>
               <groupId>org.apache.flink</groupId>
            </exclusion>
         </exclusions>
      </dependency>
import lombok.extern.slf4j.Slf4j;
import org.apache.flink.client.deployment.ClusterSpecification;
import org.apache.flink.client.deployment.application.ApplicationConfiguration;
import org.apache.flink.client.program.ClusterClient;
import org.apache.flink.client.program.ClusterClientProvider;
import org.apache.flink.configuration.*;
import org.apache.flink.yarn.YarnClientYarnClusterlog.informationRetriever;
import org.apache.flink.yarn.YarnClusterDescriptor;
import org.apache.flink.yarn.configuration.YarnConfigOptions;
import org.apache.flink.yarn.configuration.YarnDeploymentTarget;
import org.apache.flink.yarn.configuration.YarnLogConfigUtil;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.client.api.YarnClient;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.yaml.snakeyaml.Yaml;

import java.io.File;
import java.io.FileInputStream;
import java.io.FileReader;
import java.io.IOException;
import java.net.MalformedURLException;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.*;
import java.util.stream.Collectors;

/**
 * @author Administrator
 */
@Slf4j
public final class StartupFlinkYarn {

    /**
     * Yarn主機名
     */
    private static final String YARN_RESOURCE_MANAGER_HOST_NAME = "yarn.resourceManager.hostname.";

    /**
     * Yarn資源管理器地址
     */
    private static final String YARN_RESOURCE_MANAGER_ADDRESS = "yarn.resourceManager.address.";

    /**
     * 過濾Hadoop配置文件的后綴
     */
    private static final String CONFIG_XML_SUFFIX = ".xml";

    /**
     * Yarn客戶端
     */
    private YarnClient yarnClient;

    /**
     * Yarn配置
     */
    private YarnConfiguration yarnConfig;

    /**
     * Flink配置
     */
    private Configuration flinkConfig;

    /**
     * 當前程序的啟動配置信息
     */
    private StartupYarnConfig startupConfig;

    /**
     * 初始化配置文件
     * @throws Exception 加載異常
     */
    private void initiation() throws Exception {
        loadYarnConfig();
        loadYarnClient();
        loadFlinkConfig();
    }

    /**
     * 加載Yarn的配置信息
     */
    private void loadYarnConfig() throws MalformedURLException {
        yarnConfig = new YarnConfiguration();

        //Hadoop的根目錄(一般直接通過System.getEnv("HADOOP_HOME")獲取)
        String hadoopPath = "/opt/hadoop";

        //Hadoop配置文件的目錄
        hadoopPath = hadoopPath + "/etc/hadoop";

        File file = new File(hadoopPath);

        //如果不加載Hadoop的配置文件啟動時會無限等待,沒有結果
        if(!file.exists()){
            log.error("Hadoop目錄不存在:{}",hadoopPath);
            return;
        }

        if(!file.isDirectory()){
            log.error("Hadoop路徑是一個文件而不是一個目錄:{}",hadoopPath);
            return;
        }

        //過濾所有的文件,找出所有的xml配置文件
        List<File> configList = Arrays.stream(Objects.requireNonNull(file.listFiles())).filter(log.info -> log.info.getName().endsWith(CONFIG_XML_SUFFIX)).collect(Collectors.toList());

        //遍歷xml文件加入到Yarn的配置資源中
        for(File fileItem : configList){
            log.info("加載Hadoop配置文件:{}",fileItem.getAbsolutePath());
            yarnConfig.addResource(fileItem.toURI().toURL());
        }

        //遍歷Yarn解析出來的配置信息
        for (Map.Entry<String, String> entry : yarnConfig) {
            String key = entry.getKey();

            String value = entry.getValue();

            //簡單處理一下地址
            if (key.startsWith(YARN_RESOURCE_MANAGER_HOST_NAME.toLowerCase())) {
                String rm = key.substring(YARN_RESOURCE_MANAGER_HOST_NAME.length());
                String addressKey = YARN_RESOURCE_MANAGER_ADDRESS.toLowerCase() + rm;
                if (yarnConfig.get(addressKey) == null) {
                    yarnConfig.set(addressKey, value + ":" + YarnConfiguration.DEFAULT_RM_PORT);
                }
            }

            //初始化HDFS文件系統,這里直接使用Hadoop配置的Hdfs的實現,方便初始化Hdfs的文件系統
            //否則后面無法識別hdfs://協議
            //也可以再Resource中放入Hadoop的配置文件core-site.xml,可以跳過此步
            //也可以在yarnConfig中設置FileSystem.FS_DEFAULT_NAME_KEY
            if(key.equals(FileSystem.FS_DEFAULT_NAME_KEY)){
                log.info("初始化HDFS FileSystem:{}",value);
                //初始化Hadoop的FileSystem
                yarnConfig.set(FileSystem.FS_DEFAULT_NAME_KEY,value);
            }
        }

    }

    /**
     * 加載Flink任務信息
     */
    private void loadFlinkConfig() throws IOException {
        flinkConfig = new Configuration();

        //Flink的目錄(一般直接通過System.getEnv("FLINK_HOME")獲取)
        String flinkHome = "/opt/flink";

        //Flink目錄下的lib
        String flinkLib = flinkHome + "/lib";
        //Flink目錄下的conf
        String flinkConf = flinkHome + "/conf";

        //讀取Flink的配置文件
        //主要是加載一些默認的配置,不加載會出現jobmanager.process.size....等配置找不到報錯(也可以自己一個個加入到flinkConfig中)
        String flinkConfigFile = flinkConf + "/flink-conf.yaml";

        if(Files.exists(Paths.get(flinkConfigFile))){
            Yaml yaml = new Yaml();

            FileInputStream fileInputStream = new FileInputStream(flinkConfigFile);

            Map<String,Object> map = yaml.load(fileInputStream);

            for(String key : map.keySet()){
                flinkConfig.setString(key,map.get(key).toString());
            }

            fileInputStream.close();
        }

        //設置Web的端口范圍,可以不用設置
        flinkConfig.setString(RestOptions.BIND_PORT,"20000-40000");

        //這個文件是提交到Yarn上面跑的jar文件
        File jarFile = new File("/opt/task/task.jar");

        //上傳到Yarn要運行的Jar文件位置,這個Hdfs文件路徑自行更改(這個jar是最終跑起來的Flink任務)
        String targetJar = "hdfs://master1:8082/flink/data/test/task.jar";

        //運行你要跑的jar依賴的其他jar存放的Hdfs路徑
        String targetLib = "hdfs://master1:8082/flink/data/test/lib";

        //這個Jar是Flink運行需要的jar,必須的
        String flinkDistJar = "hdfs://master1:8082/flink/data/test/lib/flink-dist_2.12-1.12.2.jar");

        //獲得Hadoop的文件系統
        FileSystem fileSystem = FileSystem.get(yarnConfig);

        //將本地的文件上傳到HDFS,要上傳其他文件請自己設定
        fileSystem.copyFromLocalFile(new Path("./task.jar"),new Path(targetJar));
        fileSystem.copyFromLocalFile(new Path(flinkLib + "/flink-dist_2.12-1.12.2.jar"),new Path(flinkDistJar));

        //設置應用的顯示名稱(Yarn Web后台顯示的名稱)
        flinkConfig.set(YarnConfigOptions.APPLICATION_NAME,"Test");

        //設置為Application模式(固定)
        flinkConfig.set(DeploymentOptions.TARGET, YarnDeploymentTarget.APPLICATION.getName());

        //設置你要運行的jar(HDFS的路徑)
        flinkConfig.set(PipelineOptions.JARS, Collections.singletonList(targetJar));

        //設置運行jar時依賴的包的目錄
        flinkConfig.set(YarnConfigOptions.PROVIDED_LIB_DIRS,Collections.singletonList(new Path(targetLib).toString()));

        //設置dist包(固定)
        flinkConfig.set(YarnConfigOptions.FLINK_DIST_JAR,flinkDistJar);

        //設置Class的加載方式[child-first/parent-first]
        flinkConfig.setString(CoreOptions.CLASSLOADER_RESOLVE_ORDER, "parent-first");

        //設置日志的輸出的配置(這里指向Flink的配置下的日志文件)
        YarnLogConfigUtil.setLogConfigFileInConfig(flinkConfig,flinkConf);
    }

    /**
     * 加載Yarn客戶端信息
     */
    private void loadYarnClient(){
        yarnClient = YarnClient.createYarnClient();
        yarnClient.init(yarnConfig);
        yarnClient.start();
    }

    /**
     * 創建Yarn集群的連接器
     * @return
     */
    private YarnClusterDescriptor createYarnClusterDescriptor(){
        //設置日志否則不會有日志打印信息
        //YarnClientYarnClusterlog.informationRetriever.create(yarnClient);
        return new YarnClusterDescriptor(
                flinkConfig,
                yarnConfig,
                yarnClient,
                YarnClientYarnClusterlog.informationRetriever.create(yarnClient),
                false);
    }

    @Override
    public void startup() throws Exception {
        //初始化配置信息
        initiation();

        //創建集群描述器
        YarnClusterDescriptor descriptor = createYarnClusterDescriptor();

        //設置啟動應用的配置信息:第一個參數args是啟動你的jar附帶的參數,第二個參數是啟動你的jar的main函數所在的類名完整名稱
        ApplicationConfiguration applicationConfiguration = new ApplicationConfiguration(args,"run.main");

        //配置任務的一些內存以及插槽
        ClusterSpecification clusterSpecification = new ClusterSpecification.ClusterSpecificationBuilder()
                .setMasterMemoryMB(512).setTaskManagerMemoryMB(1024)
                .setSlotsPerTaskManager(1).createClusterSpecification();

        //直接部署jar到Yarn上跑(提交任務並在Yarn上運行)
        //每調用一次deployApplicationCluster就會多啟動一個任務在Yarn的列表中
        ClusterClientProvider<ApplicationId> provider = descriptor.deployApplicationCluster(clusterSpecification, applicationConfiguration);

        //獲得客戶端
        ClusterClient<ApplicationId> clusterClient = provider.getClusterClient();

        //Yarn的Id
        applicationId = clusterClient.getClusterId().toString();

        //訪問Flink Web界面的地址
        webAddress = clusterClient.getWebInterfaceURL();

        log.info("ApplicationId:{}",applicationId);
        log.info("WebInterface:{}",webAddress);
    }

}


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM