性能調優
(1)數據接收並行度調優(一)
通過網絡接收數據時(比如Kafka、Flume),會將數據反序列化,並存儲在Spark的內存中。如果數據接收稱為系統的瓶頸,那么可以考慮並行化數據接收。每一個輸入DStream都會在某個Worker的Executor上啟動一個Receiver,該Receiver接收一個數據流。因此可以通過創建多個輸入DStream,並且配置它們接收數據源不同的分區數據,達到接收多個數據流的效果。比如說,一個接收兩個Kafka Topic的輸入DStream,可以被拆分為兩個輸入DStream,每個分別接收一個topic的數據。這樣就會創建兩個Receiver,從而並行地接收數據,進而提升吞吐量。多個DStream可以使用union算子進行聚合,從而形成一個DStream。然后后續的transformation算子操作都針對該一個聚合后的DStream即可。
int numStreams = 5;
List<JavaPairDStream<String, String>> kafkaStreams = new ArrayList<JavaPairDStream<String, String>>(numStreams);
for (int i = 0; i < numStreams; i++) {
kafkaStreams.add(KafkaUtils.createStream(...));
}
JavaPairDStream<String, String> unifiedStream = streamingContext.union(kafkaStreams.get(0), kafkaStreams.subList(1, kafkaStreams.size()));
unifiedStream.print();
(2)數據接收並行度調優(二)
數據接收並行度調優,除了創建更多輸入DStream和Receiver以外,還可以考慮調節block interval。通過參數,spark.streaming.blockInterval,可以設置block interval,默認是200ms。對於大多數Receiver來說,在將接收到的數據保存到Spark的BlockManager之前,都會將數據切分為一個一個的block。而每個batch中的block數量,則決定了該batch對應的RDD的partition的數量,以及針對該RDD執行transformation操作時,創建的task的數量。每個batch對應的task數量是大約估計的,即batch interval / block interval。
例如說,batch interval為2s,block interval為200ms,會創建10個task。如果你認為每個batch的task數量太少,即低於每台機器的cpu core數量,那么就說明batch的task數量是不夠的,因為所有的cpu資源無法完全被利用起來。要為batch增加block的數量,那么就減小block interval。然而,推薦的block interval最小值是50ms,如果低於這個數值,那么大量task的啟動時間,可能會變成一個性能開銷點。
(3)數據接收並行度調優(三)
除了上述說的兩個提升數據接收並行度的方式,還有一種方法,就是顯式地對輸入數據流進行重分區。使用inputStream.repartition(<number of partitions>)即可。這樣就可以將接收到的batch,分布到指定數量的機器上,然后再進行進一步的操作。
(4)任務啟動調優
如果每秒鍾啟動的task過於多,比如每秒鍾啟動50個,那么發送這些task去Worker節點上的Executor的性能開銷,會比較大,而且此時基本就很難達到毫秒級的延遲了。使用下述操作可以減少這方面的性能開銷:
1、Task序列化:
使用Kryo序列化機制來序列化task,可以減小task的大小,從而減少發送這些task到各個Worker節點上的Executor的時間。
2、執行模式:
在Standalone模式下運行Spark,可以達到更少的task啟動時間。
上述方式,也許可以將每個batch的處理時間減少100毫秒。從而從秒級降到毫秒級。
(5)數據處理並行度調優
如果在計算的任何stage中使用的並行task的數量沒有足夠多,那么集群資源是無法被充分利用的。舉例來說,對於分布式的reduce操作,比如reduceByKey和reduceByKeyAndWindow,默認的並行task的數量是由spark.default.parallelism參數決定的。你可以在reduceByKey等操作中,傳入第二個參數,手動指定該操作的並行度,也可以調節全局的spark.default.parallelism參數。
(6)數據序列化調優(一)
數據序列化造成的系統開銷可以由序列化格式的優化來減小。在流式計算的場景下,有兩種類型的數據需要序列化。
1、輸入數據:
默認情況下,接收到的輸入數據,是存儲在Executor的內存中的,使用的持久化級別是StorageLevel.MEMORY_AND_DISK_SER_2。這意味着,數據被序列化為字節從而減小GC開銷,並且會復制以進行executor失敗的容錯。因此,數據首先會存儲在內存中,然后在內存不足時會溢寫到磁盤上,從而為流式計算來保存所有需要的數據。這里的序列化有明顯的性能開銷——Receiver必須反序列化從網絡接收到的數據,然后再使用Spark的序列化格式序列化數據。
2、流式計算操作生成的持久化RDD:
流式計算操作生成的持久化RDD,可能會持久化到內存中。例如,窗口操作默認就會將數據持久化在內存中,因為這些數據后面可能會在多個窗口中被使用,並被處理多次。然而,不像Spark Core的默認持久化級別,StorageLevel.MEMORY_ONLY,流式計算操作生成的RDD的默認持久化級別是StorageLevel.MEMORY_ONLY_SER ,默認就會減小GC開銷。
(7)數據序列化調優(二)
在上述的場景中,使用Kryo序列化類庫可以減小CPU和內存的性能開銷。使用Kryo時,一定要考慮注冊自定義的類,並且禁用對應引用的tracking(spark.kryo.referenceTracking)。
在一些特殊的場景中,比如需要為流式應用保持的數據總量並不是很多,也許可以將數據以非序列化的方式進行持久化,從而減少序列化和反序列化的CPU開銷,而且又不會有太昂貴的GC開銷。舉例來說,如果你數秒的batch interval,並且沒有使用window操作,那么你可以考慮通過顯式地設置持久化級別,來禁止持久化時對數據進行序列化。這樣就可以減少用於序列化和反序列化的CPU性能開銷,並且不用承擔太多的GC開銷。
(8)batch interval調優(最重要)
如果想讓一個運行在集群上的Spark Streaming應用程序可以穩定,它就必須盡可能快地處理接收到的數據。換句話說,batch應該在生成之后,就盡可能快地處理掉。對於一個應用來說,這個是不是一個問題,可以通過觀察Spark UI上的batch處理時間來定。batch處理時間必須小於batch interval時間。
基於流式計算的本質,batch interval對於,在固定集群資源條件下,應用能保持的數據接收速率,會有巨大的影響。例如,在WordCount例子中,對於一個特定的數據接收速率,應用業務可以保證每2秒打印一次單詞計數,而不是每500ms。因此batch interval需要被設置得,讓預期的數據接收速率可以在生產環境中保持住。
為你的應用計算正確的batch大小的比較好的方法,是在一個很保守的batch interval,比如5~10s,以很慢的數據接收速率進行測試。要檢查應用是否跟得上這個數據速率,可以檢查每個batch的處理時間的延遲,如果處理時間與batch interval基本吻合,那么應用就是穩定的。否則,如果batch調度的延遲持續增長,那么就意味應用無法跟得上這個速率,也就是不穩定的。因此你要想有一個穩定的配置,可以嘗試提升數據處理的速度,或者增加batch interval。記住,由於臨時性的數據增長導致的暫時的延遲增長,可以合理的,只要延遲情況可以在短時間內恢復即可。
(9)內存調優(一)
優化Spark應用的內存使用和GC行為,在Spark Core的調優中,已經講過了。這里講一下與Spark Streaming應用相關的調優參數。
Spark Streaming應用需要的集群內存資源,是由使用的transformation操作類型決定的。舉例來說,如果想要使用一個窗口長度為10分鍾的window操作,那么集群就必須有足夠的內存來保存10分鍾內的數據。如果想要使用updateStateByKey來維護許多key的state,那么你的內存資源就必須足夠大。反過來說,如果想要做一個簡單的map-filter-store操作,那么需要使用的內存就很少。
通常來說,通過Receiver接收到的數據,會使用StorageLevel.MEMORY_AND_DISK_SER_2持久化級別來進行存儲,因此無法保存在內存中的數據會溢寫到磁盤上。而溢寫到磁盤上,是會降低應用的性能的。因此,通常是建議為應用提供它需要的足夠的內存資源。建議在一個小規模的場景下測試內存的使用量,並進行評估。
(10)內存調優(二)
內存調優的另外一個方面是垃圾回收。對於流式應用來說,如果要獲得低延遲,肯定不想要有因為JVM垃圾回收導致的長時間延遲。有很多參數可以幫助降低內存使用和GC開銷:
1、DStream的持久化:
正如在“數據序列化調優”一節中提到的,輸入數據和某些操作生產的中間RDD,默認持久化時都會序列化為字節。與非序列化的方式相比,這會降低內存和GC開銷。使用Kryo序列化機制可以進一步減少內存使用和GC開銷。進一步降低內存使用率,可以對數據進行壓縮,由spark.rdd.compress參數控制(默認false)。
2、清理舊數據:
默認情況下,所有輸入數據和通過DStream transformation操作生成的持久化RDD,會自動被清理。Spark Streaming會決定何時清理這些數據,取決於transformation操作類型。例如,你在使用窗口長度為10分鍾內的window操作,Spark會保持10分鍾以內的數據,時間過了以后就會清理舊數據。但是在某些特殊場景下,比如Spark SQL和Spark Streaming整合使用時,在異步開啟的線程中,使用Spark SQL針對batch RDD進行執行查詢。那么就需要讓Spark保存更長時間的數據,直到Spark SQL查詢結束。可以使用streamingContext.remember()方法來實現。
3、CMS垃圾回收器:
使用並行的mark-sweep垃圾回收機制,被推薦使用,用來保持GC低開銷。雖然並行的GC會降低吞吐量,但是還是建議使用它,來減少batch的處理時間(降低處理過程中的gc開銷)。如果要使用,那么要在driver端和executor端都開啟。在spark-submit中使用--driver-java-options設置;使用spark.executor.extraJavaOptions參數設置。-XX:+UseConcMarkSweepGC。