注重版權,尊重他人勞動
Spark主要提供三種位置配置系統:
- 環境變量:用來啟動Spark workers,可以設置在你的驅動程序或者conf/spark-env.sh 腳本中;
- java系統性能:可以控制內部的配置參數,兩種設置方法:
- 編程的方式(程序中在創建SparkContext之前,使用System.setProperty(“xx”,“xxx”)語句設置相應系統屬性值);
- 在conf/spark-env.sh中配置環境變量SPARK_JAVA_OPTS。
- 日志配置:通過log4j.properties實習
一、環境變量
spark安裝目錄下的conf/spark-env.sh腳本決定了如何初始化worker nodes的JVM,甚至決定了你在本地如何運行spark-shell。在Git庫中這個腳本默認是不存在的,但是你可以自己創建它並通過復制con/spark-env.sh.template中的內容來配置,最后要確保你創建的文件可執行。
在spark-env.sh中你至少有兩個變量要設置:
- SCALA_HOME,指向你的scala安裝路徑;或者是SCALA_LIBRARY_PATH指向scala library JARS所在的目錄(如果你是通過DEB或者RPM安裝的scala,他們是沒有SCALA_HOME的,但是他們的libraries是分離的,默認在/usr/share/java中查找scala-library.jar)
- MESOS_NATIVE_LIBRARY,如果你要在Mesos上運行集群的話
另外,還有其他四個變量來控制執行。應該將他們設置在啟動驅動程序的環境中來取代設置在spark-env.sh,因為這樣這些設置可以自動傳遞給workers。將他們設置在每個作業中而不是spark-env.sh中,這樣確保了每個作業有他們自己的配置。
- SPARK_JAVA_ORTS,添加JVM選項。你可以通過-D來獲取任何系統屬性;
- SPARK_CLASS_PATH,添加元素到Spark的classpth中;
- SPARK_LIBARAT_OATH,添加本地libraries的查找目錄;
- SPARK_MEM,設置每個節點所能使用的內存總量。他們應該和JVM‘s -Xmx選項的格式保持一致(e.g.300m或1g)。注意:這個選項將很快被棄用支持系統屬性spark.executor.memory,所以我們推薦將它使用在新代碼中。
注意:如果你將他們設置在spark-env.sh中,他們將覆蓋用戶程序設定的值,這是不可取的。如果你喜歡,你可以選擇在spark-env.sh設置他們僅當用戶程序沒有做任何設置時,例如:
if [ -z "$SPARK_JAVA_OPTS" ] ; then SPARK_JAVA_OPTS="-verbose:gc" fi
二、系統屬性
通過設置系統屬性來配置Spark,你必須通過以下兩種方式中的任意一個來達到目的:
- 在JVM中通過-D標志(例如:java -Dspark.cores.max=5 MyProgram)
- 在你的程序中創建SparkContext之前調用System.setProperty,如下:
System.setProperty("spark.cores.max", "5")
val sc = new SparkContext(...)
更多可配置的控制內部設置的系統屬性已經有了合理的默認屬性值。然而,有五個屬性通常是你想要去控制的:
屬性名稱 | 默認值 | 含義 |
---|---|---|
spark.executor.memory | 512m | 每個處理器可以使用的內存大小,跟JVM的內存表示的字符串格式是一樣的(比如: '512m','2g') |
spark.serializer | spark.JavaSerializer | 一個類名,用於序列化網絡傳輸或者以序列化形式緩存起來的各種對象。默認情況下Java的序列化機制可以序列化任何實現了Serializable接口的對象,但是速度是很慢的,因此當你在意運行速度的時候我們建議你使用spark.KryoSerializer 並且配置 Kryo serialization。可以是任何 spark.Serializer的子類。 |
spark.kryo.registrator | (none) | 如果你使用的是Kryo序列化,就要為Kryo設置這個類去注冊你自定義的類。這個類需要繼承spark.KryoRegistrator。 可以參考 調優指南 獲取更多的信息。 |
spark.local.dir | /tmp | 設置Spark的暫存目錄,包括映射輸出文件盒需要存儲在磁盤上的RDDs。這個磁盤目錄在你的系統上面訪問速度越快越好。可以用逗號隔開來設置多個目錄。 |
spark.cores.max | (infinite) | 當運行在一個獨立部署集群上或者是一個粗粒度共享模式的Mesos集群上的時候,最多可以請求多少個CPU核心。默認是所有的都能用。 |
除了這些,在某些情況下以下屬性可能也是需要設置的:
屬性名 | 默認值 | 含義 |
---|---|---|
spark.mesos.coarse | false | 如果設置為了"true",將以粗粒度共享模式運行在Mesos集群上, 這時候Spark會在每台機器上面獲得一個長期運行的Mesos任務,而不是對每個Spark任務都要產生一個Mesos任務。對於很多短查詢,這個可能會有些許的延遲,但是會大大提高Spark工作時的資源利用率。 |
spark.default.parallelism | 8 | 在用戶沒有指定時,用於分布式隨機操作(groupByKey,reduceByKey等等)的默認的任務數。 |
spark.storage.memoryFraction | 0.66 | Spark用於緩存的內存大小所占用的Java堆的比率。這個不應該大於JVM中老年代所分配的內存大小,默認情況下老年代大小是堆大小的2/3,但是你可以通過配置你的老年代的大小,然后再去增加這個比率。 |
spark.ui.port | (random) | 你的應用程序控制面板端口號,控制面板中可以顯示每個RDD的內存使用情況。 |
spark.shuffle.compress | true | 是否壓縮映射輸出文件,通常設置為true是個不錯的選擇。 |
spark.broadcast.compress | true | 廣播變量在發送之前是否先要被壓縮,通常設置為true是個不錯的選擇。 |
spark.rdd.compress | false | 是否要壓縮序列化的RDD分區(比如,StorageLevel.MEMORY_ONLY_SER)。在消耗一點額外的CPU時間的代價下,可以極大的提高減少空間的使用。 |
spark.reducer.maxMbInFlight | 48 | 同時獲取每一個分解任務的時候,映射輸出文件的最大的尺寸(以兆為單位)。由於對每個輸出都需要我們去創建一個緩沖區去接受它,這個屬性值代表了對每個分解任務所使用的內存的一個上限值,因此除非你機器內存很大,最好還是配置一下這個值。 |
spark.closure.serializer | spark.JavaSerializer | 用於閉包的序列化類。通常Java是可以勝任的,除非在你的驅動程序中分布式函數(比如map函數)引用了大量的對象。 |
spark.kryoserializer.buffer.mb | 32 | Kryo中運行的對象的最大尺寸(Kryo庫需要創建一個不小於最大的單個序列化對象的緩存區)。如果在Kryo中出現"buffer limit exceeded"異常,你就需要去增加這個值了。注意,對每個worker而言,一個核心就會有一個緩沖。 |
spark.broadcast.factory | spark.broadcast.HttpBroadcastFactory | 使用哪一個廣播實現 |
spark.locality.wait | 3000 | 在發布一個本地數據任務時候,放棄並發布到一個非本地數據的地方前,需要等待的時間。如果你的很多任務都是長時間運行的任務,並且看到了很多的臟數據的話,你就該增加這個值了。但是一般情況下缺省值就可以很好的工作了。 |
spark.worker.timeout | 60 | 如果超過這個時間,獨立部署master還沒有收到worker的心跳回復,那么就認為這個worker已經丟失了。 |
spark.akka.frameSize | 10 | 在控制面板通信(序列化任務和任務結果)的時候消息尺寸的最大值,單位是MB。如果你需要給驅動器發回大尺寸的結果(比如使用在一個大的數據集上面使用collect()方法),那么你就該增加這個值了。 |
spark.akka.threads | 4 | 用於通信的actor線程數量。如果驅動器有很多CPU核心,那么在大集群上可以增大這個值。 |
spark.akka.timeout | 20 | Spark節點之間通信的超時時間,以秒為單位 |
spark.driver.host | (local hostname) | 驅動器監聽主機名或者IP地址. |
spark.driver.port | (random) | 驅動器監聽端口號 |
spark.cleaner.ttl | (disable) | Spark記憶任何元數據(stages生成,任務生成等等)的時間(秒)。周期性清除保證在這個時間之前的元數據會被遺忘。當長時間幾小時,幾天的運行Spark的時候設置這個是很有用的。注意:任何內存中的RDD只要過了這個時間就會被清除掉。 |
spark.streaming.blockInterval | 200 | 從網絡中批量接受對象時的持續時間。 |
spark.task.maxFailures | 4 | task失敗重試次數 |
三、配置日志
Spark使用log4j來記錄。你可以在conf目錄中添加log4j.properties文件來配置。一種方法是復制本地已存在的log4j.properties.template