1、在內存中緩存數據
性能調優主要是將數據放入內存中操作,spark緩存注冊表的方法
版本 | 緩存 | 釋放緩存 |
spark2.+ | spark.catalog.cacheTable("tableName")緩存表 | spark.catalog.uncacheTable("tableName")解除緩存 |
spark1.+ | sqlContext.cacheTable("tableName")緩存 | sqlContext.uncacheTable("tableName") 解除緩存 |
Demo案例:
(1)從Oracle數據庫中讀取數據,生成DataFrame
val oracleDF = spark.read.format("jdbc") .option("url","jdbc:oracle:thin:@192.168.88.101:1521/orcl.example.com") .option("dbtable","scott.emp") .option("user","scott") .option("password","tiger").load
(2)將DataFrame注冊成表:
oracleDF.registerTempTable("emp")
(3)執行查詢,並通過Web Console監控執行的時間
spark.sql("select * from emp").show
(4)將表進行緩存,並查詢兩次,並通過Web Console監控執行的時間
spark.catalog.cacheTable("emp")
(5)清空緩存:
spark.catalog.uncacheTable("emp")
2、性能優化相關參數
Sparksql僅僅會緩存必要的列,並且自動調整壓縮算法來減少內存和GC壓力。
屬性 | 默認值 | 描述 |
spark.sql.inMemoryColumnarStorage.compressed | true | Spark SQL 將會基於統計信息自動地為每一列選擇一種壓縮編碼方式。 |
spark.sql.inMemoryColumnarStorage.batchSize | 10000 | 緩存批處理大小。緩存數據時, 較大的批處理大小可以提高內存利用率和壓縮率,但同時也會帶來 OOM(Out Of Memory)的風險。 |
spark.sql.files.maxPartitionBytes | 128 MB | 讀取文件時單個分區可容納的最大字節數(不過不推薦手動修改,可能在后續版本自動的自適應修改) |
spark.sql.files.openCostInBytes | 4M | 打開文件的估算成本, 按照同一時間能夠掃描的字節數來測量。當往一個分區寫入多個文件的時候會使用。高估更好, 這樣的話小文件分區將比大文件分區更快 (先被調度)。 |
3. 廣播
在進行表join的時候,將小表廣播可以提高性能,spark2.+中可以調整以下參數、
屬性 | 默認值 | 描述 |
spark.sql.broadcastTimeout | 300 | 廣播等待超時時間,單位秒 |
spark.sql.autoBroadcastJoinThreshold | 10M | 用於配置一個表在執行 join 操作時能夠廣播給所有 worker 節點的最大字節大小。通過將這個值設置為 -1 可以禁用廣播。注意,當前數據統計僅支持已經運行了 ANALYZE TABLE <tableName> COMPUTE STATISTICS noscan 命令的 Hive Metastore 表。 |
注:在任務超多,廣播變量在跨stage使用數據的時候才能凸顯其真正作用。
4. 分區數據的調控
spark任務並行度的設置中,spark有兩個參數可以設置
屬性 | 默認值 | 描述 |
spark.sql.shuffle.partitions | 200 | 用於配置 join 或aggregate混洗(shuffle)數據時使用的分區數。 |
spark.default.parallelism | 對於分布式shuffle操作像reduceByKey和join,父RDD中分區的最大數目。 對於無父RDD的並行化等操作,它取決於群集管理器: -本地模式:本地計算機上的核心數 -Mesos fine grained mode:8 -其他:所有執行節點上的核心總數或2,以較大者為准 |
分布式shuffle操作的分區數 |
看起來它們的定義似乎也很相似,但在實際測試中,
- spark.default.parallelism只有在處理RDD時才會起作用,對Spark SQL的無效。
- spark.sql.shuffle.partitions則是對sparks SQL專用的設置
5. 文件與分區
這個總共有兩個參數可以調整:
- 讀取文件的時候一個分區接受多少數據;
- 文件打開的開銷,通俗理解就是小文件合並的閾值。
文件打開是有開銷的,開銷的衡量,Spark 采用了一個比較好的方式就是打開文件的開銷用,相同時間能掃描的數據的字節數來衡量。
參數介紹如下:
屬性 | 默認值 | 描述 |
spark.sql.files.maxPartitionBytes | 134217728 (128 MB) | 打包傳入一個分區的最大字節,在讀取文件的時候 |
spark.sql.files.openCostInBytes | 4194304 (4 MB) | 用相同時間內可以掃描的數據的大小來衡量打開一個文件的開銷。當將多個文件寫入同一個分區的時候該參數有用。該值設置大一點有好處,有小文件的分區會比大文件分區處理速度更快(優先調度) |
spark.sql.files.maxPartitionBytes該值的調整要結合你想要的並發度及內存的大小來進行。
spark.sql.files.openCostInBytes說直白一些這個參數就是合並小文件的閾值,小於這個閾值的文件將會合並
6.文件格式
建議使用parquet或者orc。
parquet已經可以達到很大的性能了