Flink Yarn 任務提交
代碼啟動是參考FlinkX,主要的核心是加載Hadoop中的yarn配置文件以及Flink的配置文件。最后配置啟動任務的屬性,上傳文件到Hdfs上提供給yarn啟動。
最終的主要目的是獲取YarnId和Web訪問的地址方便項目的集成以及任務監控
環境
- 開發環境: Windows 11
- 開發工具: IDEA 2021.2.3
- Flink版本: 1.12.2
- Hadoop: 2.9.2
- Java JDK: 1.8
Maven
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdfs</artifactId>
<version>2.9.2</version>
</dependency>
<dependency>
<groupId>org.yaml</groupId>
<artifactId>snakeyaml</artifactId>
<version>1.30</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>1.12.2</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_2.12</artifactId>
<version>1.12.2</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-yarn_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
<exclusions>
<exclusion>
<artifactId>flink-shaded-hadoop2</artifactId>
<groupId>org.apache.flink</groupId>
</exclusion>
</exclusions>
</dependency>
Flink Yarn 任務提交代碼
import lombok.extern.slf4j.Slf4j;
import org.apache.flink.client.deployment.ClusterSpecification;
import org.apache.flink.client.deployment.application.ApplicationConfiguration;
import org.apache.flink.client.program.ClusterClient;
import org.apache.flink.client.program.ClusterClientProvider;
import org.apache.flink.configuration.*;
import org.apache.flink.yarn.YarnClientYarnClusterlog.informationRetriever;
import org.apache.flink.yarn.YarnClusterDescriptor;
import org.apache.flink.yarn.configuration.YarnConfigOptions;
import org.apache.flink.yarn.configuration.YarnDeploymentTarget;
import org.apache.flink.yarn.configuration.YarnLogConfigUtil;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.client.api.YarnClient;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.yaml.snakeyaml.Yaml;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileReader;
import java.io.IOException;
import java.net.MalformedURLException;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.*;
import java.util.stream.Collectors;
/**
* @author Administrator
*/
@Slf4j
public final class StartupFlinkYarn {
/**
* Yarn主機名
*/
private static final String YARN_RESOURCE_MANAGER_HOST_NAME = "yarn.resourceManager.hostname.";
/**
* Yarn資源管理器地址
*/
private static final String YARN_RESOURCE_MANAGER_ADDRESS = "yarn.resourceManager.address.";
/**
* 過濾Hadoop配置文件的后綴
*/
private static final String CONFIG_XML_SUFFIX = ".xml";
/**
* Yarn客戶端
*/
private YarnClient yarnClient;
/**
* Yarn配置
*/
private YarnConfiguration yarnConfig;
/**
* Flink配置
*/
private Configuration flinkConfig;
/**
* 當前程序的啟動配置信息
*/
private StartupYarnConfig startupConfig;
/**
* 初始化配置文件
* @throws Exception 加載異常
*/
private void initiation() throws Exception {
loadYarnConfig();
loadYarnClient();
loadFlinkConfig();
}
/**
* 加載Yarn的配置信息
*/
private void loadYarnConfig() throws MalformedURLException {
yarnConfig = new YarnConfiguration();
//Hadoop的根目錄(一般直接通過System.getEnv("HADOOP_HOME")獲取)
String hadoopPath = "/opt/hadoop";
//Hadoop配置文件的目錄
hadoopPath = hadoopPath + "/etc/hadoop";
File file = new File(hadoopPath);
//如果不加載Hadoop的配置文件啟動時會無限等待,沒有結果
if(!file.exists()){
log.error("Hadoop目錄不存在:{}",hadoopPath);
return;
}
if(!file.isDirectory()){
log.error("Hadoop路徑是一個文件而不是一個目錄:{}",hadoopPath);
return;
}
//過濾所有的文件,找出所有的xml配置文件
List<File> configList = Arrays.stream(Objects.requireNonNull(file.listFiles())).filter(log.info -> log.info.getName().endsWith(CONFIG_XML_SUFFIX)).collect(Collectors.toList());
//遍歷xml文件加入到Yarn的配置資源中
for(File fileItem : configList){
log.info("加載Hadoop配置文件:{}",fileItem.getAbsolutePath());
yarnConfig.addResource(fileItem.toURI().toURL());
}
//遍歷Yarn解析出來的配置信息
for (Map.Entry<String, String> entry : yarnConfig) {
String key = entry.getKey();
String value = entry.getValue();
//簡單處理一下地址
if (key.startsWith(YARN_RESOURCE_MANAGER_HOST_NAME.toLowerCase())) {
String rm = key.substring(YARN_RESOURCE_MANAGER_HOST_NAME.length());
String addressKey = YARN_RESOURCE_MANAGER_ADDRESS.toLowerCase() + rm;
if (yarnConfig.get(addressKey) == null) {
yarnConfig.set(addressKey, value + ":" + YarnConfiguration.DEFAULT_RM_PORT);
}
}
//初始化HDFS文件系統,這里直接使用Hadoop配置的Hdfs的實現,方便初始化Hdfs的文件系統
//否則后面無法識別hdfs://協議
//也可以再Resource中放入Hadoop的配置文件core-site.xml,可以跳過此步
//也可以在yarnConfig中設置FileSystem.FS_DEFAULT_NAME_KEY
if(key.equals(FileSystem.FS_DEFAULT_NAME_KEY)){
log.info("初始化HDFS FileSystem:{}",value);
//初始化Hadoop的FileSystem
yarnConfig.set(FileSystem.FS_DEFAULT_NAME_KEY,value);
}
}
}
/**
* 加載Flink任務信息
*/
private void loadFlinkConfig() throws IOException {
flinkConfig = new Configuration();
//Flink的目錄(一般直接通過System.getEnv("FLINK_HOME")獲取)
String flinkHome = "/opt/flink";
//Flink目錄下的lib
String flinkLib = flinkHome + "/lib";
//Flink目錄下的conf
String flinkConf = flinkHome + "/conf";
//讀取Flink的配置文件
//主要是加載一些默認的配置,不加載會出現jobmanager.process.size....等配置找不到報錯(也可以自己一個個加入到flinkConfig中)
String flinkConfigFile = flinkConf + "/flink-conf.yaml";
if(Files.exists(Paths.get(flinkConfigFile))){
Yaml yaml = new Yaml();
FileInputStream fileInputStream = new FileInputStream(flinkConfigFile);
Map<String,Object> map = yaml.load(fileInputStream);
for(String key : map.keySet()){
flinkConfig.setString(key,map.get(key).toString());
}
fileInputStream.close();
}
//設置Web的端口范圍,可以不用設置
flinkConfig.setString(RestOptions.BIND_PORT,"20000-40000");
//這個文件是提交到Yarn上面跑的jar文件
File jarFile = new File("/opt/task/task.jar");
//上傳到Yarn要運行的Jar文件位置,這個Hdfs文件路徑自行更改(這個jar是最終跑起來的Flink任務)
String targetJar = "hdfs://master1:8082/flink/data/test/task.jar";
//運行你要跑的jar依賴的其他jar存放的Hdfs路徑
String targetLib = "hdfs://master1:8082/flink/data/test/lib";
//這個Jar是Flink運行需要的jar,必須的
String flinkDistJar = "hdfs://master1:8082/flink/data/test/lib/flink-dist_2.12-1.12.2.jar");
//獲得Hadoop的文件系統
FileSystem fileSystem = FileSystem.get(yarnConfig);
//將本地的文件上傳到HDFS,要上傳其他文件請自己設定
fileSystem.copyFromLocalFile(new Path("./task.jar"),new Path(targetJar));
fileSystem.copyFromLocalFile(new Path(flinkLib + "/flink-dist_2.12-1.12.2.jar"),new Path(flinkDistJar));
//設置應用的顯示名稱(Yarn Web后台顯示的名稱)
flinkConfig.set(YarnConfigOptions.APPLICATION_NAME,"Test");
//設置為Application模式(固定)
flinkConfig.set(DeploymentOptions.TARGET, YarnDeploymentTarget.APPLICATION.getName());
//設置你要運行的jar(HDFS的路徑)
flinkConfig.set(PipelineOptions.JARS, Collections.singletonList(targetJar));
//設置運行jar時依賴的包的目錄
flinkConfig.set(YarnConfigOptions.PROVIDED_LIB_DIRS,Collections.singletonList(new Path(targetLib).toString()));
//設置dist包(固定)
flinkConfig.set(YarnConfigOptions.FLINK_DIST_JAR,flinkDistJar);
//設置Class的加載方式[child-first/parent-first]
flinkConfig.setString(CoreOptions.CLASSLOADER_RESOLVE_ORDER, "parent-first");
//設置日志的輸出的配置(這里指向Flink的配置下的日志文件)
YarnLogConfigUtil.setLogConfigFileInConfig(flinkConfig,flinkConf);
}
/**
* 加載Yarn客戶端信息
*/
private void loadYarnClient(){
yarnClient = YarnClient.createYarnClient();
yarnClient.init(yarnConfig);
yarnClient.start();
}
/**
* 創建Yarn集群的連接器
* @return
*/
private YarnClusterDescriptor createYarnClusterDescriptor(){
//設置日志否則不會有日志打印信息
//YarnClientYarnClusterlog.informationRetriever.create(yarnClient);
return new YarnClusterDescriptor(
flinkConfig,
yarnConfig,
yarnClient,
YarnClientYarnClusterlog.informationRetriever.create(yarnClient),
false);
}
@Override
public void startup() throws Exception {
//初始化配置信息
initiation();
//創建集群描述器
YarnClusterDescriptor descriptor = createYarnClusterDescriptor();
//設置啟動應用的配置信息:第一個參數args是啟動你的jar附帶的參數,第二個參數是啟動你的jar的main函數所在的類名完整名稱
ApplicationConfiguration applicationConfiguration = new ApplicationConfiguration(args,"run.main");
//配置任務的一些內存以及插槽
ClusterSpecification clusterSpecification = new ClusterSpecification.ClusterSpecificationBuilder()
.setMasterMemoryMB(512).setTaskManagerMemoryMB(1024)
.setSlotsPerTaskManager(1).createClusterSpecification();
//直接部署jar到Yarn上跑(提交任務並在Yarn上運行)
//每調用一次deployApplicationCluster就會多啟動一個任務在Yarn的列表中
ClusterClientProvider<ApplicationId> provider = descriptor.deployApplicationCluster(clusterSpecification, applicationConfiguration);
//獲得客戶端
ClusterClient<ApplicationId> clusterClient = provider.getClusterClient();
//Yarn的Id
applicationId = clusterClient.getClusterId().toString();
//訪問Flink Web界面的地址
webAddress = clusterClient.getWebInterfaceURL();
log.info("ApplicationId:{}",applicationId);
log.info("WebInterface:{}",webAddress);
}
}