在學習Spark過程中,資料中介紹的提交Spark Job的方式主要有三種:
第一種:
通過命令行的方式提交Job,使用spark 自帶的spark-submit工具提交,官網和大多數參考資料都是已這種方式提交的,提交命令示例如下:
./spark-submit --class com.learn.spark.SimpleApp --master yarn --deploy-mode client --driver-memory 2g --executor-memory 2g --executor-cores 3 ../spark-demo.jar
參數含義就不解釋了,請參考官網資料。
第二種:
提交方式是已JAVA API編程的方式提交,這種方式不需要使用命令行,直接可以在IDEA中點擊Run 運行包含Job的Main類就行,Spark 提供了以SparkLanuncher 作為唯一入口的API來實現。這種方式很方便(試想如果某個任務需要重復執行,但是又不會寫linux 腳本怎么搞?我想到的是以JAV API的方式提交Job, 還可以和Spring整合,讓應用在tomcat中運行),官網的示例:http://spark.apache.org/docs/latest/api/java/index.html?org/apache/spark/launcher/package-summary.html
根據官網的示例,通過JAVA API編程的方式提交有兩種方式:
第一種是調用SparkLanuncher實例的startApplication方法,但是這種方式在所有配置都正確的情況下使用運行都會失敗的,原因是startApplication方法會調用LauncherServer啟動一個進程與集群交互,這個操作貌似是異步的,所以可能結果是main主線程結束了這個進程都沒有起起來,導致運行失敗。解決辦法是調用new SparkLanuncher().startApplication后需要讓主線程休眠一定的時間后者是使用下面的例子:
package com.learn.spark; import org.apache.spark.launcher.SparkAppHandle; import org.apache.spark.launcher.SparkLauncher; import java.io.IOException; import java.util.HashMap; import java.util.concurrent.CountDownLatch; public class LanuncherAppV { public static void main(String[] args) throws IOException, InterruptedException { HashMap env = new HashMap(); //這兩個屬性必須設置 env.put("HADOOP_CONF_DIR", "/usr/local/hadoop/etc/overriterHaoopConf"); env.put("JAVA_HOME", "/usr/local/java/jdk1.8.0_151"); //可以不設置 //env.put("YARN_CONF_DIR",""); CountDownLatch countDownLatch = new CountDownLatch(1); //這里調用setJavaHome()方法后,JAVA_HOME is not set 錯誤依然存在 SparkAppHandle handle = new SparkLauncher(env) .setSparkHome("/usr/local/spark") .setAppResource("/usr/local/spark/spark-demo.jar") .setMainClass("com.learn.spark.SimpleApp") .setMaster("yarn") .setDeployMode("cluster") .setConf("spark.app.id", "11222") .setConf("spark.driver.memory", "2g") .setConf("spark.akka.frameSize", "200") .setConf("spark.executor.memory", "1g") .setConf("spark.executor.instances", "32") .setConf("spark.executor.cores", "3") .setConf("spark.default.parallelism", "10") .setConf("spark.driver.allowMultipleContexts", "true") .setVerbose(true).startApplication(new SparkAppHandle.Listener() { //這里監聽任務狀態,當任務結束時(不管是什么原因結束),isFinal()方法會返回true,否則返回false @Override public void stateChanged(SparkAppHandle sparkAppHandle) { if (sparkAppHandle.getState().isFinal()) { countDownLatch.countDown(); } System.out.println("state:" + sparkAppHandle.getState().toString()); } @Override public void infoChanged(SparkAppHandle sparkAppHandle) { System.out.println("Info:" + sparkAppHandle.getState().toString()); } }); System.out.println("The task is executing, please wait ...."); //線程等待任務結束 countDownLatch.await(); System.out.println("The task is finished!"); } }
注意:如果部署模式是cluster,但是代碼中有標准輸出的話將看不到,需要把結果寫到HDFS中,如果是client模式則可以看到輸出。
第二種方式是:通過SparkLanuncher.lanunch()方法獲取一個進程,然后調用進程的process.waitFor()方法等待線程返回結果,但是使用這種方式需要自己管理運行過程中的輸出信息,比較麻煩,好處是一切都在掌握之中,即獲取的輸出信息和通過命令提交的方式一樣,很詳細,實現如下:
package com.learn.spark; import org.apache.spark.launcher.SparkAppHandle; import org.apache.spark.launcher.SparkLauncher; import java.io.IOException; import java.util.HashMap; public class LauncherApp { public static void main(String[] args) throws IOException, InterruptedException { HashMap env = new HashMap(); //這兩個屬性必須設置 env.put("HADOOP_CONF_DIR","/usr/local/hadoop/etc/overriterHaoopConf"); env.put("JAVA_HOME","/usr/local/java/jdk1.8.0_151"); //env.put("YARN_CONF_DIR",""); SparkLauncher handle = new SparkLauncher(env) .setSparkHome("/usr/local/spark") .setAppResource("/usr/local/spark/spark-demo.jar") .setMainClass("com.learn.spark.SimpleApp") .setMaster("yarn") .setDeployMode("cluster") .setConf("spark.app.id", "11222") .setConf("spark.driver.memory", "2g") .setConf("spark.akka.frameSize", "200") .setConf("spark.executor.memory", "1g") .setConf("spark.executor.instances", "32") .setConf("spark.executor.cores", "3") .setConf("spark.default.parallelism", "10") .setConf("spark.driver.allowMultipleContexts","true") .setVerbose(true); Process process =handle.launch(); InputStreamReaderRunnable inputStreamReaderRunnable = new InputStreamReaderRunnable(process.getInputStream(), "input"); Thread inputThread = new Thread(inputStreamReaderRunnable, "LogStreamReader input"); inputThread.start(); InputStreamReaderRunnable errorStreamReaderRunnable = new InputStreamReaderRunnable(process.getErrorStream(), "error"); Thread errorThread = new Thread(errorStreamReaderRunnable, "LogStreamReader error"); errorThread.start(); System.out.println("Waiting for finish..."); int exitCode = process.waitFor(); System.out.println("Finished! Exit code:" + exitCode); } }
使用的自定義InputStreamReaderRunnable類實現如下:
package com.learn.spark; import java.io.BufferedReader; import java.io.IOException; import java.io.InputStream; import java.io.InputStreamReader; public class InputStreamReaderRunnable implements Runnable { private BufferedReader reader; private String name; public InputStreamReaderRunnable(InputStream is, String name) { this.reader = new BufferedReader(new InputStreamReader(is)); this.name = name; } public void run() { System.out.println("InputStream " + name + ":"); try { String line = reader.readLine(); while (line != null) { System.out.println(line); line = reader.readLine(); } reader.close(); } catch (IOException e) { e.printStackTrace(); } } }
第三種方式是通過yarn的rest api的方式提交(不太常用但在這里也介紹一下):
Post請求示例: * http://<rm http address:port>/ws/v1/cluster/apps
請求所帶的參數列表:
Item | Data Type | Description |
---|---|---|
application-id | string | The application id |
application-name | string | The application name |
queue | string | The name of the queue to which the application should be submitted |
priority | int | The priority of the application |
am-container-spec | object | The application master container launch context, described below |
unmanaged-AM | boolean | Is the application using an unmanaged application master |
max-app-attempts | int | The max number of attempts for this application |
resource | object | The resources the application master requires, described below |
application-type | string | The application type(MapReduce, Pig, Hive, etc) |
keep-containers-across-application-attempts | boolean | Should YARN keep the containers used by this application instead of destroying them |
application-tags | object | List of application tags, please see the request examples on how to speciy the tags |
log-aggregation-context | object | Represents all of the information needed by the NodeManager to handle the logs for this application |
attempt-failures-validity-interval | long | The failure number will no take attempt failures which happen out of the validityInterval into failure count |
reservation-id | string | Represent the unique id of the corresponding reserved resource allocation in the scheduler |
am-black-listing-requests | object | Contains blacklisting information such as “enable/disable AM blacklisting” and “disable failure threshold” |
轉載於:https://www.cnblogs.com/itboys/p/9998666.html