1. Spark概述
一種基於內存的快速、通用、可擴展的大數據分析引擎;
內置模塊:
Spark Core(封裝了rdd、任務調度、內存管理、錯誤恢復、與存儲系統交互);
Spark SQL(處理結構化數據)、
Spark Streaming(對實時數據進行流式計算) 、
Spark Mlib(機器學習程序庫包括分類、回歸、聚合、協同過濾等)、
Spark GraghX(圖計算);
獨立調度器、Yarn、Mesos
特點:
快( 基於內存(而MR是基於磁盤)、多線程模型(而mapReduce是基於多進程的,每個MR都是獨立的JVM進程)、可進行迭代計算(而hadoop需要多個mr串行) )、
易用(支持java、scala、python等的API,支持超過80多種算法,支持交互式的 Python 和 Scala 的 shell,可方便地在shell中使用spark集群來驗證解決問題,而不像以前需要打包上傳驗證)、
通用(spark提供了統一解決方案,可用於批處理、交互式查詢(spark sql)\ 實時流式處理(spark streaming)\機器學習和圖計算,可在同一應用中無縫使用)
兼容性(與其他開源產品的融合,如hadoop的yarn、Mesos、HDFS、Hbase等);
http://spark.apache.org/ 文檔查看地址 https://spark.apache.org/docs/2.1.1/
集群角色
Master和Workers
1)Master
Spark特有資源調度系統的Leader。掌管着整個集群的資源信息,類似於Yarn框架中的ResourceManager,主要功能:
(1)監聽Worker,看Worker是否正常工作;
(2)Master對Worker、Application等的管理(接收worker的注冊並管理所有的worker,接收client提交的application,(FIFO)調度等待的application並向worker提交)。
2)Worker
Spark特有資源調度系統的Slave,有多個。每個Slave掌管着所在節點的資源信息,類似於Yarn框架中的NodeManager,主要功能:
(1)通過RegisterWorker注冊到Master;
(2)定時發送心跳給Master;
(3)根據master發送的application配置進程環境,並啟動StandaloneExecutorBackend(執行Task所需的臨時進程)
Driver和Executor
1)Driver(驅動器)
Spark的驅動器是執行開發程序中的main方法的進程。它負責開發人員編寫的用來創建SparkContext、創建RDD,以及進行RDD的轉化操作和行動操作代碼的執行。如果你是用spark shell,那么當你啟動Spark shell的時候,系統后台自啟了一個Spark驅動器程序,就是在Spark shell中預加載的一個叫作 sc的SparkContext對象。如果驅動器程序終止,那么Spark應用也就結束了。主要負責:
(1)把用戶程序轉為任務
(2)跟蹤Executor的運行狀況
(3)為執行器節點調度任務
(4)UI展示應用運行狀況
2)Executor(執行器)
Spark Executor是一個工作進程,負責在 Spark 作業中運行任務,任務間相互獨立。Spark 應用啟動時,Executor節點被同時啟動,並且始終伴隨着整個 Spark 應用的生命周期而存在。如果有Executor節點發生了故障或崩潰,Spark 應用也可以繼續執行,會將出錯節點上的任務調度到其他Executor節點上繼續運行。主要負責:
(1)負責運行組成 Spark 應用的任務,並將狀態信息返回給驅動器進程;
(2)通過自身的塊管理器(Block Manager)為用戶程序中要求緩存的RDD提供內存式存儲。RDD是直接緩存在Executor進程內的,因此任務可以在運行時充分利用緩存數據加速運算。
總結:Master和Worker是Spark的守護進程,即Spark在特定模式下正常運行所必須的進程。Driver和Executor是臨時進程,當有具體任務提交到Spark集群才會開啟的進程。
1. Local模式-本地單機
Linux中查看有多少核數: [kris@hadoop101 ~]$ cat /proc/cpuinfo ... [kris@hadoop101 ~]$ cat /proc/cpuinfo | grep 'processor' | wc -l 8
Local模式
在一台計算機,可以設置Master; (提交任務時需要指定--master)Local模式又分為:
① Local所有計算都運行在一個線程中(單節點單線程),沒有任何並行計算;
②Local[K] ,如local[4]即運行4個Worker線程(單機也可以並行有多個線程),可指定幾個線程來運行計算,通常CPU有幾個Core就執行幾個線程,最大化利用cpu的計算能力;
③Local[*], 直接幫你安裝Cpu最多Cores來設置線程數,這種是默認的;
bin/spark-submit \ //提供任務的命令 --class org.apache.spark.examples.SparkPi \ //指定運行jar的主類 --master //它有默認值是local[*] =>spark://host:port, mesos://host:port, yarn, or local. --executor-memory 1G \ //指定每個executor可用內存 --total-executor-cores 2 \ 指定executor總核數 ./examples/jars/spark-examples_2.11-2.1.1.jar \ \\jar包 100 //main方法中的args參數 ./bin/spark-submit 回車可查看所有的參數
[kris@hadoop101 spark-local]$ bin/spark-shell Spark context Web UI available at http://192.168.1.101:4040 Spark context available as 'sc' (master = local[*], app id = local-1554255531204). ##spark core的入口sc Spark session available as 'spark'. ##它是spark sql程序的入口 再起一個spark-shell會報錯: spark sql也有一個默認的元數據也是存在derby數據庫里邊 Failed to start database 'metastore_db' with class loader org.apache.spark.sql.hive.client.IsolatedClientLoader$$anon$1@63e5b8aa, see the next exception for details. Caused by: org.apache.derby.iapi.error.StandardException: Another instance of Derby may have already booted the datab 查看頁面:hadoop101:4040 scala> sc.textFile("./wc.txt").flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_).collect res0: Array[(String, Int)] = Array((Hello,3), (smile,1), (java,2), (world,1), (kris,1)) 提交任務(或者開啟spark-shell)的時候會有driver和executor進程,Local模式下它被封裝到了SparkSubmit中
提交任務分析
driver和executor是干活的;
① Client提交任務--->②起一個Driver ---> ③注冊應用程序,申請資源--資源管理者有 (Master(Standalone模式)、ResourceManage(yarn模式))----->④拿到資源后去其他節點啟動Executor----> ⑤Executor會反向注冊給Driver匯報;
⑥(把提交的jar包做任務切分,把任務發給具體執行的節點Executor)--->Driver會進行初始化sc、任務划分、任務調度 <===>Executor具體執行任務(負責具體執行任務、textFile、flatMap、map...)
⑦ Driver把任務發到Executor不一定會執行,有可能資源cpu或內存不夠了或者executor掛了,spark會有一個容錯機制,某一個掛了可轉移到其他的Executor;
最后任務跑完了,Driver會向資源管理者申請注銷(Executor也會注銷)
數據流程
textFile("input"):讀取本地文件input文件夾數據;
flatMap(_.split(" ")):壓平操作,按照空格分割符將一行數據映射成一個個單詞;
map((_,1)):對每一個元素操作,將單詞映射為元組;
reduceByKey(_+_):按照key將值進行聚合,相加;
collect:將數據收集到Driver端展示。
2. Standalone模式--完全分布式
概述
構建一個由Master+Slave構成的Spark集群,Spark運行在集群中;它的調度器是其實就是Master
提交任務時需要有一個客戶端Client,Master和Worker是守護進程它們是資源管理系統,提交任務(運行spark-shell或者spark-submit)之前它們就已經啟動了;
①提交--->起Driver就是初始化SparkContext,然后啟動Executor時需要資源;②向Master申請資源(即注冊),啟動ExecutorBackend
啟動Executor---->反向注冊給Driver匯報信息;
③ Driver划分切分任務把Task發送給Executor,如果Executor會有一個容錯機制,Executor運行時會給Driver發送報告Task運行狀態直至結束;
④最后任務運行完之后driver向master申請注銷,Executor也會注銷掉;
不一定非要在Client中起Driver(SparkContext),cluster模式,具體在哪個節點起sc由Master決定,隨機的在worker節點上選擇一個一個;
Driver在哪個節點起的原因:driver和executor之間是有通訊,每個 executor都要向driver匯報信息,互相通訊(消耗內存、資源+cpu數); 所有的executor節點都去跟driver做通訊,客戶端的壓力就會特別大;
Client是本地調試用,輸入之后馬上能看到輸入的結果;
1)修改slave文件,添加work節點:
[kris@hadoop101 conf]$ vim slaves
hadoop101
hadoop102
hadoop103
2)修改spark-env.sh文件,添加如下配置:
在高可用集群需把下面內容這給注釋掉:
#SPARK_MASTER_HOST=hadoop101
#SPARK_MASTER_PORT=7077
[kris@hadoop101 conf]$ vim spark-env.sh export JAVA_HOME=/opt/module/jdk1.8.0_144 ##如果遇到JAVA_HOME not set異常時可配置 SPARK_MASTER_HOST=hadoop101 SPARK_MASTER_PORT=7077 #配置歷史服務 export SPARK_HISTORY_OPTS="-Dspark.history.ui.port=18080 -Dspark.history.retainedApplications=30 -Dspark.history.fs.logDirectory=hdfs://hadoop101:9000/directory" #配置高可用 export SPARK_DAEMON_JAVA_OPTS=" -Dspark.deploy.recoveryMode=ZOOKEEPER -Dspark.deploy.zookeeper.url=hadoop101,hadoop102,hadoop103 -Dspark.deploy.zookeeper.dir=/spark"
3)修改spark-default.conf文件,開啟Log:
[kris@hadoop101 conf]$ vi spark-defaults.conf spark.eventLog.enabled true spark.eventLog.dir hdfs://hadoop101:9000/directory
注意:HDFS上的目錄需要提前存在。 hadoop fs -mkdir /directory
4) 分發spark包 分發的原因,因為這種模式下的資源調度是master和worker,各個節點需要自己去啟進程;
[kris@hadoop101 module]$ xsync spark/spark-standalone
5)啟動
② [kris@hadoop101 spark]$ sbin/start-all.sh 網頁查看Master:hadoop101:8080
可看到Status:ALIVE;Memory in use 等信息
高可用集群的啟動,要① 先啟動zookeeper; 在hadoop102上(也可以是其他節點)單獨啟動master節點 [kris@hadoop102 spark]$ sbin/start-master.sh
啟動歷史服務之前要先啟動 ③ start-dfs.sh
sbin/start-history-server.sh --->HistoryServer
查看歷史服務hadoop101:18080
官方求PI案例
##運行之前上邊的① ② ③步都要啟動其他; 默認的是client模式
[kris@hadoop101 spark]$ bin/spark-submit \
--class org.apache.spark.examples.SparkPi \
--master spark://hadoop101:7077 \
--executor-memory 1G \
--total-executor-cores 2 \
./examples/jars/spark-examples_2.11-2.1.1.jar \
100
===>>
Pi is roughly 3.1417439141743913 啟動spark shell /opt/module/spark/bin/spark-shell \ --master spark://hadoop101:7077 \ --executor-memory 1g \ --total-executor-cores 2 只要提交了任務就可以看到driver和executor,driver被封裝在了SparkSubmit里邊;CoarseGrainedExecutorBackend就是啟動的executor 提交任務提交給哪個executor都是有可能的
執行WordCount程序 scala>sc.textFile("./wc.txt").flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_).collect
res0: Array[(String, Int)] = Array((Hello,2), (World,1), (java,2), (sbase,1), (spark,2), (Hi,1))
[kris@hadoop101 ~]$ jpsall -------hadoop101-------
6675 DataNode 5971 Master 6100 Worker 7463 CoarseGrainedExecutorBackend 7895 Jps 7368 SparkSubmit 6527 NameNode 5855 QuorumPeerMain -------hadoop102-------
4647 CoarseGrainedExecutorBackend 4075 QuorumPeerMain 4875 Jps 4380 DataNode 4188 Worker -------hadoop103-------
4432 SecondaryNameNode 4353 DataNode 4085 QuorumPeerMain 4198 Worker 4778 Jps
在Standalone--cluster模式下
[kris@hadoop101 spark-standalone]$ bin/spark-submit \ --class org.apache.spark.examples.SparkPi \ --master spark://hadoop101:7077 \ --deploy-mode cluster \ --executor-memory 1G \ --total-executor-cores 2 \ ./examples/jars/spark-examples_2.11-2.1.1.jar \ 100 任務未執行完時的進程: [kris@hadoop101 spark-standalone]$ jpsall -------hadoop101------- 16740 CoarseGrainedExecutorBackend 16404 HistoryServer 16006 NameNode 15686 Master 16842 Jps 15805 Worker 16127 DataNode -------hadoop102------- 10240 CoarseGrainedExecutorBackend 10021 DataNode 9911 Worker 10334 Jps -------hadoop103------- 9824 DataNode 9714 Worker 9944 SecondaryNameNode 10299 Jps 10093 DriverWrapper ##cluster 模式下的Driver 任務執行完的進程: [kris@hadoop101 spark-standalone]$ jpsall -------hadoop101------- 16404 HistoryServer 16006 NameNode 15686 Master 15805 Worker 17166 Jps 16127 DataNode -------hadoop102------- 10021 DataNode 9911 Worker 10447 Jps -------hadoop103------- 10416 Jps 9824 DataNode 9714 Worker 9944 SecondaryNameNode
spark-shell的 spark HA集群訪問,前提是另外一個Master啟起來了;
/opt/module/spark/bin/spark-shell \ --master spark://hadoop101:7077,hadoop102:7077 \ --executor-memory 1g \ --total-executor-cores 2
把其中ACTIVE狀態節點的kill掉,另外一個Master的狀態將從standby模式--->active狀態;
可驗證下:
scala>sc.textFile("./wc.txt").flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_).collect
res0: Array[(String, Int)] = Array((Hello,2), (World,1), (java,2), (sbase,1), (spark,2), (Hi,1))
提交任務時: --deploy-mode DEPLOY_MODE Whether to launch the driver program locally ("client") or on one of the worker machines inside the cluster ("cluster") (Default: client). client和cluster的區別: SparkContext的位置不同(也就是運行Driver的位置不一樣),由Master決定,隨機的在其他節點初始化一個sc Driver和Executor之間會有通信,通信需要消耗資源內存cpu等,所有的executor去和客戶端(如果是client模式,Driver是啟在Client上的)去通信, 客戶端的壓力會非常大,如果有大量的executor再加上提交多個任務就啟動多個Driver,那么Client單點就掛掉被拖垮; cluster模式,每次提交任務時的sc的位置分散在不同節點上,分擔了壓力, Client本地調試時候用,可以看到輸出的結果,如可看到打印的π
--deploy-mode DEPLOY_MODE Whether to launch the driver program locally ("client") or on one of the worker machines inside the cluster ("cluster") (Default: client). --executor-cores NUM Number of cores per executor. (Default: 1 in YARN mode, or all available cores on the worker in standalone mode) bin/spark-submit \ --class org.apache.spark.examples.SparkPi \ --master spark://hadoop101:7077 \ --deploy-mode cluster \ --executor-memory 1G \ --total-executor-cores 2 \ ##總的是2,默認1個cores/executor--->推導出有2/1個executor;可控制executor的數量; ./examples/jars/spark-examples_2.11-2.1.1.jar \ 100 cluster模式下,driver叫DriverWrapper
3. Yarn模式
概述
之前的standalone模式,是自己Master和worker管理資源,分發是為了在各個節點啟進程;yarn模式資源由RM、NM來管理
Spark客戶端直接連接Yarn,不需要額外構建Spark集群。有yarn-client和yarn-cluster兩種模式,主要區別在於:Driver程序的運行節點。
yarn-client:Driver程序運行在客戶端,適用於交互、調試,希望立即看到app的輸出;
yarn-cluster:Driver程序運行在由RM(ResourceManager)啟動的AM(APPMaster)適用於生產環境。分擔壓力不會拖垮某個節點;
提交任務之前,客戶端Client、ResourceManager、NodeManager都是要啟動好的;
提交任務,App Submit; RM選擇一個NM啟動AM,AM來啟動Driver(即初始化sc),yarn的cluster模式SparkAppMaster(用來申請資源,啟動driver)和SparkContext在一個進程里邊;
AM(SparkAppMaster)向RM申請啟動Executor;(默認情況下一個節點啟一個executor這樣子負載比較均衡,也可以啟兩個),executor也是有個反向注冊的過程;
切分分配任務,同時executor上報集群狀況;跑完之后申請注銷;
安裝使用
1)修改hadoop配置文件yarn-site.xml,添加如下內容:
[kris@hadoop101 hadoop]$ vim yarn-site.xml
<!--是否啟動一個線程檢查每個任務正使用的物理內存量,如果任務超出分配值,則直接將其殺掉,默認是true --> <property> <name>yarn.nodemanager.pmem-check-enabled</name> <value>false</value> </property> <!--是否啟動一個線程檢查每個任務正使用的虛擬內存量,如果任務超出分配值,則直接將其殺掉,默認是true --> <property> <name>yarn.nodemanager.vmem-check-enabled</name> <value>false</value> </property>
2)配置歷史服務JobHistoryServer| 配置日志查看功能
修改spark-env.sh,添加如下配置:
[kris@hadoop101 conf]$ vim spark-env.sh
export JAVA_HOME=/opt/module/jdk1.8.0_144 YARN_CONF_DIR=/opt/module/hadoop-2.7.2/etc/hadoop # 配置JobHistoryServer 注意:HDFS上的目錄需要提前存在。 export SPARK_HISTORY_OPTS="-Dspark.history.ui.port=18080 -Dspark.history.retainedApplications=30 -Dspark.history.fs.logDirectory=hdfs://hadoop101:9000/directory"
從這里看到歷史日志:http://hadoop102:8088/cluster點擊直接跳轉到spark中 http://hadoop101:18080/history/application_1554294467331_0001/jobs/
[kris@hadoop101 conf]$ vim spark-defaults.conf
#修改spark-default.conf文件,開啟Log: spark.eventLog.enabled true spark.eventLog.dir hdfs://hadoop101:9000/directory # 日志查看 spark.yarn.historyServer.address=hadoop101:18080 spark.history.ui.port=18080
提交任務到Yarn執行 [kris@hadoop101 spark-yarn]$ bin/spark-submit \ --class org.apache.spark.examples.SparkPi \ --master yarn \ --deploy-mode client \ ./examples/jars/spark-examples_2.11-2.1.1.jar \ 100
[kris@hadoop101 spark-yarn]$ bin/spark-shell --master yarn ##shell只能用client模式啟動,默認的也是這種模式; Spark context Web UI available at http://192.168.1.101:4040 Spark context available as 'sc' (master = yarn, app id = application_1554290192113_0004). Spark session available as 'spark'. -------hadoop101------- 25920 NodeManager 25751 DataNode 28075 SparkSubmit ##Driver還是被封裝到這里邊的 28252 Jps 25469 QuorumPeerMain 25630 NameNode -------hadoop102------- 14995 CoarseGrainedExecutorBackend 15076 Jps 13447 DataNode 13672 NodeManager 13369 QuorumPeerMain 13549 ResourceManager 14942 ExecutorLauncher #Executor啟動器,就是AppMaster,Cluster模式,AM和sc在一個進程里邊的,這種模式AM的任務是:既可以申請資源又可以做任務切分和調度;
Client模式它們就不在一個進程了,由RM隨機選擇一個節點來啟動AM,這種模式它的作用僅僅是用來申請資源去啟動Executor; -------hadoop103------- 13536 DataNode 14610 CoarseGrainedExecutorBackend 14691 Jps 13638 NodeManager 13464 QuorumPeerMain 13710 SecondaryNameNode
yarn--cluster模式
[kris@hadoop101 spark-yarn]$ bin/spark-submit --class org.apache.spark.examples.SparkPi --master yarn --deploy-mode cluster ./examples/jars/spark-examples_2.11-2.1.1.jar 在任務未完成之前的進程: [kris@hadoop101 spark-yarn]$ jpsall -------hadoop101------- 13328 SparkSubmit 12146 NodeManager 13706 CoarseGrainedExecutorBackend 12555 NameNode 12702 DataNode 13951 Jps -------hadoop102------- 6864 ResourceManager 8101 Jps 7403 DataNode 6990 NodeManager -------hadoop103------- 7984 ApplicationMaster ## Yarn-Cluster模式下SparkAppMaster和Sparkcontext即Driver是在一個進程的 8432 Jps 7560 SecondaryNameNode 8158 CoarseGrainedExecutorBackend 7230 NodeManager 7438 DataNode 任務完成之后的進程: [kris@hadoop101 spark-yarn]$ jpsall -------hadoop101------- 12146 NodeManager 12555 NameNode 12702 DataNode 14031 Jps -------hadoop102------- 6864 ResourceManager 8153 Jps 7403 DataNode 6990 NodeManager -------hadoop103------- 7560 SecondaryNameNode 8537 Jps 7230 NodeManager 7438 DataNode
[kris@hadoop101 spark-yarn]$ sbin/start-history-server.sh ##開啟歷史服務
提交任務到Yarn執行 [kris@hadoop101 spark-yarn]$ bin/spark-submit \ --class org.apache.spark.examples.SparkPi \ --master yarn \ --deploy-mode client \ ./examples/jars/spark-examples_2.11-2.1.1.jar \ 100
Mesos模式
Spark客戶端直接連接Mesos;不需要額外構建Spark集群。國內應用比較少,更多的是運用yarn調度。
幾種模式對比
package com.atguigu.spark import org.apache.spark.{SparkConf, SparkContext} object WordCount { def main(args: Array[String]): Unit = { //1.創建SparkConf並設置App名稱 val conf = new SparkConf().setAppName("WordCount") //2.創建SparkContext,該對象是提交Spark App的入口 val context = new SparkContext(conf) //3.使用sc創建RDD並執行相應的transformation和action context.textFile(args(0)).flatMap(_.split(" ")).map((_, 1)).reduceByKey(_+_, 1).sortBy(_._2, false).saveAsTextFile(args(1)) //4.關閉連接 context.stop() } }
/wc.txt必須在HDFS上有這個文件
[kris@hadoop101 spark-yarn]$ hadoop fs -put wc.txt / [kris@hadoop101 spark-yarn]$ bin/spark-submit --class com.atguigu.spark.WordCount --master yarn --deploy-mode client /opt/module/spark/spark-yarn/WordCount.jar /wc.txt /out 結果: (Hello,3) (smile,2) (kris,2) (alex,1) (hi,1)