[看圖說話] 基於Spark UI性能優化與調試——初級篇


Spark有幾種部署的模式,單機版、集群版等等,平時單機版在數據量不大的時候可以跟傳統的java程序一樣進行斷電調試、但是在集群上調試就比較麻煩了...遠程斷點不太方便,只能通過Log的形式進行數據分析,利用spark ui做性能調整和優化。

那么本篇就介紹下如何利用Ui做性能分析,因為本人的經驗也不是很豐富,所以只能作為一個入門的介紹。

大體上會按照下面的思路進行講解:

  • 怎么訪問Spark UI
  • SparkUI能看到什么東西?job,stage,storage,environment,excutors
  • 調優的一些經驗總結

Spark UI入口

如果是單機版本,在單機調試的時候輸出信息中已經提示了UI的入口:

17/02/26 13:55:48 INFO SparkEnv: Registering OutputCommitCoordinator
17/02/26 13:55:49 INFO Utils: Successfully started service 'SparkUI' on port 4040.
17/02/26 13:55:49 INFO SparkUI: Started SparkUI at http://192.168.1.104:4040
17/02/26 13:55:49 INFO Executor: Starting executor ID driver on host localhost

單機調試的時候,可以直接登陸:http://192.168.1.104:4040

如果是集群模式,可以通過Spark日志服務器xxxxx:18088者yarn的UI進入到應用xxxx:8088,進入相應的Spark UI界面。

主頁介紹

上面就是Spark的UI主頁,首先進來能看到的是Spark當前應用的job頁面,在上面的導航欄:

  • 1 代表job頁面,在里面可以看到當前應用分析出來的所有任務,以及所有的excutors中action的執行時間。
  • 2 代表stage頁面,在里面可以看到應用的所有stage,stage是按照寬依賴來區分的,因此粒度上要比job更細一些
  • 3 代表storage頁面,我們所做的cache persist等操作,都會在這里看到,可以看出來應用目前使用了多少緩存
  • 4 代表environment頁面,里面展示了當前spark所依賴的環境,比如jdk,lib等等
  • 5 代表executors頁面,這里可以看到執行者申請使用的內存以及shuffle中input和output等數據
  • 6 這是應用的名字,代碼中如果使用setAppName,就會顯示在這里
  • 7 是job的主頁面。

模塊講解

下面挨個介紹一下各個頁面的使用方法和實踐,為了方便分析,我這里直接使用了分布式計算里面最經典的helloworld程序——WordCount,這個程序用於統計某一段文本中一個單詞出現的次數。原始的文本如下:

for the shadow of lost knowledge at least protects you from many illusions

上面這句話是有一次逛知乎,一個標題為 讀那么多書,最后也沒記住多少,還為什么讀書?其中有一個回復,引用了上面的話,也是我最喜歡的一句。意思是:“知識,哪怕是知識的幻影,也會成為你的鎧甲,保護你不被愚昧反噬”(來自知乎——《為什么讀書?》)

程序代碼如下:

public static void main(String[] args) throws InterruptedException {
        SparkConf sparkConf = new SparkConf();
        sparkConf.setMaster("local[2]");
        sparkConf.setAppName("test-for-spark-ui");
        JavaSparkContext sc = new JavaSparkContext(sparkConf);

        //知識,哪怕是知識的幻影,也會成為你的鎧甲,保護你不被愚昧反噬。
        JavaPairRDD<String,Integer> counts = sc.textFile( "C:\\Users\\xinghailong\\Desktop\\你為什么要讀書.txt" )
                .flatMap(line -> Arrays.asList(line.split(" ")).iterator())
                .mapToPair(s -> new Tuple2<String,Integer>(s,1))
                .reduceByKey((x,y) -> x+y);

        counts.cache();
        List<Tuple2<String,Integer>> result = counts.collect();
        for(Tuple2<String,Integer> t2 : result){
            System.out.println(t2._1+" : "+t2._2);
        }
        sc.stop();
}

這個程序首先創建了SparkContext,然后讀取文件,先使用 進行切分,再把每個單詞轉換成二元組,再根據key進行累加,最后輸出打印。為了測試storage的使用,我這對計算的結果添加了緩存。

job頁面

主頁可以分為兩部分,一部分是event timeline,另一部分是進行中和完成的job任務。

第一部分event timeline展開后,可以看到executor創建的時間點,以及某個action觸發的算子任務,執行的時間。通過這個時間圖,可以快速的發現應用的執行瓶頸,觸發了多少個action。

第二部分的圖表,顯示了觸發action的job名字,它通常是某個count,collect等操作。有spark基礎的人都應該知道,在spark中rdd的計算分為兩類,一類是transform轉換操作,一類是action操作,只有action操作才會觸發真正的rdd計算。具體的有哪些action可以觸發計算,可以參考api。collect at test2.java:27描述了action的名字和所在的行號,這里的行號是精准匹配到代碼的,所以通過它可以直接定位到任務所屬的代碼,這在調試分析的時候是非常有幫助的。Duration顯示了該action的耗時,通過它也可以對代碼進行專門的優化。最后的進度條,顯示了該任務失敗和成功的次數,如果有失敗的就需要引起注意,因為這種情況在生產環境可能會更普遍更嚴重。點擊能進入該action具體的分析頁面,可以看到DAG圖等詳細信息。

stage頁面

在Spark中job是根據action操作來區分的,另外任務還有一個級別是stage,它是根據寬窄依賴來區分的。

窄依賴是指前一個rdd計算能出一個唯一的rdd,比如map或者filter等;寬依賴則是指多個rdd生成一個或者多個rdd的操作,比如groupbykey reducebykey等,這種寬依賴通常會進行shuffle。

因此Spark會根據寬窄依賴區分stage,某個stage作為專門的計算,計算完成后,會等待其他的executor,然后再統一進行計算。

stage頁面的使用基本上跟job類似,不過多了一個DAG圖。這個DAG圖也叫作血統圖,標記了每個rdd從創建到應用的一個流程圖,也是我們進行分析和調優很重要的內容。比如上面的wordcount程序,就會觸發acton,然后生成一段DAG圖:

從這個圖可以看出,wordcount會生成兩個dag,一個是從讀數據到切分到生成二元組,第二個進行了reducebykey,產生shuffle。

點擊進去還可以看到詳細的DAG圖,鼠標移到上面,可以看到一些簡要的信息。

storage頁面

storage頁面能看出目前使用的緩存,點擊進去可以看到具體在每個機器上,使用的block的情況。

environment頁面

這個頁面一般不太用,因為環境基本上不會有太多差異的,不用時刻關注它。

excutors頁面

這個頁面比較常用了,一方面通過它可以看出來每個excutor是否發生了數據傾斜,另一方面可以具體分析目前的應用是否產生了大量的shuffle,是否可以通過數據的本地性或者減小數據的傳輸來減少shuffle的數據量。

調優的經驗總結

1 輸出信息

在Spark應用里面可以直接使用System.out.println把信息輸出出來,系統會直接攔截out輸出到spark的日志。像我們使用的yarn作為資源管理系統,在yarn的日志中就可以直接看到這些輸出信息了。這在數據量很大的時候,做一些show()(默認顯示20),count() 或者 take(10)的時候會很方便。

2 內存不夠

當任務失敗,收到sparkContext shutdown的信息時,基本都是執行者的內存不夠。這個時候,一方面可以調大--excutor-memory參數,另一方面還是得回去看看程序。如果受限於系統的硬件條件,無法加大內存,可以采用局部調試法,檢查是在哪里出現的內存問題。比如,你的程序分成幾個步驟,一步一步的打包運行,最后檢查出現問題的點就可以了。

3 ThreadPool

線程池不夠,這個是因為--excutor-core給的太少了,出現線程池不夠用的情況。這個時候就需要調整參數的配置了。

4 physical memory不夠

這種問題一般是driver memory不夠導致的,driver memory通常存儲了以一些調度方面的信息,這種情況很有可能是你的調度過於復雜,或者是內部死循環導致。

5 合理利用緩存

在Spark的計算中,不太建議直接使用cache,萬一cache的量很大,可能導致內存溢出。可以采用persist的方式,指定緩存的級別為MEMORY_AND_DISK,這樣在內存不夠的時候,可以把數據緩存到磁盤上。另外,要合理的設計代碼,恰當地使用廣播和緩存,廣播的數據量太大會對傳輸帶來壓力,緩存過多未及時釋放,也會導致內存占用。一般來說,你的代碼在需要重復使用某一個rdd的時候,才需要考慮進行緩存,並且在不使用的時候,要及時unpersist釋放。

6 盡量避免shuffle

這個點,在優化的過程中是很重要的。比如你需要把兩個rdd按照某個key進行groupby,然后在進行leftouterjoin,這個時候一定要考慮大小表的問題。如果把大表關聯到小表,那么性能很可能會很慘。而只需要簡單的調換一下位置,性能就可能提升好幾倍。

寫在最后

大數據計算總是充滿了各種神奇的色彩,節點之間的分布式,單節點內多線程的並行化,只有多去了解一些原理性的東西,才能用好這些工具。

最后還是獻上最喜歡的那句話——知識,哪怕是知識的幻影,也會成為你的鎧甲,保護你不被愚昧反噬。

參考


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM