Spark 官方文檔(4)——Configuration配置


Spark可以通過三種方式配置系統:

  • 通過SparkConf對象, 或者Java系統屬性配置Spark的應用參數
  • 通過每個節點上的conf/spark-env.sh腳本為每台機器配置環境變量
  • 通過log4j.properties配置日志屬性

Spark屬性

Spark屬性可以為每個應用分別進行配置,這些屬性可以直接通過SparkConf設定,也可以通過set方法設定相關屬性。
下面展示了在本地機使用兩個線程並發執行的配置代碼:

val conf = new SparkConf() .setMaster("local[2]") .setAppName("CountingSheep") val sc = new SparkContext(conf)

對於部分時間參數需要制定單位,例如

  • 時間單位:ms、s、m(min)、h、d、y分別表示毫秒、秒、分鍾、小時、天和年。
  • 存儲單位:
1b (bytes) 1k or 1kb (kibibytes = 1024 bytes) 1m or 1mb (mebibytes = 1024 kibibytes) 1g or 1gb (gibibytes = 1024 mebibytes) 1t or 1tb (tebibytes = 1024 gibibytes) 1p or 1pb (pebibytes = 1024 tebibytes)

動態加載Spark配置

有時為了避免通過編碼設定參數,可以通過創建空的SparkConf,並在調用腳本時制定相關參數

./bin/spark-submit --name "My app" --master local[4] --conf spark.eventLog.enabled=false --conf "spark.executor.extraJavaOptions=-XX:+PrintGCDetails -XX:+PrintGCTimeStamps" myApp.jar

spark shell和spark-submit提供兩種方式動態加載配置

  • 命令行參數動態設定,例如–conf –master
  • 通過配置文件。spark-submit默認讀取conf/spark-defaults.conf文件,每一行代表一個配置
spark.master spark://5.6.7.8:7077 spark.executor.memory 4g spark.eventLog.enabled true spark.serializer org.apache.spark.serializer.KryoSerializer

參數設置在執行時會進行合並,默認最高優先級是通過代碼設置,其次是通過命令行參數,最后是默認的配置文件。

查看Spark配置

可以通過web界面http://:4040中的Environment查看Spark配置信息(僅顯示spark-defaults.conf、SparkConf和命令行參數)。可以根據web頁面確定配置屬性是否生效。

配置參數 Available Properties

大部分配置參數都有默認值,以下是常用配置

Application Properties

屬性 默認值 描述
spark.app.name (none) 應用程序的名稱,會在日志和webUI顯示
spark.driver.cores 1 driver程序占用的CPU核數,只在cluster模式下有小。
spark.driver.maxResultSize 1g 對Spark每個action結果集大小的限制,最少是1M,若設為0則不限制大小。若Job結果超過限制則會異常退出,若結果集限制過大也可能造成OOM問題。
spark.driver.memory 1g driver進程可用的內存。注意:不能在代碼中配置,因為此時driver已經啟動,可以通過–driver-memory命令行參數或者配置文件進行配置。
spark.executor.memory 1g 每個executor可用的內存數量 (e.g. 2g, 8g).
spark.extraListeners (none) 一系列實現SparkListener的類,spark監聽總線會創建這些類的實例。
spark.local.dir /tmp 用於存儲mpp輸出文件和RDD緩存文件,常配置在SSD等存儲設備上,可以通過逗號分隔指定多個目錄。 注意: 在Spark 1.0 后續版本,會被SPARK_LOCAL_DIRS (Standalone, Mesos) or LOCAL_DIRS (YARN) 環境變量覆蓋.
spark.logConf false 將SparkConf 的有效配置作為INFO進行記錄
spark.master (none) 集群master節點

運行時環境

屬性 默認值 描述
spark.driver.userClassPathFirst false 用戶指定的jars優先於Spark的庫。用於解決用戶與環境的版本沖突
spark.executor.logs.rolling.maxRetainedFiles (none) 系統保留日志的最大數量,當超限時,舊的日志被刪除,默認不啟動
spark.executor.logs.rolling.time.interval daily 設置日志rolling時間間隔,默認rolling不啟動
spark.executor.userClassPathFirst false executor執行時,用戶指定的jars優先於Spark的庫。用於解決用戶與環境的版本沖突
spark.python.worker.memory 512m 每個worker進程在聚集時的內存上限,若超限則輸出到硬盤

