導語
spark 已經成為廣告、報表以及推薦系統等大數據計算場景中首選系統,因效率高,易用以及通用性越來越得到大家的青睞,我自己最近半年在接觸spark以及spark streaming之后,對spark技術的使用有一些自己的經驗積累以及心得體會,在此分享給大家。
本文依次從spark生態,原理,基本概念,spark streaming原理及實踐,還有spark調優以及環境搭建等方面進行介紹,希望對大家有所幫助。
spark 生態及運行原理
Spark 特點
-
運行速度快 => Spark擁有DAG執行引擎,支持在內存中對數據進行迭代計算。官方提供的數據表明,如果數據由磁盤讀取,速度是Hadoop MapReduce的10倍以上,如果數據從內存中讀取,速度可以高達100多倍。
-
適用場景廣泛 => 大數據分析統計,實時數據處理,圖計算及機器學習
-
易用性 => 編寫簡單,支持80種以上的高級算子,支持多種語言,數據源豐富,可部署在多種集群中
-
容錯性高。Spark引進了彈性分布式數據集RDD (Resilient Distributed Dataset) 的抽象,它是分布在一組節點中的只讀對象集合,這些集合是彈性的,如果數據集一部分丟失,則可以根據“血統”(即充許基於數據衍生過程)對它們進行重建。另外在RDD計算時可以通過CheckPoint來實現容錯,而CheckPoint有兩種方式:CheckPoint Data,和Logging The Updates,用戶可以控制采用哪種方式來實現容錯。
Spark的適用場景
目前大數據處理場景有以下幾個類型:
-
復雜的批量處理(Batch Data Processing),偏重點在於處理海量數據的能力,至於處理速度可忍受,通常的時間可能是在數十分鍾到數小時;
-
基於歷史數據的交互式查詢(Interactive Query),通常的時間在數十秒到數十分鍾之間
-
基於實時數據流的數據處理(Streaming Data Processing),通常在數百毫秒到數秒之間
Spark成功案例
目前大數據在互聯網公司主要應用在廣告、報表、推薦系統等業務上。在廣告業務方面需要大數據做應用分析、效果分析、定向優化等,在推薦系統方面則需要大數據優化相關排名、個性化推薦以及熱點點擊分析等。這些應用場景的普遍特點是計算量大、效率要求高。
騰訊 / yahoo / 淘寶 / 優酷土豆
spark運行架構
spark基礎運行架構如下所示:
spark結合yarn集群背后的運行流程如下所示:
spark 運行流程:
Spark架構采用了分布式計算中的Master-Slave模型。Master是對應集群中的含有Master進程的節點,Slave是集群中含有Worker進程的節點。
-
Master作為整個集群的控制器,負責整個集群的正常運行;
-
Worker相當於計算節點,接收主節點命令與進行狀態匯報;
-
Executor負責任務的執行;
-
Client作為用戶的客戶端負責提交應用;
-
Driver負責控制一個應用的執行。
Spark集群部署后,需要在主節點和從節點分別啟動Master進程和Worker進程,對整個集群進行控制。在一個Spark應用的執行過程中,Driver和Worker是兩個重要角色。Driver 程序是應用邏輯執行的起點,負責作業的調度,即Task任務的分發,而多個Worker用來管理計算節點和創建Executor並行處理任務。在執行階段,Driver會將Task和Task所依賴的file和jar序列化后傳遞給對應的Worker機器,同時Executor對相應數據分區的任務進行處理。
-
Excecutor /Task 每個程序自有,不同程序互相隔離,task多線程並行
-
集群對Spark透明,Spark只要能獲取相關節點和進程
-
Driver 與Executor保持通信,協作處理
三種集群模式:
1.Standalone 獨立集群
2.Mesos, apache mesos
3.Yarn, hadoop yarn
基本概念:
-
Application =>Spark的應用程序,包含一個Driver program和若干Executor
-
SparkContext => Spark應用程序的入口,負責調度各個運算資源,協調各個Worker Node上的Executor
-
Driver Program => 運行Application的main()函數並且創建SparkContext
-
Executor => 是為Application運行在Worker node上的一個進程,該進程負責運行Task,並且負責將數據存在內存或者磁盤上。每個Application都會申請各自的Executor來處理任務
-
Cluster Manager =>在集群上獲取資源的外部服務 (例如:Standalone、Mesos、Yarn)
-
Worker Node => 集群中任何可以運行Application代碼的節點,運行一個或多個Executor進程
-
Task => 運行在Executor上的工作單元
-
Job => SparkContext提交的具體Action操作,常和Action對應
-
Stage => 每個Job會被拆分很多組task,每組任務被稱為Stage,也稱TaskSet
-
RDD => 是Resilient distributed datasets的簡稱,中文為彈性分布式數據集;是Spark最核心的模塊和類
-
DAGScheduler => 根據Job構建基於Stage的DAG,並提交Stage給TaskScheduler
-
TaskScheduler => 將Taskset提交給Worker node集群運行並返回結果
-
Transformations => 是Spark API的一種類型,Transformation返回值還是一個RDD,所有的Transformation采用的都是懶策略,如果只是將Transformation提交是不會執行計算的
-
Action => 是Spark API的一種類型,Action返回值不是一個RDD,而是一個scala集合;計算只有在Action被提交的時候計算才被觸發。
Spark核心概念之RDD
Spark核心概念之Transformations / Actions
Transformation返回值還是一個RDD。它使用了鏈式調用的設計模式,對一個RDD進行計算后,變換成另外一個RDD,然后這個RDD又可以進行另外一次轉換。這個過程是分布式的。 Action返回值不是一個RDD。它要么是一個Scala的普通集合,要么是一個值,要么是空,最終或返回到Driver程序,或把RDD寫入到文件系統中。
Action是返回值返回給driver或者存儲到文件,是RDD到result的變換,Transformation是RDD到RDD的變換。
只有action執行時,rdd才會被計算生成,這是rdd懶惰執行的根本所在。
Spark核心概念之Jobs / Stage
Job => 包含多個task的並行計算,一個action觸發一個job
stage => 一個job會被拆為多組task,每組任務稱為一個stage,以shuffle進行划分
Spark核心概念之Shuffle
以reduceByKey為例解釋shuffle過程。
在沒有task的文件分片合並下的shuffle過程如下:(spark.shuffle.consolidateFiles=false
)
fetch 來的數據存放到哪里?
剛 fetch 來的 FileSegment 存放在 softBuffer 緩沖區,經過處理后的數據放在內存 + 磁盤上。這里我們主要討論處理后的數據,可以靈活設置這些數據是“只用內存”還是“內存+磁盤”。如果spark.shuffle.spill = false就只用內存。由於不要求數據有序,shuffle write 的任務很簡單:將數據 partition 好,並持久化。之所以要持久化,一方面是要減少內存存儲空間壓力,另一方面也是為了 fault-tolerance。
shuffle之所以需要把中間結果放到磁盤文件中,是因為雖然上一批task結束了,下一批task還需要使用內存。如果全部放在內存中,內存會不夠。另外一方面為了容錯,防止任務掛掉。
存在問題如下:
-
產生的 FileSegment 過多。每個 ShuffleMapTask 產生 R(reducer 個數)個 FileSegment,M 個 ShuffleMapTask 就會產生 M * R 個文件。一般 Spark job 的 M 和 R 都很大,因此磁盤上會存在大量的數據文件。
-
緩沖區占用內存空間大。每個 ShuffleMapTask 需要開 R 個 bucket,M 個 ShuffleMapTask 就會產生 MR 個 bucket。雖然一個 ShuffleMapTask 結束后,對應的緩沖區可以被回收,但一個 worker node 上同時存在的 bucket 個數可以達到 cores R 個(一般 worker 同時可以運行 cores 個 ShuffleMapTask),占用的內存空間也就達到了cores× R × 32 KB。對於 8 核 1000 個 reducer 來說,占用內存就是 256MB。
為了解決上述問題,我們可以使用文件合並的功能。
在進行task的文件分片合並下的shuffle過程如下:(spark.shuffle.consolidateFiles=true
)
可以明顯看出,在一個 core 上連續執行的 ShuffleMapTasks 可以共用一個輸出文件 ShuffleFile。先執行完的 ShuffleMapTask 形成 ShuffleBlock i,后執行的 ShuffleMapTask 可以將輸出數據直接追加到 ShuffleBlock i 后面,形成 ShuffleBlock i',每個 ShuffleBlock 被稱為 FileSegment。下一個 stage 的 reducer 只需要 fetch 整個 ShuffleFile 就行了。這樣,每個 worker 持有的文件數降為 cores× R。FileConsolidation 功能可以通過spark.shuffle.consolidateFiles=true
來開啟。
Spark核心概念之Cache
val rdd1 = ... // 讀取hdfs數據,加載成RDD rdd1.cache val rdd2 = rdd1.map(...) val rdd3 = rdd1.filter(...) rdd2.take(10).foreach(println) rdd3.take(10).foreach(println) rdd1.unpersist
cache和unpersisit兩個操作比較特殊,他們既不是action也不是transformation。cache會將標記需要緩存的rdd,真正緩存是在第一次被相關action調用后才緩存;unpersisit是抹掉該標記,並且立刻釋放內存。只有action執行時,rdd1才會開始創建並進行后續的rdd變換計算。
cache其實也是調用的persist持久化函數,只是選擇的持久化級別為MEMORY_ONLY
。
persist支持的RDD持久化級別如下:
需要注意的問題:
Cache或shuffle場景序列化時, spark序列化不支持protobuf message,需要java 可以serializable的對象。一旦在序列化用到不支持java serializable的對象就會出現上述錯誤。
Spark只要寫磁盤,就會用到序列化。除了shuffle階段和persist會序列化,其他時候RDD處理都在內存中,不會用到序列化。
Spark Streaming運行原理
spark程序是使用一個spark應用實例一次性對一批歷史數據進行處理,spark streaming是將持續不斷輸入的數據流轉換成多個batch分片,使用一批spark應用實例進行處理。
從原理上看,把傳統的spark批處理程序變成streaming程序,spark需要構建什么?
需要構建4個東西:
-
一個靜態的 RDD DAG 的模板,來表示處理邏輯;
-
一個動態的工作控制器,將連續的 streaming data 切分數據片段,並按照模板復制出新的 RDD ;
-
DAG 的實例,對數據片段進行處理;
-
Receiver進行原始數據的產生和導入;Receiver將接收到的數據合並為數據塊並存到內存或硬盤中,供后續batch RDD進行消費;
-
對長時運行任務的保障,包括輸入數據的失效后的重構,處理任務的失敗后的重調。
具體streaming的詳細原理可以參考廣點通出品的源碼解析文章:
對於spark streaming需要注意以下三點:
- 盡量保證每個work節點中的數據不要落盤,以提升執行效率。
- 保證每個batch的數據能夠在batch interval時間內處理完畢,以免造成數據堆積。
- 使用steven提供的框架進行數據接收時的預處理,減少不必要數據的存儲和傳輸。從tdbank中接收后轉儲前進行過濾,而不是在task具體處理時才進行過濾。
Spark 資源調優
內存管理:
Executor的內存主要分為三塊:
第一塊是讓task執行我們自己編寫的代碼時使用,默認是占Executor總內存的20%;
第二塊是讓task通過shuffle過程拉取了上一個stage的task的輸出后,進行聚合等操作時使用,默認也是占Executor總內存的20%;
第三塊是讓RDD持久化時使用,默認占Executor總內存的60%。
每個task以及每個executor占用的內存需要分析一下。每個task處理一個partiiton的數據,分片太少,會造成內存不夠。
其他資源配置:
具體調優可以參考美團點評出品的調優文章:
http://tech.meituan.com/spark-tuning-basic.html
http://tech.meituan.com/spark-tuning-pro.html
Spark 環境搭建
spark tdw以及tdbank api文檔:
http://git.code.oa.com/tdw/tdw-spark-common/wikis/api
其他學習資料:
http://km.oa.com/group/2430/articles/show/257492