如何在Java應用中提交Spark任務?


最近看到有幾個Github友關注了Streaming的監控工程——Teddy,所以思來想去還是優化下代碼,不能讓別人看笑話啊。於是就想改一下之前覺得最丑陋的一個地方——任務提交。

本博客內容基於Spark2.2版本~在閱讀文章並想實際操作前,請確保你有:

  1. 一台配置好Spark和yarn的服務器
  2. 支持正常spark-submit --master yarn xxxx 的任務提交

老版本

老版本任務提交是基於 ** 啟動本地進程,執行腳本spark-submit xxx ** 的方式做的。其中一個關鍵的問題就是獲得提交Spark任務的Application-id,因為這個id是跟任務狀態的跟蹤有關系的。如果你的資源管理框架用的是yarn,應該知道每個運行的任務都有一個applicaiton_id,這個id的生成規則是:

appplication_時間戳_數字

老版本的spark通過修改SparkConf參數spark.app.id就可以手動指定id,新版本的代碼是直接讀取的taskBackend中的applicationId()方法,這個方法具體的實現是根據實現類來定的。在yarn中,是通過Yarn的YarnClusterSchedulerBackend實現的,具體的實現邏輯可以參考對應的鏈接。

感興趣的同學可以看一下,生成applicaiton_id的邏輯在hadoop-yarn工程的ContainerId中定義。

總結一句話就是,想要自定義id,甭想了!!!!

於是當時腦袋瓜不靈光的我,就想到那就等應用創建好了之后,直接寫到數據庫里面唄。怎么寫呢?

  1. 我事先生成一個自定義的id,當做參數傳遞到spark應用里面;
  2. 等spark初始化后,就可以通過sparkContext取得對應的application_id以及url
  3. 然后再driver連接數據庫,插入一條關聯關系

新版本

還是歸結於互聯網時代的信息大爆炸,我看到群友的聊天,知道了SparkLauncer這個東西,調查后發現他可以基於Java代碼自動提交Spark任務。SparkLauncher支持兩種模式:

  1. new SparkLauncher().launch() 直接啟動一個Process,效果跟以前一樣
  2. new SparkLauncher().startApplicaiton(監聽器) 返回一個SparkAppHandler,並(可選)傳入一個監聽器

當然是更傾向於第二種啦,因為好處很多:

  1. 自帶輸出重定向(Output,Error都有,支持寫到文件里面),超級爽的功能
  2. 可以自定義監聽器,當信息或者狀態變更時,都能進行操作(對我沒啥用)
  3. 返回的SparkAppHandler支持 暫停、停止、斷連、獲得AppId、獲得State等多種功能,我就想要這個!!!!

一步一步,代碼展示

首先創建一個最基本的Spark程序:

import org.apache.spark.sql.SparkSession;
import java.util.ArrayList;
import java.util.List;

public class HelloWorld {
    public static void main(String[] args) throws InterruptedException {
        SparkSession spark = SparkSession
                .builder()
                //.master("yarn")
                //.appName("hello-wrold")
                //.config("spark.some.config.option", "some-value")
                .getOrCreate();

        List<Person> persons = new ArrayList<>();

        persons.add(new Person("zhangsan", 22, "male"));
        persons.add(new Person("lisi", 25, "male"));
        persons.add(new Person("wangwu", 23, "female"));


        spark.createDataFrame(persons, Person.class).show(false);

        spark.close();

    }
}

然后創建SparkLauncher類:

import org.apache.spark.launcher.SparkAppHandle;
import org.apache.spark.launcher.SparkLauncher;

import java.io.IOException;

