轉載自:https://www.iteblog.com/archives/1648
前言:
很多初學者其實對於Spark的編程模式還是RDD這個概念理解不到位,就會產生一些誤解。比如,很多時候我們常常以為一個文件是會被完整讀入到內存,然后做各種變換,這很可能是受兩個概念誤導所致:
1.RDD的定義,RDD是一個分布式的不可變數據集合;
2.Spark是一個內存處理引擎;
如果你沒有主動對RDD進行Cache/Persist等相關操作,它不過是一個概念上存在的虛擬機數據集,
你實際上是看不到這個RDD的數據的全集的(他不會真的都放在內存里)。
RDD的本質是什么
一個RDD本質上是一個函數,而RDD的變換不過是函數的嵌套,RDD我認為有兩種:
1.輸入RDD,典型如KafkaRDD,JdbcRDD以及HadoopRDD等
2.轉換RDD,如MapPartitionsRDD
我們以下面的代碼為例做分析:
sc.textFile("abc.log").map().saveAsTextFile("")
textFile中間會構建出一個HadoopRDD,然后返回了MapPartitionsRDD,map函數運行后會構建出一個MapPartitionsRDD,saveAsTextFile觸發了實際流程代碼的執行
所以RDD不過是對一個函數的封裝,當一個函數對數據處理完成后,我們就得到一個RDD的數據集(是一個虛擬的,后續會解釋)。
HadoopRDD是數據來源,每個partition負責獲取數據,獲得過程是通過iterator.next獲得一條一條的數據,假設某個時刻拿到了一條數據A,這個A會立刻被map里的函數處理得到B(完成了轉換),然后開始寫入到其他數據重復如此。所以整個過程:
1、理論上某個MapPartitionsRDD里實際在內存里的數據等於其Partition的數目,是個非常小的數值。
2、HadoopRDD則會略多些,因為屬於數據源,讀取文件,假設讀取文件的buffer是1M,那么最多也就是partitionNum*1M數據在內存里
3、saveAsTextFile也是一樣的,往HDFS寫文件,需要buffer,最多數據量為buffer*partitionNum,
所以整個過程其實是流式的過程,一條數據被各個RDD所包裹的函數處理。
剛才我反復提到了嵌套函數,怎么知道他是嵌套的呢?
如果你寫了這樣一個代碼:
sc.textFile("abc.log").map().map().........map().saveAsTextFile("")
有成千上萬個map,很可能就堆棧溢出了,為啥?實際上是函數嵌套的太深了。
按照上面的邏輯,內存使用其實是非常小的,10G內存跑100T數據也不是難事。但是為什么Spark常常因為內存問題掛掉呢?我們接着往下看:
Shuffle的本質是什么
這就是為什么要分Stage了。每個Stage其實就是我上面說的那樣,一套數據被N個嵌套的函數處理(也就是你的transform動作)。遇到了Shuffle,就被切開來,所謂的sHUffle,本質上是把數據按規則臨時都落到磁盤上,相當於完成了一個saveAsTextFile的動作,不過是存本地磁盤。然后被切開的下一個Stage則以本地磁盤的這些數據作為數據源,重新走上面描述的流程。
我們再做一次描述:
所謂Shuffle不過是把處理流程切分,給切分的上一段(我們稱為Stage M)加個存儲到磁盤的動作,
把切分的下一段(Stage M+1)數據源變成Stage M存儲的磁盤文件。每個Stage都可以走我上面的
描述,讓每條數據都可以被N個嵌套的函數處理,最后通過用戶指定的動作進行存儲。
為什么Shuffle容易導致Spark掛掉
前面我們提到,Shuffle不過是偷偷的幫你加上了個類似saveAsLocalDiskFile的動作。然而,寫磁盤是一個昂貴的動作。所以我們盡可能的把數據先放到內存,再批量寫到文件里。還有讀磁盤文件也是挺費內存的動作。把數據放內存,就有這個問題,比如10000條數據,到底會占用多少內存?這個其實很難預估的,所以一不小心,就容易導致內存溢出了,這其實也是一個很無奈的事情。
我們做Cache/Persist意味着什么
其實就是給某個Stage加上一個saveAsMemoryBlockFile的動作,然后下次再要數據的時候,就不用算了。這些存在內存的數據就表示了某個RDD處理后的結果。這個才是說為啥Spark是內存計算引擎的地方。在MR里,你要是放在HDFS里的。但Spark允許你把中間結果放在內存里。
