- 系統背景
--driver-memory 50G
- spark streaming + Kafka高級API receiver
- 目前資源分配(現在系統比較穩定的資源分配),獨立集群
--executor-memory 8G
--num-executors 11
--executor-cores 5
- 廣播變量
1. 廣播變量的初始化
1.1.executor端,存放廣播變量的對象使用非靜態,因為靜態變量是屬於類的,不能使用構造函數來初始化。在executor端使用靜態的時候,它只是定義的時候的一個狀態,而在初始化時設置的值取不到。而使用非靜態的對象,其構造函數的初始化在driver端執行,故在集群可以取到廣播變量的值。
2. 廣播變量的釋放
2.1.當filter增量為指定大小時,進行廣播,雖然廣播的是同一個對象,但是,廣播的ID是不一樣的,而且ID號越來越大,這說明對於廣播來說,它並不是一個對象,而只是名字一樣的不同對象,如果不對廣播變量進行釋放,將會導致executor端內存占用越來越大,而一直沒有使用的廣播變量,被進行GC,會導致GC開銷超過使用上線,導致程序失敗。
2.2.解決方案:這廣播之前,先調用unpersist()方法,釋放不用的廣播變量
- 使用Kafka 的高級API receiver
1. 在使用receiver高級API時,由於receiver、partition、executor的分配關系,經常會導致某個executor任務比較繁重,進而影響整體處理速度
1.1.最好是一個receiver對應一個executor
2. 由於前段時間數據延遲比較嚴重,就想,能不能讓所有executor的cores都去處理數據?所以調整receiver為原來的四倍,結果系統啟動時,就一下沖上來非常大的數據量,導致系統崩潰,可見,receiver不僅跟partition的分配有關,還跟數據接收量有關
3. 在實際處理數據中,由於消息延遲,可以看到,有的topic處理速度快有的慢,原因分析如下:
3.1.跟消息的格式有關,有的是序列化文件,有的事json格式,而json的解析相對於比較慢
3.2.有時候拖累整個集群處理速度的,除了大量數據,還跟單條數據的大小有關
以下是程序跑掛的一些異常,和原因分析
問題矯正:
第一張圖片的,解決方案的倒數第二個, spark.memory.storageFraction(動態內存的百分比設置),應該為spark.storage.memoryFraction(靜態內存分配的設置) (由於原文檔丟失,導致無法修改文檔。)
如果有什么問題,歡迎大家指出,共同探討,共同進步