[轉] Spark sql 內置配置(V2.2)


【From】 https://blog.csdn.net/u010990043/article/details/82842995

 

 最近整理了一下spark SQL內置配。加粗配置項是對sparkSQL 調優性能影響比較大的項,小伙伴們按需酌情配置。后續會挑出一些通用調優配置,共大家參考。有不正確的地方,歡迎大家在留言區留言討論。 

 

配置項 默認值 概述
spark.sql.optimizer.maxIterations 100 sql優化器最大迭代次數
spark.sql.optimizer.inSetConversionThreshold 10 插入轉換的集合大小閾值
spark.sql.inMemoryColumnarStorage.compressed TRUE 當設置為true時,SCAPK SQL將根據數據的統計自動為每個列選擇壓縮編解碼器
spark.sql.inMemoryColumnarStorage.batchSize 10000 控制用於列緩存的批處理的大小。較大的批處理大小可以提高內存利用率和壓縮率,但緩存數據時會出現OOM風險
spark.sql.inMemoryColumnarStorage.partitionPruning  TRUE 啟用內存中的列表分區剪枝
spark.sql.join.preferSortMergeJoin TRUE When true, 使用sort merge join 代替 shuffle hash join
spark.sql.sort.enableRadixSort TRUE 使用基數排序,基數排序性能非常快,但是會額外使用over heap.當排序比較小的Row時,overheap 需要提高50%
spark.sql.autoBroadcastJoinThreshold 10L * 1024 * 1024 當執行join時,被廣播到worker節點上表最大字節。當被設置為-1,則禁用廣播。當前僅僅支持 Hive Metastore tables,表大小的統計直接基於hive表的源文件大小
spark.sql.limit.scaleUpFactor 4 在執行查詢時,兩次嘗試之間讀取partation數目的增量。較高的值會導致讀取過多分區,較少的值會導致執行時間過長,因為浙江運行更多的作業
spark.sql.statistics.fallBackToHdfs FALSE 當不能從table metadata中獲取表的統計信息,返回到hdfs。這否有用取決與表是否足夠小到能夠使用auto broadcast joins
spark.sql.defaultSizeInBytes Long.MaxValue 在查詢計划中表默認大小,默認被設置成Long.MaxValue 大於spark.sql.autoBroadcastJoinThreshold的值,也就意味着默認情況下不會廣播一個表,除非他足夠小
spark.sql.shuffle.partitions 200 當為join/aggregation  shuffle數據時,默認partition的數量
spark.sql.adaptive.shuffle.targetPostShuffleInputSize 64 * 1024 * 1024byte The target post-shuffle input size in bytes of a task.
spark.sql.adaptive.enabled FALSE 是否開啟adaptive query execution(自適應查詢執行)
spark.sql.adaptive.minNumPostShufflePartitions -1 測試用
spark.sql.subexpressionElimination.enabled TRUE When true, common subexpressions will be eliminated
當為真時,將刪除公共子表達式
spark.sql.caseSensitive FALSE 查詢分析器是否區分大小寫,默認情況下不區分。強烈建議不區分大小寫
spark.sql.constraintPropagation.enabled   是否開啟優化,在查詢優化器期間推斷和傳播查詢計划中的數據約束。對於某種類型的查詢計划(例如有大量謂語和別名的查詢),約束傳播是昂貴的,會對整個運行時間產生負面影響。
spark.sql.parser.escapedStringLiterals FALSE 2.0之前默認值為true,知否默認是否。正常文字能否包含在正則表達式中。
spark.sql.parquet.mergeSchema FALSE 若為true,在讀取parquet數據源時,schema從所有文件中合並出來。否則如果沒有可用的摘要文件,則從概要文件或隨機文件中選擇模式
spark.sql.parquet.respectSummaryFiles FALSE 若為ture,假設parquet的所有部分文件和概要文件一致,在合並模式時會忽略他們。否則將會合並所有的部分文件
spark.sql.parquet.binaryAsString FALSE 是否向下兼容其他parquet生產系統(eg impala or older version spark sql ),不區分字節數據和string數據寫到parquet schema,這個配置促使spark sql將二進制數據作為string達到兼容
spark.sql.parquet.int96AsTimestamp TRUE 是否使用Int96作為timestamp的存儲格式,可以避免精度損失丟失納秒部分,為其他parquet系統提供兼容(impala)
spark.sql.parquet.int64AsTimestampMillis FALSE 當為true,timestamp值將以Int64作為mlibs的存儲擴展類型,這種模式微秒將被丟棄
spark.sql.parquet.cacheMetadata TRUE 是否緩存parquet的schema數據元,可以提升靜態數據的查詢性能
spark.sql.parquet.compression.codec snappy 支持類型:uncompressed", "snappy", "gzip", "lzo"。 指定parquet寫文件的壓縮編碼方式
spark.sql.parquet.filterPushdown TRUE 是否開啟parquet過濾條件下推
spark.sql.parquet.writeLegacyFormat FALSE spark sql在拼接schema時是否遵循parquet的schema的規范
spark.sql.parquet.output.committer.class org.apache.parquet.hadoop.ParquetOutputCommitter parquet輸出提交器類,同城必須是org.apache.hadoop.mapreduce.OutputCommitter的子類,如果不是將不會創建數據源摘要,即使配置開啟了parquet.enable.summary-metadata
spark.sql.parquet.enableVectorizedReader TRUE 開啟parquet向量解碼
spark.sql.orc.filterPushdown FALSE 是否開啟條件下推到orc文件寫
spark.sql.hive.verifyPartitionPath FALSE 當為true時,在讀取HDFS中存儲的數據時,檢查表根目錄下的所有分區路徑
spark.sql.hive.metastorePartitionPruning TRUE 當為true,spark sql的謂語將被下推到hive metastore中,更早的消除不匹配的分區,會影響到違背轉換成文件源關系的hive表
spark.sql.hive.manageFilesourcePartitions TRUE 是否使用hive metastore管理spark sql的 dataSource表分區,若為true,dataSource表會在執行計划期間使用分區剪枝 
spark.sql.hive.filesourcePartitionFileCacheSize 250 * 1024 * 1024 當非0時,開啟將分區文件數據元緩存到內存中,所有表共享一個緩存,當開啟 hive filesource partition management(spark.sql.hive.manageFilesourcePartitions)時才會生效
spark.sql.hive.caseSensitiveInferenceMode INFER_AND_SAVE 設置無法從hive表屬性讀取分區大小寫模式時所采取的操作,雖然Spice SQL本身不區分大小寫,但hive兼容的文件格式如parquet。Spark sql必須使用一個保持情況的模式,當查詢由包含區分大小寫字段名或查詢的文件支持的任何表可能無法返回准確的結果時。有效選項包括INFER_AND_SAVE(默認模式——從基礎數據文件推斷出區分大小寫的模式,並將其寫入表屬性),INFER_ONLY(推斷schema但不嘗試將其寫入表屬性)和NEVER_INFER(回退到使用區分大小寫間接轉移模式代替推斷)
spark.sql.optimizer.metadataOnly TRUE 當為true時,啟用僅使用表的元數據的元數據查詢優化來生成分區列,而不是表掃描。當掃描的所有列都是分區列,並且查詢具有滿足不同語義的聚合運算符時,它適用。
spark.sql.columnNameOfCorruptRecord _corrupt_record 當json/csv數據內部列解析失敗時,失敗列的名稱
spark.sql.broadcastTimeout" 5*60 在broadCast join時 ,廣播等待的超時時間
spark.sql.thriftserver.scheduler.pool   為JDBC客戶端會話設置公平調度程序池
spark.sql.thriftServer.incrementalCollect FALSE 當TRUE時,啟用增量集合以在thrift server中執行
spark.sql.thriftserver.ui.retainedStatements 200 JDBC/ODBC Web用戶界面歷史記錄中SQL語句的數量
spark.sql.thriftserver.ui.retainedSessions 200 JDBC/ODBC Web UI歷史中保存的SQL客戶端會話數
spark.sql.sources.default parquet 輸入輸出默認數據元
spark.sql.hive.convertCTAS FALSE 如果時true,將使用spark.sql.sources.default.設置數據源,不指定任何存儲屬性到hive ctas語句
spark.sql.hive.gatherFastStats TRUE 在修復表分區時,將快速收集STATS(文件數量和所有文件的總大小),以避免HIVE轉移子中的順序列表。
spark.sql.sources.partitionColumnTypeInference.enabled TRUE 是否自動推斷分區列的數據類型
spark.sql.sources.bucketing.enabled TRUE 當false時,分桶表當作普通表處理
spark.sql.crossJoin.enabled FALSE 當false時,如果查詢中語法笛卡兒積 卻語法中沒有顯示join,將會拋出異常
spark.sql.orderByOrdinal TRUE 當為true時,排序字段放置到seleect List,否則被忽略
spark.sql.groupByOrdinal TRUE 當為true時,按組子句的序號被視為選擇列表中的位置。當為false時,序數被忽略。
spark.sql.groupByAliases TRUE group by后的別名是否能夠被用到 select list中,若為否將拋出分析異常
spark.sql.sources.parallelPartitionDiscovery.threshold 32 允許在driver端列出文件的最大路徑數。如果在分區發現期間檢測到的路徑的數量超過該值,則嘗試用另一個SCAPLE分布式作業來列出文件。這適用於parquet、ORC、CSV、JSON和LIbSVM數據源。
spark.sql.sources.parallelPartitionDiscovery.parallelism 10000 遞歸地列出路徑集合的並行數,設置阻止文件列表生成太多任務的序號
spark.sql.selfJoinAutoResolveAmbiguity TRUE 自動解決子鏈接中的連接條件歧義,修復bug SPARK-6231
spark.sql.retainGroupColumns TRUE 是否保留分組列
spark.sql.pivotMaxValues 10000  
spark.sql.runSQLOnFiles TRUE 當為true,在sql查詢時,能夠使用dataSource.path作為表(eg:"select a,b from hdfs://xx/xx/*")
spark.sql.codegen.wholeStage TRUE 當為true,多個算子的整個stage將被便宜到一個java方法中
spark.sql.codegen.maxFields 100 在激活整個stage codegen之前支持的最大字段(包括嵌套字段)
spark.sql.codegen.fallback TRUE 當為true,在整個stage的codegen,對於編譯generated code 失敗的query 部分,將會暫時關閉
spark.sql.codegen.maxCaseBranches 20 支持最大的codegen
spark.sql.files.maxPartitionBytes 128 * 1024 * 1024 在讀取文件時,一個分區最大被讀取的數量,默認值=parquet.block.size
spark.sql.files.openCostInBytes 4 * 1024 * 1024 為了測定打開一個文件的耗時,通過同時掃描配置的字節數來測定,最好是過度估計,那么小文件的分區將比具有較大文件的分區更快(首先調度
spark.sql.files.ignoreCorruptFiles FALSE 是否自動跳過不正確的文件
spark.sql.files.maxRecordsPerFile 0 寫入單個文件的最大條數,如果時0或者負數,則無限制
spark.sql.exchange.reuse TRUE planer是否嘗試找出重復的 exchanges並復用
spark.sql.streaming.stateStore.minDeltasForSnapshot 10 在合並成快照之前需要生成的狀態存儲增量文件的最小數目
spark.sql.streaming.checkpointLocation   檢查點數據流的查詢的默認存儲位置
spark.sql.streaming.minBatchesToRetain 100 流式計算最小批次長度
spark.sql.streaming.unsupportedOperationCheck TRUE streaming query的logical plan 檢查不支持的操作
spark.sql.variable.substitute TRUE  
spark.sql.codegen.aggregate.map.twolevel.enable   啟用兩級聚合哈希映射。當啟用時,記錄將首先“插入/查找第一級、小、快的映射,然后在第一級滿或無法找到鍵時回落到第二級、更大、較慢的映射。當禁用時,記錄直接進入第二級。默認為真
spark.sql.view.maxNestedViewDepth 100 嵌套視圖中視圖引用的最大深度。嵌套視圖可以引用其他嵌套視圖,依賴關系被組織在有向無環圖(DAG)中。然而,DAG深度可能變得太大,導致意外的行為。此配置限制了這一點:當分析期間視圖深度超過該值時,我們終止分辨率以避免潛在錯誤。
spark.sql.objectHashAggregate.sortBased.fallbackThreshold 128 在ObjectHashAggregateExec的情況下,當內存中哈希映射的大小增長過大時,我們將回落到基於排序的聚合。此選項為哈希映射的大小設置行計數閾值。
spark.sql.execution.useObjectHashAggregateExec TRUE 是否使用 ObjectHashAggregateExec
spark.sql.streaming.fileSink.log.deletion TRUE 是否刪除文件流接收器中的過期日志文件
spark.sql.streaming.fileSink.log.compactInterval 10 日志文件合並閾值,然后將所有以前的文件壓縮到下一個日志文件中
spark.sql.streaming.fileSink.log.cleanupDelay 10min 保證一個日志文件被所有用戶可見的時長
spark.sql.streaming.fileSource.log.deletion TRUE 是否刪除文件流源中過期的日志文件
spark.sql.streaming.fileSource.log.compactInterval 10 日志文件合並閾值,然后將所有以前的文件壓縮到下一個日志文件中
spark.sql.streaming.fileSource.log.cleanupDelay 10min 保證一個日志文件被所有用戶可見的時長
spark.sql.streaming.schemaInference FALSE 基於文件的流,是否推斷它的模式
spark.sql.streaming.pollingDelay 10L(MILLISECONDS) 在沒有數據可用時延遲查詢新數據多長時間
spark.sql.streaming.noDataProgressEventInterval 10000L(MILLISECONDS) 在沒有數據的情況下,在兩個進度事件之間等待時間
spark.sql.streaming.metricsEnabled FALSE 是否為活動流查詢報告DoopWalth/CODAHALE度量
spark.sql.streaming.numRecentProgressUpdates 100 streaming query 保留的進度更新數量
spark.sql.statistics.ndv.maxError 0.05 生成列級統計量時超對數G+++算法允許的最大估計誤差
spark.sql.cbo.enabled FALSE 在設定true時啟用CBO來估計計划統計信息
spark.sql.cbo.joinReorder.enabled FALSE Enables join reorder in CBO.
spark.sql.cbo.joinReorder.dp.threshold 12 The maximum number of joined nodes allowed in the dynamic programming algorithm
spark.sql.cbo.joinReorder.card.weight 0.07 The weight of cardinality (number of rows) for plan cost comparison in join reorder:  rows * weight + size * (1 - weight)
spark.sql.cbo.joinReorder.dp.star.filter FALSE Applies star-join filter heuristics to cost based join enumeration
spark.sql.cbo.starSchemaDetection FALSE When true, it enables join reordering based on star schema detection
spark.sql.cbo.starJoinFTRatio 0.9 Specifies the upper limit of the ratio between the largest fact tables  for a star join to be considered
spark.sql.session.timeZone TimeZone.getDefault.getID  時間時區
spark.sql.windowExec.buffer.in.memory.threshold 4096 窗口操作符保證存儲在內存中的行數的閾值
spark.sql.windowExec.buffer.spill.threshold spark.sql.windowExec.buffer.in.memory.threshold 窗口操作符溢出的行數的閾值
spark.sql.sortMergeJoinExec.buffer.in.memory.threshold Int.MaxValue 由sortMergeJoin運算符保證存儲在內存中的行數的閾值
spark.sql.sortMergeJoinExec.buffer.spill.threshold spark.sql.sortMergeJoinExec.buffer.in.memory.threshold 由排序合並連接運算符溢出的行數的閾值
spark.sql.cartesianProductExec.buffer.in.memory.threshold 4096 笛卡爾乘積算子保證存儲在內存中的行數的閾值
spark.sql.cartesianProductExec.buffer.spill.threshold spark.sql.cartesianProductExec.buffer.in.memory.threshold 笛卡爾乘積算子溢出的行數閾值
spark.sql.redaction.options.regex "(?i)url".r  

 

 

即便join的hive表沒有10M,也沒有觸發 mapjoin[解決方案]
spark在join的時候,用來判斷一個表的大小是否達到了10M這個限制,是不會去計算這個表在hdfs上的具體的文件大小的,而是使用hive metadata中的信息,具體如下圖:

 

explain出來spark的執行計划如下:

== Physical Plan ==

*Project [device#57, pkg#58]

+- *BroadcastHashJoin [pkg#58], [apppkg#62], Inner, BuildRight

   :- *Filter isnotnull(pkg#58)

   :  +- HiveTableScan [device#57, pkg#58], MetastoreRelation dm_sdk_mapping, device_applist, [isnotnull(day#56), (cast(day#56 as double) = 2.0180501E7)]

   +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, string, false]))

      +- *Filter isnotnull(apppkg#62)

         +- HiveTableScan [apppkg#62], MetastoreRelation dm_sdk_mapping, app_category_mapping

當有些hive沒有totalSize這個信息的時候,spark就會用sortMergeJoin來做join了,可以使用下面的命令重新生成metadata信息:

ANALYZE TABLE dm_sdk_mapping.app_category_mapping COMPUTE STATISTICS


---------------------
作者:sunkl_
來源:CSDN
原文:https://blog.csdn.net/u010990043/article/details/82842995
版權聲明:本文為博主原創文章,轉載請附上博文鏈接!

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM