spark在集群上運行


1.spark在集群上運行應用的詳細過程

(1)用戶通過spark-submit腳本提交應用
(2)spark-submit腳本啟動驅動器程序,調用用戶定義的main()方法
(3)驅動器程序與集群管理器通信,申請資源以啟動執行器節點
(4)集群管理器為驅動器程序啟動執行器節點
(5)驅動器進程執行用戶應用中的操作。根據程序中所定義的對RDD的轉化操作和行動操作,驅動器節點把工作以任務的形式發送到執行器進程
(6)任務在執行器程序中進行計算並保存結果
(7)如果驅動器程序的main()方法退出,或者調用了SparkContext.stop(),驅動器程序會終止執行器進程,並且通過集群管理器釋放資源
2.集群上運行的app和本地運行的區別
集群上運行的app,還沒有指定master,而本地運行的app,指定了運行的主機master是“local”,即本地主機。以下是spark WordCount的程序實例,我們沒有指定 master,需要提交到集群上運行。
import scala.Tuple2;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;

import java.util.Arrays;
import java.util.List;
import java.util.regex.Pattern;

public final class WordCount {
    private static final Pattern SPACE = Pattern.compile(" ");

    public static void main(String[] args) throws Exception {

        SparkConf sparkConf = new SparkConf().setAppName("JavaWordCount");//關鍵是沒有指定master,需要提交到集群上運行
        JavaSparkContext ctx = new JavaSparkContext(sparkConf);
        JavaRDD<String> lines = ctx.textFile("C:\\test.txt", 1);

        JavaRDD<String> words = lines.flatMap(new FlatMapFunction<String, String>() {
            public Iterable<String> call(String s) {
                return Arrays.asList(SPACE.split(s));
            }
        });

        JavaPairRDD<String, Integer> ones = words.mapToPair(new PairFunction<String, String, Integer>() {
            public Tuple2<String, Integer> call(String s) {
                return new Tuple2<String, Integer>(s, 1);
            }
        });

        JavaPairRDD<String, Integer> counts = ones.reduceByKey(new Function2<Integer, Integer, Integer>() {
            public Integer call(Integer i1, Integer i2) {
                return i1 + i2;
            }
        });

        List<Tuple2<String, Integer>> output = counts.collect();
        for (Tuple2<?,?> tuple : output) {
            System.out.println(tuple._1() + ": " + tuple._2());
        }
        ctx.stop();
    }
}
3.使用spark-submit部署應用
(1)spark-submit的一般形式
bin/spark-submit [options] <app jar | python file> [app options] 
<app jar | python file>   表示包含應用入口的JAR包或者Python腳本
[app options]   是傳給你的應用的選項
spark-submit的一些常見標記
標記 描述
--master 表示要連接的集群管理器
--deploy-mode 選擇在本地啟動驅動器程序,還是在集群中的一台工作節點機器上啟動。在客戶端模式下,spark-submit會將驅動器程序運行在spark-submit被調用的這台機器上。在集群模式下,驅動器程序會被傳輸並被執行於集群的一個工作節點上,默認是本地模式。
--class 運行Java或者Scala程序應用的主類
--name 應用的顯示名,會顯示在spark的網頁用戶界面中
--jars 需要上傳並放在應用的CLASSPATH中的JAR包的雷彪。如果應用依賴於少量第三方的jar包,可以把它們放在這個參數中
--files 需要放在應用工作目錄中的文件雷彪。這個參數一般用來放需要分發到各節點的數據文件
--py-files 需添加到PYTHONPATH中的文件的雷彪。其中可以包含.py /.egg以及.zip文件
--executor-memory 執行器進程使用的內存量,以字節為單位,可以使用后綴指定更大的單位,比如512M或者15g
--driver-memory 驅動器進程使用的內存量,以字節為單位。可以使用后綴指定更大的單位,比如512m或者15g
 
使用各種選項調用spark-submit
./bin/spark-submit 
--master spark:// hostname:7077 
--deploy-mode cluster
--class com.databricks.examples.SparkExample
--name "Example program"
--jars dep1.jar,dep2.jar,dep3.jar
--total-executor-core 300
--executor-memory 10g
4.選擇合適的集群管理器
(1)如果是從零開始,可以先選擇獨立集群管理器。獨立模式安裝起來最簡單,而且如果你只是使用spark的話,獨立集群管理器提供與其他集群管理器完全一樣的全部功能。
(2)如果你要在使用spark的同時使用其他應用,或者是要用到更豐富的資源調度功能(例如隊列),那么YARN和Mesos都能滿足你的需求。而在這兩者中,對於大多數的hadoop發行版來說,一般YARN已經預裝好了。
(3)Mesos相對於YARN和獨立模式的一大優點在於其細粒度共享的選項,該選項可以將類似Spark shell這樣的交互式應用中的不同命令分配到不同的CPU上。因此這對於多用戶同時運行交互式shell的用例更有用處。
(4)在任何時候,最好把Spark運行在運行HDFS的節點還是那個,這樣能快速訪問存儲。你可以自行在同樣的節點上安裝Mesos或獨立集群管理器。如果使用YARN的話,大多數發行版已經把YARN和HDFS安裝在了一起。
 
 
 
 
 
 
 
 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM