Spark集群搭建與運行流程
服務器准備
我使用的是ubuntu-16.04版本的4個虛擬主機,主機名為s200,s201,s202,s203
spark安裝包spark-2.1.0-bin-hadoop2.7.tgz,在這里我使用了root用戶進行集群的搭建
集群規划
s200 : 用作Master
s201-s203:用作Worker
服務器的ssh配置
1)ssh-keygen -t rsa 生成公鑰和私鑰
2)將id_rsa.pub分發至遠程服務器上,並在遠程服務器將分發得到的公鑰cat id_rsa.pub >> ~/.ssh/authorized_keys中就可免密碼登陸
3)ssh下的各目錄說明
known_hosts:ssh會把每個你訪問過計算的公鑰(public key)都記錄在此文件中,當下次訪問相同計算機時,OpenSSH會核對公鑰,如果公鑰不同,OpenSSH會發出警告, 避免你受到DNS Hijack之類的攻擊
authorized_keys:就是為了讓兩個linux機器之間使用ssh不需要用戶名和密碼。采用了數字簽名RSA或者DSA來完成這個操作
spark集群搭建
1)修改spark-env.sh配置文件
# 把SPARK_HOME/conf/下的spark-env.sh.template文件復制為spark-env.sh
# 修改spark-env.sh配置文件,添加一些配置信息
# vim spark-env.sh
# 配置JAVA_HOME,一般來說,不配置也可以,但是可能會出現問題,還是配上吧
export JAVA_HOME=/usr/local/java/jdk1.8.0_73
# 一般來說,spark任務有很大可能性需要去HDFS上讀取文件,所以配置上
# 如果說你的spark就讀取本地文件,也不需要yarn管理,不用配
export HADOOP_CONF_DIR=/home/hadoop/apps/hadoop-2.7.4/etc/hadoop
# 設置Master的主機名
export SPARK_MASTER_HOST=hadoop01
# 提交Application的端口,默認就是這個,萬一要改呢,改這里
# 為什么默認就是7077呢?我們在sbin/ vim start-master.sh可以看到
# if [ "$SPARK_MASTER_PORT" = "" ]; then
# SPARK_MASTER_PORT=7077
# fi
# 但是我們最好在這里統一配置
export SPARK_MASTER_PORT=7077
# 每一個Worker最多可以使用的cpu core的個數,我虛擬機就一個...
# 真實服務器如果有32個,你可以設置為32個
export SPARK_WORKER_CORES=1
# 每一個Worker最多可以使用的內存,我的虛擬機就2g
# 真實服務器如果有128G,你可以設置為100G
export SPARK_WORKER_MEMORY=1g
2)修改slaves配置文件,添加Worker的主機列表
# 把SPARK_HOME/conf/下的slaves.sh.template文件復制為slaves.sh
s201
s202
s203
3)把spark_home/sbin下的start-all.sh和stop-all.sh這個文件重命名
這一步可做可不做。
如果集群中也配置HADOOP_HOME,那么在HADOOP_HOME/sbin目錄下也有start-all.sh和stop-all.sh這兩個文件,當你執行這兩個文件,系統不知道是操作hadoop集群還是spark集群。修改后就不會沖突了,當然,不修改的話,你需要進入它們的sbin目錄下執行這些文件,這肯定就不會發生沖突了。我們配置SPARK_HOME主要也是為了執行其他spark命令方便。
4)把spark安裝包分發給其他節點
scp -r spark-2.1.0 s201:`pwd`
scp -r spark-2.1.0 s202:`pwd`
scp -r spark-2.1.0 s203:`pwd`
5)在集群所有節點上配置SPARK_HOME環境變量
vim /etc/profile
export SPARK_HOME=/application/spark2.1.0
export PATH=$SPARK_HOME/bin:$SPARK_HOME/sbin:$PATH
source /etc/profile
6)在spark master節點啟動saprk集群
# 注意,如果你沒有執行第4步,一定要進入SPARK_HOME/sbin目錄下執行這個命令
# 或者你在Master節點分別執行start-master.sh和start-slaves.sh
start-spark-all.sh
# 注意,如果配置了HADOOP_CONF_DIR,那么在啟動spark集群之前,先啟動hadoop集群
7)驗證
root@s200:~# jps
3729 Master
4050 Jps
root@s200:~#
root@s201:~# jps
14805 Jps
8678 Worker
root@s201:~#
root@s202:~# jps
12307 Jps
7397 Worker
root@s202:~#
root@s203:~# jps
2363 Worker
2462 Jps
root@s203:~#
術語解釋
Master :資源管理的主節點(進程)
Cluster Manager :在集群上獲取資源的外部服務(例如standalone,Mesos,Yarn)
Worker Node(standalone):資源管理的從節點(進程)或者說管理本機資源的進程
Application:基於Spark的用戶程序,包含了driver程序和運行在集群上的executor程序
Driver Program:用於來連接工作進程(Worker)的程序
Executor:是在一個worker進程所管理的節點上為某Application啟動的一個進程,該進程負責運行任務,
並且負責將數據存在內存或者磁盤上,每個應用都有各自獨立的executors
Task:被送到某個executor上的工作單元
Job:包含很多任務(Task)的並行計算,可以看作和action對應
Stage:一個Job會被拆分成很多組任務,每組任務被稱為Stage(就像Mapreduce分map task和reduce task一樣)
spark資源調度
集群啟動后,Worker節點會像Master節點匯報資源情況,Master掌握了集群資源情況。當Spark提交一個Application后,根據RDD之間的依賴關系將Application形成一個DAG有向無環圖。任務提交后,Spark會在Driver端創建兩個對象:DAGScheduler和TaskScheduler,DAGScheduler是任務調度的高層調度器,是一個對象。DAGScheduler的主要作用就是將DAG根據RDD之間的寬窄依賴關系划分為一個個的Stage,然后將這些Stage以TaskSet的形式提交給TaskScheduler(TaskScheduler是任務調度的低層調度器,這里TaskSet其實就是一個集合,里面封裝的就是一個個的task任務,也就是stage中的並行度task任務),TaskSchedule會遍歷TaskSet集合,拿到每個task后會將task發送到計算節點Executor中去執行(其實就是發送到Executor中的線程池ThreadPool去執行)。task在Executor線程池中的運行情況會向TaskScheduler反饋,當task執行失敗時,則由TaskScheduler負責重試,將task重新發送給Executor去執行,默認重試3次。如果重試3次依然失敗,那么這個task所在的stage就失敗了。stage失敗了則由DAGScheduler來負責重試,重新發送TaskSet到TaskSchdeuler,Stage默認重試4次。如果重試4次以后依然失敗,那么這個job就失敗了。job失敗了,Application就失敗了。
TaskScheduler不僅能重試失敗的task,還會重試straggling(落后,緩慢)task(也就是執行速度比其他task慢太多的task)。如果有運行緩慢的task那么TaskScheduler會啟動一個新的task來與這個運行緩慢的task執行相同的處理邏輯。兩個task哪個先執行完,就以哪個task的執行結果為准。這就是Spark的推測執行機制。在Spark中推測執行默認是關閉的。推測執行可以通過spark.speculation屬性來配置。
1)待集群spark啟動成功后,worker與master通信,此時worker的各種信息(IP,port等)會存在Master中的workers集合中,其數據類型是HashSet。
2)當spark submit向Master為Driver申請資源時,申請信息會封裝在Master中的waitingDrivers集合中,此時有個Schedule() 方法會監控waitingDrivers集合是否為空,若不為空,說明客戶端向Master申請資源,然后查看當前集群的資源情況,從而找到符合要求的節點啟動Driver,待Driver啟動成功,就把這個申請信息從waitingDrivers集合中刪除。
3)Driver啟動成功后,Driver會像Master為Application申請資源,申請信息會封裝在Master中的waitingApps集合中,同樣Schedule()方法會監控waitingApps集合是否為空,若不為空,說明有Driver為當前Application申請資源,然后查看當前集群的資源情況,從而找到符合要求的節點去啟動Executor進程,待Executor啟動成功后,就把這個申請信息從waitingApps集合中刪除。
總結:
當為Driver或者當前Application申請資源時,會直接調用Schedule()方法,根據不同的申請去反調相應的方法,即一個Schedule()方法中有兩套處理邏輯
資源調度結論:
1、默認情況下,每一個Worker會為當前的Application啟動一個Executor進程,並且這個Executor會使用1G內存和當前Worker所能管理的所有core
2、如果想要在一個Worker上啟動多個Executor,可以在提交Application的時候,指定Executor使用的core數,命令為:spark -submit --executor-cores。
3、默認情況下Executor的啟動方式為輪訓啟動。
Executor在worker上啟動的條件是什么?
1、Worker分配給Executor的cores大於Executor所需要的最小cores。
2、Worker空閑 cores大於Executor所需最小cores。
3、Worker的空閑內存大於Executor所需要的內存。
spark任務調度
Spark是一個分布式並行計算框架。我們寫的Application要在集群中分布式計算,由於大數據中的計算原則是計算數據,為了將每一個task精准的分發到節點上,此時需要任務調度器,找到數據的位置,從而分發task到節點上。
任務調度過程:
首先根據RDD的依賴關系生成DAG有向無環圖,然后將有向無環圖交給DAGScheduler;
1)根據RDD的寬窄依賴關系,將DAG切割成一個個的stage,將切割出來的stage封裝到TaskSet對象,然后將一個個的TaskSet給TaskScheduler
2)TaskScheduler拿到TaskSet以后,會遍歷這個結果,拿到每一個task,然后去調用HDFS上的某一個方法,獲取數據的位置,根據數據的位置來分發task到worker節點的Executor進程中的線程池中執行
3)TaskScheduler會實時跟蹤每一個task的執行情況,若執行失敗,TaskScheduler會重試提交task,不會無休止的重試,默認是重試3次,如果重試3次依舊失敗,那么這個task所在stage就失敗了,此時TS向DAGScheduler匯報
4)TaskScheduler向DAGScheduler匯報當前stage失敗,此時DAGScheduler會重試提交stage。注意:每一次重試提交的stage,已經成功執行的不會被再次分發到Executor進程執行,只是提交重試失敗的。如果DAGScheduler重試了4次依然失敗,那么stage所在的job就失敗了,job失敗是不會進行重試的。DAGScheduler重試次數spark.stage.maxConsecutiveAttempts可設置。
資源調度與任務調度整合
整個過程中的部分問題
1、客戶端沒有向Master為Driver申請資源,Driver是如何啟動起來的?
通過Application程序中SparkContext上下文啟動起來的
2、為什么waitingWorker的數據類型是HashSet?
這里主要是用到了Set的唯一性,當worker與Master失去通信的時候,Master會設置一個時間間隔,若超過時間間隔worker還沒有與Master建立通信,那么Master會認為這個worker掛掉了,從而釋放這部分資源,若建立通信,Master則把原來那部分資源分配給worker,從而保證每個worker分配到一份資源
3、啟動Executor的飯是鋼hi為輪詢啟動,其優點是什么?
這種啟動方式可以使計算能夠更好地找到數據,有助於數據的本地化。
粗細粒度資源調度
粗粒度資源申請(spark)
當application執行之前,會將所有的資源申請完畢,申請到資源,就執行application,申請不到一直等待。當所有的task執行完畢之后才會釋放這批資源
優點:application執行之前資源申請完畢,每個task執行時就不需要自己去申請資源,task執行就快了, task快了,job就快了,application執行就快了
缺點:所有的task執行完畢之后才會釋放資源,容器使集群資源不能充分利用(如有數據傾斜的時候,99個task都執行完了,就剩一個task一直在執行)
細粒度資源申請(Map Reduce)
當application執行時,由執行任務的task自己去申請資源
優點:資源得到充分利用
缺點:效率不高