對於Spark的初學者,往往會有一個疑問:Spark(如SparkRDD、SparkSQL)在處理數據的時候,會將數據都加載到內存再做處理嗎?
很顯然,答案是否定的!
對該問題產生疑問的根源還是對Spark計算模型理解不透徹。
對於Spark RDD,它是一個分布式的彈性數據集,不真正存儲數據。如果你沒有在代碼中調用persist或者cache算子,Spark是不會真正將數據都放到內存里的。
此外,還要考慮persist/cache的緩存級別,以及對什么進行緩存(比如是對整張表生成的DataSet緩存還是列裁剪之后生成的DataSet緩存)(關於Spark RDD的特性解析參考《Spark RDD詳解》
既然Spark RDD不存儲數據,那么它內部是如何讀取數據的呢?其實Spark內部也實現了一套存儲系統:BlockManager。為了更深刻的理解Spark RDD數據的處理流程,先拋開BlockManager本身原理,從源碼角度闡述RDD內部函數的迭代體系。
我們都知道RDD算子最終會被轉化為shuffle map task和result task,這些task通過調用RDD的iterator方法獲取對應partition數據,而這個iterator方法又會逐層調用父RDD的iterator方法獲取數據(通過重寫scala.collection.iterator的hasNext和next方法實現)。主要過程如下:
首先看ShuffleMapTask和ResultTask中runTask方法的源碼:
關鍵看這部分處理邏輯:
rdd.iterator(partition, context)
getOrCompute方法會先通過當前executor上的BlockManager獲取指定blockId的block,如果block不存在則調用computeOrReadCheckpoint,如果要處理的RDD沒有被checkpoint或者materialized,則接着調用compute方法進行計算。
compute方法是RDD的抽象方法,由繼承RDD的子類具體實現。
以WordCount為例:
sc.textFile(input) .flatMap(line => line.split(" ")) .map(word => (word, 1)) .reduceByKey(_ + _) .saveAsTextFile(output)
-
textFile會構建一個HadoopRDD
-
flatMap/map會構建一個MapPartitionsRDD
-
reduceByKey觸發shuffle時會構建一個ShuffledRDD
-
saveAsTextFile作為action算子會觸發整個任務的執行
以flatMap/map產生的MapPartitionsRDD實現的compute方法為例:
override def compute(split: Partition, context: TaskContext): Iterator[U] = f(context, split.index, firstParent[T].iterator(split, context))
底層調用了parent RDD的iterator方法,然后作為參數傳入到了當前的MapPartitionsRDD。而f函數就是對parent RDD的iterator調用了相同的map類函數以執行用戶給定的函數。
所以,這是一個逐層嵌套的rdd.iterator方法調用,子RDD調用父RDD的iterator方法並在其結果之上調用Iterator的map函數以執行用戶給定的函數,逐層調用直到調用到最初的iterator(比如上述WordCount示例中HadoopRDD partition的iterator)。
而scala.collection.Iterator的map/flatMap方法返回的Iterator就是基於當前Iterator重寫了next和hasNext方法的Iterator實例。比如,對於map函數,結果Iterator的hasNext就是直接調用了self iterator的hasNext,next方法就是在self iterator的next方法的結果上調用了指定的map函數。
flatMap和filter函數稍微復雜些,但本質上一樣,都是通過調用self iterator的hasNext和next方法對數據進行遍歷和處理。
所以,當我們調用最終結果iterator的hasNext和next方法進行遍歷時,每遍歷一個數據元素都會逐層調用父層iterator的hasNext和next方法。各層的map函數組成一個pipeline,每個數據元素都經過這個pipeline的處理得到最終結果。
這也是Spark的優勢之一,map類算子整個形成類似流式處理的pipeline管道,一條數據被該鏈條上的各個RDD所包裹的函數處理。
再回到WordCount例子。HadoopRDD直接跟數據源關聯,內存中存儲多少數據跟讀取文件的buffer和該RDD的分區數相關(比如buffer*partitionNum,當然這是一個理論值),saveAsTextFile與此類似。MapPartitionsRDD里實際在內存里的數據也跟partition數有關系。ShuffledRDD稍微復雜些,因為牽扯到shuffle,但是RDD本身的特性仍然滿足(記錄文件的存儲位置)。
說完了Spark RDD,再來看另一個問題:Spark SQL對於多表之間join操作,會先把所有表中數據加載到內存再做處理嗎?
當然,肯定也不需要!
具體可以查看Spark SQL針對相應的Join SQL的查詢計划,以及在之前的文章《Spark SQL如何選擇join策略》中,針對目前Spark SQL支持的join方式,任何一種都不要將join語句中涉及的表全部加載到內存。即使是Broadcast Hash Join也只需將滿足條件的小表完整加載到內存。
推薦文章:
通過spark.default.parallelism談Spark談並行度
Spark為什么只有在調用action時才會觸發任務執行呢(附算子優化和使用示例)?
關注微信公眾號:大數據學習與分享,獲取更對技術干貨