Shuffle 行為

屬性 默認值 描述
spark.reducer.maxSizeInFlight 48m 多個reduce任務從map輸出獲取結果的最大尺寸。由於每個reducer需要創建緩存保留數據,除非內存很大,一般不要修改此參數
spark.shuffle.compress true 是否對map的輸出結果進行壓縮,壓縮器為spark.io.compression.codec
spark.shuffle.file.buffer 32k 每個shuffle文件輸出流的內存緩存區大小。這些緩沖區減少了系統IO的調用次數
spark.shuffle.manager sort shuffle數據的實現方法,包括sort和hash兩種。sort內存利用率更改,從1.2版本后sort作為默認實現方法
spark.shuffle.service.enabled false 激活外部shuffle服務。服務維護executor寫的文件,因而executor可以被安全移除。需要設置spark.dynamicAllocation.enabled 為true,同事指定外部shuffle服務。
spark.shuffle.service.port 7337 默認的外部shuffle服務端口
spark.shuffle.sort.bypassMergeThreshold 200 用於設置在Reducer的partition數目少於多少的時候,Sort Based Shuffle內部不使用Merge Sort的方式處理數據,而是直接將每個partition寫入單獨的文件。這個方式和Hash Based的方式是類似的,區別就是在最后這些文件還是會合並成一個單獨的文件,並通過一個index索引文件來標記不同partition的位置信息。 從Reducer看來,數據文件和索引文件的格式和內部是否做過Merge Sort是完全相同的。這個可以看做SortBased Shuffle在Shuffle量比較小的時候對於Hash Based Shuffle的一種折衷。當然了它和Hash Based Shuffle一樣,也存在同時打開文件過多導致內存占用增加的問題。因此如果GC比較嚴重或者內存比較緊張,可以適當的降低這個值。
spark.shuffle.spill.compress true 若為true,代表處理的中間結果在spill到本地硬盤時都會進行壓縮,在將中間結果取回進行merge的時候,要進行解壓。在Disk IO成為瓶頸的場景下,這個被設置為true可能比較合適;如果本地硬盤是SSD,那么這個設置為false可能比較合適。

Spark UI

屬性 默認值 描述
spark.eventLog.compress false 是否壓縮事務日志,當spark.eventLog.enabled為true時有效
spark.eventLog.dir file:///tmp/spark-events 記錄event日志的目錄,也可以是hdfs目錄
spark.eventLog.enabled false 是否記錄Spark events,用於在應用執行完后重建Web UI
spark.eventLog.enabled true 是否允許通過web UI kill掉stages和相關的job
spark.ui.port 4040 應用統計信息的端口
spark.ui.retainedJobs 1000 spark UI和status APIs在垃圾回收之前記錄的任務數
spark.ui.retainedStages 1000 spark UI和status APIs在垃圾回收之前記錄的Stage數
spark.worker.ui.retainedExecutors 1000 spark UI和status APIs在垃圾回收之前記錄的executor數目
spark.worker.ui.retainedDrivers 1000 同上
spark.worker.ui.retainedExecutions 1000 同上
spark.worker.ui.retainedBatches 1000 同上

壓縮和序列化

屬性 默認值 描述
spark.broadcast.compress true 廣播變量是否被壓縮
spark.closure.serializer org.apache.spark.serializer.JavaSerializer 閉包的序列化類,目前只支持java序列化
spark.io.compression.codec snappy 內部數據RDD的壓縮編碼器,用於RDD、廣播變量和shuffle數據壓縮。支持三種編碼器:lz4、lzf和snappy。
spark.io.compression.lz4.blockSize 32k 壓縮塊大小
spark.io.compression.snappy.blockSize 32k 壓縮塊大小
spark.kryo.classesToRegister (none) 若使用kryo序列化,本參數指定需要注冊的自定義類
spark.kryo.referenceTracking true(false when using Spark SQL Thrift Server) 序列化時是否使用相同的對象,若對象圖譜中包含同一對象的多個副本,會提高性能。若不存在該情況,關閉可以提高性能
spark.kryo.registrationRequired false 是否需要kry注冊,若為true,在序列化未注冊的類時kryo會拋出異常;若為false,對於未注冊的類,kryo會在每個對象寫入類名,降低了性能。
spark.kryo.registrator (none) 指定自定義的kryo注冊類
spark.kryoserializer.buffer.max 64m kryo序列化的緩沖區大小,需要比所有序列化對象大
spark.kryoserializer.buffer 64k 初始化的序列化緩沖區,可以根據需要增長到spark.kryoserializer.buffer.max
spark.rdd.compress false 是否序列化RDD分區,能通過耗費大量CPU降低存儲空間
spark.serializer org.apache.spark.serializer.JavaSerializer 序列化對象的類,建議使用org.apache.spark.serializer.KryoSerializer
spark.serializer.objectStreamReset 100 當序列化對象時,為了減少IO會緩存大量數據,然而這會阻止垃圾回收,可以通過reset將刷新緩沖區。

內存管理

屬性 默認值 描述
spark.memory.fraction 0.75 用於執行和存儲的內存比例。值越小,計算內存越小,緩沖區數據被排除的可能越大。這個比例剩余的部分用於存儲spark元數據、用戶數據結構,最好使用默認值。
spark.memory.storageFraction 0.5 在存儲和計算內存中,緩存所占的內存比例,值越大,計算可用內存越少。
spark.memory.offHeap.enabled false 若為true,則spark會嘗試使用堆外內存,同時要求spark.memory.offHeap.size必須是正數
spark.memory.offHeap.size 0 堆外內存可用的字節數
spark.memory.useLegacyMode false 是否可以使用傳統內存管理。本參數為true時,以下參數(已廢棄)才有效:spark.shuffle.memoryFraction spark.storage.memoryFraction spark.storage.unrollFraction

執行操作

屬性 默認值 描述
spark.broadcast.blockSize 4m TorrentBroadcastFactory中每個block的大小。若值太大會降低廣播的並行度,若值太小則可能出現BlockManager瓶頸
spark.broadcast.factory org.apache.spark.broadcast.TorrentBroadcastFactory 廣播的實現
spark.cleaner.ttl (infinite) spark記憶元數據的時間。若超時則清理,用於長時間運行例如spark stream應用,需要注意被緩存的RDD超時也會被清理。
spark.executor.cores 在Yarn是1;standalone中是所有可用的core 每個executor可用的CPU核心數目,standalone模式下,每個worker會每個executor使用一個CPU核心
spark.default.parallelism 對於reduceByKey和join操作,是RDD中最大分區數;對於parallelize操作,分區數與集群管理相關:本地模式(CPU核心數作為分區數)、Mesos(8)、其他(所有執行器的核心數與2求最大值) 默認的並行數
spark.executor.heartbeatInterval 10s executor與driver的心跳間隔
spark.files.fetchTimeout 60s SparkContext.addFile的超時值
spark.files.useFetchCache true 若為true,同一應用的執行器間通過局部緩存優化;若為false則各個executor獲取各自文件
spark.files.overwrite false 當目標文件存在時是否重寫
spark.hadoop.cloneConf false 若為true,則為每個task拷貝hadoop的配置對象;
spark.hadoop.validateOutputSpecs true 若設置為true,saveAsHadoopFile會驗證輸出目錄是否存在。雖然設為false可以忽略文件存在的異常,但建議使用 Hadoop文件系統的API手動刪除輸出目錄。當通過Spark Streaming的StreamingContext時本參數會被忽略,因為當進行checkpoint恢復時會重寫已經存在的文件。
spark.storage.memoryMapThreshold 2m 讀取文件塊時Spark內存map最小的大小。當map所占內存接近或小於操作系統page大小時,內存映射負載很大
spark.externalBlockStore.blockManager org.apache.spark.storage.TachyonBlockManager 存儲RDDs的外部文件塊管理器。文件系統的URL被設置為spark.externalBlockStore.url
spark.externalBlockStore.baseDir System.getProperty(“java.io.tmpdir”) 外部塊存儲RDD的目錄。文件系統URL被設置為spark.externalBlockStore.url
spark.externalBlockStore.url tachyon://localhost:19998 for Tachyon 代表外部塊文件系統的URL

網絡

屬性 默認值 描述
spark.akka.frameSize 128 最大消息大小(MB)。一般用於限制executor與driver之間的信息大小,若運行幾千個map和reduce任務,可以適當調大參數
spark.akka.heartbeat.interval 1000s 可以調成很大,用於禁止Akka內部的傳輸失敗檢測。越大的時間間隔減少網絡負載,越小的間隔容易進行Akka錯誤檢測。
spark.akka.heartbeat.pauses 6000s 與spark.akka.heartbeat.interval類似
spark.akka.threads 4 actor用於傳輸的線程個數。當driver有較多CPU是可以調大該值
spark.akka.timeout 100s spark節點間溝通的超時時間
spark.blockManager.port random block 管理器的監聽端口
spark.broadcast.port random driver的http廣播監聽端口
spark.driver.host (local hostname) driver監聽的主機名或者IP地址。用於master和executor的信息傳輸
spark.driver.port random driver監聽的接口
spark.executor.port random executor監聽的端口
spark.fileserver.port random driver 文件服務監聽的接口
spark.network.timeout 120s 默認所有網絡交互的超時時間
spark.port.maxRetries 16 端口重試連接最大次數
spark.replClassServer.port random driver類服務監聽的接口
spark.rpc.numRetries 3 RPC任務重試的次數
spark.rpc.retry.wait 3s RPC任務ask操作的延時
spark.rpc.askTimeout 120s RPC任務等待超時
spark.rpc.lookupTimeout 120s RPC遠程lookup操作超時時間

作業調度Scheduling

屬性 默認值 描述
spark.cores.max (not set) spark應用可用最大CPU內核數,若未設置,stanalone集群使用 spark.deploy.defaultCores作為參數,Mesos可以使用所有CPU。
spark.locality.wait 3s data-local或less-local任務啟動任務超時時間。若任務時間長且數據不再本地,則最后調大
spark.locality.wait.node spark.locality.wait Customize the locality wait for node locality. For example, you can set this to 0 to skip node locality and search immediately for rack locality (if your cluster has rack information).
spark.locality.wait.process spark.locality.wait Customize the locality wait for process locality. This affects tasks that attempt to access cached data in a particular executor process.
spark.locality.wait.rack spark.locality.wait Customize the locality wait for rack locality.
spark.scheduler.maxRegisteredResourcesWaitingTime 30s Maximum amount of time to wait for resources to register before scheduling begins.
spark.scheduler.mode FIFO 作業調度模式。可以設置為FAIR公平調度或者FIFO先進先出
spark.scheduler.revive.interval 1s The interval length for the scheduler to revive the worker resource offers to run tasks.
spark.speculation false 若設置為true,則會根據執行慢的stage多次啟動,以最先完成為准。
spark.speculation.interval 100ms speculate 的頻率
spark.speculation.multiplier 1.5 當task比所有任務執行時間的中值長多少倍時啟動speculate
spark.speculation.quantile 0.75 啟動speculate前任務完成數據量所占比例值
spark.task.cpus 1 每個task分配的cpu數量
spark.task.maxFailures 4 task失敗的最多次數,比重試次數多1

動態分配內存

屬性 默認值 描述
spark.dynamicAllocation.enabled false 是否啟動動態資源分配
spark.dynamicAllocation.executorIdleTimeout 60s 若動態分配設為true且executor處於idle狀態的時間已超時,則移除executor
spark.dynamicAllocation.cachedExecutorIdleTimeout infinity 若executor緩存數據超時,且動態內存分配為true,則移除緩存
spark.dynamicAllocation.initialExecutors spark.dynamicAllocation.minExecutors 若動態分配為true,執行器的初始數量
spark.dynamicAllocation.maxExecutor infinity 執行器最大數量
spark.dynamicAllocation.minExecutor 0 執行器最少數量
spark.dynamicAllocation.schedulerBacklogTimeout 1s If dynamic allocation is enabled and there have been pending tasks backlogged for more than this duration, new executors will be requested.
spark.dynamicAllocation.sustainedSchedulerBacklogTimeout schedulerBacklogTimeout Same as spark.dynamicAllocation.schedulerBacklogTimeout, but used only for subsequent executor requests.

安全

屬性 默認值 描述
spark.acls.enable false Whether Spark acls should are enabled. If enabled, this checks to see if the user has access permissions to view or modify the job. Note this requires the user to be known, so if the user comes across as null no checks are done. Filters can be used with the UI to authenticate and set the user.
spark.admin.acls Empty Comma separated list of users/administrators that have view and modify access to all Spark jobs. This can be used if you run on a shared cluster and have a set of administrators or devs who help debug when things work. Putting a “*” in the list means any user can have the priviledge of admin.
spark.authenticate false Whether Spark authenticates its internal connections. See spark.authenticate.secret if not running on YARN.
spark.authenticate.secret None Set the secret key used for Spark to authenticate between components. This needs to be set if not running on YARN and authentication is enabled.
spark.authenticate.enableSaslEncryption false Enable encrypted communication when authentication is enabled. This option is currently only supported by the block transfer service.
spark.network.sasl.serverAlwaysEncrypt false Disable unencrypted connections for services that support SASL authentication. This is currently supported by the external shuffle service.
spark.core.connection.ack.wait.timeout 60s How long for the connection to wait for ack to occur before timing out and giving up. To avoid unwilling timeout caused by long pause like GC, you can set larger value.
spark.core.connection.auth.wait.timeout 30s How long for the connection to wait for authentication to occur before timing out and giving up.
spark.modify.acls Empty Comma separated list of users that have modify access to the Spark job. By default only the user that started the Spark job has access to modify it (kill it for example). Putting a “*” in the list means any user can have access to modify it.
spark.ui.filters None Comma separated list of filter class names to apply to the Spark web UI. The filter should be a standard javax servlet Filter. Parameters to each filter can also be specified by setting a java system property of:spark..params=’param1=value1,param2=value2’For example:-Dspark.ui.filters=com.test.filter1 -Dspark.com.test.filter1.params=’param1=foo,param2=testing’
spark.ui.view.acls Empty Comma separated list of users that have view access to the Spark web ui. By default only the user that started the Spark job has view access. Putting a “*” in the list means any user can have view access to this Spark job.

加密Encryption

屬性 默認值 描述
spark.ssl.enabled false 是否在所有協議中支持SSL連接
spark.ssl.enabledAlgorithms Empty 一些列的密碼運算,指定的cipher需要被JVM支持
spark.ssl.keyPassword None 私鑰密碼
spark.ssl.keyStore None key存儲文件,可以是組件啟動的相對路徑也可以是絕對路徑
spark.ssl.keyStorePassword None A password to the key-store
spark.ssl.protocol None A protocol name. The protocol must be supported by JVM. The reference list of protocols one can find on this page.
spark.ssl.trustStore None A path to a trust-store file. The path can be absolute or relative to the directory where the component is started in.
spark.ssl.trustStorePassword None A password to the trust-store.

Spark Streaming

屬性 默認值 描述
spark.streaming.backpressure.enabled false Enables or disables Spark Streaming’s internal backpressure mechanism (since 1.5). This enables the Spark Streaming to control the receiving rate based on the current batch scheduling delays and processing times so that the system receives only as fast as the system can process. Internally, this dynamically sets the maximum receiving rate of receivers. This rate is upper bounded by the values spark.streaming.receiver.maxRate and spark.streaming.kafka.maxRatePerPartition if they are set (see below).
spark.streaming.blockInterval 200ms Interval at which data received by Spark Streaming receivers is chunked into blocks of data before storing them in Spark. Minimum recommended - 50 ms. See the performance tuning section in the Spark Streaming programing guide for more details.
spark.streaming.receiver.maxRate not set Maximum rate (number of records per second) at which each receiver will receive data. Effectively, each stream will consume at most this number of records per second. Setting this configuration to 0 or a negative number will put no limit on the rate. See the deployment guide in the Spark Streaming programing guide for mode details.
spark.streaming.receiver.writeAheadLog.enable false Enable write ahead logs for receivers. All the input data received through receivers will be saved to write ahead logs that will allow it to be recovered after driver failures. See the deployment guide in the Spark Streaming programing guide for more details.
spark.streaming.unpersist true Force RDDs generated and persisted by Spark Streaming to be automatically unpersisted from Spark’s memory. The raw input data received by Spark Streaming is also automatically cleared. Setting this to false will allow the raw data and persisted RDDs to be accessible outside the streaming application as they will not be cleared automatically. But it comes at the cost of higher memory usage in Spark.
spark.streaming.stopGracefullyOnShutdown false If true, Spark shuts down the StreamingContext gracefully on JVM shutdown rather than immediately.
spark.streaming.kafka.maxRatePerPartition not set Maximum rate (number of records per second) at which data will be read from each Kafka partition when using the new Kafka direct stream API. See the Kafka Integration guide for more details.
spark.streaming.kafka.maxRetries 1 Maximum number of consecutive retries the driver will make in order to find the latest offsets on the leader of each partition (a default value of 1 means that the driver will make a maximum of 2 attempts). Only applies to the new Kafka direct stream API.
spark.streaming.ui.retainedBatches 1000 How many batches the Spark Streaming UI and status APIs remember before garbage collecting.
spark.streaming.driver.writeAheadLog.closeFileAfterWrite false Whether to close the file after writing a write ahead log record on the driver. Set this to ‘true’ when you want to use S3 (or any file system that does not support flushing) for the metadata WAL on the driver.
spark.streaming.receiver.writeAheadLog.closeFileAfterWrite false Whether to close the file after writing a write ahead log record on the receivers. Set this to ‘true’ when you want to use S3 (or any file system that does not support flushing) for the data WAL on the receivers.

SparkR

屬性 默認值 描述
spark.r.numRBackendThreads 2 RBackend維護的RPC句柄個數
spark.r.command Rscript Executable for executing R scripts in cluster modes for both driver and workers.
spark.r.driver.command spark.r.command Executable for executing R scripts in client modes for driver. Ignored in cluster modes

其他參數具體參見https://spark.apache.org/docs/latest/configuration.html

環境變量

部分Spark設置可以通過配置環境變量(在conf/spark-env.sh中設置)實現。在standalone和Mesos模式中,這個文 件可以設定機器特定的信息,例如主機名。由於spark-env.sh安裝后並不存在,可以拷貝spark-env.sh.template,並確保它有 執行權限。
以下是spark-env.sh的常用參數:

環境變量 描述
JAVA_HOME java安裝目錄
PYSPARK_PYTHON 運行pyspark的python可執行文件,默認是python2.7
SPARK_DRIVER_R SparkR shell的R可執行文件,默認是R
SPARK_LOCAL_IP 機器綁定的IP地址
SPARK_PUBLIC_DNS Spark程序向外廣播的主機名

除此之外還有一些spark standalone集群設置的參數,例如每個機器運行的最大內存、CPU核數等。

配置日志

Spark使用log4j記錄日志。可以通過配置conf/log4j.properties文件配置日志。

覆蓋配置目錄

通過指定SPARK_CONF_DIR變量,可以覆蓋默認的SAPRK_HOME/conf下面的配置,例如spark-defaults.conf, spark-env.sh, log4j.properties等待。

集成Hadoop集群配置

若想通過spark讀寫HDFS,需要將以下兩個配置文件拷貝到spark classpath目錄下
+ hdfs-size.xml :提供HDFS客戶端默認的操作
+ core-site.xml :設置默認的文件系統名稱

雖然不同發行版本配置文件不同,但一般都在/etc/hadoop/conf目錄下。為了使得spark可以找到這些配置文件,在spark-env.sh文件中配置HADOOP_CONF_DIR變量。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM