Apache Spark是一個圍繞速度、易用性和復雜分析構建的大數據處理框架,最初在2009年由加州大學伯克利分校的AMPLab開發,並於2010年成為Apache的開源項目之一,與Hadoop和Storm等其他大數據和MapReduce技術相比,Spark有如下優勢:
1.運行速度快,Spark擁有DAG執行引擎,支持在內存中對數據進行迭代計算。官方提供的數據表明,如果數據由磁盤讀取,速度是Hadoop MapReduce的10倍以上,如果數據從內存中讀取,速度可以高達100多倍。
2.適用場景廣泛,大數據分析統計,實時數據處理,圖計算及機器學習
3.易用性,編寫簡單,支持80種以上的高級算子,支持多種語言,數據源豐富,可部署在多種集群中
4.容錯性高。Spark引進了彈性分布式數據集RDD (Resilient Distributed Dataset) 的抽象,它是分布在一組節點中的只讀對象集合,這些集合是彈性的,如果數據集一部分丟失,則可以根據“血統”(即充許基於數據衍生過程)對它們進行重建。另外在RDD計算時可以通過CheckPoint來實現容錯,而CheckPoint有兩種方式:CheckPoint Data,和Logging The Updates,用戶可以控制采用哪種方式來實現容錯。
Spark的適用場景
目前大數據處理場景有以下幾個類型:
1.復雜的批量處理(Batch Data Processing),偏重點在於處理海量數據的能力,至於處理速度可忍受,通常的時間可能是在數十分鍾到數小時;
2.基於歷史數據的交互式查詢(Interactive Query),通常的時間在數十秒到數十分鍾之間
3.基於實時數據流的數據處理(Streaming Data Processing),通常在數百毫秒到數秒之間
架構及生態
通常當需要處理的數據量超過了單機尺度(比如我們的計算機有4GB的內存,而我們需要處理100GB以上的數據)這時我們可以選擇spark集群進行計算,有時我們可能需要處理的數據量並不大,但是計算很復雜,需要大量的時間,這時我們也可以選擇利用spark集群強大的計算資源,並行化地計算,其架構示意圖如下:

Spark Core:包含Spark的基本功能;尤其是定義RDD的API、操作以及這兩者上的動作。其他Spark的庫都是構建在RDD和Spark Core之上的。
Spark SQL:提供通過Apache Hive的SQL變體Hive查詢語言(HiveQL)與Spark進行交互的API。每個數據庫表被當做一個RDD,Spark SQL查詢被轉換為Spark操作。Spark提供的sql形式的對接Hive、JDBC、HBase等各種數據渠道的API,用Java開發人員的思想來講就是面向接口、解耦合,ORMapping、Spring Cloud Stream等都是類似的思想。
Spark Streaming:基於SparkCore實現的可擴展、高吞吐、高可靠性的實時數據流處理。支持從Kafka、Flume等數據源處理后存儲到HDFS、DataBase、Dashboard中。對實時數據流進行處理和控制。Spark Streaming允許程序能夠像普通RDD一樣處理實時數據。
MLlib:一個常用機器學習算法庫,算法被實現為對RDD的Spark操作。這個庫包含可擴展的學習算法,比如分類、回歸等需要對大量數據集進行迭代的操作。
GraphX:控制圖、並行圖操作和計算的一組算法和工具的集合。GraphX擴展了RDD API,包含控制圖、創建子圖、訪問路徑上所有頂點的操作
Spark的架構設計
Hadoop存在缺陷:
基於磁盤,無論是MapReduce還是YARN都是將數據從磁盤中加載出來,經過DAG,然后重新寫回到磁盤中,計算過程的中間數據又需要寫入到HDFS的臨時文件,這些都使得Hadoop在大數據運算上表現太“慢”,Spark應運而生。

Cluster Manager在standalone模式中即為Master主節點,控制整個集群,監控worker。在YARN模式中為資源管理器負責分配資源,有點像YARN中ResourceManager那個角色,大管家握有所有的干活的資源,屬於乙方的總包。
WorkerNode是可以干活的節點,聽大管家ClusterManager差遣,是真正有資源干活的主。從節點,負責控制計算節點,啟動Executor或者Driver。
Executor是在WorkerNode上起的一個進程,相當於一個包工頭,負責准備Task環境和執行
Task,負責內存和磁盤的使用。Task是施工項目里的每一個具體的任務。
Driver是統管Task的產生與發送給Executor的,運行Application 的main()函數,是甲方的司令員。
SparkContext是與ClusterManager打交道的,負責給錢申請資源的,是甲方的接口人。
整個互動流程是這樣的:
1 甲方來了個項目,創建了SparkContext,SparkContext去找ClusterManager申請資源同時給出報價,需要多少CPU和內存等資源。ClusterManager去找WorkerNode並啟動Excutor,並介紹Excutor給Driver認識。
2 Driver根據施工圖拆分一批批的Task,將Task送給Executor去執行。
3 Executor接收到Task后准備Task運行時依賴並執行,並將執行結果返回給Driver
4 Driver會根據返回回來的Task狀態不斷的指揮下一步工作,直到所有Task執行結束。
運行流程及特點

1.構建Spark Application的運行環境,啟動SparkContext
2.SparkContext向資源管理器(可以是Standalone,Mesos,Yarn)申請運行Executor資源,並啟動StandaloneExecutorbackend,
3.Executor向SparkContext申請Task
4.SparkContext將應用程序分發給Executor
5.SparkContext構建成DAG圖,將DAG圖分解成Stage、將Taskset發送給Task Scheduler,最后由Task Scheduler將Task發送給Executor運行
6.Task在Executor上運行,運行完釋放所有資源
Spark運行特點:
1.每個Application獲取專屬的executor進程,該進程在Application期間一直駐留,並以多線程方式運行Task。這種Application隔離機制是有優勢的,無論是從調度角度看(每個Driver調度他自己的任務),還是從運行角度看(來自不同Application的Task運行在不同JVM中),當然這樣意味着Spark Application不能跨應用程序共享數據,除非將數據寫入外部存儲系統
2.Spark與資源管理器無關,只要能夠獲取executor進程,並能保持相互通信就可以了
提交SparkContext的Client應該靠近Worker節點(運行Executor的節點),最好是在同一個Rack里,因為Spark Application運行過程中SparkContext和Executor之間有大量的信息交換
Task采用了數據本地性和推測執行的優化機制
Spark任務調度模塊DAGScheduler、TaskScheduler:


用戶編排的代碼由一個個的RDD Objects組成,DAGScheduler負責根據RDD的寬依賴拆分DAG為一個個的Stage,買個Stage包含一組邏輯完全相同的可以並發執行的Task。TaskScheduler負責將Task推送給從ClusterManager那里獲取到的Worker啟動的Executor。
DAGScheduler(統一化的,Spark說了算):
詳細的案例分析下如何進行Stage划分,請看下圖

1 stage是觸發action的時候從后往前划分的,所以本圖要從RDD_G開始划分。
2 RDD_G依賴於RDD_B和RDD_F,隨機決定先判斷哪一個依賴,但是對於結果無影響。
3 RDD_B與RDD_G屬於窄依賴,所以他們屬於同一個stage,RDD_B與老爹RDD_A之間是寬依賴的關系,所以他們不能划分在一起,所以RDD_A自己是一個stage1
4 RDD_F與RDD_G是屬於寬依賴,他們不能划分在一起,所以最后一個stage的范圍也就限定了,RDD_B和RDD_G組成了Stage3
5 RDD_F與兩個爹RDD_D、RDD_E之間是窄依賴關系,RDD_D與爹RDD_C之間也是窄依賴關系,所以他們都屬於同一個stage2
6 執行過程中stage1和stage2相互之間沒有前后關系所以可以並行執行,相應的每個stage內部各個partition對應的task也並行執行
7 stage3依賴stage1和stage2執行結果的partition,只有等前兩個stage執行結束后才可以啟動stage3.
8 我們前面有介紹過Spark的Task有兩種:ShuffleMapTask和ResultTask,其中后者在DAG最后一個階段推送給Executor,其余所有階段推送的都是ShuffleMapTask。在這個案例中stage1和stage2中產生的都是ShuffleMapTask,在stage3中產生的ResultTask。
9 雖然stage的划分是從后往前計算划分的,但是依賴邏輯判斷等結束后真正創建stage是從前往后的。也就是說如果從stage的ID作為標識的話,先需要執行的stage的ID要小於后需要執行的ID。就本案例來說,stage1和stage2的ID要小於stage3,至於stage1和stage2的ID誰大誰小是隨機的,是由前面第2步決定的。
雖然理論上Task應該交給workerNode上的executor來執行的,但是有一種情況下是是在DAG划分結束后直接在本地執行的。
1 Spark.localExecution.enabled設置為true;
2 用戶顯示指定允許本地執行;
3 整個DAG只有一個stage;
4 僅有一個Partition。
同時滿足上面4個條件下,可以直接在SparkContext(Driver)節點上本地執行。
TaskScheduler(接口化的,根據不同的部署方式Standalone、Mesos、YARN、Local):

每個TaskScheduler對應着一個幫手SchedulerBackend,SchedulerBackend負責與ClusterManager交互獲得資源,然后將這些資源信息傳給TaskScheduler,TaskScheduler負責監督Task的執行狀態並進行相應的調度。這里主要做的工作有:就近原則、失敗重試、慢任務推測性執行
任務調度的時候默認是FIFO(先到先得)的,由jobID和stageID的大小來決定;也可以配置成FAIR(公平)的,重新確定調度順序推送task給Executor。
Executor執行完Task后會通過向Driver發送StatusUpdate的消息來通知Driver任務更新Task的狀態。Driver會將Task狀態通知轉告給TaskSchedule,后者會重新分配計算任務。
假如Task有執行失敗的,根據失敗原因和閾值進行該Task的重試或者放棄。
假如所有Task執行成功,如果Task是ResultTask,那么任務結束;如果是ShuffleMapTask那么啟動下一個stage。
Spark運行模式:
Local模式:
比較簡單,只適用於自己玩和測試,甲方SparkContext乙方Executor等都部署在一起,物理位置上角色定位不明確。
Mesos模式:

Worker部分采用Master/Slaver模式,Master是整個系統的核心部件所以用ZooKeeper做高可用性加固,Slaver真正創建Executor執行Task並將自己的物理計算資源匯報給Master,Master負責將slavers的資源按照策略分配給Framework。
Mesos資源調度分為粗粒度和細粒度兩種方式:
粗粒度方式是啟動時直接向Master申請執行全部Task的資源,並等所有計算任務結束后才釋放資源;細粒度方式是根據Task需要的資源不停的申請和歸還。兩個方式各有利弊,粗粒度的優點是調度成本小,但是會因木桶效應造成資源長期被霸占;細粒度沒有木桶效應,但是調度上的管理成本較高。
YARN模式:

YRAN模式下分為Cluster和Client兩種模式,上圖中的為Cluster模式。
Cluster模式下就是將Spark作為一個普通的YARN任務,Client端通過ResourceManager申請到資源,創建ApplicationMaster、Task到Container中去。ApplicationMaster負責監督Task的執行情況。
Yarn-Client模式中,Driver在客戶端本地運行,這種模式可以使得Spark Application和客戶端進行交互,因為Driver在客戶端,所以可以通過webUI訪問Driver的狀態,默認是http://hadoop1:4040訪問,而YARN通過http:// hadoop1:8088訪問
YARN-client的工作流程步驟為:

Spark Cluster模式:
在YARN-Cluster模式中,當用戶向YARN中提交一個應用程序后,YARN將分兩個階段運行該應用程序:
第一個階段是把Spark的Driver作為一個ApplicationMaster在YARN集群中先啟動;
第二個階段是由ApplicationMaster創建應用程序,然后為它向ResourceManager申請資源,並啟動Executor來運行Task,同時監控它的整個運行過程,直到運行完成
YARN-cluster的工作流程分為以下幾個步驟

Spark Client 和 Spark Cluster的區別:
理解YARN-Client和YARN-Cluster深層次的區別之前先清楚一個概念:Application Master。在YARN中,每個Application實例都有一個ApplicationMaster進程,它是Application啟動的第一個容器。它負責和ResourceManager打交道並請求資源,獲取資源之后告訴NodeManager為其啟動Container。從深層次的含義講YARN-Cluster和YARN-Client模式的區別其實就是ApplicationMaster進程的區別
YARN-Cluster模式下,Driver運行在AM(Application Master)中,它負責向YARN申請資源,並監督作業的運行狀況。當用戶提交了作業之后,就可以關掉Client,作業會繼續在YARN上運行,因而YARN-Cluster模式不適合運行交互類型的作業
YARN-Client模式下,Application Master僅僅向YARN請求Executor,Client會和請求的Container通信來調度他們工作,也就是說Client不能離開
Standalone(也叫Deploy)模式:
與Mesos模式有點像,也是Master/Slavers的架構。Standalone模式使用Spark自帶的資源調度框架,選用ZooKeeper來實現Master的HA
框架結構圖如下:

該模式主要的節點有Client節點、Master節點和Worker節點。其中Driver既可以運行在Master節點上中,也可以運行在本地Client端。當用spark-shell交互式工具提交Spark的Job時,Driver在Master節點上運行;當使用spark-submit工具提交Job或者在Eclips、IDEA等開發平台上使用”new SparkConf.setManager(“spark://master:7077”)”方式運行Spark任務時,Driver是運行在本地Client端上的
運行過程如下圖

1.SparkContext連接到Master,向Master注冊並申請資源(CPU Core 和Memory)
2.Master根據SparkContext的資源申請要求和Worker心跳周期內報告的信息決定在哪個Worker上分配資源,然后在該Worker上獲取資源,然后啟動StandaloneExecutorBackend;
StandaloneExecutorBackend向SparkContext注冊;
3.SparkContext將Applicaiton代碼發送給StandaloneExecutorBackend;並且SparkContext解析Applicaiton代碼,構建DAG圖,並提交給DAG Scheduler分解成Stage(當碰到Action操作時,就會催生Job;每個Job中含有1個或多個Stage,Stage一般在獲取外部數據和shuffle之前產生),然后以Stage(或者稱為TaskSet)提交給Task Scheduler,Task Scheduler負責將Task分配到相應的Worker,最后提交給StandaloneExecutorBackend執行;
4.StandaloneExecutorBackend會建立Executor線程池,開始執行Task,並向SparkContext報告,直至Task完成
5.所有Task完成后,SparkContext向Master注銷,釋放資源
Spark的容錯處理
請注意這里使用的是容錯而不是容災,因為這倆不是一個概念。
容災是洪水、火災、地震等導致的災難性的毀滅性的故障,是非常小概率的事件,需要做數據級別甚至應用級別的異地備份;而容錯是解決由於網絡阻塞、磁盤損壞、內存溢出、機器掉電等引起的單點故障或者模塊化的故障,是每時每刻都有可能發生的大概率事件。
所以說容錯和容災不是一個級別的,我們對架構拓撲稍作優化通過很小的成本就可以達到容錯的效果;但是要想達到容災那將是巨大的開銷而且很難存在一個100%容災的設計,例如地球被炸了難道還要將數據在外太空做定期備份么~~
前面介紹模式的時候我們一再強調,Master是核心部件,是心臟和大腦,所以Master的故障是我們不能接受的,所以需要通過Zookeeper來做高可用性(雖然也有fileSystem模式但是自從我做過的一個產品真的出現磁盤損壞導致單點故障然后連續加班了40多小時候以后我再也不相信硬件了)。一旦Leader的Master掛掉,其它Master會自主推優出新的Leader。新的Leader會從ZooKeeper中讀取所有元數據並通知到大家(Worker、Client)自己登基上位。
Worker節點眾多,出現故障的概率最高,workers定時的向Master上報心跳,一旦超時Master將對它進行卸磨殺驢。Master會將worker上所有Executor設置為失效並通知給Client,Client會通知SchedulerBackend如果有該worker上的executor正在執行你的task請重新調度。Master通知完以后會嘗試kill掉該壞掉的worker。
Worker節點上運行着很多executor進程,如果worker心跳沒問題但是某個executor進程出線了問題怎么辦?這個概率比worker出現異常的概率更大!其實worker節點除了明着干活的executor,還有監督executor的executorRunner,它會將executor退出的信息告知自己所在的worker,worker在通知自己的master,剩下的就是跟上面一樣的套路了。
Executor詳解:
Executor干了兩件事情:運行Task和將結果反饋給Driver。。
Master在給Application分配Worker時有兩種方式:盡量打散和盡量集中。
盡量打散適用於內存密集型,盡量集中適用於CPU密集型。
1個物理節點可以部署多個Worker,但是一個Worker中對於1個Application只能有1個Executor。
關於Executor的內存設置:
Executor是執行Task的真正苦力,內存設置的過小,會導致內存溢出或者頻繁GC影響效率;內存設置過大會導致占用過多資源(內存資源從價格上和槽道數量上來講還是比較珍貴的)。所以合理的設置Executor內存是Spark處理任務的關鍵。Executor支持的任務的數量取決於持有的CPU的核數,所以一種思路是如果集群普遍的CPU核數夠多但是內存緊張,可以采用更多的分區來增加Task的個數減少單個Task執行對內存的要求。
Executor最終將Task的執行結果反饋給Driver,會根據大小采用不同的策略:
1 如果大於MaxResultSize,默認1G,直接丟棄;
2 如果“較大”,大於配置的frameSize(默認10M),以taksId為key存入BlockManager
3 else,全部吐給Driver。
Shuffle詳解:
Hash Base Shuffle(spark1.2以前默認):
下圖是將4個Partition洗牌成3個Partition的案例,假設當前是StageA,下一個是StageB

在洗牌過程中StageA每個當前的Task會把自己的Partition按照stageB中Partition的要求做Hash產生stageB中task數量的Partition(這里特別強調是每個stageA的task),這樣就會有len(stageA.task)*len(stageB.task)這么多的小file在中間過程產生,如果要緩存RDD結果還需要維護到內存,下個stageB需要merge這些file又涉及到網絡的開銷和離散文件的讀取,所以說超過一定規模的任務用Hash Base模式是非常吃硬件的。
盡管后來Spark版本推出了Consolidate對基於Hash的模式做了優化,但是只能在一定程度上減少block file的數量,沒有根本解決上面的缺陷。
Sort Base Shuffle(spark1.2開始默認):

Sort模式下StageA每個Task會產生2個文件:內容文件和索引文件。內容文件是根據StageB中Partition的要求自己先sort好並生成一個大文件;索引文件是對內容文件的輔助說明,里面維護了不同的子partition之間的分界,配合StageB的Task來提取信息。這樣中間過程產生文件的數量由len(stageA.task)*len(stageB.task)減少到2* len(stageB.task),StageB對內容文件的讀取也是順序的。Sort帶來的另一個好處是,一個大文件對比與分散的小文件更方便壓縮和解壓,通過壓縮可以減少網絡IO的消耗。(PS:但是壓縮和解壓的過程吃CPU,所以要合理評估)
Sort和Hash模式通過spark.shuffle.manager來配置的。
Storage模塊:
存儲介質:內存、磁盤、Tachyon(這貨是個分布式內存文件,與Redis不一樣,Redis是分布式內存數據庫),存儲級別就是它們單獨或者相互組合,再配合一些容錯、序列化等策略。例如內存+磁盤。
負責存儲的組件是BlockManager,在Master(Dirver)端和Slaver(Executor)端都有BlockManager,分工不同。Slaver端的將自己的BlockManager注冊給Master,負責真正block;Master端的只負責管理和調度。
Storage模塊運行時內存默認占Executor分配內存的60%,所以合理的分配Executor內存和選擇合適的存儲級別需要平衡下Spark的性能和穩定。
RDD(Resilient Distributed Datasets) 彈性分布式數據集
RDD支持兩種操作:轉換(transiformation)和動作(action)
轉換就是將現有的數據集創建出新的數據集,像Map;動作就是對數據集進行計算並將結果返回給Driver,像Reduce。
RDD中轉換是惰性的,只有當動作出現時才會做真正運行。這樣設計可以讓Spark更見有效的運行,因為我們只需要把動作要的結果送給Driver就可以了而不是整個巨大的中間數據集。
緩存技術(不僅限內存,還可以是磁盤、分布式組件等)是Spark構建迭代式算法和快速交互式查詢的關鍵,當持久化一個RDD后每個節點都會把計算分片結果保存在緩存中,並對此數據集進行的其它動作(action)中重用,這就會使后續的動作(action)變得跟迅速(經驗值10倍)。例如RDD0àRDD1àRDD2,執行結束后RDD1和RDD2的結果已經在內存中了,此時如果又來RDD0àRDD1àRDD3,就可以只計算最后一步了。
RDD之間的寬依賴和窄依賴:
窄依賴:父RDD的每個Partition只被子RDD的一個Partition使用。
寬依賴:父RDD的每個Partition會被子RDD的多個Partition使用。
寬和窄可以理解為褲腰帶,褲腰帶扎的緊下半身管的嚴所以只有一個兒子;褲腰帶幫的比較寬松下半身管的不禁會搞出一堆私生子,這樣就記住了。
對於窄依賴的RDD,可以用一個計算單元來處理父子partition的,並且這些Partition相互獨立可以並行執行;對於寬依賴完全相反。
在故障回復時窄依賴表現的效率更高,兒子壞了可以通過重算爹來得到兒子,反正就這一個兒子當爹的恢復效率就是100%。但是對於寬依賴效率就很低了,如下圖:

如果兒子b1壞了a1、a2、a3三個當爹的都運算了一次恢復了b1,但是其實它們的運算同時也會覆蓋一遍b2這個無辜的兒子,有效率只有50%。
代碼實現上窄依賴NarrowDependency有2種:OneToOneDependency和RangeDependency
寬依賴只有1種ShuffleDependency,但是內部參數ShuffleManager有Hash和Sort兩種,后面會詳細介紹Hash和Sork的區別。
有了以上RDD寬窄依賴和父子之間的血緣關系,我們就可以繪制DAG:
繪制原則就是由於寬依賴的“斷點”效應,根據寬依賴將整個DAG分為不同的階段(Stage),每個Stage之間有先后關系從前向后進行,在每個Stage內部窄依賴RDD是並行執行的。
Stage的划分是從最后一個RDD從后往前進行的。
注意:轉化和動作只是決定惰性執行的時機,寬窄依賴才是划分Stage的唯一標准。ReduceByKey是轉化,但它包含ShuffleDependency,所以轉化和動作與寬窄依賴沒關,不要混淆。
RDD的計算:
Spark的Task有兩種:ShuffleMapTask和ResultTask,其中后者在DAG最后一個階段推送給Executor,其余所有階段推送的都是ShuffleMapTask。
Executor在准備好Task運行環境后會調用scheduler.Task#run,scheduler.Task#run會調用ShuffleMapTask或ResultTask的runTask,runTask會調用RDD的#iterator,每個RDD真正的計算邏輯實現在RDD的computer方法中。用戶創建SparkContext時會創建SparkEnv負責管理所有運行環境的信息,最核心的是cacheManager。
CheckPoint:
CheckPoint是對RDD緩存不足被擦寫等中間block斷丟失導致重新計算這一缺點的彌補,CheckPoint會啟動一個job來計算並將計算結果寫入磁盤中,最后修改原始RDD的依賴為當前CheckPoint。當緩存沒有命中時先來看CheckPoint中有沒有記錄,再決定是否重新計算。CheckPoint是RDD磁盤緩存的一種表現,穩定性更高,但是IO更慢。
Spark與hadoop:
- Hadoop有兩個核心模塊,分布式存儲模塊HDFS和分布式計算模塊Mapreduce
- spark本身並沒有提供分布式文件系統,因此spark的分析大多依賴於Hadoop的分布式文件系統HDFS
- Hadoop的Mapreduce與spark都可以進行數據計算,而相比於Mapreduce,spark的速度更快並且提供的功能更加豐富
- 關系圖如下:

