Flink Yarn 任务提交获取YarnId和Web访问地址


Flink Yarn 任务提交

代码启动是参考FlinkX,主要的核心是加载Hadoop中的yarn配置文件以及Flink的配置文件。最后配置启动任务的属性,上传文件到Hdfs上提供给yarn启动。
最终的主要目的是获取YarnId和Web访问的地址方便项目的集成以及任务监控

环境

  • 开发环境: Windows 11
  • 开发工具: IDEA 2021.2.3
  • Flink版本: 1.12.2
  • Hadoop: 2.9.2
  • Java JDK: 1.8

Maven

      <dependency>
         <groupId>org.apache.hadoop</groupId>
         <artifactId>hadoop-hdfs</artifactId>
         <version>2.9.2</version>
      </dependency>

      <dependency>
         <groupId>org.yaml</groupId>
         <artifactId>snakeyaml</artifactId>
         <version>1.30</version>
      </dependency>

      <dependency>
         <groupId>org.apache.flink</groupId>
         <artifactId>flink-java</artifactId>
         <version>1.12.2</version>
      </dependency>

      <dependency>
         <groupId>org.apache.flink</groupId>
         <artifactId>flink-streaming-java_2.12</artifactId>
         <version>1.12.2</version>
      </dependency>

      <dependency>
         <groupId>org.apache.flink</groupId>
         <artifactId>flink-yarn_${scala.binary.version}</artifactId>
         <version>${flink.version}</version>
         <exclusions>
            <exclusion>
               <artifactId>flink-shaded-hadoop2</artifactId>
               <groupId>org.apache.flink</groupId>
            </exclusion>
         </exclusions>
      </dependency>
import lombok.extern.slf4j.Slf4j;
import org.apache.flink.client.deployment.ClusterSpecification;
import org.apache.flink.client.deployment.application.ApplicationConfiguration;
import org.apache.flink.client.program.ClusterClient;
import org.apache.flink.client.program.ClusterClientProvider;
import org.apache.flink.configuration.*;
import org.apache.flink.yarn.YarnClientYarnClusterlog.informationRetriever;
import org.apache.flink.yarn.YarnClusterDescriptor;
import org.apache.flink.yarn.configuration.YarnConfigOptions;
import org.apache.flink.yarn.configuration.YarnDeploymentTarget;
import org.apache.flink.yarn.configuration.YarnLogConfigUtil;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.client.api.YarnClient;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.yaml.snakeyaml.Yaml;

import java.io.File;
import java.io.FileInputStream;
import java.io.FileReader;
import java.io.IOException;
import java.net.MalformedURLException;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.*;
import java.util.stream.Collectors;

/**
 * @author Administrator
 */
@Slf4j
public final class StartupFlinkYarn {

    /**
     * Yarn主机名
     */
    private static final String YARN_RESOURCE_MANAGER_HOST_NAME = "yarn.resourceManager.hostname.";

    /**
     * Yarn资源管理器地址
     */
    private static final String YARN_RESOURCE_MANAGER_ADDRESS = "yarn.resourceManager.address.";

    /**
     * 过滤Hadoop配置文件的后缀
     */
    private static final String CONFIG_XML_SUFFIX = ".xml";

    /**
     * Yarn客户端
     */
    private YarnClient yarnClient;

    /**
     * Yarn配置
     */
    private YarnConfiguration yarnConfig;

    /**
     * Flink配置
     */
    private Configuration flinkConfig;

    /**
     * 当前程序的启动配置信息
     */
    private StartupYarnConfig startupConfig;

    /**
     * 初始化配置文件
     * @throws Exception 加载异常
     */
    private void initiation() throws Exception {
        loadYarnConfig();
        loadYarnClient();
        loadFlinkConfig();
    }

    /**
     * 加载Yarn的配置信息
     */
    private void loadYarnConfig() throws MalformedURLException {
        yarnConfig = new YarnConfiguration();

        //Hadoop的根目录(一般直接通过System.getEnv("HADOOP_HOME")获取)
        String hadoopPath = "/opt/hadoop";

        //Hadoop配置文件的目录
        hadoopPath = hadoopPath + "/etc/hadoop";

        File file = new File(hadoopPath);

        //如果不加载Hadoop的配置文件启动时会无限等待,没有结果
        if(!file.exists()){
            log.error("Hadoop目录不存在:{}",hadoopPath);
            return;
        }

        if(!file.isDirectory()){
            log.error("Hadoop路径是一个文件而不是一个目录:{}",hadoopPath);
            return;
        }

        //过滤所有的文件,找出所有的xml配置文件
        List<File> configList = Arrays.stream(Objects.requireNonNull(file.listFiles())).filter(log.info -> log.info.getName().endsWith(CONFIG_XML_SUFFIX)).collect(Collectors.toList());

        //遍历xml文件加入到Yarn的配置资源中
        for(File fileItem : configList){
            log.info("加载Hadoop配置文件:{}",fileItem.getAbsolutePath());
            yarnConfig.addResource(fileItem.toURI().toURL());
        }

        //遍历Yarn解析出来的配置信息
        for (Map.Entry<String, String> entry : yarnConfig) {
            String key = entry.getKey();

            String value = entry.getValue();

            //简单处理一下地址
            if (key.startsWith(YARN_RESOURCE_MANAGER_HOST_NAME.toLowerCase())) {
                String rm = key.substring(YARN_RESOURCE_MANAGER_HOST_NAME.length());
                String addressKey = YARN_RESOURCE_MANAGER_ADDRESS.toLowerCase() + rm;
                if (yarnConfig.get(addressKey) == null) {
                    yarnConfig.set(addressKey, value + ":" + YarnConfiguration.DEFAULT_RM_PORT);
                }
            }

            //初始化HDFS文件系统,这里直接使用Hadoop配置的Hdfs的实现,方便初始化Hdfs的文件系统
            //否则后面无法识别hdfs://协议
            //也可以再Resource中放入Hadoop的配置文件core-site.xml,可以跳过此步
            //也可以在yarnConfig中设置FileSystem.FS_DEFAULT_NAME_KEY
            if(key.equals(FileSystem.FS_DEFAULT_NAME_KEY)){
                log.info("初始化HDFS FileSystem:{}",value);
                //初始化Hadoop的FileSystem
                yarnConfig.set(FileSystem.FS_DEFAULT_NAME_KEY,value);
            }
        }

    }

    /**
     * 加载Flink任务信息
     */
    private void loadFlinkConfig() throws IOException {
        flinkConfig = new Configuration();

        //Flink的目录(一般直接通过System.getEnv("FLINK_HOME")获取)
        String flinkHome = "/opt/flink";

        //Flink目录下的lib
        String flinkLib = flinkHome + "/lib";
        //Flink目录下的conf
        String flinkConf = flinkHome + "/conf";

        //读取Flink的配置文件
        //主要是加载一些默认的配置,不加载会出现jobmanager.process.size....等配置找不到报错(也可以自己一个个加入到flinkConfig中)
        String flinkConfigFile = flinkConf + "/flink-conf.yaml";

        if(Files.exists(Paths.get(flinkConfigFile))){
            Yaml yaml = new Yaml();

            FileInputStream fileInputStream = new FileInputStream(flinkConfigFile);

            Map<String,Object> map = yaml.load(fileInputStream);

            for(String key : map.keySet()){
                flinkConfig.setString(key,map.get(key).toString());
            }

            fileInputStream.close();
        }

        //设置Web的端口范围,可以不用设置
        flinkConfig.setString(RestOptions.BIND_PORT,"20000-40000");

        //这个文件是提交到Yarn上面跑的jar文件
        File jarFile = new File("/opt/task/task.jar");

        //上传到Yarn要运行的Jar文件位置,这个Hdfs文件路径自行更改(这个jar是最终跑起来的Flink任务)
        String targetJar = "hdfs://master1:8082/flink/data/test/task.jar";

        //运行你要跑的jar依赖的其他jar存放的Hdfs路径
        String targetLib = "hdfs://master1:8082/flink/data/test/lib";

        //这个Jar是Flink运行需要的jar,必须的
        String flinkDistJar = "hdfs://master1:8082/flink/data/test/lib/flink-dist_2.12-1.12.2.jar");

        //获得Hadoop的文件系统
        FileSystem fileSystem = FileSystem.get(yarnConfig);

        //将本地的文件上传到HDFS,要上传其他文件请自己设定
        fileSystem.copyFromLocalFile(new Path("./task.jar"),new Path(targetJar));
        fileSystem.copyFromLocalFile(new Path(flinkLib + "/flink-dist_2.12-1.12.2.jar"),new Path(flinkDistJar));

        //设置应用的显示名称(Yarn Web后台显示的名称)
        flinkConfig.set(YarnConfigOptions.APPLICATION_NAME,"Test");

        //设置为Application模式(固定)
        flinkConfig.set(DeploymentOptions.TARGET, YarnDeploymentTarget.APPLICATION.getName());

        //设置你要运行的jar(HDFS的路径)
        flinkConfig.set(PipelineOptions.JARS, Collections.singletonList(targetJar));

        //设置运行jar时依赖的包的目录
        flinkConfig.set(YarnConfigOptions.PROVIDED_LIB_DIRS,Collections.singletonList(new Path(targetLib).toString()));

        //设置dist包(固定)
        flinkConfig.set(YarnConfigOptions.FLINK_DIST_JAR,flinkDistJar);

        //设置Class的加载方式[child-first/parent-first]
        flinkConfig.setString(CoreOptions.CLASSLOADER_RESOLVE_ORDER, "parent-first");

        //设置日志的输出的配置(这里指向Flink的配置下的日志文件)
        YarnLogConfigUtil.setLogConfigFileInConfig(flinkConfig,flinkConf);
    }

    /**
     * 加载Yarn客户端信息
     */
    private void loadYarnClient(){
        yarnClient = YarnClient.createYarnClient();
        yarnClient.init(yarnConfig);
        yarnClient.start();
    }

    /**
     * 创建Yarn集群的连接器
     * @return
     */
    private YarnClusterDescriptor createYarnClusterDescriptor(){
        //设置日志否则不会有日志打印信息
        //YarnClientYarnClusterlog.informationRetriever.create(yarnClient);
        return new YarnClusterDescriptor(
                flinkConfig,
                yarnConfig,
                yarnClient,
                YarnClientYarnClusterlog.informationRetriever.create(yarnClient),
                false);
    }

    @Override
    public void startup() throws Exception {
        //初始化配置信息
        initiation();

        //创建集群描述器
        YarnClusterDescriptor descriptor = createYarnClusterDescriptor();

        //设置启动应用的配置信息:第一个参数args是启动你的jar附带的参数,第二个参数是启动你的jar的main函数所在的类名完整名称
        ApplicationConfiguration applicationConfiguration = new ApplicationConfiguration(args,"run.main");

        //配置任务的一些内存以及插槽
        ClusterSpecification clusterSpecification = new ClusterSpecification.ClusterSpecificationBuilder()
                .setMasterMemoryMB(512).setTaskManagerMemoryMB(1024)
                .setSlotsPerTaskManager(1).createClusterSpecification();

        //直接部署jar到Yarn上跑(提交任务并在Yarn上运行)
        //每调用一次deployApplicationCluster就会多启动一个任务在Yarn的列表中
        ClusterClientProvider<ApplicationId> provider = descriptor.deployApplicationCluster(clusterSpecification, applicationConfiguration);

        //获得客户端
        ClusterClient<ApplicationId> clusterClient = provider.getClusterClient();

        //Yarn的Id
        applicationId = clusterClient.getClusterId().toString();

        //访问Flink Web界面的地址
        webAddress = clusterClient.getWebInterfaceURL();

        log.info("ApplicationId:{}",applicationId);
        log.info("WebInterface:{}",webAddress);
    }

}


免责声明!

本站转载的文章为个人学习借鉴使用,本站对版权不负任何法律责任。如果侵犯了您的隐私权益,请联系本站邮箱yoyou2525@163.com删除。



 
粤ICP备18138465号  © 2018-2025 CODEPRJ.COM