1:Spark1.0.0屬性配置方式
Spark屬性提供了大部分應用程序的控制項,並且可以單獨為每個應用程序進行配置。
在Spark1.0.0提供了3種方式的屬性配置:
- SparkConf方式
- SparkConf方式可以直接將屬性值傳遞到SparkContext;
- SparkConf可以對某些通用屬性直接配置,如master使用setMaster,appname使用setAppName;
- 也可以使用set()方法對屬性進行鍵-值對配置,如set("spark.executor.memory", "1g") 。
- 命令行參數方式
- 這種方式是在使用spark-submit或spark-shell提交應用程序的時候,用命令行參數提交;
- 這種方式可以比較靈活的配置各個應用程序的運行環境;
- 可以通過spark-submit --help 或 spark-shell –help顯示出屬性的完整列表。
- 文件配置方式
- 該方式是將屬性配置項以鍵值對方式寫入文本文件中,一個配置項占一行;
- 該文件默認為conf/spark-defaults.conf,spark-submit在提交應用程序的時候會檢查是否存在該文件,有的話就將相關的屬性配置載入;
- 該文件可以在 spark-submit的命令參數--properties-file定義不同的位置。
- 優先權
- SparkConf方式 > 命令行參數方式 >文件配置方式
- 查看Spark屬性配置
- 通過應用程序的webUI(地址http://<driver>:4040)可以查看Spark屬性配置,從而檢查屬性配置是否正確;
- 只是顯示通過上面三種方式顯式指定的屬性配置,對於其他屬性可以假定使用默認配置;
- 對於大多數內部控制屬性,系統已經提供了合理的默認配置。
2:Spark1.0.0中通用屬性
A:應用程序屬性
| 屬性名稱 | 默認 | 含義 |
| spark.app.name | 無 | 應用程序名稱 |
| spark.master | 無 | 要連接的群集管理器 |
| spark.executor.memory | 512 m | 每個executor使用的內存總量 |
| spark.serializer | org.apache.spark.serializer. JavaSerializer |
在網絡數據傳送或緩存時使用的序化 器 ,默認的序 化 器 是Java序化 器 ,雖然這種序 化 器 對任何Java對象可以使用,兼容性好,但是處理速度相當的慢;如果要追求處理速度的話,建議使用org.apache.spark.serializer.KryoSerializer序化 器 。當然也可以任意是定義為org.apache.spark.Serializer 子類的 序化器 。 |
| spark.kryo.registrator | 無 | 如果要使用 Kryo 序化 器 ,需要創建一個繼承 KryoRegistrator的類並設置系統屬性spark.kryo.registrator指向該類。 |
| spark.local.dir | /tmp | 用於暫存 空間的目錄,該目錄用於保存map輸出文件或者 轉儲 RDD。該目錄位於高速的本地磁盤上,或者位於使用逗號分隔的多個不同磁盤上的目錄。注意: 在Spark 1.0 及更高版本這屬性將被 群集管理器配置的環境變量 SPARK_LOCAL_DIRS (Standalone、Mesos) 或 LOCAL_DIRS (YARN) 代替。 |
| spark.logConf | false | SparkContext 啟動時記錄有效 SparkConf信息。 |
B:運行時環境
| 屬性名稱 | 默認 | 含義 |
| spark.executor.memory | 512 m | 分配給每個executor進程總內存(使用類似512m、2g格式) |
| spark.executor.extraJavaOptions | 無 | 要傳遞給executor的額外 JVM 選項,注意不能使用它來設置Spark屬性或堆大小設置。 |
| spark.executor.extraClassPath | 無 | 追加到executor類路徑中的附加類路徑,主要為了兼容舊版本的Spark,通常不需要用戶設置。 |
| spark.executor.extraLibraryPath | 無 | 啟動executor JVM 時要用到的特殊庫路徑。 |
| spark.files.userClassPathFirst | false | executor在加載類的時候是否優先使用用戶自定義的JAR包,而不是Spark帶有的JAR包。此功能可以用於解決Spark依賴包和用戶依賴包之間的沖突。目前,該屬性只是一項試驗功能。 |
C:Shuffle 操作
| 屬性名稱 | 默認 | 含義 |
| spark.shuff le.consolidateFiles | false | 如果設置為true,在shuffle時就合並中間文件,對於有大量Reduce任務的shuffle來說,合並文件可以提高文件系統性能。如果使用的是 ext4 或 xfs 文件系統, 建議設置為true;對於ext3, 由於文件系統的限制,設置為true 反而 會使內核>8的機器 降低性能。 |
| spark.shuffle.spill | true | 如果設置為true,在 shuffle 期間 通過溢出數據到磁盤來降低 了 內存 使用總 量,溢出閾值是由spark.shuffle.memoryFraction指定的。 |
| spark.shuffle.spill.compress | true | 是否壓縮在 shuffle 期間溢出的數據,如果壓縮將使用spark.io.compression.codec。 |
| spark.shuffle.compress | true | 是否壓縮map輸出文件,壓縮將使用spark.io.compression.codec。 |
| spark.shuffle.file.buffer.kb | 100 | 每個shuffle的文件輸出流 內存緩沖區 的 大小 ,以KB為單位。這些緩沖區可以減少磁盤尋道的次數,也減少創建shuffle中間文件時的系統調用。 |
| spark.reducer.maxMbInFlight | 48 | 每個reduce任務同時獲取map輸出的最大大小 (以兆字節為單位)。由於每個map輸出都需要一個緩沖區來接收它,這代表着 每個 reduce 任務有 固定的內存開銷,所以要設置小點,除非有很大內存。 |
D:Spark UI
| 屬性名稱 | 默認 | 含義 |
| spark.ui.port | 4040 | 應用程序webUI的端口 |
| spark.ui.retainedStages | 1000 | 在GC之前webUI保留的stage數量 |
| spark.ui.killEnabled | true | 允許在 webUI將stage和相應的job殺死 |
| spark.eventLog.enabled | false | 是否記錄 Spark事件,用於 應用程序在完成后 重構 webUI。 |
| spark.eventLog.compress | false | 是否壓縮記錄Spark事件,前提spark.eventLog.enabled為true。 |
| spark.eventLog.dir | file:///tmp/spark-events | 如果spark.eventLog.enabled為 true,該屬性為記錄spark 事件的根目錄。在此根目錄中,Spark為 每個應用程序 創建分目錄,並將 應用程序的 事件記錄到在此目錄中。用戶可以將此屬性設置為HDFS目錄,以便history server讀取歷史記錄文件。 |
E:壓縮和序化
| 屬性名稱 | 默認 | 含義 |
| spark.broadcast.compress | true | 是否在發送之前壓縮廣播變量。 |
| spark.rdd.compress | false | 是否壓縮序化的RDD分區 ,可以節省大量空間,但會消耗一些額外的CPU時間。 |
| spark.io.compression.codec | org.apache.spark.io. LZFCompressionCodec |
用於壓縮內部數據如 RDD 分區和shuffle輸出的 編碼 解碼器 。 Spark提供兩個編解碼器: org.apache.spark.io.LZFCompressionCodec和org.apache.spark.io.SnappyCompressionCodec。其中,Snappy提供更快速的壓縮和解壓縮,而LZF提供了更好的壓縮比。 |
| spark.io.compression.snappy .block.size |
32768 | 使用 Snappy 編 碼 解碼器時, 編 碼 解碼器使用的 塊大小 (以字節為單位) 。 |
| spark. closure .serializer | org.apache.spark.serializer. JavaSerializer |
用於閉包 的序化器,目前只有支持Java序化器。 |
| spark.serializer. objectStreamReset |
10000 | 使用 org.apache.spark.serializer.JavaSerializer序化時,序化器緩存對象以防止寫入冗余數據,這時停止這些對象的垃圾收集。通過調用重置序化器,刷新該信息就可以收集舊對象。若要關閉這重定期重置功能將其設置為 < = 0 。默認情況下 每10000個對象將 重置序化器。 |
| spark.kryo.referenceTracking | true | 當使用 Kryo 序化數據時, 是否跟蹤對同一對象的引用。如果你的對象圖有回路或者同一對象有多個副本,有必要設置為true;其他情況下 可以禁用以提高性能。 |
| spark.kryoserializer.buffer.mb | 2 | 在Kryo 里 允許的最大對象大小 ( Kryo 會創建一個緩沖區,至少和序化的最大單個對象一樣大) 。如果Kryo 出現緩沖區限制超出異常報錯,增加這個值。注意,每個worker的每個core只有一個緩沖區。 |
F:執行操作
| 屬性名稱 | 默認 | 含義 |
| spark.default.parallelism | 本地模式: 本地機器內核數 Mesos精細模式: 8 其他: 所有executor的core總數 或者2,以較大者為准 |
如果用戶沒設置,系統使用集群中運行shuffle操作的默認任務數(groupByKey、 reduceByKey等)。 |
| spark.broadcast.factory | org.apache.spark.broadcast. HttpBroadcastFactory |
廣播的實現類 |
| spark.broadcast.blockSize | 4096 | TorrentBroadcastFactory塊 大小(以kb為單位)。太大值在廣播時降低並行性 (使速度變慢);太小值, BlockManager性能 可能會受到沖擊。 |
| spark.files.overwrite | false | 通過 SparkContext.addFile() 添加的文件在目標中已經存在並且內容不匹配時,是否覆蓋目標文件。 |
| spark.files.fetchTimeout | false | 在 獲取由driver 通過SparkContext.addFile() 添加的文件時, 是否使用 通信時間 超時。 |
| spark.storage.memoryFraction | 0.6 | Java堆用於cache的比例 |
| spark.tachyonStore.baseDir | System.getProperty("java.io.tmpdir") | 用於存儲RDD的techyon目錄,tachyon文件系統的URL由spark.tachyonStore.url設置。也可以是逗號分隔的多個techyon目錄。 |
| spark.storage. memoryMapThreshold |
8192 | 以字節為單位的塊大小,用於磁盤讀取一個塊大小進行內存映射。這可以防止Spark在內存映射時使用很小塊,一般情況下,對塊進行內存映射的開銷接近或低於操作系統的頁大小。 |
| spark.tachyonStore.url | tachyon://localhost:19998 | 基於techyon文件的URL。 |
| spark.cleaner.ttl | 無限 | spark記錄任何元數據(stages生成、task生成等)的持續時間。定期清理可以確保將超期的元數據遺忘,這在運行長時間任務是很有用的,如運行24/7的sparkstreaming任務。注意RDD持久化在內存中的超期數據也會被清理。 |
G:網絡通信
| 屬性名稱 | 默認 | 含義 |
| spark.driver.host | 本地主機名 | 運行driver的主機名或 IP 地址。 |
| spark.driver.port | 隨機 | driver偵聽的端口。 |
| spark.akka.frameSize | 10 | 以MB為單位的driver和executor之間通信信息的大小,設置值越大,driver可以接受更大的計算結果。 |
| spark.akka.threads | 4 | 用於通信的actor線程數,在大型集群中擁有更多CPU內核的driver可以增加actor線程數。 |
| spark.akka.timeout | 100 | 以秒為單位的 Spark節點 之間通信超時時間。 |
| spark.akka.heartbeat.pauses | 600 | 下面3個參數是用於設置akka自帶的故障探測器,設置很大值的話,可以停用故障探測器。如果想啟用故障探測器,以秒為單位設置這3個參數。通常是在特殊需要的情況下開啟故障探測器,一個敏感的故障探測器有助於惡意的executor的定位,而對於由於GC暫停或網絡滯后引起的情況下,不需要開啟故障探測器;另外故障探測器的開啟會導致由於心跳信息的頻繁交換而引起的網絡泛濫。 本參數是設置可接受的心跳停頓時間。 |
| spark.akka.failure-detector.threshold | 300.0 | 對應AKKA的akka.remote.transport-failure-detector.threshold |
| spark.akka.heartbeat.interval | 1000 | 心跳間隔時間 |
H:調度
| 屬性名稱 | 默認 | 含義 |
| spark.task.cpus | 1 | 為每個任務分配的 內核 數。 |
| spark.task.maxFailures | 4 | job放棄task前該task的失敗次數,該值>=1 |
| spark.scheduler.mode | FIFO | SparkContext對job進行調度所采用的模式。 對於多用戶可采用FAIR模式。 |
| spark.cores.max | 未設置 | 當應用程序運行在Standalone集群或者粗粒度共享模式Mesos集群時,應用程序向集群請求的最大CPU內核總數(不是指每台機器,而是整個集群)。如果不設置,對於Standalone集群將使用spark.deploy.defaultCores中數值,而Mesos將使用集群中可用的內核。 |
| spark.mesos.coarse | false | 如果設置為true,在Mesos集群中運行時使用粗粒度共享模式。 |
| spark.speculation | false | 以下幾個參數是關於Spark推測執行機制的相關參數 。此參數設定是否使用推測執行 機制,如果設置為true則spark使用推測 執行 機制,對於Stage中拖后腿的Task在其他節點中重新啟動,並將最先完成的Task的計算結果最為最終結果。 |
| spark.speculation.interval | 100 | Spark多長時間進行檢查task運行狀態用以推測,以毫秒為單位。 |
| spark.speculation.quantile | 0.75 | 推測啟動前,Stage必須要完成總Task的百分比。 |
| spark.speculation.multiplier | 1.5 | 比已完成Task的運行速度中位數慢多少倍才啟用推測 |
| spark.locality.wait | 3000 | 以下幾個參數是關於Spark數據本地性的相關參數。 本參數是以毫秒為單位啟動本地數據task的等待時間,如果超出就啟動下一本地優先級別的task。該設置同樣可以應用到各優先級別的本地性之間(本地進程 -> 本地節點 -> 本地機架 -> 任意節點 ),當然,也可以通過spark.locality.wait.node等參數設置不同優先級別的本地性。 |
| spark.locality.wait.process | spark.locality.wait | 本地進程級別的本地等待時間 |
| spark.locality.wait.node | spark.locality.wait | 本地節點級別的本地等待時間 |
| spark.locality.wait.rack | spark.locality.wait | 本地機架級別的本地等待時間 |
| spark.scheduler.revive.interval | 1000 | 復活重新獲取資源的Task的最長時間間隔(毫秒),發生在Task因為本地資源不足而將資源分配給其他Task運行后進入等待時間,如果這個等待時間內重新獲取足夠的資源就繼續計算。 |
I:安全
| 屬性名稱 | 默認 | 含義 |
| spark.authenticate | false | Spark是否啟用內部身份驗證。 |
| spark.authenticate.secret | 無 | 設置Spark用於組件之間進行身份驗證的密鑰。如果不是YARN上運行並且spark.authenticate為true時,需要設置密鑰。 |
| spark.core.connection. auth.wait.timeout |
30 | Spark用於組件時間進行身份認證的超時時間。 |
| spark.ui.filters | 無 | Spark web UI 要使用的 以逗號分隔的篩選器名稱列表。 篩選器要符合javax servlet Filter標准, 每個篩選器的參數可以通過設置java系統屬性來指定: spark.<class name of filter>.params='param1=value1,param2=value2' 例如: -Dspark.ui.filters=com.test.filter1 -Dspark.com.test.filter1.params='param1=foo,param2=testing' |
| spark.ui.acls.enable | false | Spark webUI存取權限是否啟用。如果啟用,在用戶瀏覽web界面的時候會檢查用戶是否有訪問權限。 |
| spark.ui.view.acls | 空 | 以逗號分隔Spark webUI訪問用戶的列表。默認情況下只有啟動Spark job的用戶才有訪問權限。 |
J:Spark Streaming
| 屬性名稱 | 默認 | 含義 |
| spark.streaming.blockInterval | 200 | 在時間間隔內 (毫秒)Spark Streaming接收器將接收數據合並成數據塊並存儲在Spark。 |
| spark.streaming.unpersist | true | 如果設置為true,強迫將SparkStreaming持久化的RDD數據從Spark內存中清理,同樣的,SparkStreaming接收的原始輸入數據也會自動被清理;如果設置為false,則允許原始輸入數據和持久化的RDD數據可被外部的Streaming應用程序訪問,因為這些數據不會自動清理。 |
3:集群特有的屬性
A:Standalone特有屬性
Standalone還可以通過環境變量文件conf/spark-env.sh來設置屬性,相關的配置項是:
- SPARK_MASTER_OPTS 配置master使用的屬性
- SPARK_WORKER_OPTS 配置worker使用的屬性
- SPARK_DAEMON_JAVA_OPTS 配置master和work都使用的屬性
配置的時候,使用類似的語句:
export SPARK_MASTER_OPTS="-Dx1=y1 -Dx2=y2"
# - 其中x代表屬性,y代表屬性值
其中SPARK_MASTER_OPTS所支持的屬性有:
| 屬性名稱 | 默認 | 含義 |
| spark.deploy.spreadOut | true | Standalone集群管理器是否自由選擇節點還是固定到盡可能少的節點,前者會有更好的數據本地性,后者對於計算密集型工作負載更有效 |
| spark.deploy.defaultCores | 無限 | 如果沒有設置spark.cores.max,該參數設置Standalone集群分配給應用程序的最大內核數,如果不設置,應用程序獲取所有的有效內核。注意在一個共享的集群中,設置一個低值防止攫取了所有的內核,影響他人的使用。 |
| spark.worker.timeout | 60 | master因為沒有收到心跳信息而認為worker丟失的時間(秒) |
其中SPARK_WORKER_OPTS所支持的屬性有:
| 屬性名稱 | 默認 | 含義 |
| spark.worker.cleanup.enabled | false | 是否定期清理worker的應用程序工作目錄,只適用於Standalone模式,不適用於YARN模式。清理的時候將無視應用程序是否在運行。 |
| spark.worker.cleanup.interval | 1800 | 清理worker本地過期的應用程序工作目錄的時間間隔(秒) |
| spark.worker.cleanup.appDataTtl | 7*24*3600 | worker保留應用程序工作目錄的有效時間。該時間由磁盤空間、應用程序日志、應用程序的jar包以及應用程序的提交頻率來設定。 |
其中SPARK_DAEMON_JAVA_OPTS所支持的屬性有:
| 屬性名稱 | 含義 |
| spark.deploy.recoveryMode | 下面3個參數是用於配置zookeeper模式的master HA。 設置為ZOOKEEPER表示啟用master備用恢復模式,默認為NONE。 |
| spark.deploy.zookeeper.url | zookeeper集群URL |
| spark.deploy.zookeeper.dir | zooKeeper保存恢復狀態的目錄,缺省為/spark |
| spark.deploy.recoveryMode | 設成FILESYSTEM啟用master單節點恢復模式,缺省值為NONE |
| spark.deploy.recoveryDirectory | Spark保存恢復狀態的目錄 |
B:YARN特有屬性
YARN特有屬性的配置,應該是支持SparkConf方式和conf/spark-defaults.conf文件配置方式,
。
| 屬性名稱 | 默認 | 含義 |
| spark.yarn.applicationMaster.waitTries | 10 | RM等待Spark AppMaster啟動次數,也就是SparkContext初始化次數。超過這個數值,啟動失敗。 |
| spark.yarn.submit.file.replication | 3 | 應用程序上載到HDFS的文件的復制因子 |
| spark.yarn.preserve.staging.files | false | 設置為true,在job結束后,將stage相關的文件保留而不是刪除。 |
| spark.yarn.scheduler.heartbeat.interval-ms | 5000 | Spark AppMaster發送心跳信息給YARN RM的時間間隔 |
| spark.yarn.max.executor.failures | 2倍於executor數 | 導致應用程序宣告失敗的最大executor失敗數 |
| spark.yarn.historyServer.address | 無 | Spark history server的地址(要含有http://)。這個地址會在應用程序完成后提交給YARN RM,使得將信息從RM UI連接到history server UI上。 |
