sparkStreaming實時數據處理的優化方面


1.並行度

在direct方式下,sparkStreaming的task數量是等於kafka的分區數,kakfa單個分區的一般吞吐量為10M/s

常規設計下:kafka的分區數一般為broken節點的3,6,9倍比較合理

比如我的集群有6個broken節點,創建kafka的分區為18個,sparkStreaming的task也為18個,當然也可以適當放大分區,根據自己的數據量來合理規划集群及分區數

2.序列化

java的序列化很沉重,會序列化好多無關的(時間長)

舉例:100000個簡單的對象,序列化時間對比

java原生序列化時間:8637 ms
java原生反序列化時間:5851 ms

Kryo 序列化時間:455 ms
Kryo 反序列化時間:207 ms

 對對象進行序列化注冊
sparkConf.registerKryoClasses( Array( classOf[OrderInfo], classOf[Opt_alliance_business], classOf[DriverInfo], classOf[RegisterUsers] , classOf[Reservation] )

3.限流與背壓

不開啟背壓:每秒鍾從kafka每一個分區拉取的數據量是無限制--剛啟動程序時kafka堆積的數大量據都會直接被短時間進行消費,消費不及時,可能會發生內存溢出
開啟限流:spark.streaming.kafka.maxRatePerPartition
開啟背壓:流啟動之后 --》checkpoint --》metastore 流信息無法更改
舉例:

sparkConf.set("spark.streaming.backpressure.initialRate" , "500") 初始速率500條/s
sparkConf.set("spark.streaming.backpressure.enabled" , "true") 開啟壓背
sparkConf.set("spark.streaming.kafka.maxRatePerPartition" , "5000") 最大速度不超過5000條
4.cpu空轉-->task 如果task沒拿到數據,也會進行資源消耗
spark.locality.wait 3s

 

5.不要在代碼中判斷這個表是否存在不要在實時代碼里面判斷表是否存在,耗時

6、推測執行

推測執行:就是把執行失敗task的轉移到另一個executor
 場景:task失敗造成重試(task啟動、壓縮、序列化),如果每次task執行3秒失敗重試8次需要消耗24秒
sparkConf.set("spark.speculation.interval", "300") 推測執行間隔
sparkConf.set("spark.speculation.quantile","0.9") 推測執行完成的task的閾值

7.關於某個task的執行的任務運行兩個小時都運行不完
場景:yarn日志報錯:shuffle落地文件找不到、shuffle文件打不開  也會造成task失敗 ,spark 4105 shuffle fetch 錯誤

原因:shuffle1 過程 writeshuffle=》落地(默認lz4)=》readshuffle,寫的匯聚shuffle文件被下游的節點打不開或者讀取不到,可能是壓縮的原因,壓縮文件打不開

spark4105錯誤地址:https://issues.apache.org/jira/browse/SPARK-4105
解決:開啟推測執行 =》轉移任務,關閉shuffle壓縮設置(也就是增加了節點直接傳輸的文件大小,加大了IO),重新跑數據


8.hashshuffle與sortshuffle

https://www.jianshu.com/p/fafef67c203c

------------恢復內容結束------------


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM