【數據傾斜及調優概述】
大數據分布式計算中一個常見的棘手問題——數據傾斜:
在進行shuffle的時候,必須將各個節點上相同的key拉取到某個節點上的一個task來進行處理,比如按照key進行聚合或join等操作。此時如果某個key對應的數據量特別大的話,就會發生數據傾斜。比如大部分key對應10條數據,但是個別key卻對應了百萬條數據,那么大部分task可能就只會分配到10條數據,然后1秒鍾就運行完了;但是個別task可能分配到了百萬數據,要運行一兩個小時。木桶原理,整個作業的運行進度是由運行時間最長的那個task決定的。
出現數據傾斜的時候,絕大多數task執行得都非常快,但個別task執行極慢。例如,總共有1000個task,997個task都在1分鍾之內執行完了,但是剩余兩三個task卻要一兩個小時。這種情況很常見。原本能夠正常執行的Spark作業,某天突然報出OOM(內存溢出)異常,觀察異常棧,是我們寫的業務代碼造成的。這種情況比較少見。
此時Spark作業的性能會比期望差很多。數據傾斜調優,就是使用各種技術方案解決不同類型的數據傾斜問題,以保證Spark作業的性能。
【定位發生數據傾斜的代碼】
1) 數據傾斜只會發生在shuffle過程中。所以關注一些常用的並且可能會觸發shuffle操作的算子:distinct、groupByKey、reduceByKey、aggregateByKey、join、cogroup、repartition等。出現數據傾斜時,可能就是代碼中使用了這些算子中的某一個所導致的。
2)通過觀察spark UI的界面,定位數據傾斜發生在第幾個stage中。
如果是用yarn-client模式提交,那么本地是直接可以看到log的,可以在log中找到當前運行到了第幾個stage;如果是用yarn-cluster模式提交,則可以通過Spark Web UI來查看當前運行到了第幾個stage。此外,無論是使用yarn-client模式還是yarn-cluster模式,我們都可以在Spark Web UI上深入看一下當前這個stage各個task分配的數據量,從而進一步確定是不是task分配的數據不均勻導致了數據傾斜。
- 1段提交代碼是1個Application
- 1個action算子是1個job
- 1個job中,以寬依賴為分割線,划分成不同stage,stage編號從0開始
- 1個stage中,划分出參數指定數量的task,注意觀察Locality Level和Duration列
- Executor數量是配置參數指定的
- 看結果文件---自己統計代碼中println的打印
3)根據 【Spark工作原理】stage划分原理理解 中的stage的划分算法定位到極有可能發生數據傾斜的代碼
【查看導致數據傾斜的key的分布情況】
1. 如果是Spark SQL中的group by、join語句導致的數據傾斜,那么就查詢一下SQL中使用的表的key分布情況。
2. 如果是對Spark RDD執行shuffle算子導致的數據傾斜,那么可以在Spark作業中加入查看key分布的代碼,比如RDD.countByKey()。然后對統計出來的各個key出現的次數,collect/take到客戶端打印一下,就可以看到key的分布情況。
不放回sample+countByKey查看key分布,是否數據傾斜
val sampledPairs = pairs.sample(false, 0.1)
val sampledWordCounts = sampledPairs.countByKey()
sampledWordCounts.foreach(println(_))