背景
項目需要處理很多文件,而一些文件很大有幾十GB,因此考慮對於這種文件,專門編寫Spark程序處理,為了程序的統一處理,需要在代碼中調用Spark作業來處理大文件。
實現方案
經過調研,發現可以使用Spark提供的SparkLauncher類進行Spark作業的提交,這個類的使用有很多參數需要注意,經過項目驗證后,本文給出相對完整的使用方式以及說明
首先項目中要添加pom依賴,注意加上自己的版本
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-launcher_2.11</artifactId>
</dependency>
其次,可以把Spark作業本身的一些參數放在配置文件里,靈活修改,我這里是配置kerberos安全認證的CDH集群,Spark作業提交時使用的模式為yarn-client,主要使用到了一下配置,配置中的路徑這里是作為例子隨便填的,實際按照自己環境填寫,另外,整個應用是在CDH客戶端節點執行的。每個配置項都有說明:
#spark application use
#driver的日志輸出
driverLogDir=/root/test/logs/
#kerberos認證keytab文件
keytab=/root/test/dw_hbkal.keytab
# keyberos認證主體
principal=dw_hbkal
# yarn集群上運行spark作業
master=yarn
# yarn-client模式
deployMode=client
# spark-executor個數和內存配置
minExecutors=16
maxExecutors=16
executorMemory=1g
# driver內存配置
driverMemory=256M
# spark-executor使用的core數量配置
executorCores=2
# spark作業的主類
mainClass=com.unionpay.css.fcmp.compare.cp.spark.nonprikey.FileCompare
# spark作業的jar包
jarPath=/root/test/my-spark-job-1.0-SNAPSHOT.jar
# spark作業依賴的第三方jar
extjars=/root/test/mysql-connector-java-8.0.27.jar,/root/test/jedis-2.8.1.jar
# CHD客戶端上存放的集群配置文件,表明向哪個集群提交spark作業
HADOOP_CONF_DIR=/root/CDH/bjc/CDH/etc/conf/hadoop-conf
JAVA_HOME=/usr/java/jdk1.8.0_141
SPARK_HOME=/opt/cloudera/parcels/CDH/lib/spark
# spark作業執行的yarn隊列
yarnQueue=mysparkqueue
上述配置可以在代碼中讀取,並結合SparkLauncher一起使用,可以參看以下例子代碼:
//負責發起spark作業
public class SparkJobService{
private static final Logger logger = LoggerFactory.getLogger(SparkJobService.class);
static Config config;
//spark任務參數
static String keytabPath;
static String principal ;
static String master;
static String deployMode;
static String minExecutods;
static String maxExecutors;
static String executorMemory;
static String driverMemory;
static String executorCores;
static String mainClass;
static String jarPath;
static String extjars;
static String yarnQueue;
static String HADOOP_CONF_DIR;
static String JAVA_HOME;
static String SPARK_HOME;
static String driverLogDir;
static {
config = new Config("job.properties");
keytabPath = config.getString("keytab");
principal = config.getString("principal");
master = config.getString("master");
deployMode = config.getString("deployMode");
minExecutods = config.getString("minExecutods");
maxExecutors = config.getString("maxExecutors");
executorMemory = config.getString("executorMemory");
driverMemory = config.getString("driverMemory");
executorCores = config.getString("executorCores");
mainClass = config.getString("mainClass");
jarPath = config.getString("jarPath");
extjars = config.getString("extjars");
yarnQueue = config.getString("yarnQueue");
HADOOP_CONF_DIR=config.getString("HADOOP_CONF_DIR");
JAVA_HOME = config.getString("JAVA_HOME");
SPARK_HOME = config.getString("SPARK_HOME");
driverLogDir = config.getString("driverLogDir");
}
public static void main(String[] args) {
try{
//spark任務設置
//如果在系統環境變量中添加了,可以不加
HashMap<String,String> env = new HashMap();
env.put("HADOOP_CONF_DIR",HADOOP_CONF_DIR);
env.put("JAVA_HOME",JAVA_HOME);
env.put("SPARK_HOME",SPARK_HOME);
String jobArgs1 = "test1";
String jobArgs2 = "test2"
//......
SparkLauncher launcher = new SparkLauncher(env).addSparkArg("--keytab",keytabPath).addSparkArg("--principal",principal).setMaster(master).setDeployMode(deployMode)
.setConf("spark.dynamicAllocation.minExecutors",minExecutods).setConf("spark.dynamicAllocation.maxExecutors",maxExecutors).setConf("spark.driver.memory",driverMemory).setConf("spark.executor.memory",executorMemory).setConf("spark.executor.cores",executorCores)
.setConf("spark.yarn.queue",yarnQueue)
.setAppResource(jarPath).setMainClass(mainClass).addAppArgs(jobArgs1,jobArgs2);
//spark job中依賴jar,如mysql-connector.jar...
for(String jarName : extjars.split(",")){
launcher.addJar(jarName);
}
launcher.setAppName("SparkJob");
//spark本地driver日志
launcher.redirectError(new File(driverLogDir + "spark_driver.log"));
final String[] jobId = new String[]{""};
//用來等待spark作業結束
CountDownLatch latch = new CountDownLatch(1);
SparkAppHandle sparkAppHandle = launcher.setVerbose(false).startApplication(new SparkAppHandle.Listener() {
@Override
public void stateChanged(SparkAppHandle sparkAppHandle) {
SparkAppHandle.State state = sparkAppHandle.getState();
switch (state){
case SUBMITTED:
logger.info("提交spark作業成功");
//yarn上spark作業的jobId
jobId[0] = sparkAppHandle.getAppId();
break;
case FINISHED:
logger.info("spark job success");
break;
case FAILED:
case KILLED:
case LOST:
logger.info("spark job failed");
}
if (state.isFinal())
latch.countDown();
}
@Override
public void infoChanged(SparkAppHandle sparkAppHandle) {
}
});
//等待Spark作業執行結束
latch.await();
}catch (Exception e){
logger.error("error",e);
}finally {
//...
}
}
}
上述代碼中,尤其注意spark作業參數是怎么配置的,不同的參數使用的是不同的方法調用,一些參數使用addSparkArg方法添加,一些使用setConf添加。特別提示,如果是傳給spark應用本身的參數,需要使用addAppArgs方法傳遞,該方法形參為變長參數。
另外,代碼中設置了spark本地driver日志路徑,這樣可以方便產看日志。通過SparkAppHandle的stateChanged回調函數,獲得spark作業的執行狀態,本例子中需要等待spark作業執行結束,因此提交作業之后,通過CountDownLatch機制來等待,在stateChanged中,當發現spark作業為結束狀態,計數器減一,整個程序結束。
以上便是一種代碼中調用Spark作業的一種實現方案,有問題可以一起交流。