Spark在處理數據的時候,會將數據都加載到內存再做處理嗎?


對於Spark的初學者,往往會有一個疑問:Spark(如SparkRDDSparkSQL)在處理數據的時候,會將數據都加載到內存再做處理嗎?

很顯然,答案是否定的!

對該問題產生疑問的根源還是對Spark計算模型理解不透徹。

對於Spark RDD,它是一個分布式的彈性數據集,不真正存儲數據。如果你沒有在代碼中調用persist或者cache算子,Spark是不會真正將數據都放到內存里的。

此外,還要考慮persist/cache的緩存級別,以及對什么進行緩存(比如是對整張表生成的DataSet緩存還是列裁剪之后生成的DataSet緩存)(關於Spark RDD的特性解析參考《Spark RDD詳解》

既然Spark RDD不存儲數據,那么它內部是如何讀取數據的呢?其實Spark內部也實現了一套存儲系統:BlockManager。為了更深刻的理解Spark RDD數據的處理流程,先拋開BlockManager本身原理,從源碼角度闡述RDD內部函數的迭代體系。

我們都知道RDD算子最終會被轉化為shuffle map task和result task,這些task通過調用RDD的iterator方法獲取對應partition數據,而這個iterator方法又會逐層調用父RDD的iterator方法獲取數據(通過重寫scala.collection.iterator的hasNext和next方法實現)。主要過程如下:

首先看ShuffleMapTask和ResultTask中runTask方法的源碼:

關鍵看這部分處理邏輯:

rdd.iterator(partition, context)

 

 

getOrCompute方法會先通過當前executor上的BlockManager獲取指定blockId的block,如果block不存在則調用computeOrReadCheckpoint,如果要處理的RDD沒有被checkpoint或者materialized,則接着調用compute方法進行計算。

compute方法是RDD的抽象方法,由繼承RDD的子類具體實現。

以WordCount為例:

sc.textFile(input)
  .flatMap(line => line.split(" "))
  .map(word => (word, 1))
  .reduceByKey(_ + _)
  .saveAsTextFile(output)

 

  1. textFile會構建一個HadoopRDD

  2. flatMap/map會構建一個MapPartitionsRDD

  3. reduceByKey觸發shuffle時會構建一個ShuffledRDD

  4. saveAsTextFile作為action算子會觸發整個任務的執行

以flatMap/map產生的MapPartitionsRDD實現的compute方法為例:

override def compute(split: Partition, context: TaskContext): Iterator[U] =
    f(context, split.index, firstParent[T].iterator(split, context))

 

底層調用了parent RDD的iterator方法,然后作為參數傳入到了當前的MapPartitionsRDD。而f函數就是對parent RDD的iterator調用了相同的map類函數以執行用戶給定的函數。

所以,這是一個逐層嵌套的rdd.iterator方法調用,子RDD調用父RDD的iterator方法並在其結果之上調用Iterator的map函數以執行用戶給定的函數,逐層調用直到調用到最初的iterator(比如上述WordCount示例中HadoopRDD partition的iterator)。

而scala.collection.Iterator的map/flatMap方法返回的Iterator就是基於當前Iterator重寫了next和hasNext方法的Iterator實例。比如,對於map函數,結果Iterator的hasNext就是直接調用了self iterator的hasNext,next方法就是在self iterator的next方法的結果上調用了指定的map函數。

flatMap和filter函數稍微復雜些,但本質上一樣,都是通過調用self iterator的hasNext和next方法對數據進行遍歷和處理。

所以,當我們調用最終結果iterator的hasNext和next方法進行遍歷時,每遍歷一個數據元素都會逐層調用父層iterator的hasNext和next方法。各層的map函數組成一個pipeline,每個數據元素都經過這個pipeline的處理得到最終結果。

這也是Spark的優勢之一,map類算子整個形成類似流式處理的pipeline管道,一條數據被該鏈條上的各個RDD所包裹的函數處理。

再回到WordCount例子。HadoopRDD直接跟數據源關聯,內存中存儲多少數據跟讀取文件的buffer和該RDD的分區數相關(比如buffer*partitionNum,當然這是一個理論值),saveAsTextFile與此類似。MapPartitionsRDD里實際在內存里的數據也跟partition數有關系。ShuffledRDD稍微復雜些,因為牽扯到shuffle,但是RDD本身的特性仍然滿足(記錄文件的存儲位置)。

說完了Spark RDD,再來看另一個問題:Spark SQL對於多表之間join操作,會先把所有表中數據加載到內存再做處理嗎?

當然,肯定也不需要!

具體可以查看Spark SQL針對相應的Join SQL的查詢計划,以及在之前的文章《Spark SQL如何選擇join策略》中,針對目前Spark SQL支持的join方式,任何一種都不要將join語句中涉及的表全部加載到內存。即使是Broadcast Hash Join也只需將滿足條件的小表完整加載到內存。

推薦文章:

對Spark硬件配置的建議

Spark和MapReduce任務計算模型

Spark集群和任務執行

通過spark.default.parallelism談Spark談並行度

解析SparkStreaming和Kafka集成的兩種方式

Spark為什么只有在調用action時才會觸發任務執行呢(附算子優化和使用示例)?


關注微信公眾號:大數據學習與分享,獲取更對技術干貨


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM