1 節點說明
| IP |
Role |
| 192.168.1.111 |
ActiveNameNode |
| 192.168.1.112 |
StandbyNameNode,Master,Worker |
| 192.168.1.113 |
DataNode,Master,Worker |
| 192.168.1.114 |
DataNode,Worker |
HDFS集群和Spark集群之間節點共用。
2 安裝HDFS
見HDFS2.X和Hive的安裝部署文檔:http://www.cnblogs.com/Scott007/p/3614960.html
3 Spark部署
Spark常用的安裝部署模式有Spark On Yarn和Standalone,可以同時使用。
3.1 Spark on Yarn
這種模式,借助Yarn資源分配的功能,使用Spark客戶端來向Yarn提交任務運行。只需將Spark的部署包放置到Yarn集群的某個節點上即可(或者是Yarn的客戶端,能讀取到Yarn集群的配置文件即可)。Spark本身的Worker節點、Master節點不需要啟動。
但是,Spark的部署包須是基於對應的Yarn版本正確編譯后的,否則會出現Spark和Yarn的兼容性問題。
on Yarn的兩種運行方式,其運行結束后的日志不能在Yarn的Application管理界面看到,目前只能在客戶端通過:
yarn logs -applicationId <applicationId>
命令查看每個Application的日志。
3.1.1 配置
部署這種模式,需要修改conf目錄下的spark-env.sh文件。在其中新增如下配置選項:
export HADOOP_HOME= /home/hadoop/hadoop-2.0.0-cdh4.5.0
export HADOOP_CONF_DIR= $HADOOP_HOME/etc/hadoop
SPARK_EXECUTOR_INSTANCES=2
SPARK_EXECUTOR_CORES=1
SPARK_EXECUTOR_MEMORY=400M
SPARK_DRIVER_MEMORY=400M
SPARK_YARN_APP_NAME="Spark 1.0.0"
其中:
(1) HADOOP_HOME:當前節點中HDFS的部署路徑,因為Spark需要和HDFS中的節點在一起;
(2) HADOOP_CONF_DIR:HDFS節點中的conf配置文件路徑,正常情況下此目錄為$HADOOP_HOME/etc/hadoop;
(3) SPARK_EXECUTOR_INSTANCES:在Yarn集群中啟動的Worker的數目,默認為2個;
(4) SPARK_EXECUTOR_CORES:每個Worker所占用的CPU核的數目;
(5) SPARK_EXECUTOR_MEMORY:每個Worker所占用的內存大小;
(6) SPARK_DRIVER_MEMORY:Spark應用程序Application所占的內存大小,這里的Driver對應Yarn中的ApplicationMaster;
(7) SPARK_YARN_APP_NAME:Spark Application在Yarn中的名字;
配置完成后,將Spark部署文件放置到Yarn的節點中即可。這里,將spark-1.0.0整個目錄放到Yarn集群的一個節點192.168.1.112的/home/hadoop(設為spark的安裝路徑的父目錄)路徑下。
3.1.2 測試
在Spark的部署路徑的bin路徑下,執行spark-submit腳本來運行spark-examples包中的例子。執行如下:
./bin/spark-submit --master yarn \
--class org.apache.spark.examples.JavaWordCount \
--executor-memory 400M \
--driver-memory 400M \
/home/hadoop/spark-1.0.0/examples/target/scala-2.10/spark-examples-1.0.0-hadoop2.0.0-cdh4.5.0.jar ./hdfs-site.xml
這個例子是計算WordCount的,例子被打包在/home/hadoop/spark-1.0.0/examples/target/scala-2.10/spark-examples-1.0.0-hadoop2.0.0-cdh4.5.0.jar包中,對應的Class為org.apache.spark.examples.JavaWordCount,./hdfs-site.xml是HDFS中指定路徑下的一個文件,WordCount就是針對它來做的。而--master yarn就是指定運行在Yarn集群中,以yarn模式運行。
Spark On Yarn有兩種運行模式,一種是Yarn Cluster方式,一種是Yarn Client方式。
(1) Yarn Cluster: Spark Driver程序將作為一個ApplicationMaster在YARN集群中先啟動,然后再由ApplicationMaster向RM申請資源啟動executor以運行Task。因為Driver程序在Yarn中運行,所以程序的運行結果不能在客戶端顯示,所以最好將結果保存在HDFS上,客戶端的終端顯示的是作為Yarn的job的運行情況。
(2) Yarn Client: Spark Driver程序在客戶端上運行,然后向Yarn申請運行exeutor以運行Task,本地程序負責最后的結果匯總等。客戶端的Driver將應用提交給Yarn后,Yarn會先后啟動ApplicationMaster和executor,另外ApplicationMaster和executor都是裝載在container里運行,container默認的內存是1G,ApplicationMaster分配的內存是driver-memory,executor分配的內存是executor-memory。同時,因為Driver在客戶端,所以程序的運行結果可以在客戶端顯示,Driver以進程名為SparkSubmit的形式存在。
上面命令中的提交方式“yarn”就是默認按照“Yarn Client”方式運行。用戶可自定義運行方式,通過“--master”指定程序以yarn、yarn-cluster或者yarn-client中的一種方式運行。
需要重點說明的是最后文件的路徑,是相當於HDFS中的/user/hadoop而言,hadoop是當前命令的用戶。“./hdfs-site.xml”在HDFS中的全路徑為“hdfs://namespace/user/hadoop/hdfs-site.xml”,其中hadoop是當前的用戶,namespace是HDFS的命名空間;如果寫成“/hdfs-site.xml”則在HDFS中指的是“hdfs://namespace/hdfs-site.xml”;當然也可以直接傳入“hdfs://namespace/user/hadoop/hdfs-site.xml”用於指定在HDFS中的要進行WordCount計算的文件。
另外,Spark應用程序需要的CPU Core數目和內存,需要根據當前Yarn的NodeManager的硬件條件相應設置,不能超過NodeManager的硬件條件。
./bin/spark-submit --master yarn \
--class org.apache.spark.examples.JavaWordCount \
--executor-memory 400M \
--driver-memory 400M \
/home/hadoop/spark-1.0.0/examples/target/scala-2.10/spark-examples-1.0.0-hadoop2.0.0-cdh4.5.0.jar hdfs://namespace/user/hadoop/hdfs-site.xml
在Yarn的ResourceManager對應的Web界面中查看啟動的Application。
Running:
Success:
同時可以在啟動腳本的客戶端看到WordCount的運行結果:

3.2 Spark Standalone
這種模式,就是把Spark單獨作為一個集群來進行部署。集群中有兩種節點,一種是Master,另一種是Worker節點。Master負責分配任務給Worker節點來執行,並負責最后的結果合並,Worker節點負責具體的任務執行。
3.2.1 配置
所需修改的配置文件除了spark-env.sh文件以外,還有slave文件,都位於conf目錄中。
slave文件中保存的是worker節點host或者IP,此處的配置為:
192.168.1.112
192.168.1.113
192.168.1.114
至於spark-env.sh文件,可以配置如下屬性:
(1) SPARK_MASTER_PORT:Master服務端口,默認為7077;
(2) SPARK_WORKER_CORES:每個Worker進程所需要的CPU核的數目;
(3) SPARK_WORKER_MEMORY:每個Worker進程所需要的內存大小;
(4) SPARK_WORKER_INSTANCES:每個Worker節點上運行Worker進程的數目;
(5) SPARK_MASTER_WEBUI_PORT:Master節點對應Web服務的端口;
(6)export SPARK_DAEMON_JAVA_OPTS="-Dspark.deploy.recoveryMode=ZOOKEEPER -Dspark.deploy.zookeeper.url=192.168.1.117:2181,192.168.1.118:2181,192.168.1.119:2181 -Dspark.deploy.zookeeper.dir=/spark":用於指定Master的HA,依賴於zookeeper集群;
(7) export SPARK_JAVA_OPTS="-Dspark.cores.max=4":用於限定每個提交的Spark Application的使用的CPU核的數目,因為缺省情況下提交的Application會使用所有集群中剩余的CPU Core。
注意在Worker進程的CPU個數和內存大小的時候,要結合機器的實際硬件條件,如果一個Worker節點上的所有Worker進程需要的CPU總數目或者內存大小超過當前Worker節點的硬件條件,則Worker進程會啟動失敗。
將配置好的Spark文件拷貝至每個Spark集群的節點上的相同路徑中。為方便使用spark-shell,可以在環境變量中配置上SPARK_HOME。
3.2.2 啟動
配置結束后,就該啟動集群了。這里使用Master的HA方式,選取192.168.1.112、192.168.1.113節點作為Master,192.168.1.112、192.168.1.113、192.168.1.114節點上運行兩個Worker進程。
首先在192.168.1.113節點上做此操作:
啟動之后,可以查看當前節點的進程:
另外,為了保證Master的HA,在192.168.1.112節點上只啟動Master:
192.168.1.112節點的進程為:
啟動過后,通過Web頁面查看集群的情況,這里訪問的是:
再看standby節點192.168.1.112的web界面http://192.168.1.112:8090/
3.2.3 測試
Spark的bin子目錄中的spark-submit腳本是用於提交程序到集群中運行的工具,我們使用此工具做一個關於pi的計算。命令如下:
./bin/spark-submit --master spark://spark113:7077 \ --class org.apache.spark.examples.SparkPi \ --name Spark-Pi --executor-memory 400M \ --driver-memory 512M \ /home/hadoop/spark-1.0.0/examples/target/scala-2.10/spark-examples-1.0.0-hadoop2.0.0-cdh4.5.0.jar
其中--master參數用於指定Master節點的URI,但是這里填的是Host,不是IP!
任務啟動之后,在Spark的Master的Web界面可以看到運行中的Application。
任務運行結束之后,在Web界面中Completed Applications表格中會看到對應的結果。

同時,命令行中會打印出來運行的結果,如下所示:
4 spark-submit工具
上面測試程序的提交都是使用的spark-submit腳本,其位於$SPARK_HOME/bin目錄中,執行時需要傳入的參數說明如下:
Usage: spark-submit [options] <app jar | python file> [app options]
| 參數名稱 |
含義 |
| --master MASTER_URL |
可以是spark://host:port, mesos://host:port, yarn, yarn-cluster,yarn-client, local |
| --deploy-mode DEPLOY_MODE |
Driver程序運行的地方,client或者cluster |
| --class CLASS_NAME |
主類名稱,含包名 |
| --name NAME |
Application名稱 |
| --jars JARS |
Driver依賴的第三方jar包 |
| --py-files PY_FILES |
用逗號隔開的放置在Python應用程序PYTHONPATH上的.zip, .egg, .py文件列表 |
| --files FILES |
用逗號隔開的要放置在每個executor工作目錄的文件列表 |
| --properties-file FILE |
設置應用程序屬性的文件路徑,默認是conf/spark-defaults.conf |
| --driver-memory MEM |
Driver程序使用內存大小 |
| --driver-java-options |
|
| --driver-library-path |
Driver程序的庫路徑 |
| --driver-class-path |
Driver程序的類路徑 |
| --executor-memory MEM |
executor內存大小,默認1G |
| --driver-cores NUM |
Driver程序的使用CPU個數,僅限於Spark Alone模式 |
| --supervise |
失敗后是否重啟Driver,僅限於Spark Alone模式 |
| --total-executor-cores NUM |
executor使用的總核數,僅限於Spark Alone、Spark on Mesos模式 |
| --executor-cores NUM |
每個executor使用的內核數,默認為1,僅限於Spark on Yarn模式 |
| --queue QUEUE_NAME |
提交應用程序給哪個YARN的隊列,默認是default隊列,僅限於Spark on Yarn模式 |
| --num-executors NUM |
啟動的executor數量,默認是2個,僅限於Spark on Yarn模式 |
| --archives ARCHIVES |
僅限於Spark on Yarn模式 |
另外,在執行spark-submit.sh工具進行提交應用之前,可以使用如下方式提前定義好當前Spark Application所使用的CPU Core數目和內存大小:
SPARK_JAVA_OPTS="-Dspark.cores.max=2 -Dspark.executor.memory=600m" \ ./bin/spark-submit --master spark://update113:7077 \ --class org.apache.spark.examples.SparkPi \ … …
5 Spark HistoryServer
類似於Mapreduce的JobHistoryServer,Spark也有一個服務可以保存歷史Application的運行記錄。
修改$SPARK_HOME/conf下的spark-defaults.conf文件(注意,修改后的配置文件在每個節點都要有),其中可修改的配置屬性為:
| 屬性名稱 |
默認值 |
含義 |
| spark.history.updateInterval |
10 |
以秒為單位,更新日志相關信息的時間間隔 |
| spark.history.retainedApplications |
250 |
保存Application歷史記錄的個數,如果超過這個值,舊的應用程序信息將被刪除 |
| spark.history.ui.port |
18080 |
HistoryServer的web端口 |
| spark.history.kerberos.enabled |
False |
是否使用kerberos方式登錄訪問HistoryServer,對於持久層位於安全集群的HDFS上是有用的,如果設置為true,就要配置下面的兩個屬性 |
| spark.history.kerberos.principal |
|
用於HistoryServer的kerberos主體名稱 |
| spark.history.kerberos.keytab |
|
用於HistoryServer的kerberos keytab文件位置 |
| spark.history.ui.acls.enable |
False |
授權用戶查看應用程序信息的時候是否檢查acl。如果啟用,只有應用程序所有者和spark.ui.view.acls指定的用戶可以查看應用程序信息;否則,不做任何檢查 |
| spark.eventLog.enabled |
False |
是否記錄Spark事件 |
| spark.eventLog.dir |
|
保存日志相關信息的路徑,可以是hdfs://開頭的HDFS路徑,也可以是file://開頭的本地路徑,都需要提前創建 |
| spark.yarn.historyServer.address |
|
Server端的URL:Ip:port 或者host:port |
此處的設置如下:
spark.eventLog.enabled true spark.eventLog.dir hdfs://yh/user/hadoop/sparklogs spark.yarn.historyServer.address update113:18080
設置完文件之后,進入sbin目錄啟動服務:

運行完成的Application歷史記錄可以通過訪問上面指定的HistoryServer地址查看,這里是http://192.168.1.113:18080/。

無論運行時是本地模式,還是yarn-client、yarn-cluster,運行記錄均可在此頁面查看。
並且程序運行時的環境變量、系統參數、各個階段的耗時均可在此查看,很強大!
6 Spark可配置參數
Spark參數的配置可通過三種方式:SparkConf方式 > 命令行參數方式 >文件配置方式。
6.1 應用屬性
| 屬性名 |
默認值 |
含義 |
| spark.app.name |
|
應用程序名稱 |
| spark.master |
|
要連接的Spark集群Master的URL |
| spark.executor.memory |
512 m |
每個executor使用的內存大小 |
| spark.serializer |
org.apache.spark .serializer.JavaSerializer |
序列化方式,官方建議使用org.apache.spark.serializer.KryoSerializer,當然也可以任意是定義為org.apache.spark.Serializer子類的序化器 |
| spark.kryo.registrator |
|
如果要使用 Kryo序化器,需要創建一個繼承KryoRegistrator的類並設置系統屬性spark.kryo.registrator指向該類 |
| spark.local.dir |
/tmp |
用於保存map輸出文件或者轉儲RDD。可以多個目錄,之間以逗號分隔。在Spark 1.0 及更高版本此屬性會被環境變量 SPARK_LOCAL_DIRS (Standalone、Mesos) 或 LOCAL_DIRS (YARN) 代替 |
| spark.logConf |
False |
SparkContext 啟動時是否記錄有效 SparkConf信息 |
6.2 運行環境變量
| 屬性名 |
默認值 |
含義 |
| spark.executor.extraJavaOptions |
|
傳遞給executor的額外JVM 選項,但是不能使用它來設置Spark屬性或堆空間大小 |
| spark.executor.extraClassPath |
|
追加到executor類路徑中的附加類路徑 |
| spark.executor.extraLibraryPath |
|
啟動executor JVM 時要用到的特殊庫路徑 |
| spark.files.userClassPathFirst |
False |
executor在加載類的時候是否優先使用用戶自定義的JAR包,而不是Spark帶有的JAR包,目前,該屬性只是一項試驗功能 |
6.3 Shuffle操作相關屬性
| 屬性名 |
默認值 |
含義 |
| spark.shuffle.consolidateFiles |
False |
如果為true,在shuffle時就合並中間文件,對於有大量Reduce任務的shuffle來說,合並文件可以提高文件系統性能,如果使用的是ext4 或 xfs 文件系統,建議設置為true;對於ext3,由於文件系統的限制,設置為true反而會使內核>8的機器降低性能 |
| spark.shuffle.spill |
True |
如果為true,在shuffle期間通過溢出數據到磁盤來降低了內存使用總量,溢出閾值是由spark.shuffle.memoryFraction指定的 |
| spark.shuffle.spill.compress |
True |
是否壓縮在shuffle期間溢出的數據,如果壓縮將使用spark.io.compression.codec。 |
| spark.shuffle.compress |
True |
是否壓縮map輸出文件,壓縮將使用spark.io.compression.codec。 |
| spark.shuffle.file.buffer.kb |
100 |
每個shuffle的文件輸出流內存緩沖區的大小,以KB為單位。這些緩沖區可以減少磁盤尋道的次數,也減少創建shuffle中間文件時的系統調用 |
| spark.reducer.maxMbInFlight |
48 |
每個reduce任務同時獲取map輸出的最大大小 (以兆字節為單位)。由於每個map輸出都需要一個緩沖區來接收它,這代表着每個 reduce 任務有固定的內存開銷,所以要設置小點,除非有很大內存 |
6.4 SparkUI相關屬性
| 屬性名 |
默認值 |
含義 |
| spark.ui.port |
4040 |
應用程序webUI的端口 |
| spark.ui.retainedStages |
1000 |
在GC之前保留的stage數量 |
| spark.ui.killEnabled |
True |
允許在webUI將stage和相應的job殺死 |
| spark.eventLog.enabled |
False |
是否記錄Spark事件,用於應用程序在完成后重構webUI |
| spark.eventLog.compress |
False |
是否壓縮記錄Spark事件,前提spark.eventLog.enabled為true |
| spark.eventLog.dir |
file:///tmp/spark-events |
如果spark.eventLog.enabled為 true,該屬性為記錄spark事件的根目錄。在此根目錄中,Spark為每個應用程序創建分目錄,並將應用程序的事件記錄到在此目錄中。可以將此屬性設置為HDFS目錄,以便history server讀取歷史記錄文件 |
6.5 壓縮和序列化相關屬性
| 屬性名 |
默認值 |
含義 |
| spark.broadcast.compress |
True |
是否在發送之前壓縮廣播變量 |
| spark.rdd.compress |
False |
是否壓縮RDD分區 |
| spark.io.compression.codec |
org.apache.spark.io. LZFCompressionCodec |
用於壓縮內部數據如 RDD分區和shuffle輸出的編碼解碼器, org.apache.spark.io.LZFCompressionCodec和org.apache.spark.io.SnappyCompressionCodec。其中,Snappy提供更快速的壓縮和解壓縮,而LZF提供了更好的壓縮比 |
| spark.io.compression.snappy .block.size |
32768 |
使用Snappy編碼解碼器時,編碼解碼器使用的塊大小 (以字節為單位) |
| spark.closure.serializer |
org.apache.spark.serializer. JavaSerializer |
用於閉包的序化器,目前只有支持Java序化器 |
| spark.serializer. |
10000 |
org.apache.spark.serializer.JavaSerializer序列化時,會緩存對象以防止寫入冗余數據,此時會停止這些對象的垃圾收集。通過調用重置序化器,刷新該信息就可以收集舊對象。若要關閉這重定期重置功能將其設置為< = 0 。默認情況下每10000個對象將重置序化器 |
| spark.kryo.referenceTracking |
True |
當使用Kryo序化數據時,是否跟蹤對同一對象的引用。如果你的對象圖有回路或者同一對象有多個副本,有必要設置為true;其他情況下可以禁用以提高性能 |
| spark.kryoserializer.buffer.mb |
2 |
在Kryo 里允許的最大對象大小(Kryo會創建一個緩沖區,至少和序化的最大單個對象一樣大)。每個worker的每個core只有一個緩沖區 |
6.6 執行時相關屬性
| 屬性名 |
默認值 |
含義 |
| spark.default.parallelism |
本地模式:機器核數 Mesos:8 其他:max(executor的core,2) |
如果用戶不設置,系統使用集群中運行shuffle操作的默認任務數(groupByKey、 reduceByKey等) |
| spark.broadcast.factory |
org.apache.spark.broadcast. HttpBroadcastFactory |
廣播的實現類 |
| spark.broadcast.blockSize |
4096 |
TorrentBroadcastFactory塊大小(以kb為單位)。過大會降低廣播速度;過小會使印象BlockManager性能 |
| spark.files.overwrite |
Fale |
通過 SparkContext.addFile() 添加的文件在目標中已經存在並且內容不匹配時,是否覆蓋目標文件 |
| spark.files.fetchTimeout |
False |
在獲取由driver通過SparkContext.addFile() 添加的文件時,是否使用通信時間超時 |
| spark.storage.memoryFraction |
0.6 |
Java堆用於cache的比例 |
| spark.tachyonStore.baseDir |
System.getProperty("java.io.tmpdir") |
用於存儲RDD的techyon目錄,tachyon文件系統的URL由spark.tachyonStore.url設置,也可以是逗號分隔的多個techyon目錄 |
| spark.storage. memoryMapThreshold |
8192 |
以字節為單位的塊大小,用於磁盤讀取一個塊大小時進行內存映射。這可以防止Spark在內存映射時使用很小塊,一般情況下,對塊進行內存映射的開銷接近或低於操作系統的頁大小 |
| spark.tachyonStore.url |
tachyon://localhost:19998 |
基於techyon文件的URL |
| spark.cleaner.ttl |
|
spark記錄任何元數據(stages生成、task生成等)的持續時間。定期清理可以確保將超期的元數據丟棄,這在運行長時間任務是很有用的,如運行7*24的sparkstreaming任務。RDD持久化在內存中的超期數據也會被清理 |
6.7 網絡相關屬性
| 屬性名 |
默認值 |
含義 |
| spark.driver.host |
|
運行driver的主機名或 IP 地址 |
| spark.driver.port |
隨機 |
driver偵聽的端口 |
| spark.akka.frameSize |
10 |
以MB為單位的driver和executor之間通信信息的大小,設置值越大,driver可以接受更大的計算結果 |
| spark.akka.threads |
4 |
用於通信的actor線程數,在大型集群中擁有更多CPU內核的driver可以增加actor線程數 |
| spark.akka.timeout |
100 |
以秒為單位的Spark節點之間超時時間 |
| spark.akka.heartbeat.pauses |
600 |
下面3個參數是用於設置Akka自帶的故障探測器。啟用的話,以秒為單位設置如下這三個參數,有助於對惡意的executor的定位,而對於由於GC暫停或網絡滯后引起的情況下,不需要開啟故障探測器;另外故障探測器的開啟會導致由於心跳信息的頻繁交換而引起的網絡泛濫。 本參數是設置可接受的心跳停頓時間 |
| spark.akka.failure-detector.threshold |
300.0 |
對應Akka的akka.remote.transport-failure-detector.threshold |
| spark.akka.heartbeat.interval |
1000 |
心跳間隔時間 |
6.8 調度相關屬性
| 屬性名 |
默認值 |
含義 |
| spark.task.cpus |
1 |
為每個任務分配的內核數 |
| spark.task.maxFailures |
4 |
Task的最大重試次數 |
| spark.scheduler.mode |
FIFO |
Spark的任務調度模式,還有一種Fair模式 |
| spark.cores.max |
|
當應用程序運行在Standalone集群或者粗粒度共享模式Mesos集群時,應用程序向集群請求的最大CPU內核總數(不是指每台機器,而是整個集群)。如果不設置,對於Standalone集群將使用spark.deploy.defaultCores中數值,而Mesos將使用集群中可用的內核 |
| spark.mesos.coarse |
False |
如果設置為true,在Mesos集群中運行時使用粗粒度共享模式 |
| spark.speculation |
False |
以下幾個參數是關於Spark推測執行機制的相關參數。此參數設定是否使用推測執行機制,如果設置為true則spark使用推測執行機制,對於Stage中拖后腿的Task在其他節點中重新啟動,並將最先完成的Task的計算結果最為最終結果 |
| spark.speculation.interval |
100 |
Spark多長時間進行檢查task運行狀態用以推測,以毫秒為單位 |
| spark.speculation.quantile |
0.75 |
推測啟動前,Stage必須要完成總Task的百分比 |
| spark.speculation.multiplier |
1.5 |
比已完成Task的運行速度中位數慢多少倍才啟用推測 |
| spark.locality.wait |
3000 |
以下幾個參數是關於Spark數據本地性的。本參數是以毫秒為單位啟動本地數據task的等待時間,如果超出就啟動下一本地優先級別的task。該設置同樣可以應用到各優先級別的本地性之間(本地進程 -> 本地節點 -> 本地機架 -> 任意節點 ),當然,也可以通過spark.locality.wait.node等參數設置不同優先級別的本地性 |
| spark.locality.wait.process |
spark.locality.wait |
本地進程級別的本地等待時間 |
| spark.locality.wait.node |
spark.locality.wait |
本地節點級別的本地等待時間 |
| spark.locality.wait.rack |
spark.locality.wait |
本地機架級別的本地等待時間 |
| spark.scheduler.revive.interval |
1000 |
復活重新獲取資源的Task的最長時間間隔(毫秒),發生在Task因為本地資源不足而將資源分配給其他Task運行后進入等待時間,如果這個等待時間內重新獲取足夠的資源就繼續計算 |
6.9 安全相關屬性
| 屬性名 |
默認值 |
含義 |
| spark.authenticate |
False |
是否啟用內部身份驗證 |
| spark.authenticate.secret |
|
設置組件之間進行身份驗證的密鑰。如果不是YARN上運行並且spark.authenticate為true時,需要設置密鑰 |
| spark.core.connection. auth.wait.timeout |
30 |
進行身份認證的超時時間 |
| spark.ui.filters |
|
Spark web UI 要使用的以逗號分隔的篩選器名稱列表。篩選器要符合javax servlet Filter標准,每個篩選器的參數可以通過設置java系統屬性來指定: spark.<class name of filter>.params='param1=value1,param2=value2' 例如: -Dspark.ui.filters=com.test.filter1 -Dspark.com.test.filter1.params='param1=foo,param2=testing' |
| spark.ui.acls.enable |
False |
Spark webUI存取權限是否啟用。如果啟用,在用戶瀏覽web界面的時候會檢查用戶是否有訪問權限 |
| spark.ui.view.acls |
|
以逗號分隔Spark webUI訪問用戶的列表。默認情況下只有啟動Spark job的用戶才有訪問權限 |
6.10 SparkStreaming相關屬性
| 屬性名 |
默認值 |
含義 |
| spark.streaming.blockInterval |
200 |
Spark Streaming接收器將接收數據合並成數據塊並存儲在Spark里的時間間隔,毫秒 |
| spark.streaming.unpersist |
True |
如果設置為true,強迫將SparkStreaming持久化的RDD數據從Spark內存中清理,同樣的,SparkStreaming接收的原始輸入數據也會自動被清理;如果設置為false,則允許原始輸入數據和持久化的RDD數據可被外部的Streaming應用程序訪問,因為這些數據不會自動清理 |
6.11 Standalone模式特有屬性
可以在文件conf/spark-env.sh中來設置此模式的特有相關屬性:
(1)SPARK_MASTER_OPTS:配置master使用的屬性
(2)SPARK_WORKER_OPTS:配置worker使用的屬性
(3)SPARK_DAEMON_JAVA_OPTS:配置master和work都使用的屬性
配置的時候,使用類似的語句:
export SPARK_MASTER_OPTS="-Dx1=y1 -Dx2=y2"
其中x代表屬性,y代表屬性值。
SPARK_MASTER_OPTS所支持的屬性有:
| 屬性名 |
默認值 |
含義 |
| spark.deploy.spreadOut |
True |
Standalone集群管理器是否自由選擇節點還是固定到盡可能少的節點,前者會有更好的數據本地性,后者對於計算密集型工作負載更有效 |
| spark.worker.timeout |
60 |
master因為沒有收到心跳信息而認為worker丟失的時間(秒) |
| spark.deploy.defaultCores |
|
如果沒有設置spark.cores.max,該參數設置Standalone集群分配給應用程序的最大內核數,如果不設置,應用程序獲取所有的有效內核。注意在一個共享的集群中,設置一個低值防止攫取了所有的內核,影響他人的使用 |
SPARK_WORKER_OPTS所支持的屬性有
| 屬性名 |
默認值 |
含義 |
| spark.worker.cleanup.enabled |
False |
是否定期清理worker的應用程序工作目錄,只適用於Standalone模式,清理的時候將無視應用程序是否在運行 |
| spark.worker.cleanup.interval |
1800 |
清理worker本地過期的應用程序工作目錄的時間間隔(秒) |
| spark.worker.cleanup.appDataTtl |
7*24*3600 |
worker保留應用程序工作目錄的有效時間。該時間由磁盤空間、應用程序日志、應用程序的jar包以及應用程序的提交頻率來設定 |
SPARK_DAEMON_JAVA_OPTS所支持的屬性有:
| 屬性名 |
含義 |
| spark.deploy.recoveryMode |
下面3個參數是用於配置zookeeper模式的master HA。設置為ZOOKEEPER表示啟用master備用恢復模式,默認為NONE |
| spark.deploy.zookeeper.url |
zookeeper集群URL |
| spark.deploy.zookeeper.dir |
zooKeeper保存恢復狀態的目錄,缺省為/spark |
| spark.deploy.recoveryMode |
設成FILESYSTEM啟用master單節點恢復模式,缺省值為NONE |
| spark.deploy.recoveryDirectory |
Spark保存恢復狀態的目錄 |
6.12 Spark on Yarn特有屬性
| 屬性名 |
默認值 |
含義 |
| spark.yarn.applicationMaster.waitTries |
10 |
RM等待Spark AppMaster啟動重試次數,也就是SparkContext初始化次數。超過這個數值,啟動失敗 |
| spark.yarn.submit.file.replication |
3 |
應用程序上傳到HDFS的文件的副本數 |
| spark.yarn.preserve.staging.files |
False |
若為true,在job結束后,將stage相關的文件保留而不是刪除 |
| spark.yarn.scheduler.heartbeat.interval-ms |
5000 |
Spark AppMaster發送心跳信息給YARN RM的時間間隔 |
| spark.yarn.max.executor.failures |
2倍於executor數 |
導致應用程序宣告失敗的最大executor失敗次數 |
| spark.yarn.historyServer.address |
|
Spark history server的地址(不要加http://)。這個地址會在Spark應用程序完成后提交給YARN RM,然后RM將信息從RM UI寫到history server UI上。 |
7 示例配置
主要的配置文件均位於$SPARK_HOME/conf中,包括slave、spark-env.sh、spark-defaults.conf文件等。
7.1 slave文件
192.168.1.112 192.168.1.113 192.168.1.114
7.2 spark-env.sh文件
export JAVA_HOME="/export/servers/jdk1.6.0_25" #yarn export HADOOP_HOME=/home/hadoop/hadoop-2.0.0-cdh4.5.0 export HADOOP_CONF_DIR=$HADOOP_HOME/etc/hadoop SPARK_EXECUTOR_INSTANCES=2 SPARK_EXECUTOR_CORES=1 SPARK_EXECUTOR_MEMORY=400M SPARK_DRIVER_MEMORY=400M SPARK_YARN_APP_NAME="Spark 1.0.0" #alone SPARK_MASTER_WEBUI_PORT=8090 SPARK_WORKER_MEMORY=400M SPARK_WORKER_CORES=1 SPARK_WORKER_INSTANCES=2 #Master HA export SPARK_DAEMON_JAVA_OPTS="-Dspark.deploy.recoveryMode=ZOOKEEPER -Dspark.deploy.zookeeper.url=192.168.1.117:2181,192.168.1.118:2181,192.168.1.119:2181 -Dspark.deploy.zookeeper.dir=/spark"
7.3 spark-defaults.conf文件
#history server spark.eventLog.enabled true spark.eventLog.dir hdfs://namespace/user/hadoop/sparklogs spark.yarn.historyServer.address spark113:18080 #shuffle spark.shuffle.consolidateFiles true #task spark.task.cpus 1 spark.task.maxFailures 3 #scheduler type spark.scheduler.mode FAIR #security park.authenticate true spark.authenticate.secret hadoop spark.core.connection.auth.wait.timeout 1500 spark.ui.acls.enable true spark.ui.view.acls root,hadoop #each executor used max memory spark.executor.memory 400m #spark on yarn spark.yarn.applicationMaster.waitTries 5 spark.yarn.submit.file.replication 3 spark.yarn.preserve.staging.files false spark.yarn.scheduler.heartbeat.interval-ms 5000 #park standalone and on mesos spark.cores.max 4
8 Spark SQL
Spark支持Scala、Python等語言寫的腳本直接在Spark環境執行,更重要的是支持對Hive語句進行包裝后在Spark上運行。這就是Spark SQL。
8.1 相關配置
配置的步驟比較簡單,把Hive的配置文件hive-site.xml直接放置到$SPARK_HOME的conf路徑下即可。如果是想在Spark集群本地執行SQL的話,每個對應的節點都要做同樣的配置。
8.2 運行SQL
啟動bin目錄下的spark-shell腳本,依次執行如下語句:
val sc: SparkContext val hiveContext = new org.apache.spark.sql.hive.HiveContext(sc) import hiveContext._ hql("CREATE TABLE IF NOT EXISTS src (key INT, value STRING)") hql("LOAD DATA LOCAL INPATH '/examples /data.txt' INTO TABLE src") hql("FROM src SELECT key, value").collect().foreach(println)
上面的命令,分別是聲明SparkContext對象,利用hql方法執行Hive的SQL語句,在執行SQL語句的過程中,可以通過Hive的Cli客戶端進行查看相應操作的結果。
8.3 on yarn模式
由於spark-shell腳本是在本地執行的,如果想放到Yarn上去執行的話,可以使用上面第4節中的spark-submit工具,這時候需要對需要輸入的sql語句進行包裝,將包裝類打包成jar文件,再提交。
包裝類的代碼如下:
1 package spark; 2 3 import java.util.List; 4 5 import org.apache.spark.SparkConf; 6 import org.apache.spark.api.java.JavaSparkContext; 7 import org.apache.spark.sql.api.java.Row; 8 import org.apache.spark.sql.hive.api.java.JavaHiveContext; 9 10 /** 11 * Description: 12 * Author: ITScott@163.com 13 * Date: 2014/7/15 14 */ 15 public class SparkSQL { 16 17 public static void main(String[] args) { 18 if(args.length != 2){ 19 System.out.println("usage: <applicationName> <sql statments>"); 20 System.exit(1); 21 } 22 23 String applicationName = args[0]; 24 String sql = args[1]; 25 26 SparkConf conf = new SparkConf().setAppName(applicationName); 27 JavaSparkContext sc = new JavaSparkContext(conf); 28 JavaHiveContext hiveContext = new JavaHiveContext(sc); 29 List<Row> results = hiveContext.hql(sql).collect(); 30 31 System.out.println("Sql is:" + sql + ", has been executed over."); 32 System.out.println("The result size is " + results.size() + ", they are:"); 33 for(int i=0; i<results.size(); i++){ 34 System.out.println(results.get(i).toString()); 35 } 36 37 System.out.println("Execute over ..."); 38 sc.stop(); 39 System.out.println("Stop over ..."); 40 } 41 42 }
將其打包成jar文件spark-0.0.1-SNAPSHOT.jar,再使用spark-submit工具進行任務的提交,命令如下:
./spark-submit \ --class spark.SparkSQL \ --master yarn-cluster \ --num-executors 3 \ --driver-memory 400m --executor-memory 400m --executor-cores 1 \ --jars /home/hadoop/spark-1.0.0/examples/libs/spark-core_2.10-1.0.0.jar,/home/hadoop/spark-1.0.0/examples/libs/spark-hive_2.10-1.0.0.jar,/home/hadoop/spark-1.0.0/lib_managed/jars/datanucleus-api-jdo-3.2.1.jar,/home/hadoop/spark-1.0.0/lib_managed/jars/datanucleus-core-3.2.2.jar,/home/hadoop/spark-1.0.0/lib_managed/jars/datanucleus-rdbms-3.2.1.jar,/home/hadoop/hive-0.12.0/lib/mysql-connector-java-5.1.27-bin.jar --files /home/hadoop/spark-1.0.0/conf/hive-site.xml \ /home/hadoop/spark-1.0.0/examples/libs/spark-0.0.1-SNAPSHOT.jar "hiveTest" "CREATE TABLE IF NOT EXISTS test4 (key INT, value STRING)"
其中,--master參數指定的是yarn-cluster模式,當然也可以使用yarn-client模式,至於區別,已經在上文說了;--class指定的是我們包裝類的主類,見上文源碼;--jars是依賴的四個jar包;--files是指定的hive-site.xml配置文件,提交到Yarn中的Application在執行的時候,需要把此配置文件分發到每個Executor上;最后的兩個參數,一個是Application的名稱,一個是運行的SQL語句。
運行結束后,可以到Spark HistoryServer中查看運行結果。
-------------------------------------------------------------------------------
如果您看了本篇博客,覺得對您有所收獲,請點擊右下角的 [推薦]
如果您想轉載本博客,請注明出處
如果您對本文有意見或者建議,歡迎留言
感謝您的閱讀,請關注我的后續博客