public class Launcher {
    public static void main(String[] args) throws IOException {
        SparkAppHandle handler = new SparkLauncher()
                .setAppName("hello-world")
                .setSparkHome(args[0])
                .setMaster(args[1])
                .setConf("spark.driver.memory", "2g")
                .setConf("spark.executor.memory", "1g")
                .setConf("spark.executor.cores", "3")
                .setAppResource("/home/xinghailong/launcher/launcher_test.jar")
                .setMainClass("HelloWorld")
                .addAppArgs("I come from Launcher")
                .setDeployMode("cluster")
                .startApplication(new SparkAppHandle.Listener(){
                    @Override
                    public void stateChanged(SparkAppHandle handle) {
                        System.out.println("**********  state  changed  **********");
                    }

                    @Override
                    public void infoChanged(SparkAppHandle handle) {
                        System.out.println("**********  info  changed  **********");
                    }
                });


        while(!"FINISHED".equalsIgnoreCase(handler.getState().toString()) && !"FAILED".equalsIgnoreCase(handler.getState().toString())){
            System.out.println("id    "+handler.getAppId());
            System.out.println("state "+handler.getState());

            try {
                Thread.sleep(10000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}

然后打包工程,打包過程可以參考之前的博客:
http://www.cnblogs.com/xing901022/p/7891867.html

打包完成后上傳到部署Spark的服務器上。由於SparkLauncher所在的類引用了SparkLauncher,所以還需要把這個jar也上傳到服務器上。

[xinghailong@hnode10 launcher]$ ls
launcher_test.jar  spark-launcher_2.11-2.2.0.jar
[xinghailong@hnode10 launcher]$ pwd
/home/xinghailong/launcher

由於SparkLauncher需要指定SPARK_HOME,因此如果你的機器可以執行spark-submit,那么就看一下spark-submit里面,SPARK_HOME是在哪

[xinghailong@hnode10 launcher]$ which spark2-submit
/var/lib/hadoop-hdfs/bin/spark2-submit

最后幾行就能看到:

export SPARK2_HOME=/var/lib/hadoop-hdfs/app/spark

# disable randomized hash for string in Python 3.3+
export PYTHONHASHSEED=0

exec "${SPARK2_HOME}"/bin/spark-class org.apache.spark.deploy.SparkSubmit "$@"

綜上,我們需要的是:

  1. 一個自定義的Jar,里面包含spark應用和SparkLauncher類
  2. 一個SparkLauncher的jar,spark-launcher_2.11-2.2.0.jar 版本根據你自己的來就行
  3. 一個當前目錄的路徑
  4. 一個SARK_HOME環境變量指定的目錄

然后執行命令啟動測試:

java -Djava.ext.dirs=/home/xinghailong/launcher -cp launcher_test.jar Launcher /var/lib/hadoop-hdfs/app/spark yarn

說明:

  1. -Djava.ext.dirs 設置當前目錄為java類加載的目錄
  2. 傳入兩個參數,一個是SPARK_HOME;一個是啟動模式

觀察刪除發現成功啟動運行了:

id    null
state UNKNOWN
Mar 10, 2018 12:00:52 PM org.apache.spark.launcher.OutputRedirector redirect
INFO: 18/03/10 12:00:52 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
**********  state  changed  **********
...省略一大堆拷貝jar的日志
**********  info  changed  **********
**********  state  changed  **********
Mar 10, 2018 12:00:55 PM org.apache.spark.launcher.OutputRedirector redirect
INFO: 18/03/10 12:00:55 INFO yarn.Client: Application report for application_1518263195995_37615 (state: ACCEPTED)
... 省略一堆重定向的日志
application_1518263195995_37615 (state: ACCEPTED)
id    application_1518263195995_37615
state SUBMITTED
Mar 10, 2018 12:01:00 PM org.apache.spark.launcher.OutputRedirector redirect
INFO: 18/03/10 12:01:00 INFO yarn.Client: Application report for application_1518263195995_37615 (state: RUNNING)
**********  state  changed  **********
... 省略一堆重定向的日志
INFO: 	 user: hdfs
**********  state  changed  **********
Mar 10, 2018 12:01:08 PM org.apache.spark.launcher.OutputRedirector redirect
INFO: 18/03/10 12:01:08 INFO util.ShutdownHookManager: Shutdown hook called
Mar 10, 2018 12:01:08 PM org.apache.spark.launcher.OutputRedirector redirect
INFO: 18/03/10 12:01:08 INFO util.ShutdownHookManager: Deleting directory /tmp/spark-f07e0213-61fa-4710-90f5-2fd2030e0701

總結

這樣就實現了基於Java應用提交Spark任務,並獲得其Appliation_id和狀態進行定位跟蹤的需求了。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM