Flink Yarn 任务提交
代码启动是参考FlinkX,主要的核心是加载Hadoop中的yarn配置文件以及Flink的配置文件。最后配置启动任务的属性,上传文件到Hdfs上提供给yarn启动。
最终的主要目的是获取YarnId和Web访问的地址方便项目的集成以及任务监控
环境
- 开发环境: Windows 11
- 开发工具: IDEA 2021.2.3
- Flink版本: 1.12.2
- Hadoop: 2.9.2
- Java JDK: 1.8
Maven
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdfs</artifactId>
<version>2.9.2</version>
</dependency>
<dependency>
<groupId>org.yaml</groupId>
<artifactId>snakeyaml</artifactId>
<version>1.30</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>1.12.2</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_2.12</artifactId>
<version>1.12.2</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-yarn_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
<exclusions>
<exclusion>
<artifactId>flink-shaded-hadoop2</artifactId>
<groupId>org.apache.flink</groupId>
</exclusion>
</exclusions>
</dependency>
Flink Yarn 任务提交代码
import lombok.extern.slf4j.Slf4j;
import org.apache.flink.client.deployment.ClusterSpecification;
import org.apache.flink.client.deployment.application.ApplicationConfiguration;
import org.apache.flink.client.program.ClusterClient;
import org.apache.flink.client.program.ClusterClientProvider;
import org.apache.flink.configuration.*;
import org.apache.flink.yarn.YarnClientYarnClusterlog.informationRetriever;
import org.apache.flink.yarn.YarnClusterDescriptor;
import org.apache.flink.yarn.configuration.YarnConfigOptions;
import org.apache.flink.yarn.configuration.YarnDeploymentTarget;
import org.apache.flink.yarn.configuration.YarnLogConfigUtil;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.client.api.YarnClient;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.yaml.snakeyaml.Yaml;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileReader;
import java.io.IOException;
import java.net.MalformedURLException;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.*;
import java.util.stream.Collectors;
/**
* @author Administrator
*/
@Slf4j
public final class StartupFlinkYarn {
/**
* Yarn主机名
*/
private static final String YARN_RESOURCE_MANAGER_HOST_NAME = "yarn.resourceManager.hostname.";
/**
* Yarn资源管理器地址
*/
private static final String YARN_RESOURCE_MANAGER_ADDRESS = "yarn.resourceManager.address.";
/**
* 过滤Hadoop配置文件的后缀
*/
private static final String CONFIG_XML_SUFFIX = ".xml";
/**
* Yarn客户端
*/
private YarnClient yarnClient;
/**
* Yarn配置
*/
private YarnConfiguration yarnConfig;
/**
* Flink配置
*/
private Configuration flinkConfig;
/**
* 当前程序的启动配置信息
*/
private StartupYarnConfig startupConfig;
/**
* 初始化配置文件
* @throws Exception 加载异常
*/
private void initiation() throws Exception {
loadYarnConfig();
loadYarnClient();
loadFlinkConfig();
}
/**
* 加载Yarn的配置信息
*/
private void loadYarnConfig() throws MalformedURLException {
yarnConfig = new YarnConfiguration();
//Hadoop的根目录(一般直接通过System.getEnv("HADOOP_HOME")获取)
String hadoopPath = "/opt/hadoop";
//Hadoop配置文件的目录
hadoopPath = hadoopPath + "/etc/hadoop";
File file = new File(hadoopPath);
//如果不加载Hadoop的配置文件启动时会无限等待,没有结果
if(!file.exists()){
log.error("Hadoop目录不存在:{}",hadoopPath);
return;
}
if(!file.isDirectory()){
log.error("Hadoop路径是一个文件而不是一个目录:{}",hadoopPath);
return;
}
//过滤所有的文件,找出所有的xml配置文件
List<File> configList = Arrays.stream(Objects.requireNonNull(file.listFiles())).filter(log.info -> log.info.getName().endsWith(CONFIG_XML_SUFFIX)).collect(Collectors.toList());
//遍历xml文件加入到Yarn的配置资源中
for(File fileItem : configList){
log.info("加载Hadoop配置文件:{}",fileItem.getAbsolutePath());
yarnConfig.addResource(fileItem.toURI().toURL());
}
//遍历Yarn解析出来的配置信息
for (Map.Entry<String, String> entry : yarnConfig) {
String key = entry.getKey();
String value = entry.getValue();
//简单处理一下地址
if (key.startsWith(YARN_RESOURCE_MANAGER_HOST_NAME.toLowerCase())) {
String rm = key.substring(YARN_RESOURCE_MANAGER_HOST_NAME.length());
String addressKey = YARN_RESOURCE_MANAGER_ADDRESS.toLowerCase() + rm;
if (yarnConfig.get(addressKey) == null) {
yarnConfig.set(addressKey, value + ":" + YarnConfiguration.DEFAULT_RM_PORT);
}
}
//初始化HDFS文件系统,这里直接使用Hadoop配置的Hdfs的实现,方便初始化Hdfs的文件系统
//否则后面无法识别hdfs://协议
//也可以再Resource中放入Hadoop的配置文件core-site.xml,可以跳过此步
//也可以在yarnConfig中设置FileSystem.FS_DEFAULT_NAME_KEY
if(key.equals(FileSystem.FS_DEFAULT_NAME_KEY)){
log.info("初始化HDFS FileSystem:{}",value);
//初始化Hadoop的FileSystem
yarnConfig.set(FileSystem.FS_DEFAULT_NAME_KEY,value);
}
}
}
/**
* 加载Flink任务信息
*/
private void loadFlinkConfig() throws IOException {
flinkConfig = new Configuration();
//Flink的目录(一般直接通过System.getEnv("FLINK_HOME")获取)
String flinkHome = "/opt/flink";
//Flink目录下的lib
String flinkLib = flinkHome + "/lib";
//Flink目录下的conf
String flinkConf = flinkHome + "/conf";
//读取Flink的配置文件
//主要是加载一些默认的配置,不加载会出现jobmanager.process.size....等配置找不到报错(也可以自己一个个加入到flinkConfig中)
String flinkConfigFile = flinkConf + "/flink-conf.yaml";
if(Files.exists(Paths.get(flinkConfigFile))){
Yaml yaml = new Yaml();
FileInputStream fileInputStream = new FileInputStream(flinkConfigFile);
Map<String,Object> map = yaml.load(fileInputStream);
for(String key : map.keySet()){
flinkConfig.setString(key,map.get(key).toString());
}
fileInputStream.close();
}
//设置Web的端口范围,可以不用设置
flinkConfig.setString(RestOptions.BIND_PORT,"20000-40000");
//这个文件是提交到Yarn上面跑的jar文件
File jarFile = new File("/opt/task/task.jar");
//上传到Yarn要运行的Jar文件位置,这个Hdfs文件路径自行更改(这个jar是最终跑起来的Flink任务)
String targetJar = "hdfs://master1:8082/flink/data/test/task.jar";
//运行你要跑的jar依赖的其他jar存放的Hdfs路径
String targetLib = "hdfs://master1:8082/flink/data/test/lib";
//这个Jar是Flink运行需要的jar,必须的
String flinkDistJar = "hdfs://master1:8082/flink/data/test/lib/flink-dist_2.12-1.12.2.jar");
//获得Hadoop的文件系统
FileSystem fileSystem = FileSystem.get(yarnConfig);
//将本地的文件上传到HDFS,要上传其他文件请自己设定
fileSystem.copyFromLocalFile(new Path("./task.jar"),new Path(targetJar));
fileSystem.copyFromLocalFile(new Path(flinkLib + "/flink-dist_2.12-1.12.2.jar"),new Path(flinkDistJar));
//设置应用的显示名称(Yarn Web后台显示的名称)
flinkConfig.set(YarnConfigOptions.APPLICATION_NAME,"Test");
//设置为Application模式(固定)
flinkConfig.set(DeploymentOptions.TARGET, YarnDeploymentTarget.APPLICATION.getName());
//设置你要运行的jar(HDFS的路径)
flinkConfig.set(PipelineOptions.JARS, Collections.singletonList(targetJar));
//设置运行jar时依赖的包的目录
flinkConfig.set(YarnConfigOptions.PROVIDED_LIB_DIRS,Collections.singletonList(new Path(targetLib).toString()));
//设置dist包(固定)
flinkConfig.set(YarnConfigOptions.FLINK_DIST_JAR,flinkDistJar);
//设置Class的加载方式[child-first/parent-first]
flinkConfig.setString(CoreOptions.CLASSLOADER_RESOLVE_ORDER, "parent-first");
//设置日志的输出的配置(这里指向Flink的配置下的日志文件)
YarnLogConfigUtil.setLogConfigFileInConfig(flinkConfig,flinkConf);
}
/**
* 加载Yarn客户端信息
*/
private void loadYarnClient(){
yarnClient = YarnClient.createYarnClient();
yarnClient.init(yarnConfig);
yarnClient.start();
}
/**
* 创建Yarn集群的连接器
* @return
*/
private YarnClusterDescriptor createYarnClusterDescriptor(){
//设置日志否则不会有日志打印信息
//YarnClientYarnClusterlog.informationRetriever.create(yarnClient);
return new YarnClusterDescriptor(
flinkConfig,
yarnConfig,
yarnClient,
YarnClientYarnClusterlog.informationRetriever.create(yarnClient),
false);
}
@Override
public void startup() throws Exception {
//初始化配置信息
initiation();
//创建集群描述器
YarnClusterDescriptor descriptor = createYarnClusterDescriptor();
//设置启动应用的配置信息:第一个参数args是启动你的jar附带的参数,第二个参数是启动你的jar的main函数所在的类名完整名称
ApplicationConfiguration applicationConfiguration = new ApplicationConfiguration(args,"run.main");
//配置任务的一些内存以及插槽
ClusterSpecification clusterSpecification = new ClusterSpecification.ClusterSpecificationBuilder()
.setMasterMemoryMB(512).setTaskManagerMemoryMB(1024)
.setSlotsPerTaskManager(1).createClusterSpecification();
//直接部署jar到Yarn上跑(提交任务并在Yarn上运行)
//每调用一次deployApplicationCluster就会多启动一个任务在Yarn的列表中
ClusterClientProvider<ApplicationId> provider = descriptor.deployApplicationCluster(clusterSpecification, applicationConfiguration);
//获得客户端
ClusterClient<ApplicationId> clusterClient = provider.getClusterClient();
//Yarn的Id
applicationId = clusterClient.getClusterId().toString();
//访问Flink Web界面的地址
webAddress = clusterClient.getWebInterfaceURL();
log.info("ApplicationId:{}",applicationId);
log.info("WebInterface:{}",webAddress);
}
}