Spark 要點總結及優化


  

Spark Components:

 

 

 角色組成:

  Driver :  由SparkContext創建,運行在main方法,負責資源申請與調度,程序分發,接收每個分區的計算結果
  Cluster manager:  獲取集群內資源(模式standalone ,Mesos, YARN)的外部服務
  Worker node:  集群中能夠運行計算程序的節點
  Executor:  work node上啟動的一個進程,能夠運行tasks,能在memory 或者 disk上存儲數據,每個application都有自己的Executors
  Task:   發送給executor的一個執行單元(task是以thread形式執行)
  Job:     actions生成的多個任務組成的並行計算,每個action對應一個job
  Statge: 每個job划分為階段性的小型任務集合(一個節點上順序完成的一次計算)

 

架構說明:

1, 每個application都有自己的Executor進程,每個Executor可以多線程執行任務,存在整個application生命周期內,多個application之間互相獨立(每個app對應一個jvm實例),多個spark application之間只能通過將數據寫入外存儲才能進行數據共享
    
2,spark計算層與集群管理模式無關,只要獲取到Executor,並且Executor之間能夠互相通信,它就能在集群中運行

3,driver負責監聽接收Executor上的計算結果,driver必須確保其它Worker能夠通過網絡地址尋找到Executor,driver負責管理集群上的task分發,把task運行在較近的worker nodes上,如果執行task在遠端的集群上,他會通過RPC方式提交operations到較近的節點運行task

 

Spark是以MapReduce為基礎在其上進行功能擴展的集群計算框架,spark計算面向是RDD(resilient distributed dataset)數據源
RDD是編程抽象概念,代表可以跨機器進行分割的只讀的數據集合,所有對數據操作都需通過RDD來處理。

 

RDD操作:

  create:通過hfile 或 scala collection作為數據源

  transformation:處理計算轉換,map,flatmap,filter

  controler:對中間結果可存儲在memory 或file供其它RDD數據復用

  actions:驅動RDD執行計算

 

 

 

Spark程序是一個惰性計算,通過action調用來驅動程序運行,代碼被分發到集群上,由各個RDD分區上的worker來執行,然后結果會被發送回Driver進行聚合處理。
驅動程序創建一個或多個RDD,調用transform來轉換RDD,然后調用reduce處理被轉換后的RDD。在程序處理數據過程中使用的是pipleLine方式。

 

WordCount執行流程:

 

  

 

spark集群及邏輯划分:

 

 

 

 任務分發及調度:

 

 

 Spark 存儲管理:

  BlockManager管理內存,磁盤,堆外內存。其中主要內存使用分為execution和storage。execution主要在shuffles, joins, sorts 和 aggregations時使用,storage用於cache,集群中間數據傳遞。execution與storage內存可共享使用,沒有execution任務,storage可以使用全部內存,當execution需要內存時,剔除storage到一定比例閾值空間。當有計算使用內存時,storage不可擠占execution的內存。

 數據緩存分類:
  persist:發生pipeline處理時內部存儲
  checkpoint: 外存儲點,存儲執行會在下一個action算子執行后開始執行(即滯后執行)且會重復執行,需要結合persist使用
  broadcast:發生在driver,獲取得到blockmanager ID,傳到executor,執行時如沒有數據則從blockmanager獲取處理,下一次可直接使用

 內存划分:300M空間系統預留,40%空間存儲數據結構,spark元數據,應對不正常大對象產生的oom預留位,60%空間用於execution和storage

BlockManager組成結構:

 

 

 

 

 

spark RDD之間 Dependence類別:

 

  

spark shuffle組成:

 

 

 

 

 

 

Spark優化:

  1,相同的數據使用一個RDD:避免多次計算相同數據的RDD

  2,多次使用的RDD計算結果持久化:當一個RDD相同的算子執行多次時,為避免重復計算需要將計算結果進行緩存/持久化,加快下一次的計算
    持久化級別:
    MEMORY_ONLY(對象內存存儲)
    MEMORY_AND_DISK(內存不夠時寫一部分到磁盤)
    MEMORY_ONLY_SER(序列化為字節數組存儲在內存,相比memory_only對象存儲更省空間,多出來的性能開銷是序列化與反序列化的開銷)
    MEMORY_AND_DISK_SER(序列化存儲內存磁盤)
    DISK_ONLY(僅磁盤)

  策略選擇

  (1)數據量較少優選MEMORY_ONLY,如果使用MEMORY_ONLY級別時發生了內存溢出,建議嘗試使用MEMORY_ONLY_SER級別,此時每個partition是一個字節數組,降低了內存占用,如果RDD中數據量過多還是可能導致OOM

  (2)如果內存級別都無法使用,那么建議使用MEMORY_AND_DISK_SER,而不是MEMORY_AND_DISK。到了這一步說明RDD的數據量很大,內存無法完全放下,序列化后的數據比較少,可以節省內存和磁盤的空間開銷。

  同時該策略會優先盡量嘗試將數據緩存在內存中,內存緩存不下才會寫入磁盤。不建議使用DISK_ONLY和后綴為_2的級別,因為完全基於磁盤文件的數據的讀寫,會導致性能急劇降低,有時還不如重新計算一次所有RDD。

  (3)后綴為_2的級別所有數據都復制一份副本,並發送到其它節點上,數據復制及網絡傳輸會導致較大的性能開銷

 

  3,盡量減少shuffle計算:shuffle最耗性能,shuffle不同節點上相同的key拉取到一台機器進行聚合操作,涉及到磁盤IO和網絡傳輸,byKey、join,distinct、repartition等算子會觸發shuffle操作
  盡量使用reduceByKey或者aggregateByKey算子來替代掉groupByKey算子。因為reduceByKey和aggregateByKey算子在本地進行combiner較少拉取的數據量,而groupByKey算子是不會進行聚合全量數據會在集群的各個節點之間分發和傳輸,性能較差。

 

  4,算子優化:
  mapPartitions替代map:partition數據量不是很大時效率較高。一次調用會處理一個partition所有的數據,而不是一次函數調用處理一條,性能相對來說會高一些,但如果內存不夠可能出現OOM異常

  foreachPartitions替代foreach:類似mapPartitions替代map 如將RDD中所有數據寫MySQL等外存儲時避免foreach頻繁地創建和銷毀數據庫連接,每個partition使用一個connection,提高性能

  repartitionAndSortWithinPartitions替代repartition與sort:可以一邊進行重分區的shuffle操作,一邊進行排序,shuffle與sort兩個操作同時進行

  mapValues/flatmapValues:當分區器,分區數沒有變化,key沒有變化,只對value進行轉化可使用 mapValues->map 和 flatmapValues->flatmap 來避免產生不必要的shuffle操作

 

//例wordcount 對統計的value進行轉換且進行分組
   val words: RDD[String] = data.flatMap(_.split(" "))
    val kv: RDD[(String, Int)] = words.map((_,1))
      val res: RDD[(String, Int)] = kv.reduceByKey(_+_)
//    val res01: RDD[(String, Int)] = res.map(x=>(x._1,x._2*10))
    val res01: RDD[(String, Int)] = res.mapValues(x=>x*10)
    val res02: RDD[(String, Iterable[Int])] = res01.groupByKey()
    res02.foreach(println)

  broadcast外部變量:在算子函數中使用到外部變量時,默認情況下,Spark會將該變量復制多個副本,通過網絡傳輸到task中,此時每個task都有一個變量副本,如果變量本身比較大,task比較多,會占用過多內存和傳輸性能問題

  廣播后的變量會保證每個Executor內存中只有一份變量副本,同一個EXcutor內的task共享一個節省內存

 

  5,數據結構優化:
  1)減少java包裝類的使用(object header 16byte)盡量使用基礎類型替代
  2)使用array+ primitive types替換HashMap,List
  3)避免使用過多的小對象嵌套結構
  4)使用數值或枚舉類型替換string作為key(string編碼及長度等占用)
  5)內存小於32 GB 調整JVM flag -XX:+UseCompressedOops,使pointers 由8byte到4byte

 

  6,數據本地化:
    1)PROCESS_LOCAL,待處理的數據在相同的jvm實例內運行,這是最佳級別
    2)NODE_LOCAL,數據在一個節點(例如在同一個HDFS節點或者同一個節點的另一個Excutor上)
    3)NO_PREF,沒有位置偏好,從任何地方訪問一樣快(Redis,mysql,HBase)
    4)RACK_LOCAL,同一機架上的節點
    5)ANY,數據不在同一個機架,在網絡任意節點

  當task在等待executor執行超時時,有任何空閑executor上沒有未處理數據的情況下,Spark 會切換到較低的local level執行task。

    兩種執行策略:a), 等待,直到繁忙的 CPU 釋放后,再次在這節點上啟動task。    b) 立即在需要將數據傳輸到遠端節點上啟動新的task執行。
  Spark 通常是等待一段時間等待 CPU 釋放,超時過期后它開始將數據從遠處移動到空閑CPU的節點上執行。每個級別之間的等待超時時間可以單獨配置,也可以一起配置在一個參數spark.locality

  相關參數:

  spark.locality.wait //本地進程內超時等待時間
  spark.locality.wait.node//本機超時等待時間
  spark.locality.wait.rack//同一機架超時等待時間
  spark.locality.wait.process//本地無引用的外部

 

  其它參數調優
  num-executors:作業總共要用多少個Executor進程。如果不設置的話,默認只會給你啟動少量的Executor進程,此時Spark作業運行速度是非常慢,一般設置50~100個左右 太少無法充分利用資源,太多無法給予充分的資源

  executor-memory:每個Executor進程的內存。內存設置4G~8G較為合適,num-executors乘以executor-memory不能超過隊列的最大內存

  executor-cores:每個Executor進程的CPU core數量。設置為2~4個較為合適

  driver-memory:Driver進程的內存。如果使用collect算子將RDD的計算結果數據全部拉取到Driver上處理,那么必須確保Driver的內存足夠大

  spark.default.parallelism:設置每個stage的默認task數量,Spark默認設置的數量是偏少,不會使用足夠的資源。如果task數量偏少的話,就會導致前面設置好的Executor的參數白費,無論有多少資源,只有1,2個task導致資源浪費,

  官網推薦設置原則是每個core2-3個task,總的為num-executors * executor-cores的2~3倍較為合適

  spark.storage.memoryFraction:RDD持久化數據在Executor內存中能占的比例,默認是0.5。有較多的RDD持久化操作,該參數的值可以適當提高一些,保證持久化的數據能夠容納在內存中。如發現作業由於頻繁的gc導致運行緩慢,

  task執行用戶代碼的內存不夠用,建議調低。作業中的shuffle操作比較多,而持久化操作比較少,建議調低

  spark.shuffle.file.bufferbuffer    文件溢寫緩沖大小,默認32k
  spark.shuffle.memoryFraction   shuffle使用executor內存占比,默認0.2
  spark.reducer.maxSizeInFlight   shuffle read buffer 一次數據拉取量,默認48m
  spark.shuffle.spill.numElementsForceSpillThreshold   強制文件溢寫數據的條目數閾值,默認integer最大值
  spark.shuffle.spill.initialMemoryThreshold   強制文件溢寫數據量內存占用多少空間,默認5m

 

 

  數據傾斜問題:

     發現問題:

    1,sample countByKey()  wc查看結果。 

    2,在Spark Web UI查看一下當前這個stage各個task分配的數據量,執行時長

    解決方案:

    1,雙重聚合:rdd進行key值隨機前綴N 先進行一步combiner,然后去掉前綴再次combiner

    2,向上采樣:對rdd內數據隨機key前綴,較少數據的rdd內相同的key進行N(excutor core task)倍的擴容在進行join處理

    3,broadcast處理:如果有較少的數據量rdd與較大的rdd進行join則小的rdd進行broadcast后數據量多的rdd進行map join

    4,過濾異常數據,如某些無用冗余數據量較大,則先過濾處理

    5,提高並行度(緩解作用並未根本解決)RDD多分區,通過算子指定並行度,例如,reduceByKey(_+_,10),配置spark.default.parallelism

 

  推測計划問題:

    當spark task中0.75已執行完成,剩余task執行時間達到已完成task中位數的1.5倍,則spark會重新調度一個新的task執行此task未完成的任務。(spark默認關閉,map reduce有開啟)

   導致問題:1,導致計算結果數據重復  2,如有數據傾斜發生會使task無法執行完 

   相關參數配置:

  spark.speculation//計划是否開啟,默認false
  spark.speculation.interval//檢測間隔 100ms
  spark.speculation.multiplier//執行緩慢時間界定,是多少倍的已執行完task中位數 1.5
  spark.speculation.quantile//所有已完成task中占總數的比例 0.75

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM