【From】 https://blog.csdn.net/u010990043/article/details/82842995
最近整理了一下spark SQL內置配。加粗配置項是對sparkSQL 調優性能影響比較大的項,小伙伴們按需酌情配置。后續會挑出一些通用調優配置,共大家參考。有不正確的地方,歡迎大家在留言區留言討論。
配置項 | 默認值 | 概述 |
spark.sql.optimizer.maxIterations | 100 | sql優化器最大迭代次數 |
spark.sql.optimizer.inSetConversionThreshold | 10 | 插入轉換的集合大小閾值 |
spark.sql.inMemoryColumnarStorage.compressed | TRUE | 當設置為true時,SCAPK SQL將根據數據的統計自動為每個列選擇壓縮編解碼器 |
spark.sql.inMemoryColumnarStorage.batchSize | 10000 | 控制用於列緩存的批處理的大小。較大的批處理大小可以提高內存利用率和壓縮率,但緩存數據時會出現OOM風險 |
spark.sql.inMemoryColumnarStorage.partitionPruning | TRUE | 啟用內存中的列表分區剪枝 |
spark.sql.join.preferSortMergeJoin | TRUE | When true, 使用sort merge join 代替 shuffle hash join |
spark.sql.sort.enableRadixSort | TRUE | 使用基數排序,基數排序性能非常快,但是會額外使用over heap.當排序比較小的Row時,overheap 需要提高50% |
spark.sql.autoBroadcastJoinThreshold | 10L * 1024 * 1024 | 當執行join時,被廣播到worker節點上表最大字節。當被設置為-1,則禁用廣播。當前僅僅支持 Hive Metastore tables,表大小的統計直接基於hive表的源文件大小 |
spark.sql.limit.scaleUpFactor | 4 | 在執行查詢時,兩次嘗試之間讀取partation數目的增量。較高的值會導致讀取過多分區,較少的值會導致執行時間過長,因為浙江運行更多的作業 |
spark.sql.statistics.fallBackToHdfs | FALSE | 當不能從table metadata中獲取表的統計信息,返回到hdfs。這否有用取決與表是否足夠小到能夠使用auto broadcast joins |
spark.sql.defaultSizeInBytes | Long.MaxValue | 在查詢計划中表默認大小,默認被設置成Long.MaxValue 大於spark.sql.autoBroadcastJoinThreshold的值,也就意味着默認情況下不會廣播一個表,除非他足夠小 |
spark.sql.shuffle.partitions | 200 | 當為join/aggregation shuffle數據時,默認partition的數量 |
spark.sql.adaptive.shuffle.targetPostShuffleInputSize | 64 * 1024 * 1024byte | The target post-shuffle input size in bytes of a task. |
spark.sql.adaptive.enabled | FALSE | 是否開啟adaptive query execution(自適應查詢執行) |
spark.sql.adaptive.minNumPostShufflePartitions | -1 | 測試用 |
spark.sql.subexpressionElimination.enabled | TRUE | When true, common subexpressions will be eliminated 當為真時,將刪除公共子表達式 |
spark.sql.caseSensitive | FALSE | 查詢分析器是否區分大小寫,默認情況下不區分。強烈建議不區分大小寫 |
spark.sql.constraintPropagation.enabled | 是否開啟優化,在查詢優化器期間推斷和傳播查詢計划中的數據約束。對於某種類型的查詢計划(例如有大量謂語和別名的查詢),約束傳播是昂貴的,會對整個運行時間產生負面影響。 | |
spark.sql.parser.escapedStringLiterals | FALSE | 2.0之前默認值為true,知否默認是否。正常文字能否包含在正則表達式中。 |
spark.sql.parquet.mergeSchema | FALSE | 若為true,在讀取parquet數據源時,schema從所有文件中合並出來。否則如果沒有可用的摘要文件,則從概要文件或隨機文件中選擇模式 |
spark.sql.parquet.respectSummaryFiles | FALSE | 若為ture,假設parquet的所有部分文件和概要文件一致,在合並模式時會忽略他們。否則將會合並所有的部分文件 |
spark.sql.parquet.binaryAsString | FALSE | 是否向下兼容其他parquet生產系統(eg impala or older version spark sql ),不區分字節數據和string數據寫到parquet schema,這個配置促使spark sql將二進制數據作為string達到兼容 |
spark.sql.parquet.int96AsTimestamp | TRUE | 是否使用Int96作為timestamp的存儲格式,可以避免精度損失丟失納秒部分,為其他parquet系統提供兼容(impala) |
spark.sql.parquet.int64AsTimestampMillis | FALSE | 當為true,timestamp值將以Int64作為mlibs的存儲擴展類型,這種模式微秒將被丟棄 |
spark.sql.parquet.cacheMetadata | TRUE | 是否緩存parquet的schema數據元,可以提升靜態數據的查詢性能 |
spark.sql.parquet.compression.codec | snappy | 支持類型:uncompressed", "snappy", "gzip", "lzo"。 指定parquet寫文件的壓縮編碼方式 |
spark.sql.parquet.filterPushdown | TRUE | 是否開啟parquet過濾條件下推 |
spark.sql.parquet.writeLegacyFormat | FALSE | spark sql在拼接schema時是否遵循parquet的schema的規范 |
spark.sql.parquet.output.committer.class | org.apache.parquet.hadoop.ParquetOutputCommitter | parquet輸出提交器類,同城必須是org.apache.hadoop.mapreduce.OutputCommitter的子類,如果不是將不會創建數據源摘要,即使配置開啟了parquet.enable.summary-metadata |
spark.sql.parquet.enableVectorizedReader | TRUE | 開啟parquet向量解碼 |
spark.sql.orc.filterPushdown | FALSE | 是否開啟條件下推到orc文件寫 |
spark.sql.hive.verifyPartitionPath | FALSE | 當為true時,在讀取HDFS中存儲的數據時,檢查表根目錄下的所有分區路徑 |
spark.sql.hive.metastorePartitionPruning | TRUE | 當為true,spark sql的謂語將被下推到hive metastore中,更早的消除不匹配的分區,會影響到違背轉換成文件源關系的hive表 |
spark.sql.hive.manageFilesourcePartitions | TRUE | 是否使用hive metastore管理spark sql的 dataSource表分區,若為true,dataSource表會在執行計划期間使用分區剪枝 |
spark.sql.hive.filesourcePartitionFileCacheSize | 250 * 1024 * 1024 | 當非0時,開啟將分區文件數據元緩存到內存中,所有表共享一個緩存,當開啟 hive filesource partition management(spark.sql.hive.manageFilesourcePartitions)時才會生效 |
spark.sql.hive.caseSensitiveInferenceMode | INFER_AND_SAVE | 設置無法從hive表屬性讀取分區大小寫模式時所采取的操作,雖然Spice SQL本身不區分大小寫,但hive兼容的文件格式如parquet。Spark sql必須使用一個保持情況的模式,當查詢由包含區分大小寫字段名或查詢的文件支持的任何表可能無法返回准確的結果時。有效選項包括INFER_AND_SAVE(默認模式——從基礎數據文件推斷出區分大小寫的模式,並將其寫入表屬性),INFER_ONLY(推斷schema但不嘗試將其寫入表屬性)和NEVER_INFER(回退到使用區分大小寫間接轉移模式代替推斷) |
spark.sql.optimizer.metadataOnly | TRUE | 當為true時,啟用僅使用表的元數據的元數據查詢優化來生成分區列,而不是表掃描。當掃描的所有列都是分區列,並且查詢具有滿足不同語義的聚合運算符時,它適用。 |
spark.sql.columnNameOfCorruptRecord | _corrupt_record | 當json/csv數據內部列解析失敗時,失敗列的名稱 |
spark.sql.broadcastTimeout" | 5*60 | 在broadCast join時 ,廣播等待的超時時間 |
spark.sql.thriftserver.scheduler.pool | 為JDBC客戶端會話設置公平調度程序池 | |
spark.sql.thriftServer.incrementalCollect | FALSE | 當TRUE時,啟用增量集合以在thrift server中執行 |
spark.sql.thriftserver.ui.retainedStatements | 200 | JDBC/ODBC Web用戶界面歷史記錄中SQL語句的數量 |
spark.sql.thriftserver.ui.retainedSessions | 200 | JDBC/ODBC Web UI歷史中保存的SQL客戶端會話數 |
spark.sql.sources.default | parquet | 輸入輸出默認數據元 |
spark.sql.hive.convertCTAS | FALSE | 如果時true,將使用spark.sql.sources.default.設置數據源,不指定任何存儲屬性到hive ctas語句 |
spark.sql.hive.gatherFastStats | TRUE | 在修復表分區時,將快速收集STATS(文件數量和所有文件的總大小),以避免HIVE轉移子中的順序列表。 |
spark.sql.sources.partitionColumnTypeInference.enabled | TRUE | 是否自動推斷分區列的數據類型 |
spark.sql.sources.bucketing.enabled | TRUE | 當false時,分桶表當作普通表處理 |
spark.sql.crossJoin.enabled | FALSE | 當false時,如果查詢中語法笛卡兒積 卻語法中沒有顯示join,將會拋出異常 |
spark.sql.orderByOrdinal | TRUE | 當為true時,排序字段放置到seleect List,否則被忽略 |
spark.sql.groupByOrdinal | TRUE | 當為true時,按組子句的序號被視為選擇列表中的位置。當為false時,序數被忽略。 |
spark.sql.groupByAliases | TRUE | group by后的別名是否能夠被用到 select list中,若為否將拋出分析異常 |
spark.sql.sources.parallelPartitionDiscovery.threshold | 32 | 允許在driver端列出文件的最大路徑數。如果在分區發現期間檢測到的路徑的數量超過該值,則嘗試用另一個SCAPLE分布式作業來列出文件。這適用於parquet、ORC、CSV、JSON和LIbSVM數據源。 |
spark.sql.sources.parallelPartitionDiscovery.parallelism | 10000 | 遞歸地列出路徑集合的並行數,設置阻止文件列表生成太多任務的序號 |
spark.sql.selfJoinAutoResolveAmbiguity | TRUE | 自動解決子鏈接中的連接條件歧義,修復bug SPARK-6231 |
spark.sql.retainGroupColumns | TRUE | 是否保留分組列 |
spark.sql.pivotMaxValues | 10000 | |
spark.sql.runSQLOnFiles | TRUE | 當為true,在sql查詢時,能夠使用dataSource.path作為表(eg:"select a,b from hdfs://xx/xx/*") |
spark.sql.codegen.wholeStage | TRUE | 當為true,多個算子的整個stage將被便宜到一個java方法中 |
spark.sql.codegen.maxFields | 100 | 在激活整個stage codegen之前支持的最大字段(包括嵌套字段) |
spark.sql.codegen.fallback | TRUE | 當為true,在整個stage的codegen,對於編譯generated code 失敗的query 部分,將會暫時關閉 |
spark.sql.codegen.maxCaseBranches | 20 | 支持最大的codegen |
spark.sql.files.maxPartitionBytes | 128 * 1024 * 1024 | 在讀取文件時,一個分區最大被讀取的數量,默認值=parquet.block.size |
spark.sql.files.openCostInBytes | 4 * 1024 * 1024 | 為了測定打開一個文件的耗時,通過同時掃描配置的字節數來測定,最好是過度估計,那么小文件的分區將比具有較大文件的分區更快(首先調度 |
spark.sql.files.ignoreCorruptFiles | FALSE | 是否自動跳過不正確的文件 |
spark.sql.files.maxRecordsPerFile | 0 | 寫入單個文件的最大條數,如果時0或者負數,則無限制 |
spark.sql.exchange.reuse | TRUE | planer是否嘗試找出重復的 exchanges並復用 |
spark.sql.streaming.stateStore.minDeltasForSnapshot | 10 | 在合並成快照之前需要生成的狀態存儲增量文件的最小數目 |
spark.sql.streaming.checkpointLocation | 檢查點數據流的查詢的默認存儲位置 | |
spark.sql.streaming.minBatchesToRetain | 100 | 流式計算最小批次長度 |
spark.sql.streaming.unsupportedOperationCheck | TRUE | streaming query的logical plan 檢查不支持的操作 |
spark.sql.variable.substitute | TRUE | |
spark.sql.codegen.aggregate.map.twolevel.enable | 啟用兩級聚合哈希映射。當啟用時,記錄將首先“插入/查找第一級、小、快的映射,然后在第一級滿或無法找到鍵時回落到第二級、更大、較慢的映射。當禁用時,記錄直接進入第二級。默認為真 | |
spark.sql.view.maxNestedViewDepth | 100 | 嵌套視圖中視圖引用的最大深度。嵌套視圖可以引用其他嵌套視圖,依賴關系被組織在有向無環圖(DAG)中。然而,DAG深度可能變得太大,導致意外的行為。此配置限制了這一點:當分析期間視圖深度超過該值時,我們終止分辨率以避免潛在錯誤。 |
spark.sql.objectHashAggregate.sortBased.fallbackThreshold | 128 | 在ObjectHashAggregateExec的情況下,當內存中哈希映射的大小增長過大時,我們將回落到基於排序的聚合。此選項為哈希映射的大小設置行計數閾值。 |
spark.sql.execution.useObjectHashAggregateExec | TRUE | 是否使用 ObjectHashAggregateExec |
spark.sql.streaming.fileSink.log.deletion | TRUE | 是否刪除文件流接收器中的過期日志文件 |
spark.sql.streaming.fileSink.log.compactInterval | 10 | 日志文件合並閾值,然后將所有以前的文件壓縮到下一個日志文件中 |
spark.sql.streaming.fileSink.log.cleanupDelay | 10min | 保證一個日志文件被所有用戶可見的時長 |
spark.sql.streaming.fileSource.log.deletion | TRUE | 是否刪除文件流源中過期的日志文件 |
spark.sql.streaming.fileSource.log.compactInterval | 10 | 日志文件合並閾值,然后將所有以前的文件壓縮到下一個日志文件中 |
spark.sql.streaming.fileSource.log.cleanupDelay | 10min | 保證一個日志文件被所有用戶可見的時長 |
spark.sql.streaming.schemaInference | FALSE | 基於文件的流,是否推斷它的模式 |
spark.sql.streaming.pollingDelay | 10L(MILLISECONDS) | 在沒有數據可用時延遲查詢新數據多長時間 |
spark.sql.streaming.noDataProgressEventInterval | 10000L(MILLISECONDS) | 在沒有數據的情況下,在兩個進度事件之間等待時間 |
spark.sql.streaming.metricsEnabled | FALSE | 是否為活動流查詢報告DoopWalth/CODAHALE度量 |
spark.sql.streaming.numRecentProgressUpdates | 100 | streaming query 保留的進度更新數量 |
spark.sql.statistics.ndv.maxError | 0.05 | 生成列級統計量時超對數G+++算法允許的最大估計誤差 |
spark.sql.cbo.enabled | FALSE | 在設定true時啟用CBO來估計計划統計信息 |
spark.sql.cbo.joinReorder.enabled | FALSE | Enables join reorder in CBO. |
spark.sql.cbo.joinReorder.dp.threshold | 12 | The maximum number of joined nodes allowed in the dynamic programming algorithm |
spark.sql.cbo.joinReorder.card.weight | 0.07 | The weight of cardinality (number of rows) for plan cost comparison in join reorder: rows * weight + size * (1 - weight) |
spark.sql.cbo.joinReorder.dp.star.filter | FALSE | Applies star-join filter heuristics to cost based join enumeration |
spark.sql.cbo.starSchemaDetection | FALSE | When true, it enables join reordering based on star schema detection |
spark.sql.cbo.starJoinFTRatio | 0.9 | Specifies the upper limit of the ratio between the largest fact tables for a star join to be considered |
spark.sql.session.timeZone | TimeZone.getDefault.getID | 時間時區 |
spark.sql.windowExec.buffer.in.memory.threshold | 4096 | 窗口操作符保證存儲在內存中的行數的閾值 |
spark.sql.windowExec.buffer.spill.threshold | spark.sql.windowExec.buffer.in.memory.threshold | 窗口操作符溢出的行數的閾值 |
spark.sql.sortMergeJoinExec.buffer.in.memory.threshold | Int.MaxValue | 由sortMergeJoin運算符保證存儲在內存中的行數的閾值 |
spark.sql.sortMergeJoinExec.buffer.spill.threshold | spark.sql.sortMergeJoinExec.buffer.in.memory.threshold | 由排序合並連接運算符溢出的行數的閾值 |
spark.sql.cartesianProductExec.buffer.in.memory.threshold | 4096 | 笛卡爾乘積算子保證存儲在內存中的行數的閾值 |
spark.sql.cartesianProductExec.buffer.spill.threshold | spark.sql.cartesianProductExec.buffer.in.memory.threshold | 笛卡爾乘積算子溢出的行數閾值 |
spark.sql.redaction.options.regex | "(?i)url".r |
即便join的hive表沒有10M,也沒有觸發 mapjoin[解決方案]
spark在join的時候,用來判斷一個表的大小是否達到了10M這個限制,是不會去計算這個表在hdfs上的具體的文件大小的,而是使用hive metadata中的信息,具體如下圖:
explain出來spark的執行計划如下:
== Physical Plan ==
*Project [device#57, pkg#58]
+- *BroadcastHashJoin [pkg#58], [apppkg#62], Inner, BuildRight
:- *Filter isnotnull(pkg#58)
: +- HiveTableScan [device#57, pkg#58], MetastoreRelation dm_sdk_mapping, device_applist, [isnotnull(day#56), (cast(day#56 as double) = 2.0180501E7)]
+- BroadcastExchange HashedRelationBroadcastMode(List(input[0, string, false]))
+- *Filter isnotnull(apppkg#62)
+- HiveTableScan [apppkg#62], MetastoreRelation dm_sdk_mapping, app_category_mapping
當有些hive沒有totalSize這個信息的時候,spark就會用sortMergeJoin來做join了,可以使用下面的命令重新生成metadata信息:
ANALYZE TABLE dm_sdk_mapping.app_category_mapping COMPUTE STATISTICS
---------------------
作者:sunkl_
來源:CSDN
原文:https://blog.csdn.net/u010990043/article/details/82842995
版權聲明:本文為博主原創文章,轉載請附上博文鏈接!