一.算子調優之MapPartitions提升Map類操作性能
1.MapPartitions操作的優點:
如果是普通的map,比如一個partition中有1萬條數據;ok,那么你的function要執行和計算1萬次。
但是,使用MapPartitions操作之后,一個task僅僅會執行一次function,function一次接收所有的partition數據。只要執行一次就可以了,性能比較高。
2.MapPartitions的缺點:一定是有的。
如果是普通的map操作,一次function的執行就處理一條數據;那么如果內存不夠用的情況下,比如處理了1千條數據了,那么這個時候內存不夠了,那么就可以將已經處理完的1千條數據從內存里面垃圾回收掉,或者用其他方法,騰出空間來吧。
所以說普通的map操作通常不會導致內存的OOM異常。
但是MapPartitions操作,對於大量數據來說,比如甚至一個partition,100萬數據,一次傳入一個function以后,那么可能一下子內存不夠,但是又沒有辦法去騰出內存空間來,可能就OOM,內存溢出。
3.什么時候比較適合用MapPartitions系列操作?
就是說,數據量不是特別大的時候,都可以用這種MapPartitions系列操作,性能還是非常不錯的,是有提升的。比如原來是15分鍾,(曾經有一次性能調優),12分鍾。10分鍾->9分鍾。
但是也有過出問題的經驗,MapPartitions只要一用,直接OOM,內存溢出,崩潰。
在項目中,自己先去估算一下RDD的數據量,以及每個partition的量,還有自己分配給每個executor的內存資源。看看一下子內存容納所有的partition數據,行不行。如果行,可以試一下,能跑通就好。性能肯定是有提升的。
但是試了一下以后,發現,不行,OOM了,那就放棄吧。
二.算子調優之filter過后使用coalesce減少分區數量
(1)在經過filter之后,通常會造成各個partition中的數據數量相差過大
默認情況下,經過了這種filter之后,RDD中的每個partition的數據量,可能都不太一樣了。(原本每個partition的數據量可能是差不多的)
問題:
1、每個partition數據量變少了,但是在后面進行處理的時候,還是要跟partition數量一樣數量的task,來進行處理;有點浪費task計算資源。
2、每個partition的數據量不一樣,會導致后面的每個task處理每個partition的時候,每個task要處理的數據量就不同,這個時候很容易發生什么問題?數據傾斜。。。。
比如說,第二個partition的數據量才100;但是第三個partition的數據量是900;那么在后面的task處理邏輯一樣的情況下,不同的task要處理的數據量可能差別達到了9倍,甚至10倍以上;同樣也就導致了速度的差別在9倍,甚至10倍以上。
這樣的話呢,就會導致有些task運行的速度很快;有些task運行的速度很慢。這,就是數據傾斜。
(2)針對上述的兩個問題,我們希望應該能夠怎么樣?
1、針對第一個問題,我們希望可以進行partition的壓縮吧,因為數據量變少了,那么partition其實也完全可以對應的變少。比如原來是4個partition,現在完全可以變成2個partition。那么就只要用后面的2個task來處理即可。就不會造成task計算資源的浪費。(不必要,針對只有一點點數據的partition,還去啟動一個task來計算)
2、針對第二個問題,其實解決方案跟第一個問題是一樣的;也是去壓縮partition,盡量讓每個partition的數據量差不多。那么這樣的話,后面的task分配到的partition的數據量也就差不多。不會造成有的task運行速度特別慢,有的task運行速度特別快。避免了數據傾斜的問題。
有了解決問題的思路之后,接下來,我們該怎么來做呢?實現?
(3)coalesce算子
主要就是用於在filter操作之后,針對每個partition的數據量各不相同的情況,來壓縮partition的數量。減少partition的數量,而且讓每個partition的數據量都盡量均勻緊湊。
從而便於后面的task進行計算操作,在某種程度上,能夠一定程度的提升性能。
三.算子調優之使用foreachPartition優化寫數據庫性能
(1)傳統的foreach寫數據庫過程
默認的foreach的性能缺陷在哪里?
首先,對於每條數據,都要單獨去調用一次function,task為每個數據,都要去執行一次function函數。
如果100萬條數據,(一個partition),調用100萬次。性能比較差。
另外一個非常非常重要的一點
如果每個數據,你都去創建一個數據庫連接的話,那么你就得創建100萬次數據庫連接。
但是要注意的是,數據庫連接的創建和銷毀,都是非常非常消耗性能的。雖然我們之前已經用了數據庫連接池,只是創建了固定數量的數據庫連接。
你還是得多次通過數據庫連接,往數據庫(MySQL)發送一條SQL語句,然后MySQL需要去執行這條SQL語句。如果有100萬條數據,那么就是100萬次發送SQL語句。
以上兩點(數據庫連接,多次發送SQL語句),都是非常消耗性能的。
(2)使用foreachPartition
(3)用了foreachPartition算子之后,好處在哪里?
1、對於我們寫的function函數,就調用一次,一次傳入一個partition所有的數據
2、主要創建或者獲取一個數據庫連接就可以
3、只要向數據庫發送一次SQL語句和多組參數即可
在實際生產環境中,清一色,都是使用foreachPartition操作;但是有個問題,跟mapPartitions操作一樣,如果一個partition的數量真的特別特別大,比如真的是100萬,那基本上就不太靠譜了。
一下子進來,很有可能會發生OOM,內存溢出的問題。
一組數據的對比:生產環境
一個partition大概是1千條左右
用foreach,跟用foreachPartition,性能的提升達到了2~3分鍾。
四.算子調優之使用repartition解決Spark SQL低並行度的性能問題
並行度:之前說過,並行度是自己可以調節,或者說是設置的。
1、spark.default.parallelism
2、textFile(),傳入第二個參數,指定partition數量(比較少用)
咱們的項目代碼中,沒有設置並行度,實際上,在生產環境中,是最好自己設置一下的。官網有推薦的設置方式,你的spark-submit腳本中,會指定你的application總共要啟動多少個executor,100個;每個executor多少個cpu core,2~3個;總共application,有cpu core,200個。
官方推薦,根據你的application的總cpu core數量(在spark-submit中可以指定,200個),自己手動設置spark.default.parallelism參數,指定為cpu core總數的2~3倍。400~600個並行度。600。
承上啟下
你設置的這個並行度,在哪些情況下會生效?哪些情況下,不會生效?
如果你壓根兒沒有使用Spark SQL(DataFrame),那么你整個spark application默認所有stage的並行度都是你設置的那個參數。(除非你使用coalesce算子縮減過partition數量)
問題來了,Spark SQL,用了。用Spark SQL的那個stage的並行度,你沒法自己指定。Spark SQL自己會默認根據hive表對應的hdfs文件的block,自動設置Spark SQL查詢所在的那個stage的並行度。你自己通過spark.default.parallelism參數指定的並行度,只會在沒有Spark SQL的stage中生效。
比如你第一個stage,用了Spark SQL從hive表中查詢出了一些數據,然后做了一些transformation操作,接着做了一個shuffle操作(groupByKey);下一個stage,在shuffle操作之后,做了一些transformation操作。hive表,對應了一個hdfs文件,有20個block;你自己設置了spark.default.parallelism參數為100。
你的第一個stage的並行度,是不受你的控制的,就只有20個task;第二個stage,才會變成你自己設置的那個並行度,100。
問題在哪里?
Spark SQL默認情況下,它的那個並行度,咱們沒法設置。可能導致的問題,也許沒什么問題,也許很有問題。Spark SQL所在的那個stage中,后面的那些transformation操作,可能會有非常復雜的業務邏輯,甚至說復雜的算法。如果你的Spark SQL默認把task數量設置的很少,20個,然后每個task要處理為數不少的數據量,然后還要執行特別復雜的算法。
這個時候,就會導致第一個stage的速度,特別慢。第二個stage,1000個task,刷刷刷,非常快。
解決上述Spark SQL無法設置並行度和task數量的辦法,是什么呢?
repartition算子,你用Spark SQL這一步的並行度和task數量,肯定是沒有辦法去改變了。但是呢,可以將你用Spark SQL查詢出來的RDD,使用repartition算子,去重新進行分區,此時可以分區成多個partition,比如從20個partition,分區成100個。
然后呢,從repartition以后的RDD,再往后,並行度和task數量,就會按照你預期的來了。就可以避免跟Spark SQL綁定在一個stage中的算子,只能使用少量的task去處理大量數據以及復雜的算法邏輯。
五.算子調優之reduceByKey本地聚合介紹
reduceByKey,相較於普通的shuffle操作(比如groupByKey),它的一個特點,就是說,會進行map端的本地聚合。
對map端給下個stage每個task創建的輸出文件中,寫數據之前,就會進行本地的combiner操作,也就是說對每一個key,對應的values,都會執行你的算子函數() + _)
(1)用reduceByKey對性能的提升:
1、在本地進行聚合以后,在map端的數據量就變少了,減少磁盤IO。而且可以減少磁盤空間的占用。
2、下一個stage,拉取數據的量,也就變少了。減少網絡的數據傳輸的性能消耗。
3、在reduce端進行數據緩存的內存占用變少了。
4、reduce端,要進行聚合的數據量也變少了。
(2)總結:
reduceByKey在什么情況下使用呢?
1、非常普通的,比如說,就是要實現類似於wordcount程序一樣的,對每個key對應的值,進行某種數據公式或者算法的計算(累加、類乘)
2、對於一些類似於要對每個key進行一些字符串拼接的這種較為復雜的操作,可以自己衡量一下,其實有時,也是可以使用reduceByKey來實現的。但是不太好實現。如果真能夠實現出來,對性能絕對是有幫助的。(shuffle基本上就占了整個spark作業的90%以上的性能消耗,主要能對shuffle進行一定的調優,都是有價值的)