本文翻譯之cloudera的博客,本系列有兩篇,第二篇看心情了
概論
當我們理解了
transformation,action和rdd后,我們就可以寫一些基礎的spark的應用了,但是如果需要對應用進行調優就需要了解spark的底層執行模型,理解job,stage,task等概念。
本文你將會了解spark程序是怎么在機器上執行的,同時也學到一些實用的建議關於什么樣的執行模型可以提高程序效率
Spark如何執行應用
一個spark程序包括一個driver進程和多個分散在集群節點上的executor進程,driver負責工作流程,executor負責以task方式執行工作同時也會存儲一些數據,在程序運行的整個生命周期里driver和executor會一直存活,盡管后期動態資源分配可能會改變這種情況,每個executor都有一些任務槽用於並發運行task,部署這些進程到集群中依賴所選的集群部署方式(standalone,mesos,yarn),但是driver和executor存在每一個spark程序中
當在spark中執行一個action會觸發一個spark的job的提交,為了了解該job是什么樣的,spark會通過RDD的DAG來確定該action依賴什么,同時生成一個執行計划,該計划從一個最遠的RDD開始,通常該RDD不依賴別的RDD或者是一個緩存的數據到一個生成Action結果的最終的RDD
執行計划包含了一些stages,stage是由tranformation組裝生成的,一個stage對應一組執行相同代碼的task,每一個task處理不同的數據,每個stage包含一系列的transformation,這些transformation執行的時候不需要shuffler數據
什么決定哪些數據需要進行shuffle, 一個RDD由固定數量的partition組成,每個partition是由許多記錄組成的,對於哪些通過窄依賴的transformation(比如map,filter)返回的RDD,它的partition中的一個記錄是從
它父RDD中一個parition的一個記錄計算得到的,子RDD一個partition只依賴父RDD中的一個partition,像coalesce這種的操作會導致一個task計算父RDD中多個partition,但是仍把他看做窄依賴,因為只依賴一部分的partition.
spark還支持像groupbyke和reducebykey這樣的寬依賴,這樣計算子RDD中一個partition的一條記錄依賴父RDD中許多
partition中的記錄,具有相同key的所有tuple最終會放到在同一個partition中,然后被同一個task處理,為了滿足這種操作,spark必須進行shuffle,通過在集群的節點間傳輸數據最終生成一個新的stage和新的partition集合。(同一個stage中所有的RDD的partition數據應該不一致)
例如
sc.textFile("someFile.txt").map(mapFunc).flatMap(flatMapFunc).filter(filterFunc).count()
該代碼最后執行了一個action的操作,它依賴對數據源是一個文本文件的RDD一系列的轉換,該代碼會在一個stage中執行,因為他們只窄依賴。
val tokenized = sc.textFile(args(0)).flatMap(_.split(' '))val wordCounts = tokenized.map((_,1)).reduceByKey(_ + _)val filtered = wordCounts.filter(_._2 >=1000)val charCounts = filtered.flatMap(_._1.toCharArray).map((_,1)).reduceByKey(_ + _)charCounts.collect()
上面代碼將會分成3個stage進行執行,reduceByKey操作會產生一個stage邊界,因為計算它的輸出需要數據按key進行重新分區。下圖是一個RDD的轉換圖。
下面粉色的線划分出需要執行的stage
在每一個stage的邊界,父stage中的task會把數據寫入到磁盤上,子stage中的task會通過網絡去獲取數據,因為這樣會加重磁盤和網絡的負擔,stage邊界代價太高應該盡可能的避免,父stage中數據的partition可能和子stage中的partition的數據不一致。那么會導致stage邊界的轉換函數都會接受一個numPartition的參數用於確定子stage中數據的partition個數。
對於mapreduce應用來說reducer的個數是一個重要的參數,調整stage邊界中partition的個數通常可以提高或降低spark應用的性能,下面講下如何有效調整該參數。
選擇正確的操作
當使用spark編寫應用時,開發者可以組裝不同的transformation和action生成相同的結果,但是不是所有組裝的性能都是一樣的,選擇正常的組裝方式可以大大提供應用的性能,一些規則和觀點可以幫助做出正確的選擇。
ShemaRDD正慢慢穩定下,它將開放Spark的catalyst優化器給使用spark core的開發者使用,它將spark可以做一些高級的選擇:哪些操作可以使用,當ShemaRDD變成一個穩定模塊后,用戶就可以避免做這樣的選擇了。
選擇操作的組裝方式的目標為了減少shuffle的數量和shuffler數據的數量,這是因為shuffler是一個很昂貴的操作,所有的數據必須落盤然后再通過網絡傳輸,repatition,join,cogroup和別的一些以*key和*bykey結尾的transformation操作都會導致shuffle, 但是性能是不一樣的,初學者遇到的最大問題就是不明白他們的操作成本,
- 當執行一個associative reductive 操作時不要使用groupbykey,例如。 rdd.groupbykey().mapValues(_.sum)和rdd.reduceBykey(_+_)的結果一樣,但是前面的操作會導致所有的數據進行網絡傳輸,后者只會先在本地計算每個patition相同key的和,然后通過shuffler合並所有本地計算的和(都會有shuffle,但是傳輸的數據減少了很多)
- 當輸入和輸出的類型不一樣時不要使用reduceByKey,例如
當寫一個transformation用來找到每一個key對應唯一的一個字符串是,一種方式如下:rdd.map(kv => (kv._1, new Set[String]() + kv._2)).reduceByKey(_ ++ _),該操作會導致大量的不必要的set對象,每個key都會創建一個,這里最好使用aggregateBykey,它會執行map端的聚集更有效val zero =new collection.mutable.Set[String]()rdd.aggregateByKey(zero)((set, v)=> set += v,(set1, set2)=> set1 ++= set2)
3. 不要使用flatMap-join-groupBy的模式。當兩個數據集已經groupbykey后,如果想join后繼續分組,可以使用cogroup,這將建少很多That avoids all the overhead associated with unpacking and repacking the groups.
什么時候shuffle不會發生
需要注意哪些場景下上面的transformation不會有shuffle,spark知道怎么避免shuffle當前面的transformation時數據被同一個patitioner分區過。
rdd1 = someRdd.reduceByKey(...)rdd2 = someOtherRdd.reduceByKey(...)rdd3 = rdd1.join(rdd2)
該例子中reduceByKey使用默認的patitioner,所有rdd1和rdd2都是哈希分區的,兩個RDD會有兩次的shuffler,如果兩個RDD的parttion的個數相同,join就不需要額外的shuffle了,因為兩個RDD被同一個partitioner分區,相同key的數據都落在兩個RDD上的單獨的一個parition上。所有rdd3的一個partition只依賴rdd1和rdd2中的單獨的partition
注意,這里是任務的執行模型,不是parition的模型,
如果兩個RDD的parition使用不同的partitioner,那么join可能需要shuffle
當兩個數據集做join時避免shuffle的另一種方式就是使用廣播變量,當一個數據集小的足夠塞到一個executor內存中,它可以被加載到driver的一個hashtable中然后廣播到每一個executor中。一個map的transformation引用該hashtable做join
什么時候需要更多的shuffle
多數情況下shuffle對性能有消耗,但是有時候額外的shuffle可以增大並發而提高性能,例如,數據存儲在一個不可切分的文件中(壓縮文件),InputFormat進行分區是可能會導致大量的記錄在一個partition中,parition的個數少不能有效利用集群計算資源,這種情況下,當數據加載完成后執行repatition操作,它會導致shuffle但是可以利用集群更多的計算資源
還一種情況就是使用reduce和aggregate等action在driver上進行數據聚集,當聚集大量數目的partition時,
driver上由於使用一個線程用來合並這些結果將成為性能的瓶頸,為了減輕driver的壓力,可以調用reduceBykey和aggregateByKey執行一個分布式聚集操作,把數據切分中更小的parition, 每個partion並行的合並數據,然后把結果返回給driver執行最終的聚集,
二次排序
還一個需要注意的就是repartitionAndSortWithinPartitions這個transformation,該transformation在shuffler的時候就行排序,這樣大量數據落盤的時候可以進行排序然后方便后面的操作
