Spark底層原理簡化版


Spark SQL/DF的執行過程

將上層的SQL語句映射為底層的RDD模型。

  • 寫代碼(DF/Dataset/SQL)並提交
  • Parser解析后得到unresolved logical plan(代碼合法但未判斷data是否存在、數據類型)
  • Analyzer分析對比Catalog(里面綁定了數據信息)后得到 analyzed logical plan(有數據類型的計划)。
  • Optimizer根據預定的邏輯優化得到optimized logical plan。這些邏輯為Rule,例如PushdownPredicate(即提前filter,需要storage支持,如mysql)、ConstantFolding(將常量合並,如1+1直接變為2)、ColumnPruning(減少讀取的列,根據select判斷,同樣需要storage支持,如Parquet、ORC)等
  • Query Planner從optimized locical plan中得出不同的物理執行策略Iterator[PhysicalPlan]
  • CBO利用Cost Model得出最優physical plan(截止2.2還沒實現),它包含一系列RDDs和transformation,即UI顯示的DAG。
  • 提交前准備,如確保分區操作正確、物理算子樹節點重用、Code Generation等,得到Prepared SparkPlan
  • 得到QueryExecution
  • 在集群執行(執行過程通過生成本地Java字節碼去除整個tasks或者stages來進一步優化)。Adaptive Execution還會根據信息動態調整執行計划。

上述除了第一步和最后一步,其他都在driver中完成。

Catalog主要用於各種函數資源信息和元數據信息的統一管理,包括全局的臨時視圖管理、函數資源加載器、函數注冊接口、外部系統Catalog(數據庫)、配置。

集群運行部分

  1. Driver進程執行代碼,當發現action時,會調用sc.runJob() -> dagScheduler.runJob()。
  2. DAGScheduler通過submitJob將job提交到任務隊列eventProcessLoop中。然后DAGScheduler通過doOnReceive對任務隊列中的信息進行模式匹配,如果匹配到JobSubmitted,就通過handleJobSubmitted,從job中創建ResultStage,然后調用submitStage。這個函數會通過getMissingParentStages,從ResultStage的rdd開始沿着rdd的依賴遍歷,遇到ShuffleDependency,則創建ShuffleMapStage。然后對這些MissingParentStages調用submitMissingTasks。里面會根據Stage的類型創建相應的Task並放進一個taskSet中。然后調用taskScheduler.submitTasks(taskSet)
  3. taskScheduler的這個函數根據taskSet新建TaskSetManager(包含taskset),並將manager放入schedulableQueue。然后調用CoarseGrainedSchedulerBackend的reviveOffers,發送ReviveOffers給自己。
  4. 當CoarseGrainedSchedulerBackend接收到這個信息后就會調用makeOffers,篩選出activeExecutors,然后調用TaskScheduler的resourceOffers獲取TaskDescription,記錄task要被發送給哪個executor、jar包地址等信息。接着把TaskDescription作為參數調用launchTasks。這個函數會根據將序列化后的task創建LaunchTask,並send到相應的executor。這樣,任務就被發送到集群。各個方法后續都有獲取結果的代碼返回給runJob。在任務執行期間,Driver繼續執行代碼,遇到action就重復上述步驟。

一個job結束后會進行checkpoint。

  1. ExecutorBackend調用receive,如果匹配到LaunchTask,就會調用executor的launchtask,該函數根據任務創建taskrunner,並放入線程池中執行。

Aggregation

AggregateFunction抽象類,它有兩個子抽象類:ImperativeAggregate命令式和DeclarativeAggregate聲明式。AggregateExpression是AggregateFunction的封裝。

聚合緩沖區(AggregateBuffer)與聚合模式(AggregateMode)

AggregateBuffer每個key一個,保存中間結果。一個共享區,能多個聚合函數訪問。

聚合模式有4種:

  • Final模式和Partial模式一般都是組合使用。Partial模式可以看作是局部數據的聚合,返回的是聚合緩沖區中的中間數據。而Final模式所起到的作用是將聚合緩沖區的數據進行合並,然后返回最終的結果。
  • Complete模式不進行局部聚合計算
  • PartialMerge:對聚合緩沖區進行合並,但還不是最終結果,主要用於distinct語句中,相當耗時。

執行

RDD的每個partition作為InputIterator,經過AggregateExec得到相應的AggregationIterator。

HashAggregateExec、SortAggregateExec和ObjectHashAggregateExec分別創建TungstenAggregationIterator、SortBasedAggregationIterator和ObjectAggregationIterator。

通常用HashAggregate,但在一些情況下會變為SortAggregate:

  • 查詢中存在不支持Partial方式的聚合函數
  • 聚合函數結果不支持Bufer方式,例如collect_set和collect_list函數
  • 內存不足imperative

SortAggregateExec:如果有partial agg,在map端,SortAggregateExec在進行聚合之前會在分區內排序(從而達到分組的目的),然后再聚合。在reduce端一樣。

HashAggregateExec:構建一個Map,將數據保存到map中並進行聚合計算。

Window的執行

WindowExec規定了數據的分布和有序性,所以在執行前要用exchange和sort完成重分區和分區內數據的排序。而WindowExec根據窗口的定義又不同的執行方式。

window的這一操作就少了partial agg,shuffle的數據量大。

Join

Join策略:廣播、ShuffledHashJoinExec、SortMergeJoinExec(最常見)、其他不包含join條件的語句。

  • 廣播:當一個大表和一個小表進行Join操作時,為了避免數據的Shuffle,可以將小表的全部數據分發到每個節點上。

    在Outer類型的Join中,基表不能被廣播,例如當A left outer join B時,只能廣播右表B。

  • ShuffledHashJoinExec:先對兩個表進行hash shuffle,然后把小表變成map完全存儲到內存,最后進行join。

    開啟條件:spark.sql.join.preferSortMergeJoin為false;小表的大小 小於 廣播閾值 * 默認分區數;小表3倍小於另一個表。不適合兩個表都很大的情況,因為其中一個表的hash部分要全部放到內存。

  • SortMergeJoinExec:先hash shuffle將兩表數據數據相同key的分到同一個分區,然后sort,最后join。由於排序的特性,每次處理完一條記錄后只需要從上一次結束的位置開始繼續查找。適合大表join大表。

Shuffle

shuffle是根據partitioner(key或ranger)將不同節點上的數據移動到其對應的(同hash划分或range范圍)節點上,便於同類數據的聚合或join等計算。這個過程中,map side組織數據,如果shuffle的數據過大,會把數據溢出到磁盤,reduce side拉取數據。ByKey類shuffle的性能消耗更大,它們會在兩端為每類key創建聚合對象(同樣內存不夠進磁盤,等GC刪除)。

以上為官網內容,下面為底層實現部分。

從2.0開始,Spark就只有Tungsten Sort Shuffle。在實現層面,Spark在啟動時會創建ShuffleManager來管理Shuffle,默認情況下SortShuffleManger(tungstensort對應)是ShuffleManager的具體實現。ShuffleMapTask從SortShuffleManger中獲得ShuffleWriter。下游的task獲取ShuffleReader。

ShuffleWriter的具體實現:

當ShuffleDependency注冊一個Shuffle時就會得到一個ShuffleHandle對象,根據它獲取相應的writer。

  • BypassMergeSortShuffleHandle(可以獲得BypassMergeSortShuffleWriter),即可以忽略掉聚合排序的Shuffle過程(從Shuffle數據讀取任務看來,數據文件和索引文件的格式和內部是否做過聚合排序是完全相同的。),直接將每個分區寫入單獨的文件,並在最后做一個合並處理,並創建一個index索引文件來標記不同分區的位置信息。適合數據量少的情況。
  • SerializedShuffleHandle(可以獲得UnsafeShuffleWriter),對應Tungsten方式的Shuffle過程,這種情況下ShuffleMapTask的輸出數據能夠先序列化為二進制數據存儲在內存中,再執行相關的操作,在內存使用上是一種更高效的方式。
  • BaseShuffleHandle(可以獲得SortShuffleWriter),在不滿足上面兩種handle條件時獲得BaseShuffleHandle對象,意味着以反序列化的格式處理Shuffle輸出數據。過程是創建ExternalSorter對象,將全部數據插入該對象,生成Shuffle數據文件和索引文件,最后創建MapStatus對象,將數據和索引進行傳輸。關鍵實現在於外部排序器,根據是否需要聚合采用不同的map數據結構,當數據量過大,便會溢出到磁盤。

ShuffleReader的具體實現

BlockStoreShuffleReader方面,根據上述map信息對ShuffleBlockFetcherIterator進行不同的封裝,得到相應的iterator。

writer和reader都會根據是否有aggregator、ordering進行相應的處理。writer還有partitioner參數。這些對於shuffle都是optional。

Tungsten

內存管理機制

Executor中對象的處理實際由JVM執行,Spark的統計數據無法准確計算數據量的大小,所以無法避免OOM。

Tungsten的內存管理(需要設置ofHeap.enabled和ofHeap.size)讓Spark直接操作二進制數據而不是JVM對象,從而提升內存使用率。

內存管理器

內存管理由MemoryManager通過MemoryPool管理。MP根據heap和ofheap分為兩大類。每類再分為execration和storage。

MemoryManager的具體實現有1.6之前的StaticMemoryManager和之后的UnifiedMemoryManager。任務通過這些manager來完成內存申請或釋放操作。下圖為統一內存管理器的所管理的內存結構。其中Reserved用於Spark系統內部,應用內存為用戶程序中的數據結構。

執行內存和存儲內存之間能互相借用(當空間不足,即放不下一個完整的block,且對方有空余時。),歸還時可讓對方多占用的部分轉到磁盤,但有些復雜的因素會導致無法歸還。

存儲內存

如上面所說,存儲內存管理器為StorageMemoryPool,它的使用者是存儲模塊,具體實現是BlockManager。它負責管理計算過程中產生的各種數據,可以看作是一個獨立的分布式存儲管理系統。Driver端的為主BlockManagerMaster,負責對全部數據塊的元數據信息進行管理和維護,Executor端將數據塊的狀態上報到driver端,並接收住節點的相關操作命令。

在rdd被緩存到存儲內存之前,它是屬於應用內存部分的,而且是不連續的,上層通過迭代器訪問。持久化后才到存儲內存,且連續。而根據持久化的級別,是否序列化,會采用不同的數據結構。如果有新的block需要緩存而沒有足夠的存儲內存,BlockManager會分局LRU淘汰Block,這個淘汰要么刪除,要么溢出到磁盤,看被淘汰的block是否設置了usedisk的持久化。

執行內存

主要用於滿足shuffle、join、sort、agg等計算過程對內存的需求。

內存管理最底層實現

內存分配管理的基礎是MemoryAllocator,manager通過它來申請和釋放內存。其實現包括HeapMemoryAllocator和UnsafeMemoryAllocator。

Tungsten使用另外的一些數據結構和方法來實現其計算。例如重新實現的ByteArray、LongArray、UTF8String、BytesToMap等。

緩存敏感計算(Cacheaware computation)

通過設計緩存友好的數據結構來提高緩存命中率和本地化的特性。

動態代碼生成(Code generation)

代碼生成能夠去掉原始數據類型的封裝和多態函數調度。

參考
Spark 2.2.2 源碼
Spark SQL 內核剖析


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM