調優
Spark Streaming集成Kafka時,當數據量較小時默認配置一般都能滿足我們的需要,但是當數據量大的時候,就需要進行一定的調整和優化。
-
合理的批處理時間(batchDuration)
幾乎所有的Spark Streaming調優文檔都會提及批處理時間的調整,在StreamingContext初始化的時候,有一個參數便是批處理時間的設定。如果這個值設置的過短,即個batchDuration所產生的Job並不能在這期間完成處理,那么就會造成數據不斷堆積,最終導致Spark Streaming發生阻塞。而且,一般對於batchDuration的設置不會小於500ms,因為過小會導致SparkStreaming頻繁的提交作業,對整個streaming造成額外的負擔。在平時的應用中,根據不同的應用場景和硬件配置,我設在1~10s之間,我們可以根據SparkStreaming的可視化監控界面,觀察Total Delay來進行batchDuration的調整,如下圖:調整批處理時間 -
合理的Kafka拉取量(maxRatePerPartition重要)
對於Spark Streaming消費kafka中數據的應用場景,這個配置是非常關鍵的,配置參數為:spark.streaming.kafka.maxRatePerPartition。這個參數默認是沒有上線的,即kafka當中有多少數據它就會直接全部拉出。而根據生產者寫入Kafka的速率以及消費者本身處理數據的速度,同時這個參數需要結合上面的batchDuration,使得每個partition拉取在每個batchDuration期間拉取的數據能夠順利的處理完畢,做到盡可能高的吞吐量,而這個參數的調整可以參考可視化監控界面中的Input Rate和Processing Time,如下圖:maxRatePerPartition1maxRatePerPartition2spark.streaming.kafka.maxRatePerPartition這個參數是控制吞吐量的,一般和spark.streaming.backpressure.enabled=true一起使用。那么應該怎么算這個值呢。
如例要10分鍾的吞吐量控制在5000,0000,kafka分區是10個。
spark.streaming.kafka.maxRatePerPartition=8400這個值是怎么算的呢。如下是公式
spark.streaming.kafka.maxRatePerPartition的值 * kafka分區數 * (10 *60)(每秒時間)
-
緩存反復使用的Dstream(RDD)
Spark中的RDD和SparkStreaming中的Dstream,如果被反復的使用,最好利用cache(),將該數據流緩存起來,防止過度的調度資源造成的網絡開銷。可以參考觀察Scheduling Delay參數,如下圖:Dstream - 設置合理的GC
長期使用Java的小伙伴都知道,JVM中的垃圾回收機制,可以讓我們不過多的關注與內存的分配回收,更加專注於業務邏輯,JVM都會為我們搞定。對JVM有些了解的小伙伴應該知道,在Java虛擬機中,將內存分為了初生代(eden generation)、年輕代(young generation)、老年代(old generation)以及永久代(permanent generation),其中每次GC都是需要耗費一定時間的,尤其是老年代的GC回收,需要對內存碎片進行整理,通常采用標記-清楚的做法。同樣的在Spark程序中,JVM GC的頻率和時間也是影響整個Spark效率的關鍵因素。在通常的使用中建議:--conf "spark.executor.extraJavaOptions=-XX:+UseConcMarkSweepGC"
- 設置合理的CPU資源數
CPU的core數量,每個executor可以占用一個或多個core,可以通過觀察CPU的使用率變化來了解計算資源的使用情況,例如,很常見的一種浪費是一個executor占用了多個core,但是總的CPU使用率卻不高(因為一個executor並不總能充分利用多核的能力),這個時候可以考慮讓么個executor占用更少的core,同時worker下面增加更多的executor,或者一台host上面增加更多的worker來增加並行執行的executor的數量,從而增加CPU利用率。但是增加executor的時候需要考慮好內存消耗,因為一台機器的內存分配給越多的executor,每個executor的內存就越小,以致出現過多的數據spill over甚至out of memory的情況。 - 設置合理的parallelism
partition和parallelism,partition指的就是數據分片的數量,每一次task只能處理一個partition的數據,這個值太小了會導致每片數據量太大,導致內存壓力,或者諸多executor的計算能力無法利用充分;但是如果太大了則會導致分片太多,執行效率降低。在執行action類型操作的時候(比如各種reduce操作),partition的數量會選擇parent RDD中最大的那一個。而parallelism則指的是在RDD進行reduce類操作的時候,默認返回數據的paritition數量(而在進行map類操作的時候,partition數量通常取自parent RDD中較大的一個,而且也不會涉及shuffle,因此這個parallelism的參數沒有影響)。所以說,這兩個概念密切相關,都是涉及到數據分片的,作用方式其實是統一的。通過spark.default.parallelism可以設置默認的分片數量,而很多RDD的操作都可以指定一個partition參數來顯式控制具體的分片數量。
在SparkStreaming+kafka的使用中,我們采用了Direct連接方式,前文闡述過Spark中的partition和Kafka中的Partition是一一對應的,我們一般默認設置為Kafka中Partition的數量。 - 使用高性能的算子
這里參考了美團技術團隊的博文,並沒有做過具體的性能測試,其建議如下:
- 使用reduceByKey/aggregateByKey替代groupByKey
- 使用mapPartitions替代普通map
- 使用foreachPartitions替代foreach
- 使用filter之后進行coalesce操作
- 使用repartitionAndSortWithinPartitions替代repartition與sort類操作
- 使用Kryo優化序列化性能
這個優化原則我本身也沒有經過測試,但是好多優化文檔有提到,這里也記錄下來。
在Spark中,主要有三個地方涉及到了序列化:
在算子函數中使用到外部變量時,該變量會被序列化后進行網絡傳輸(見“原則七:廣播大變量”中的講解)。
將自定義的類型作為RDD的泛型類型時(比如JavaRDD,Student是自定義類型),所有自定義類型對象,都會進行序列化。因此這種情況下,也要求自定義的類必須實現Serializable接口。
使用可序列化的持久化策略時(比如MEMORY_ONLY_SER),Spark會將RDD中的每個partition都序列化成一個大的字節數組。
對於這三種出現序列化的地方,我們都可以通過使用Kryo序列化類庫,來優化序列化和反序列化的性能。Spark默認使用的是Java的序列化機制,也就是ObjectOutputStream/ObjectInputStream API來進行序列化和反序列化。但是Spark同時支持使用Kryo序列化庫,Kryo序列化類庫的性能比Java序列化類庫的性能要高很多。官方介紹,Kryo序列化機制比Java序列化機制,性能高10倍左右。Spark之所以默認沒有使用Kryo作為序列化類庫,是因為Kryo要求最好要注冊所有需要進行序列化的自定義類型,因此對於開發者來說,這種方式比較麻煩。
以下是使用Kryo的代碼示例,我們只要設置序列化類,再注冊要序列化的自定義類型即可(比如算子函數中使用到的外部變量類型、作為RDD泛型類型的自定義類型等):
// 創建SparkConf對象。 val conf = new SparkConf().setMaster(...).setAppName(...) // 設置序列化器為KryoSerializer。 conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer") // 注冊要序列化的自定義類型。 conf.registerKryoClasses(Array(classOf[MyClass1], classOf[MyClass2]))
-
結果
經過種種調試優化,我們最終要達到的目的是,Spark Streaming能夠實時的拉取Kafka當中的數據,並且能夠保持穩定,如下圖所示:結果當然不同的應用場景會有不同的圖形,這是本文詞頻統計優化穩定后的監控圖,我們可以看到Processing Time這一柱形圖中有一Stable的虛線,而大多數Batch都能夠在這一虛線下處理完畢,說明整體Spark Streaming是運行穩定的。
附:
使用SparkStreaming集成kafka時有幾個比較重要的參數:
(1)spark.streaming.stopGracefullyOnShutdown (true / false)默認fasle
確保在kill任務時,能夠處理完最后一批數據,再關閉程序,不會發生強制kill導致數據處理中斷,沒處理完的數據丟失
(2)spark.streaming.backpressure.enabled (true / false) 默認false
開啟后spark自動根據系統負載選擇最優消費速率
(3)spark.streaming.backpressure.initialRate (整數) 默認直接讀取所有
在(2)開啟的情況下,限制第一次批處理應該消費的數據,因為程序冷啟動 隊列里面有大量積壓,防止第一次全部讀取,造成系統阻塞
(4)spark.streaming.kafka.maxRatePerPartition (整數) 默認直接讀取所有
限制每秒每個消費線程讀取每個kafka分區最大的數據量
注意:
只有(4)激活的時候,每次消費的最大數據量,就是設置的數據量,如果不足這個數,就有多少讀多少,如果超過這個數字,就讀取這個數字的設置的值
只有(2)+(4)激活的時候,每次消費讀取的數量最大會等於(4)設置的值,最小是spark根據系統負載自動推斷的值,消費的數據量會在這兩個范圍之內變化根據系統情況,但第一次啟動會有多少讀多少數據。此后按(2)+(4)設置規則運行
(2)+(3)+(4)同時激活的時候,跟上一個消費情況基本一樣,但第一次消費會得到限制,因為我們設置第一次消費的頻率了。
參考: