Spark優化一則 - 減少Shuffle
看了Spark Summit 2014的A Deeper Understanding of Spark Internals,視頻(要科學上網)詳細講解了Spark的工作原理,Slides的45頁給原始算法和優化算法。
破砂鍋用自己3節點的Spark集群試驗了這個優化算法,並進一步找到更快的算法。測試數據是Sogou實驗室的日志文件前10000000條數據。目標是對日志第2列數據,按照第一個字母合並,得到每個首字母有幾條記錄。
所有的方案都重新啟動Spark shell,先用以下代碼把日志第2列數據cache到內存里,Spark GUI顯示cache有8個partition,約1GB內存。
val rdd = sc.textFile("hdfs://hadoop1:8000/input/SogouQ3.txt").map(_.split("\t")).map(_(1)) rdd.cache() rdd.count() // res1: Long = 10000000
Spark GUI
| RDD Name |
Storage Level |
Cached Partitions |
Fraction Cached |
Size in Memory |
Size in Tachyon |
Size on Disk |
| 3 |
Memory Deserialized 1x Replicated |
8 |
100% |
1089.4 MB |
0.0 B |
0.0 B |
Slides原始方案
rdd.map(x => (x.charAt(0), x)).groupByKey().mapValues({x => x.toSet.size}).collect() // res2: Array[(Char, Int)] = Array((8,168189), (0,168338), (a,168228), (9,168018), (1,167647), (b,168404), (2,168731), (3,168206), (c,168991), (d,168095), (4,167523), (e,168179), (5,167967), (6,167907), (f,168174), (7,168718))
Spark stage GUI顯示有關stage Id是1-2,累計耗時5s,產生140MB shuffle read和208MB shuffle write。
| Stage Id |
Description |
Submitted |
Duration |
Tasks: Succeeded/Total |
Shuffle Read |
Shuffle Write |
| 1 |
2014/09/03 20:51:58 |
3 s |
8/8 |
140.2 MB |
||
| 2 |
2014/09/03 20:51:55 |
2 s |
8/8 |
208.4 MB |
||
| 0 |
2014/09/03 20:51:46 |
8 s |
8/8 |
Slides優化方案
rdd.distinct(numPartitions = 6).map(x => (x.charAt(0), 1)).reduceByKey(_+_).collect() // res2: Array[(Char, Int)] = Array((6,167907), (0,168338), (f,168174), (7,168718), (a,168228), (1,167647), (8,168189), (b,168404), (2,168731), (9,168018), (3,168206), (c,168991), (d,168095), (4,167523), (e,168179), (5,167967))
Spark stage GUI顯示有關stage Id是1-3,累計耗時4.2s,生成50MB shuffle read和75MB shuffle write。雖然多了1個stage,shuffle read/write比原始方案減少超過60%,從而速度加快16%。
| Stage Id |
Description |
Submitted |
Duration |
Tasks: Succeeded/Total |
Shuffle Read |
Shuffle Write |
| 1 |
2014/09/03 20:24:17 |
0.2 s |
6/6 |
4.9 KB |
||
| 2 |
2014/09/03 20:24:15 |
2 s |
6/6 |
50.4 MB |
7.4 KB |
|
| 3 |
2014/09/03 20:24:13 |
2 s |
8/8 |
75.6 MB |
||
| 0 |
2014/09/03 20:23:55 |
7 s |
8/8 |
Zero Shuffle優化方案
既然減少shuffle可以加快速度,破砂鍋想出以下的Zero Shuffle方案來。
rdd.map(x => (x.charAt(0), x)).countByKey() // res2: scala.collection.Map[Char,Long] = Map(e -> 623689, 2 -> 623914, 5 -> 619840, b -> 626111, 8 -> 620738, d -> 623515, 7 -> 620222, 1 -> 616184, 4 -> 616628, a -> 641623, c -> 630514, 6 -> 621346, f -> 624447, 0 -> 632735, 9 -> 637770, 3 -> 620724)
Spark stage GUI顯示有關stage Id是1,累計耗時只有0.3s,沒有shuffle read/write。這個方案有關的RDD只有narrow dependency,所以只有1個stage。
| Stage Id |
Description |
Submitted |
Duration |
Tasks: Succeeded/Total |
Shuffle Read |
Shuffle Write |
| 1 |
2014/09/03 20:45:02 |
0.3 s |
8/8 |
|||
| 0 |
2014/09/03 20:44:32 |
8 s |
小結
比較3種方案
| 方案 |
Shuffle Read |
Shuffle Write |
Time |
| Slides原始方案 |
140.2 MB |
208.4 MB |
5s |
| Slides優化方案 |
50.4 MB |
75.6 MB |
4.2s |
| Zero Shuffle優化方案 |
0 |
0 |
0.3s |
Spark的優化之一是盡可能減少shuffle從而大幅減少緩慢的網絡傳輸。熟悉RDD的函數對Spark優化有很大幫助。
