1) --conf spark.dynamicAllocation.enabled=false
如果正在使用的是CDH的Spark,修改這個配置為false;開源的Spark版本則默認是false。
當為true時,即使指定了num-executors個數,spark-streaming應用也會占用整個集群的資源。
2) --conf spark.streaming.concurrentJobs=10
這個配置項的默認值為1,代表着新的batch過來之后只能在隊列中等待之前的batch執行完之后再執行。
如果batch執行的時間超過了batch本身的時間,可以將該配置增大。
修改該配置的風險:kafka的單個partition只支持順序消費,如果排在后面的batch先執行完成,kafka consumer 在commit offset時會出現混亂。
建議,使用之前充分評估風險,否則盡量不修改該配置。
3) cache persist 和broadcast的選擇
使用spark-streaming的應用一般是實時或者准實時的應用,所以需要預加載的變量(如模型,矩陣等),一般不會選擇cache和persist,而是使用廣播變量broadcast(只讀,類似於全局變量,但是如果在spark中直接使用全局變量會大幅降低程序性能)。
另一方面,將rdd/df的cache改為map(key,value)形式后進行廣播,可以在需要對該rdd/df進行join的地方采用rdd.map{m=>get(m.key)}的形式來代替。減少了join帶來的開銷。
4)預加載broadcast變量
廣播變量是懶加載的,首次在dataDStream.foreachRDD中使用該廣播變量會導致第一批數據處理比較慢,廣播變量越大延遲也越大。
懶加載在Spark離線任務中是比較好的策略,但是對線上實時推薦來說,延遲10s以上的行為數據可能都已經沒有處理價值了。
所以可以在還沒有進入到foreachRDD中時,先讓廣播變量能夠預加載到每台服務器,設置kafka讀取的offset為latest,這樣能夠保證spark-streaming總是能夠處理到最新的數據。
預加載的方法利用了懶加載的性質,隨便新建一個df,按照executor的個數repartition之后,在每個partition中讀取廣播變量的value中的任意一個值(不存在的也可以),這樣就能保證每個executor都能加載到該廣播變量。
someDF.repartition(sparkSession.sparkContext.getConf.getInt("spark.executor.instances", 10)).foreachPartition {
p =>
bcVariables.value.get("_")
}
5)去掉所有不必要的join
join確實有很多可以優化的配置,但是沒必要把時間花在join的優化上,尤其是在可以用廣播變量來作為代替方案的情況下。
需要注意的是,廣播變量和broadcast join是不一樣的,前者效率在大部分時候要更高。
6)kafka partition個數和executor個數的關系
executor個數要能被partition個數整除。例如,如果partition個數為24個,那么12個executor和18個executor處理數據的性能差距不大。如果集群可以分配的executor個數為18個,那么partition數可以從24個調整為18個(或者36個等等)。
原因比較明顯,就不多提了。展示幾個實驗數據
下圖為性能測試實驗中3,6,12個executor下數據處理時間(縱坐標)和數據量(橫坐標)的關系,是明顯的線性關系。

下圖為性能測試實驗中在處理600條數據時,executor數(橫坐標)和時間(縱坐標)的關系(分區個數為executor的整數倍)

由圖一的三組數據也可以看出,每秒能處理的數據的條數和executor的個數約等於線性關系。即如果當前集群每3秒能處理x條數據,那么集群擴容一倍后,每3秒應該能處理2x條數據。
由圖二可看出,executor數和數據的處理時間不是簡單的線性關系,也就是說,如果當前集群處理100條數據耗時6秒,並不能保證將集群擴容一倍后100條數據的處理時間變為3秒。
7)kafka的hash分區
kafka的各個分區處理的數據應該保證盡量按照某一特征(比如用戶id)hash分區,這樣能夠保證某一用戶的所有記錄都在某一個partition,這樣spark-streaming在處理reduceByKey時會提升效率。
8)提交任務時指定sql shuffle partition ,否則默認是200
--conf spark.sql.shuffle.partitions=6