使用Spark 時,通常會有兩種模式。
一、在交互式編程環境(REPL, a.k.a spark-shell)下實現一些代碼,測試一些功能點。
二、像MapReduce 那樣提前編寫好源代碼並編譯打包(僅限 Java 或 Scala,Python 不需要),然后將程序代碼通過spark-submit 命令提交到 YARN 集群完成計算。
spark-shell
啟動 spark-shell 通常需要指定 master、executor 內存、executor 數量等參數。由於 YARN 集群有審計機制,每個人提交的 spark application 需要指定 name 參數,同時確保 name 是以個人的 LDAP 用戶名為后綴。另外,如果你不確定 driver 是否有足夠的內存能容納一個 RDD 的計算結果,建議不要使用 RDD 的 collect 方法而使用其 take 方法,否則會使 driver 發生 OOM。
1.scala交互式編程環境
通過命令啟動sprak-shell
/opt/tige/spark2/bin/spark-shell \ --master yarn-client \ --queue root.default \ --driver-memory 4g \ --executor-memory 8g\ --conf spark.dynamicAllocation.maxExecutors=10 \ --name spark_test_{your username}
啟動spark后系統自動創建sc和sqlContext(HiveContext實例),可以使用它們來創建RDD或者DataFarme
2.使用Python交互式編程環境
通過命令pyspark
/opt/tiger/spark_deploy/spark2/bin/ipyspark --master yarn-client --queue root.default --driver-memory 4g --executor-memory 8g --num-executors 8 --name spark_test_${your LDAP user name}
spark-submit
首先我們需要使用 Spark 的 API 實現一個擁有入口(main)的程序,然后通過 spark-submit 提交到 YARN 集群。
-
Scala 版本的 WordCount
import org.apache.spark.{SparkConf, SparkContext} object WordCount extends App { val sparkConf = new SparkConf() sparkConf.setAppName("spark_test_${your LDAP user name}") sparkConf.setMaster("yarn-client") sparkConf.set("spark.driver.memory", "4g") sparkConf.set("spark.executor.memory", "8g") sparkConf.set("spark.dynamicAllocation.initialExecutors", "3") sparkConf.set("spark.dynamicAllocation.maxExecutors", "10") val sc = new SparkContext(sparkConf) val words = sc.textFile("/path/to/text/file") val wordCount = words.map(word => (word, 1)).reduceByKey(_ + _).collect() wordCount.foreach(println) }
完成代碼編寫與編譯打包之后就可以通過 spark-submit 來提交應用了,命令如下:
/opt/tiger/spark_deploy/spark2/bin/spark-submit --master yarn-client --class WordCount your_spark_test.jar
- python版本的WordCount
from pyspark import SparkContext, SparkConf from operator import add if __name__ == '__main__': conf = SparkConf() conf.setMaster('yarn-client') conf.setAppName('spark_test_${your LDAP user name}') conf.set("spark.driver.memory", "4g") conf.set("spark.executor.memory", "8g") conf.set("spark.dynamicAllocation.initialExecutors", "3") conf.set("spark.dynamicAllocation.maxExecutors", "10") sc = SparkContext(conf=conf) words = sc.textFile("/path/to/text/file") wordCount = words.map(lambda word: (word, 1)).reduceByKey(add).collect() for key, value in wordCount: print key, value
假設上面這段 Python 代碼的文件名為 your_spark_test.py,那么提交這段代碼到 YARN 集群的命令如下:
/opt/tiger/spark_deploy/spark2/bin/spark-submit --master yarn-client your_spark_test.py