Spark版本:1.6.2
spark-submit提供了在所有集群平台提交應用的統一接口,你不需要因為平台的遷移改變配置。Spark支持三種集群:Standalone、Apache Mesos和Hadoop Yarn。
綁定應用程序依賴庫
如果你的應用程序依賴其他項目,需要將其一起打包,打包時需要包括依賴的第三方庫。sbt和maven都有裝配插件,可以指定hadoop和spark版本,而不將其打入jar包中,因為hadoop和spark的庫由集群環境提供。然后通過spark安裝目錄下的spark-submit工具提交你的應用程序。
對於python程序,需要添加--py-files參數,若有多個Python文件,推薦將其打包zip或egg,然后執行。
spark-submit提交應用
spark-submit支持對三種集群提交應用,主要語法如下:
./bin/spark-submit \
--class <main-class> \ --master <master-url> \ --deploy-mode <deploy-mode> \ --conf <key>=<value> \ ... # other options <application-jar> \ [application-arguments]
參數詳解:
- --class: 應用程序的入口,例如
org.apache.spark.example.SpariPi
- --master: 指定集群類型,例如local(本地)、spark://master:7077(stanalone模式)、yarn-client
- --deploy-mode: 是否將Driver部署到worker節點,默認是在client
- --conf: 配置spark環境,在引號中使用key=value形式
- appliaction-jar: 指定應用程序的jar包
- application-arguments: 應用程序的參數
還有一些針對各個集群平台的非通用的設置,例如使用Spark standalone cluster時,可以配置--supervise
參數,確保driver在返回值為非零時,自動重啟。下面是一些常用的配置用例:
# Run application locally on 8 cores
./bin/spark-submit \
--class org.apache.spark.examples.SparkPi \ --master local[8] \ /path/to/examples.jar \ 100 # Run on a Spark standalone cluster in client deploy mode ./bin/spark-submit \ --class org.apache.spark.examples.SparkPi \ --master spark://207.184.161.138:7077 \ --executor-memory 20G \ --total-executor-cores 100 \ /path/to/examples.jar \ 1000 # Run on a Spark standalone cluster in cluster deploy mode with supervise ./bin/spark-submit \ --class org.apache.spark.examples.SparkPi \ --master spark://207.184.161.138:7077 \ --deploy-mode cluster \ --supervise \ --executor-memory 20G \ --total-executor-cores 100 \ /path/to/examples.jar \ 1000 # Run on a YARN cluster export HADOOP_CONF_DIR=XXX ./bin/spark-submit \ --class org.apache.spark.examples.SparkPi \ --master yarn \ --deploy-mode cluster \ # can be client for client mode --executor-memory 20G \ --num-executors 50 \ /path/to/examples.jar \ 1000 # Run a Python application on a Spark standalone cluster ./bin/spark-submit \ --master spark://207.184.161.138:7077 \ examples/src/main/python/pi.py \ 1000 # Run on a Mesos cluster in cluster deploy mode with supervise ./bin/spark-submit \ --class org.apache.spark.examples.SparkPi \ --master mesos://207.184.161.138:7077 \ --deploy-mode cluster \ --supervise \ --executor-memory 20G \ --total-executor-cores 100 \ http://path/to/examples.jar \ 1000
Master URLs
下面對spark-submit的--master參數進行介紹,主要包含以下幾種類型:
Master URL | 簡介 |
---|---|
local | 在本地使用一個worker線程運行spark |
local[K] | 在本地使用K個worker線程運行spark |
local[*] | 在本地運行CPU核心數個worker線程 |
spark://HOST: PORT | 連接到Spark standalone集群的master節點,默認port是7077 |
mesos://HOST: PORT | 連接到mesos集群,默認port是5050 |
yarn | 連接到yarn集群,通過--deploy-mode指定yarn-client和yarn-cluster兩種模式。集群的位置通過HADOOP_CONF_DIR或YARN_CONF_DIR變量配置 |
通過文件加載配置
Spark可以通過配置文件或應用代碼、或者spark-submit參數加載相關的配置。默認情況下,spark讀取conf/spark-defaults.conf配置。默認的spark配置參見下一文檔。
若是通過代碼設置spark.master參數,則--master參數會被忽略。一般來說,可以通過SparkConf配置的屬性優先級最高,其次是spark-submit的屬性,最后是配置文件。代碼優先級 > spark-submit參數 > 配置文件。
先進的依賴管理
spark-submit的**--jars**選項會根據集群不同選擇不同的處理策略。spark支持以下幾種URL模式,並使用不同策略:
- file: 絕對的文件路徑,各個worker通過http服務從driver節點copy文件;
- hdfs:http: https ftp: 通過相應的協議拉取jar文件到本地;
- local: 這種URL代表在每個worker的本地路徑下都已經存在該文件,不會觸發網絡IO
由於每個worker都會拷貝文件到本地,如何清理是個問題。yarn會自動定期處理,spark standalone集群可以配置spark.worker.cleanup.appDataTtl
配置保存的時間,默認是7天。
用戶還可以通過--packages
包含其他的依賴,這些依賴庫的傳播依賴也會被包含。--repositories
可以包含額外的庫倉儲。這些參數在pyspark, spark-shell, spark-submit
中都支持。
spark測試RDD所占存儲
(獲取部分記錄,並根據RDD記錄數對RDD所占空間進行預估):
def getTotalSize(rdd: RDD[Row]): Long = { // This can be a parameter val NO_OF_SAMPLE_ROWS = 10l; val totalRows = rdd.count(); var totalSize = 0l if (totalRows > NO_OF_SAMPLE_ROWS) { val sampleRDD = rdd.sample(true, NO_OF_SAMPLE_ROWS) val sampleRDDSize = getRDDSize(sampleRDD) totalSize = sampleRDDSize.*(totalRows)./(NO_OF_SAMPLE_ROWS) } else { // As the RDD is smaller than sample rows count, we can just calculate the total RDD size totalSize = getRDDSize(rdd) } totalSize } def getRDDSize(rdd: RDD[Row]) : Long = { var rddSize = 0l val rows = rdd.collect() for (i <- 0 until rows.length) { rddSize += SizeEstimator.estimate(rows.apply(i).toSeq.map { value => value.asInstanceOf[AnyRef] }) } rddSize }
更多信息
當部署好應用程序后,集群模式概述對分布式執行、如何監控和調試程序進行了闡述。