原文連接 http://xiguada.org/spark/
Spark概述
當前,MapReduce編程模型已經成為主流的分布式編程模型,它極大地方便了編程人員在不會分布式並行編程的情況下,將自己的程序運行在分布式系統上。但是MapReduce也存在一些缺陷,如高延遲、不支持DAG模型、Map與Reduce的中間數據落地等。因此在近兩年,社區出現了優化改進MapReduce的項目,如交互查詢引擎Impala、支持DAG的TEZ、支持內存計算Spark等。Spark是UC Berkeley AMP lab開源的通用並行計算框架,以其先進的設計理念,已經成為社區的熱門項目。Spark相對與MapReduce的優勢有:低延遲、支持DAG和分布式內存計算。雖然Spark有許多優勢,但是畢竟沒有經過大規模生產的驗證,所以暫未能代替MapReduce,慶幸的是,由於許多人意識到Spark的優勢所在,社區Spark已成熱門項目。本文對Spark的分析基於社區spark 1.0.2版本。
Spark生態圈介紹
Spark力圖整合機器學習(MLib)、圖算法(GraphX)、流式計算(Spark Streaming)和數據倉庫(Spark SQL)等領域,通過計算引擎Spark,彈性分布式數據集(RDD),架構出一個新的大數據應用平台。
Spark生態圈以HDFS、S3、Techyon為底層存儲引擎,以Yarn、Mesos和Standlone作為資源調度引擎;使用Spark,可以實現MapReduce應用;基於Spark,Spark SQL可以實現即席查詢,Spark Streaming可以處理實時應用,MLib可以實現機器學習算法,GraphX可以實現圖計算,SparkR可以實現復雜數學計算。
圖1 Spark生態圈
Spark包與目錄介紹
下載源碼包:http://spark.apache.org/downloads.html
Spark 1.0.2源碼包目錄結構:
圖2 spark代碼目錄結構
源代碼下子目錄很多,下表是幾個關鍵目錄的介紹。
子目錄 |
功能 |
core |
Spark核心代碼都在此目錄下 |
sql |
Spark sql相關的代碼 |
streaming |
Spark Streaming(實時計算)相關的代碼 |
mlib |
MLib(機器學習)相關代碼 |
graphx |
GraphX(圖計算)相關代碼 |
yarn |
支持Spark運行在Yarn上的模塊 |
example |
各種spark作業的例子 |
非關鍵目錄
子目錄 |
功能 |
assembly |
組裝spark項目的地方 |
bagel |
基於Spark的輕量Pregel實現,bagel將被GraphX代替 |
ec2 |
提交spark集群到Amazon EC2 |
external |
與一些外部系統的依賴 |
extra |
此目錄包含了spark默認不構建的組件 |
repl |
Spark shell功能模塊 |
tools |
工具包 |
Spark編譯
Spark使用mvn編譯,並為我們提交了構建項目的腳本:make-distribution.sh,推薦在Linux下編譯,編譯命令:./make-distribution.sh --hadoop 2.2.0 --with-yarn –tgz
編譯成功后會工程目錄下生成dist目錄,即項目可執行包:
圖3
如何運行Spark作業
參考社區文檔: https://spark.apache.org/docs/latest/quick-start.html
Spark運行模式介紹
Spark任務的運行模式有local、standalone、OnYarn等,各種運行模式的詳細流程可以參考博客: http://www.cnblogs.com/shenh062326/p/3658543.html
Spark作業執行簡要流程
無論運行在哪種模式下,Spark作業的執行流程都是相似的,主要有如下八步:
-
客戶端提交作業
-
Driver啟動流程
-
Driver申請資源並啟動其余Executor(即Container)
-
Executor啟動流程
-
作業調度,生成stages與tasks。
-
Task調度到Executor上,Executor啟動線程執行Task邏輯
-
Driver管理Task狀態
-
Task完成,Stage完成,作業完成
Spark原理詳細介紹
DAGScheduler與TaskScheduler詳解
DAGScheduler把一個spark作業轉換成成stage的DAG(Directed Acyclic Graph有向無環圖),根據RDD和stage之間的關系,找出開銷最小的調度方法,然后把stage以TaskSet的形式提交給TaskScheduler。圖3與圖4展示了DAGScheduler與TaskScheduler的工作。
圖4 DAGScheduler的作用
圖5 TaskScheduler作用
DAGScheduler還決定了運行task的理想位置,並把這些信息傳遞給下層的TaskScheduler。此外,DAGScheduler還處理由於shuffle數據丟失導致的失敗,這有可能需要重新提交運行之前的stage(非shuffle數據丟失導致的task失敗由TaskScheduler處理)。
TaskScheduler維護所有TaskSet,當Executor向Driver發送心跳時,TaskScheduler會根據其資源剩余情況分配相應的Task。另外TaskScheduler還維護着所有Task的運行狀態,重試失敗的Task。
RDD詳解
RDD(Resilient Distributed Datasets彈性分布式數據集),是spark中最重要的概念,用戶可以簡單的把RDD理解成一個提供了許多操作接口的數據集合,和一般數據集不同的是,其實際數據分布存儲於一批機器中(內存或磁盤中)。當然,RDD肯定不會這么簡單,它的功能還包括容錯、集合內的數據可以並行處理等。圖5是RDD類的視圖,圖6簡要展示了RDD的底層實現。更多RDD的操作描述和編程方法請參考社區文檔:https://spark.apache.org/docs/latest/programming-guide.html。
圖6 RDD提供了許多操作
圖7 RDD的實現
RDD cache的原理
RDD的轉換過程中,並不是每個RDD都會存儲,如果某個RDD會被重復使用,或者計算其代價很高,那么可以通過顯示調用RDD提供的cache()方法,把該RDD存儲下來。那RDD的cache是如何實現的呢?
RDD中提供的cache()方法只是簡單的把該RDD放到cache列表中。當RDD的iterator被調用時,通過CacheManager把RDD計算出來,並存儲到BlockManager中,下次獲取便可直接通過CacheManager從BlockManager取出。
Shuffle原理簡介
在Spark編程時,不僅僅只有reduce才會產生shuffle過程,RDD提供的groupByKey,countApproxDistinctByKey等操作都會生成shuffle。Spark中shuffle的實現與MapReduce的shuffle有比較大的差別,首先是map階段,map的輸出不再需要排序,直接寫到文件中,一個map會把屬於不同reduce的數據分別輸出到不同的文體中,而reduce則通過aggregator處理所有shuffle fetch獲取的partition。
從流程上看,MapTask結束后,Driver的MapOutPutTracker會注冊MapOutPuts,ReduceTask啟動后向Driver獲取MapOutPutStatuses,然后fetch相應的MapOutPuts。
圖8 Shuffle簡介
圖9 Shuffle的原理(摘自網絡)
Spark Streaming詳解
Spark Streaming是建立在Spark上的實時計算框架,通過它提供豐富的API、基於內存的高速執行引擎,用戶可以結合流式、批處理和交互試查詢應用。
Spark Streaming的基本原理是將輸入數據流以時間片(秒級)為單位進行拆分,然后以類似批處理的方式處理每個時間片數據,其基本原理如下圖所示。
圖10 Spark Streaming基本原理圖
首先,Spark Streaming把實時輸入數據流以時間片Δt (如1秒)為單位切分成塊。Spark Streaming會把每塊數據作為一個RDD,並使用RDD操作處理每一小塊數據。每個塊都會生成一個Spark Job處理,最終結果也返回多塊。
使用Spark Streaming編寫的程序與編寫Spark程序非常相似,在Spark程序中,主要通過操作RDD(Resilient Distributed Datasets彈性分布式數據集)提供的接口,如map、reduce、filter等,實現數據的批處理。而在Spark Streaming中,則通過操作DStream(表示數據流的RDD序列)提供的接口,這些接口和RDD提供的接口類似。
圖11 Spark Streaming程序轉換為DStream Graph
圖12 DStream Graph轉換為Spark jobs
在圖12中,Spark Streaming把程序中對DStream的操作轉換為DStream Graph,圖4中,對於每個時間片,DStream Graph都會產生一個RDD Graph;針對每個輸出操作(如print、foreach等),Spark Streaming都會創建一個Spark action;對於每個Spark action,Spark Streaming都會產生一個相應的Spark job,並交給JobManager。JobManager中維護着一個Jobs隊列, Spark job存儲在這個隊列中,JobManager把Spark job提交給Spark Scheduler,Spark Scheduler負責調度Task到相應的Spark Executor上執行。
圖13
Spark Streaming的另一大優勢在於其容錯性,RDD會記住創建自己的操作,每一批輸入數據都會在內存中備份,如果由於某個結點故障導致該結點上的數據丟失,這時可以通過備份的數據在其它結點上重算得到最終的結果。