利用SparkLauncher在代碼中調用Spark作業


背景

項目需要處理很多文件,而一些文件很大有幾十GB,因此考慮對於這種文件,專門編寫Spark程序處理,為了程序的統一處理,需要在代碼中調用Spark作業來處理大文件。

實現方案

經過調研,發現可以使用Spark提供的SparkLauncher類進行Spark作業的提交,這個類的使用有很多參數需要注意,經過項目驗證后,本文給出相對完整的使用方式以及說明

首先項目中要添加pom依賴,注意加上自己的版本

<dependency>
	<groupId>org.apache.spark</groupId>
     <artifactId>spark-launcher_2.11</artifactId>
</dependency>

其次,可以把Spark作業本身的一些參數放在配置文件里,靈活修改,我這里是配置kerberos安全認證的CDH集群,Spark作業提交時使用的模式為yarn-client,主要使用到了一下配置,配置中的路徑這里是作為例子隨便填的,實際按照自己環境填寫,另外,整個應用是在CDH客戶端節點執行的。每個配置項都有說明:

#spark application use
#driver的日志輸出
driverLogDir=/root/test/logs/
#kerberos認證keytab文件
keytab=/root/test/dw_hbkal.keytab
# keyberos認證主體
principal=dw_hbkal
# yarn集群上運行spark作業
master=yarn
# yarn-client模式
deployMode=client
# spark-executor個數和內存配置
minExecutors=16
maxExecutors=16
executorMemory=1g
# driver內存配置
driverMemory=256M
# spark-executor使用的core數量配置
executorCores=2
# spark作業的主類
mainClass=com.unionpay.css.fcmp.compare.cp.spark.nonprikey.FileCompare
# spark作業的jar包
jarPath=/root/test/my-spark-job-1.0-SNAPSHOT.jar
# spark作業依賴的第三方jar
extjars=/root/test/mysql-connector-java-8.0.27.jar,/root/test/jedis-2.8.1.jar
# CHD客戶端上存放的集群配置文件,表明向哪個集群提交spark作業
HADOOP_CONF_DIR=/root/CDH/bjc/CDH/etc/conf/hadoop-conf
JAVA_HOME=/usr/java/jdk1.8.0_141
SPARK_HOME=/opt/cloudera/parcels/CDH/lib/spark
# spark作業執行的yarn隊列
yarnQueue=mysparkqueue

上述配置可以在代碼中讀取,並結合SparkLauncher一起使用,可以參看以下例子代碼:

//負責發起spark作業
public class SparkJobService{

    private static final Logger logger = LoggerFactory.getLogger(SparkJobService.class);
    static Config config;
    //spark任務參數
    static String keytabPath;
    static String principal ;
    static String master;
    static String deployMode;
    static String minExecutods;
    static String maxExecutors;
    static String executorMemory;
    static String driverMemory;
    static String executorCores;
    static String mainClass;
    static String jarPath;
    static String extjars;
    static String yarnQueue;
    static String HADOOP_CONF_DIR;
    static String JAVA_HOME;
    static String SPARK_HOME;
    static String driverLogDir;

    static {
        config = new Config("job.properties");
        keytabPath = config.getString("keytab");
        principal = config.getString("principal");
        master = config.getString("master");
        deployMode = config.getString("deployMode");
        minExecutods = config.getString("minExecutods");
        maxExecutors = config.getString("maxExecutors");
        executorMemory = config.getString("executorMemory");
        driverMemory = config.getString("driverMemory");
        executorCores = config.getString("executorCores");
        mainClass = config.getString("mainClass");
        jarPath = config.getString("jarPath");
        extjars = config.getString("extjars");
        yarnQueue = config.getString("yarnQueue");
        HADOOP_CONF_DIR=config.getString("HADOOP_CONF_DIR");
        JAVA_HOME = config.getString("JAVA_HOME");
        SPARK_HOME = config.getString("SPARK_HOME");
        driverLogDir = config.getString("driverLogDir");
    }

    public static void main(String[] args) {
        try{
            //spark任務設置
            //如果在系統環境變量中添加了,可以不加
            HashMap<String,String> env = new HashMap();
            env.put("HADOOP_CONF_DIR",HADOOP_CONF_DIR);
            env.put("JAVA_HOME",JAVA_HOME);
            env.put("SPARK_HOME",SPARK_HOME);
			
			String jobArgs1 = "test1";
			String jobArgs2 = "test2"
			//......

            SparkLauncher launcher = new SparkLauncher(env).addSparkArg("--keytab",keytabPath).addSparkArg("--principal",principal).setMaster(master).setDeployMode(deployMode)
                    .setConf("spark.dynamicAllocation.minExecutors",minExecutods).setConf("spark.dynamicAllocation.maxExecutors",maxExecutors).setConf("spark.driver.memory",driverMemory).setConf("spark.executor.memory",executorMemory).setConf("spark.executor.cores",executorCores)
                    .setConf("spark.yarn.queue",yarnQueue)
                    .setAppResource(jarPath).setMainClass(mainClass).addAppArgs(jobArgs1,jobArgs2);

            //spark job中依賴jar,如mysql-connector.jar...
            for(String jarName : extjars.split(",")){
                launcher.addJar(jarName);
            }
            launcher.setAppName("SparkJob");
            //spark本地driver日志
            launcher.redirectError(new File(driverLogDir + "spark_driver.log"));
            final String[] jobId = new String[]{""};
            //用來等待spark作業結束
            CountDownLatch latch = new CountDownLatch(1);
            SparkAppHandle sparkAppHandle = launcher.setVerbose(false).startApplication(new SparkAppHandle.Listener() {
                @Override
                public void stateChanged(SparkAppHandle sparkAppHandle) {
                    SparkAppHandle.State state = sparkAppHandle.getState();
                    switch (state){
                        case SUBMITTED:
                            logger.info("提交spark作業成功");
							//yarn上spark作業的jobId
                            jobId[0] = sparkAppHandle.getAppId();
                            break;
                        case FINISHED:
                            logger.info("spark job success");
                            break;
                        case FAILED:
                        case KILLED:
                        case LOST:
                            logger.info("spark job failed");
                    }
                    if (state.isFinal())
                        latch.countDown();
                }

                @Override
                public void infoChanged(SparkAppHandle sparkAppHandle) {
                }
            });
			//等待Spark作業執行結束
            latch.await();
        }catch (Exception e){
            logger.error("error",e);
        }finally {
			//...
        }
    }
}

上述代碼中,尤其注意spark作業參數是怎么配置的,不同的參數使用的是不同的方法調用,一些參數使用addSparkArg方法添加,一些使用setConf添加。特別提示,如果是傳給spark應用本身的參數,需要使用addAppArgs方法傳遞,該方法形參為變長參數。

另外,代碼中設置了spark本地driver日志路徑,這樣可以方便產看日志。通過SparkAppHandle的stateChanged回調函數,獲得spark作業的執行狀態,本例子中需要等待spark作業執行結束,因此提交作業之后,通過CountDownLatch機制來等待,在stateChanged中,當發現spark作業為結束狀態,計數器減一,整個程序結束。

以上便是一種代碼中調用Spark作業的一種實現方案,有問題可以一起交流。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM