Spark Components:
角色組成:
Driver : 由SparkContext創建,運行在main方法,負責資源申請與調度,程序分發,接收每個分區的計算結果
Cluster manager: 獲取集群內資源(模式standalone ,Mesos, YARN)的外部服務
Worker node: 集群中能夠運行計算程序的節點
Executor: work node上啟動的一個進程,能夠運行tasks,能在memory 或者 disk上存儲數據,每個application都有自己的Executors
Task: 發送給executor的一個執行單元(task是以thread形式執行)
Job: actions生成的多個任務組成的並行計算,每個action對應一個job
Statge: 每個job划分為階段性的小型任務集合(一個節點上順序完成的一次計算)
架構說明:
1, 每個application都有自己的Executor進程,每個Executor可以多線程執行任務,存在整個application生命周期內,多個application之間互相獨立(每個app對應一個jvm實例),多個spark application之間只能通過將數據寫入外存儲才能進行數據共享
2,spark計算層與集群管理模式無關,只要獲取到Executor,並且Executor之間能夠互相通信,它就能在集群中運行
3,driver負責監聽接收Executor上的計算結果,driver必須確保其它Worker能夠通過網絡地址尋找到Executor,driver負責管理集群上的task分發,把task運行在較近的worker nodes上,如果執行task在遠端的集群上,他會通過RPC方式提交operations到較近的節點運行task
Spark是以MapReduce為基礎在其上進行功能擴展的集群計算框架,spark計算面向是RDD(resilient distributed dataset)數據源
RDD是編程抽象概念,代表可以跨機器進行分割的只讀的數據集合,所有對數據操作都需通過RDD來處理。
RDD操作:
create:通過hfile 或 scala collection作為數據源
transformation:處理計算轉換,map,flatmap,filter
controler:對中間結果可存儲在memory 或file供其它RDD數據復用
actions:驅動RDD執行計算
Spark程序是一個惰性計算,通過action調用來驅動程序運行,代碼被分發到集群上,由各個RDD分區上的worker來執行,然后結果會被發送回Driver進行聚合處理。
驅動程序創建一個或多個RDD,調用transform來轉換RDD,然后調用reduce處理被轉換后的RDD。在程序處理數據過程中使用的是pipleLine方式。
WordCount執行流程:
spark集群及邏輯划分:
任務分發及調度:
Spark 存儲管理:
BlockManager管理內存,磁盤,堆外內存。其中主要內存使用分為execution和storage。execution主要在shuffles, joins, sorts 和 aggregations時使用,storage用於cache,集群中間數據傳遞。execution與storage內存可共享使用,沒有execution任務,storage可以使用全部內存,當execution需要內存時,剔除storage到一定比例閾值空間。當有計算使用內存時,storage不可擠占execution的內存。
數據緩存分類:
persist:發生pipeline處理時內部存儲
checkpoint: 外存儲點,存儲執行會在下一個action算子執行后開始執行(即滯后執行)且會重復執行,需要結合persist使用
broadcast:發生在driver,獲取得到blockmanager ID,傳到executor,執行時如沒有數據則從blockmanager獲取處理,下一次可直接使用
內存划分:300M空間系統預留,40%空間存儲數據結構,spark元數據,應對不正常大對象產生的oom預留位,60%空間用於execution和storage
BlockManager組成結構:
spark RDD之間 Dependence類別:
spark shuffle組成:
Spark優化:
1,相同的數據使用一個RDD:避免多次計算相同數據的RDD
2,多次使用的RDD計算結果持久化:當一個RDD相同的算子執行多次時,為避免重復計算需要將計算結果進行緩存/持久化,加快下一次的計算
持久化級別:
MEMORY_ONLY(對象內存存儲)
MEMORY_AND_DISK(內存不夠時寫一部分到磁盤)
MEMORY_ONLY_SER(序列化為字節數組存儲在內存,相比memory_only對象存儲更省空間,多出來的性能開銷是序列化與反序列化的開銷)
MEMORY_AND_DISK_SER(序列化存儲內存磁盤)
DISK_ONLY(僅磁盤)
策略選擇:
(1)數據量較少優選MEMORY_ONLY,如果使用MEMORY_ONLY級別時發生了內存溢出,建議嘗試使用MEMORY_ONLY_SER級別,此時每個partition是一個字節數組,降低了內存占用,如果RDD中數據量過多還是可能導致OOM
(2)如果內存級別都無法使用,那么建議使用MEMORY_AND_DISK_SER,而不是MEMORY_AND_DISK。到了這一步說明RDD的數據量很大,內存無法完全放下,序列化后的數據比較少,可以節省內存和磁盤的空間開銷。
同時該策略會優先盡量嘗試將數據緩存在內存中,內存緩存不下才會寫入磁盤。不建議使用DISK_ONLY和后綴為_2的級別,因為完全基於磁盤文件的數據的讀寫,會導致性能急劇降低,有時還不如重新計算一次所有RDD。
(3)后綴為_2的級別所有數據都復制一份副本,並發送到其它節點上,數據復制及網絡傳輸會導致較大的性能開銷
3,盡量減少shuffle計算:shuffle最耗性能,shuffle不同節點上相同的key拉取到一台機器進行聚合操作,涉及到磁盤IO和網絡傳輸,byKey、join,distinct、repartition等算子會觸發shuffle操作
盡量使用reduceByKey或者aggregateByKey算子來替代掉groupByKey算子。因為reduceByKey和aggregateByKey算子在本地進行combiner較少拉取的數據量,而groupByKey算子是不會進行聚合全量數據會在集群的各個節點之間分發和傳輸,性能較差。
4,算子優化:
mapPartitions替代map:partition數據量不是很大時效率較高。一次調用會處理一個partition所有的數據,而不是一次函數調用處理一條,性能相對來說會高一些,但如果內存不夠可能出現OOM異常
foreachPartitions替代foreach:類似mapPartitions替代map 如將RDD中所有數據寫MySQL等外存儲時避免foreach頻繁地創建和銷毀數據庫連接,每個partition使用一個connection,提高性能
repartitionAndSortWithinPartitions替代repartition與sort:可以一邊進行重分區的shuffle操作,一邊進行排序,shuffle與sort兩個操作同時進行
mapValues/flatmapValues:當分區器,分區數沒有變化,key沒有變化,只對value進行轉化可使用 mapValues->map 和 flatmapValues->flatmap 來避免產生不必要的shuffle操作
//例wordcount 對統計的value進行轉換且進行分組 val words: RDD[String] = data.flatMap(_.split(" ")) val kv: RDD[(String, Int)] = words.map((_,1)) val res: RDD[(String, Int)] = kv.reduceByKey(_+_) // val res01: RDD[(String, Int)] = res.map(x=>(x._1,x._2*10)) val res01: RDD[(String, Int)] = res.mapValues(x=>x*10) val res02: RDD[(String, Iterable[Int])] = res01.groupByKey() res02.foreach(println)
broadcast外部變量:在算子函數中使用到外部變量時,默認情況下,Spark會將該變量復制多個副本,通過網絡傳輸到task中,此時每個task都有一個變量副本,如果變量本身比較大,task比較多,會占用過多內存和傳輸性能問題
廣播后的變量會保證每個Executor內存中只有一份變量副本,同一個EXcutor內的task共享一個節省內存
5,數據結構優化:
1)減少java包裝類的使用(object header 16byte)盡量使用基礎類型替代
2)使用array+ primitive types替換HashMap,List
3)避免使用過多的小對象嵌套結構
4)使用數值或枚舉類型替換string作為key(string編碼及長度等占用)
5)內存小於32 GB 調整JVM flag -XX:+UseCompressedOops,使pointers 由8byte到4byte
6,數據本地化:
1)PROCESS_LOCAL,待處理的數據在相同的jvm實例內運行,這是最佳級別
2)NODE_LOCAL,數據在一個節點(例如在同一個HDFS節點或者同一個節點的另一個Excutor上)
3)NO_PREF,沒有位置偏好,從任何地方訪問一樣快(Redis,mysql,HBase)
4)RACK_LOCAL,同一機架上的節點
5)ANY,數據不在同一個機架,在網絡任意節點
當task在等待executor執行超時時,有任何空閑executor上沒有未處理數據的情況下,Spark 會切換到較低的local level執行task。
兩種執行策略:a), 等待,直到繁忙的 CPU 釋放后,再次在這節點上啟動task。 b) 立即在需要將數據傳輸到遠端節點上啟動新的task執行。
Spark 通常是等待一段時間等待 CPU 釋放,超時過期后它開始將數據從遠處移動到空閑CPU的節點上執行。每個級別之間的等待超時時間可以單獨配置,也可以一起配置在一個參數spark.locality
相關參數:
spark.locality.wait //本地進程內超時等待時間
spark.locality.wait.node//本機超時等待時間
spark.locality.wait.rack//同一機架超時等待時間
spark.locality.wait.process//本地無引用的外部
其它參數調優
num-executors:作業總共要用多少個Executor進程。如果不設置的話,默認只會給你啟動少量的Executor進程,此時Spark作業運行速度是非常慢,一般設置50~100個左右 太少無法充分利用資源,太多無法給予充分的資源
executor-memory:每個Executor進程的內存。內存設置4G~8G較為合適,num-executors乘以executor-memory不能超過隊列的最大內存
executor-cores:每個Executor進程的CPU core數量。設置為2~4個較為合適
driver-memory:Driver進程的內存。如果使用collect算子將RDD的計算結果數據全部拉取到Driver上處理,那么必須確保Driver的內存足夠大
spark.default.parallelism:設置每個stage的默認task數量,Spark默認設置的數量是偏少,不會使用足夠的資源。如果task數量偏少的話,就會導致前面設置好的Executor的參數白費,無論有多少資源,只有1,2個task導致資源浪費,
官網推薦設置原則是每個core2-3個task,總的為num-executors * executor-cores的2~3倍較為合適
spark.storage.memoryFraction:RDD持久化數據在Executor內存中能占的比例,默認是0.5。有較多的RDD持久化操作,該參數的值可以適當提高一些,保證持久化的數據能夠容納在內存中。如發現作業由於頻繁的gc導致運行緩慢,
task執行用戶代碼的內存不夠用,建議調低。作業中的shuffle操作比較多,而持久化操作比較少,建議調低
spark.shuffle.file.bufferbuffer 文件溢寫緩沖大小,默認32k
spark.shuffle.memoryFraction shuffle使用executor內存占比,默認0.2
spark.reducer.maxSizeInFlight shuffle read buffer 一次數據拉取量,默認48m
spark.shuffle.spill.numElementsForceSpillThreshold 強制文件溢寫數據的條目數閾值,默認integer最大值
spark.shuffle.spill.initialMemoryThreshold 強制文件溢寫數據量內存占用多少空間,默認5m
數據傾斜問題:
發現問題:
1,sample countByKey() wc查看結果。
2,在Spark Web UI查看一下當前這個stage各個task分配的數據量,執行時長
解決方案:
1,雙重聚合:rdd進行key值隨機前綴N 先進行一步combiner,然后去掉前綴再次combiner
2,向上采樣:對rdd內數據隨機key前綴,較少數據的rdd內相同的key進行N(excutor core task)倍的擴容在進行join處理
3,broadcast處理:如果有較少的數據量rdd與較大的rdd進行join則小的rdd進行broadcast后數據量多的rdd進行map join
4,過濾異常數據,如某些無用冗余數據量較大,則先過濾處理
5,提高並行度(緩解作用並未根本解決)RDD多分區,通過算子指定並行度,例如,reduceByKey(_+_,10),配置spark.default.parallelism
推測計划問題:
當spark task中0.75已執行完成,剩余task執行時間達到已完成task中位數的1.5倍,則spark會重新調度一個新的task執行此task未完成的任務。(spark默認關閉,map reduce有開啟)
導致問題:1,導致計算結果數據重復 2,如有數據傾斜發生會使task無法執行完
相關參數配置:
spark.speculation//計划是否開啟,默認false
spark.speculation.interval//檢測間隔 100ms
spark.speculation.multiplier//執行緩慢時間界定,是多少倍的已執行完task中位數 1.5
spark.speculation.quantile//所有已完成task中占總數的比例 0.75