spark2+的sql 性能調優


1、在內存中緩存數據

性能調優主要是將數據放入內存中操作,spark緩存注冊表的方法

版本 緩存 釋放緩存
spark2.+ spark.catalog.cacheTable("tableName")緩存表 spark.catalog.uncacheTable("tableName")解除緩存
spark1.+ sqlContext.cacheTable("tableName")緩存 sqlContext.uncacheTable("tableName") 解除緩存

 

Demo案例:

(1)從Oracle數據庫中讀取數據,生成DataFrame

val oracleDF = spark.read.format("jdbc")
.option("url","jdbc:oracle:thin:@192.168.88.101:1521/orcl.example.com")
.option("dbtable","scott.emp")
.option("user","scott")
.option("password","tiger").load

(2)將DataFrame注冊成表:   

 oracleDF.registerTempTable("emp")

(3)執行查詢,並通過Web Console監控執行的時間

spark.sql("select * from emp").show

(4)將表進行緩存,並查詢兩次,並通過Web Console監控執行的時間

spark.catalog.cacheTable("emp")

(5)清空緩存:

spark.catalog.uncacheTable("emp")

2、性能優化相關參數

Sparksql僅僅會緩存必要的列,並且自動調整壓縮算法來減少內存和GC壓力。

屬性 默認值 描述
spark.sql.inMemoryColumnarStorage.compressed true Spark SQL 將會基於統計信息自動地為每一列選擇一種壓縮編碼方式。
spark.sql.inMemoryColumnarStorage.batchSize 10000 緩存批處理大小。緩存數據時, 較大的批處理大小可以提高內存利用率和壓縮率,但同時也會帶來 OOM(Out Of Memory)的風險。
spark.sql.files.maxPartitionBytes 128 MB 讀取文件時單個分區可容納的最大字節數(不過不推薦手動修改,可能在后續版本自動的自適應修改)
spark.sql.files.openCostInBytes 4M 打開文件的估算成本, 按照同一時間能夠掃描的字節數來測量。當往一個分區寫入多個文件的時候會使用。高估更好, 這樣的話小文件分區將比大文件分區更快 (先被調度)。

3. 廣播

在進行表join的時候,將小表廣播可以提高性能,spark2.+中可以調整以下參數、

屬性 默認值 描述
spark.sql.broadcastTimeout 300 廣播等待超時時間,單位秒
spark.sql.autoBroadcastJoinThreshold 10M 用於配置一個表在執行 join 操作時能夠廣播給所有 worker 節點的最大字節大小。通過將這個值設置為 -1 可以禁用廣播。注意,當前數據統計僅支持已經運行了 ANALYZE TABLE <tableName> COMPUTE STATISTICS noscan 命令的 Hive Metastore 表。

注:在任務超多,廣播變量在跨stage使用數據的時候才能凸顯其真正作用。

4. 分區數據的調控

spark任務並行度的設置中,spark有兩個參數可以設置

屬性 默認值 描述
spark.sql.shuffle.partitions 200 用於配置 join 或aggregate混洗(shuffle)數據時使用的分區數。
spark.default.parallelism

對於分布式shuffle操作像reduceByKey和join,父RDD中分區的最大數目。

對於無父RDD的並行化等操作,它取決於群集管理器:

-本地模式:本地計算機上的核心數

-Mesos fine grained mode:8

-其他:所有執行節點上的核心總數或2,以較大者為准

分布式shuffle操作的分區數

 

看起來它們的定義似乎也很相似,但在實際測試中,

  • spark.default.parallelism只有在處理RDD時才會起作用,對Spark SQL的無效。
  • spark.sql.shuffle.partitions則是對sparks SQL專用的設置

5. 文件與分區

這個總共有兩個參數可以調整:

  • 讀取文件的時候一個分區接受多少數據;
  • 文件打開的開銷,通俗理解就是小文件合並的閾值。

文件打開是有開銷的,開銷的衡量,Spark 采用了一個比較好的方式就是打開文件的開銷用,相同時間能掃描的數據的字節數來衡量。

參數介紹如下:

屬性 默認值 描述
spark.sql.files.maxPartitionBytes 134217728 (128 MB) 打包傳入一個分區的最大字節,在讀取文件的時候
spark.sql.files.openCostInBytes 4194304 (4 MB) 用相同時間內可以掃描的數據的大小來衡量打開一個文件的開銷。當將多個文件寫入同一個分區的時候該參數有用。該值設置大一點有好處,有小文件的分區會比大文件分區處理速度更快(優先調度)

spark.sql.files.maxPartitionBytes該值的調整要結合你想要的並發度及內存的大小來進行。

spark.sql.files.openCostInBytes說直白一些這個參數就是合並小文件的閾值,小於這個閾值的文件將會合並

6.文件格式

建議使用parquet或者orc。

parquet已經可以達到很大的性能了

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM