spark提交任務的兩種的方法


  在學習Spark過程中,資料中介紹的提交Spark Job的方式主要有兩種(我所知道的):

第一種:

   通過命令行的方式提交Job,使用spark 自帶的spark-submit工具提交,官網和大多數參考資料都是已這種方式提交的,提交命令示例如下:
./spark-submit --class com.learn.spark.SimpleApp --master yarn --deploy-mode client --driver-memory 2g --executor-memory 2g --executor-cores 3 ../spark-demo.jar
參數含義就不解釋了,請參考官網資料。
 第二種:

   提交方式是已JAVA API編程的方式提交,這種方式不需要使用命令行,直接可以在IDEA中點擊Run 運行包含Job的Main類就行,Spark 提供了以SparkLanuncher 作為唯一入口的API來實現。這種方式很方便(試想如果某個任務需要重復執行,但是又不會寫linux 腳本怎么搞?我想到的是以JAV API的方式提交Job, 還可以和Spring整合,讓應用在tomcat中運行),官網的示例:http://spark.apache.org/docs/latest/api/java/index.html?org/apache/spark/launcher/package-summary.html

根據官網的示例,通過JAVA API編程的方式提交有兩種方式:

        第一種是調用SparkLanuncher實例的startApplication方法,但是這種方式在所有配置都正確的情況下使用運行都會失敗的,原因是startApplication方法會調用LauncherServer啟動一個進程與集群交互,這個操作貌似是異步的,所以可能結果是main主線程結束了這個進程都沒有起起來,導致運行失敗。解決辦法是調用new SparkLanuncher().startApplication后需要讓主線程休眠一定的時間后者是使用下面的例子:

 1 package com.learn.spark; 
 2 
 3 import org.apache.spark.launcher.SparkAppHandle; 
 4 import org.apache.spark.launcher.SparkLauncher; 
 5 
 6 import java.io.IOException; 
 7 import java.util.HashMap; 
 8 import java.util.concurrent.CountDownLatch; 
 9 
10 public class LanuncherAppV { 
11     public static void main(String[] args) throws IOException, InterruptedException { 
12 
13 
14         HashMap env = new HashMap(); 
15         //這兩個屬性必須設置 
16         env.put("HADOOP_CONF_DIR", "/usr/local/hadoop/etc/overriterHaoopConf"); 
17         env.put("JAVA_HOME", "/usr/local/java/jdk1.8.0_151"); 
18         //可以不設置 
19         //env.put("YARN_CONF_DIR",""); 
20         CountDownLatch countDownLatch = new CountDownLatch(1); 
21         //這里調用setJavaHome()方法后,JAVA_HOME is not set 錯誤依然存在 
22         SparkAppHandle handle = new SparkLauncher(env) 
23         .setSparkHome("/usr/local/spark") 
24         .setAppResource("/usr/local/spark/spark-demo.jar") 
25         .setMainClass("com.learn.spark.SimpleApp") 
26         .setMaster("yarn") 
27         .setDeployMode("cluster") 
28         .setConf("spark.app.id", "11222") 
29         .setConf("spark.driver.memory", "2g") 
30         .setConf("spark.akka.frameSize", "200") 
31         .setConf("spark.executor.memory", "1g") 
32         .setConf("spark.executor.instances", "32") 
33         .setConf("spark.executor.cores", "3") 
34         .setConf("spark.default.parallelism", "10") 
35         .setConf("spark.driver.allowMultipleContexts", "true") 
36         .setVerbose(true).startApplication(new SparkAppHandle.Listener() { 
37         //這里監聽任務狀態,當任務結束時(不管是什么原因結束),isFinal()方法會返回true,否則返回false 
38          @Override 
39         public void stateChanged(SparkAppHandle sparkAppHandle) { 
40             if (sparkAppHandle.getState().isFinal()) { 
41                 countDownLatch.countDown(); 
42             } 
43             System.out.println("state:" + sparkAppHandle.getState().toString()); 
44         } 
45 
46 
47         @Override 
48         public void infoChanged(SparkAppHandle sparkAppHandle) { 
49             System.out.println("Info:" + sparkAppHandle.getState().toString()); 
50         } 
51     }); 
52     System.out.println("The task is executing, please wait ...."); 
53     //線程等待任務結束 
54     countDownLatch.await(); 
55     System.out.println("The task is finished!"); 
56 
57 
58     } 
59 } 

 注意:如果部署模式是cluster,但是代碼中有標准輸出的話將看不到,需要把結果寫到HDFS中,如果是client模式則可以看到輸出。

 

第二種方式是:通過SparkLanuncher.lanunch()方法獲取一個進程,然后調用進程的process.waitFor()方法等待線程返回結果,但是使用這種方式需要自己管理運行過程中的輸出信息,比較麻煩,好處是一切都在掌握之中,即獲取的輸出信息和通過命令提交的方式一樣,很詳細,實現如下:

 
        
 1 package com.learn.spark; 
 2 
 3 import org.apache.spark.launcher.SparkAppHandle; 
 4 import org.apache.spark.launcher.SparkLauncher; 
 5 
 6 import java.io.IOException; 
 7 import java.util.HashMap; 
 8 
 9 public class LauncherApp { 
10 
11 public static void main(String[] args) throws IOException, InterruptedException { 
12 
13     HashMap env = new HashMap(); 
14     //這兩個屬性必須設置 
15     env.put("HADOOP_CONF_DIR","/usr/local/hadoop/etc/overriterHaoopConf"); 
16     env.put("JAVA_HOME","/usr/local/java/jdk1.8.0_151"); 
17     //env.put("YARN_CONF_DIR",""); 
18 
19     SparkLauncher handle = new SparkLauncher(env) 
20         .setSparkHome("/usr/local/spark") 
21         .setAppResource("/usr/local/spark/spark-demo.jar") 
22         .setMainClass("com.learn.spark.SimpleApp") 
23         .setMaster("yarn") 
24         .setDeployMode("cluster") 
25         .setConf("spark.app.id", "11222") 
26         .setConf("spark.driver.memory", "2g") 
27         .setConf("spark.akka.frameSize", "200") 
28         .setConf("spark.executor.memory", "1g") 
29         .setConf("spark.executor.instances", "32") 
30         .setConf("spark.executor.cores", "3") 
31         .setConf("spark.default.parallelism", "10") 
32         .setConf("spark.driver.allowMultipleContexts","true") 
33         .setVerbose(true); 
34 
35 
36     Process process =handle.launch(); 
37     InputStreamReaderRunnable inputStreamReaderRunnable = new InputStreamReaderRunnable(process.getInputStream(), "input"); 
38     Thread inputThread = new Thread(inputStreamReaderRunnable, "LogStreamReader input"); 
39     inputThread.start(); 
40 
41     InputStreamReaderRunnable errorStreamReaderRunnable = new InputStreamReaderRunnable(process.getErrorStream(), "error"); 
42     Thread errorThread = new Thread(errorStreamReaderRunnable, "LogStreamReader error"); 
43     errorThread.start(); 
44 
45     System.out.println("Waiting for finish..."); 
46     int exitCode = process.waitFor(); 
47     System.out.println("Finished! Exit code:" + exitCode); 
48 
49     } 
50 }
 
        
使用的自定義InputStreamReaderRunnable類實現如下:
 1 package com.learn.spark; 
 2 
 3 import java.io.BufferedReader; 
 4 import java.io.IOException; 
 5 import java.io.InputStream; 
 6 import java.io.InputStreamReader; 
 7 
 8 public class InputStreamReaderRunnable implements Runnable { 
 9 
10   private BufferedReader reader; 
11 
12   private String name; 
13 
14   public InputStreamReaderRunnable(InputStream is, String name) { 
15     this.reader = new BufferedReader(new InputStreamReader(is)); 
16     this.name = name; 
17   } 
18 
19   public void run() {
20  
21     System.out.println("InputStream " + name + ":"); 
22     try { 
23         String line = reader.readLine(); 
24         while (line != null) { 
25            System.out.println(line); 
26            line = reader.readLine(); 
27         } 
28         reader.close(); 
29       } catch (IOException e) { 
30         e.printStackTrace(); 
31       } 
32    } 
33 } 
 
        



 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM