本文轉之Pivotal的一個工程師的博客。覺得極好。
作者本人經常在StackOverflow上回答一個關系Spark架構的問題,發現整個互聯網都沒有一篇文章能對Spark總體架構進行很好的描述,作者可憐我們這些菜鳥,寫了這篇文章,太感動了。
本文讀者需要一定的Spark的基礎知識,至少了解Spark的RDD和DAG。

上圖引入了很多術語:"Executor","Task","Cache","Worker Node"等等,當我開始學習Spark的時候,這幾乎是整個互聯網上唯一一張關於Spark架構的圖了,我個人覺得該圖缺失了一些很重要的概念或者是描述的
任何Spark的進程都是一個JVM進程,既然是一個JVM進程,那么就可以配置它的堆大小(-Xmx和-Xms),但是進程怎么使用堆內存和為什么需要它呢?下面是一個JVM堆空間下Spark的內存分配情況

默認情況下,Spark進程的堆空間是512mb,為了安全考慮同時避免OOM,Spark只允許利用90%的堆空間,spark中使用spark.storage.safetyFraction用來配置該值(默認是0.9). Spark作為一個內存計算工具,Spark可以在內存中存儲數據,如果讀過
http://0x0fff.com/spark-misconceptions/
, 就會明白Spark不是一個真的內存工具,它只是把內存作為他的LRU緩存,這樣大量的內存被用來緩存正在計算的數據,該部分占safe堆的60%,Spark使用spark.storage.memoryFraction控制該值,如果想知道Spark中能緩存了多少數據,可以統計所有Executor的堆大小,乘上safeFraction和memoryFraction,默認是54%,這就是Spark可用緩存數據使用的堆大小,
該部分介紹shuffle的內存使用情況,它通過 堆大小 * spark.shuffle.safetyFraction * spark.shuffle.memoryFraction。
spark.shuffle.safetyFraction的默認值是0.8,
spark.shuffle.memoryFraction的默認值是0.2,所以最終只能最多使堆空間的16%用於shuffle,關於怎么使用這塊內存,參考https://github.com/apache/spark/blob/branch-1.3/core/src/main/scala/org/apache/spark/shuffle/ShuffleMemoryManager.scala ,但是通常spark會使用這塊內存用於shuffle中一些別的任務,當執行shuffle時,有時對數據進行排序,當進行排序時,需要緩沖排完序后的數據(注意不能改變LRU緩沖中的數據,因為后面可能要重用),這樣就需要大量的RAM存儲排完序后的數據塊,當沒有足夠的內存用於排序,參考外排的實現,可以一塊一塊的排序,然后最終合並。
最后要講到的一塊內存是"unroll",該快內存用於unroll計算如下:spark.storage.unrollFraction * spark.storage.memoryFraction * spark.storage.safetyFraction 。當我們需要在內存展開數據塊的時候使用,那么為什么需要展開呢?因為spark允許以序列化和非序列化兩種方式存儲數據,序列化后的數據無法直接使用,所以使用時必須要展開。該部分內存占用緩存的內存,所以如果需要內存用於展開數據時,如果這個時候內存不夠,那么Spark LRU緩存中的數據會刪除一些快。
此時應該清楚知道spark怎么使用JVM中堆內存了,現在切換到集群模式,當你啟動一個spark集群,如何看待它,下圖是YARN模式下的

當運行在yarn集群上時,Yarn的ResourceMananger用來管理集群資源,集群上每個節點上的NodeManager用來管控所在節點的資源,從yarn的角度來看,每個節點看做可分配的資源池,當向ResourceManager請求資源時,它返回一些NodeManager信息,這些NodeManager將會提供execution container給你,每個
execution container就是滿足請求的堆大小的JVM進程,JVM進程的位置是由ResourceMananger管理的,不能自己控制,如果一個節點有64GB的內存被yarn管理(通過yarn.nodemanager.resource.memory-mb配置),當請求10個4G內存的executors時,這些
executors可能運行在同一個節點上。
當在yarn上啟動spark集群上,可以指定executors的數量(-num-executors或者spark.executor.instances),可以指定每個executor使用的內存(-executor-memory或者spark.executor.memory),可以指定每個executor使用的cpu核數(-executor-cores或者spark.executor.cores),指定每個task執行使用的core數(spark.task.cpus),也可以指定driver應用使用的內存(-driver-memory和spark.driver.memory)
當在集群上執行應用時,job會被切分成stages,每個stage切分成task,每個task單獨調度,可以把executor的jvm進程看做task執行池,每個executor有 spark.executor.cores / spark.task.cpus execution 個執行槽,這里有個例子:集群有12個節點運行Yarn的NodeManager,每個節點有64G內存和32的cpu核,每個節點可以啟動2個executor,每個executor的使用26G內存,剩下的內用系統和別的服務使用,每個executor有12個cpu核用於執行task,這樣整個集群有12 machines * 2 executors per machine * 12 cores per executor / 1 core = 288 個task執行槽,這意味着spark集群可以同時跑288個task,整個集群用戶緩存數據的內存有0.9 spark.storage.safetyFraction * 0.6 spark.storage.memoryFraction * 12 machines * 2 executors per machine * 26 GB per executor = 336.96 GB.
到目前為止,我們已經了解了spark怎么使用JVM的內存以及集群上執行槽是什么,目前為止還沒有談到task的一些細節,這將在另一個文章中提高,基本上就是spark的一個工作單元,作為exector的jvm進程中的一個線程執行,這也是為什么spark的job啟動時間快的原因,在jvm中啟動一個線程比啟動一個單獨的jvm進程塊(在hadoop中執行mapreduce應用會啟動多個jvm進程)
下面將關注spark的另一個抽象:partition, spark處理的所有數據都會切分成partion,一個parition是什么以及怎么確定,partition的大小完全依賴數據源,spark中大部分用於讀取數據的方法都可以指定生成的RDD中partition的個數,當從hdfs上讀取一個文件時,會使用Hadoop的InputFormat來處理,默認情況下InputFormat返回每個InputSplit都會映射RDD中的一個Partition,大部分存儲在HDFS上的文件每個數據塊會生成一個InputSplit,每個數據塊大小為64mb和128mb,因為HDFS上面的數據的塊邊界是按字節來算的(64mb一個塊),但是當數據被處理是,它又要按記錄進行切分,對於文本文件來說切分的字符就是換行符,對於sequence文件來說,他是塊結束,如果是壓縮文件,整個文件都被壓縮了,它不能按行進行切分了,整個文件只有一個inputsplit,這樣spark中也會只有一個parition,在處理的時候需要手動的repatition。