1、使用Sparkconf配置Spark
對Spark進行性能調優,通常就是修改Spark應用的運行時配置選項。
Spark中最主要的配置機制通過SparkConf類對Spark進行配置,當創建出一個SparkContext時,就需要創建出一個SparkConf實例。
Sparkconf實例包含用戶要重載的配置選項的鍵值對。調用set()方法來添加配置項的設置,然后把這個對象傳給SparkContext的構造方法。
調用setAppName()和setMaster()來分別設置spark.app.name和spark.master的值。
例如:
//創建一個conf對象
val conf = new SparkConf()
conf.set("spark.app.name","My Spark App")
conf.set("spark.master","local[4]")
conf.set("spark.ui.port","36000")
//使用這個配置對象創建一個SparkContext
val sc = new SparkContext(conf)
Spark運行通過spark-submit工具動態設置配置項。當應用被spark-submit腳本啟動時,腳本會把這些配置項設置到運行環境中。
例如:
$ bin/spark-submit \ --class com.example.MyAPP \ --master local[4] \ --name "My Spark App" --conf spark.ui.port=36000 \ myApp.jar
Spark有特定的優先級順序來選擇實際配置,優先級最高的是在用戶代碼中顯示調用set()方法設置的選項,其次是通過spark-submit傳遞的參數,再次是寫在配置文件中的值,最后是系統的默認值。
2、Spark執行的組成部分:作業、任務和步驟
通過Spark示例展示Spark執行的各個階段,以了解用戶代碼如何被編譯為下層的執行計划。
val input = sc.textFile("input.txt")
val tokenized = input.map(line => line.split(" ")).filter(words => words.size > 0)
val counts = tokenized.map(words => (words(0),1)).reduceByKey{(a,b) => a+b}
以上示例執行了三次轉化操作,最終生成一個叫做counts的RDD。程序定義了一個RDD對象的有向無環圖,每個RDD維護了其指向一個或多個父節點的引用,以及表示其與父節點之間關系的信息。
這里counts的譜系圖如下:

在調用行動操作之前,RDD都只是存儲着可以讓我們計算出具體數據的描述信息。要觸發實際計算,需要對counts調用一個行動操作,比如使用collect()將數據收集到驅動器程序。
counts.collect()
Spark調度器會創建出用於計算行動操作的RDD物理執行計划。Spark調度器從最終需要被調用行動操作的RDD出發,向上回溯所有必須計算的RDD。調度器會訪問RDD的父節點,父節點的父節點,以此類推,遞歸向上生成計算所有必要的祖先RDD的物理計划。如下:

流水線執行:當RDD不需要混洗數據就可以從父節點計算出來時,調度器就會自動進行流水線執行。在物理執行時,執行計划輸出的縮進等級與父節點相同的RDD會與父節點在同一個步驟中進行流水線執行。
除了流水線執行的優化,當一個RDD已經緩存在集群內存或磁盤上時,Spark的內部調度器也會自動截短RDD譜系圖。這種情況下,Spark會短路求值,直接基於緩存下來的RDD進行計算。
特定的行動操作所生成的步驟的集合被稱為一個作業。
一個物理步驟會啟動很多任務,每個任務都是在不同的數據分區上做同樣的事情。任務內部的流程是一樣的,包括:(1)從數據存儲或已有RDD或數據混洗的輸出中獲取輸入數據。(2)執行必要的操作來計算出這些操作所代表的RDD。(3)把輸出寫到一個數據混洗文件中,寫入外部存儲或者是發回驅動器程序。
3、Spark優化的關鍵性能
- 並行度
RDD的邏輯表示其實是一個對象集合。在物理執行期間,RDD會被分為一系列的分區,每個分區都是整個數據的子集。當Spark調度並運行任務時,Spark會為每個分區中的數據創建出一個任務。輸入RDD一般會根據其底層的存儲系統選擇並行度。
並行度會從兩方面影響程序的性能:當並行度過低時,Spark集群會出現資源閑置的情況,而當並行度過高時,每個分區產生的間接開銷累計起來就會更大。
Spark有兩種方法來對操作的並行度進行調優:一種是在數據混洗操作時,使用參數的方式為混洗后的RDD指定並行度。第二種方法是對於任何已有的RDD,可以進行重新分區來獲取更多或者更少的分區數。可以使用repartition()實現重新分區操作,該操作會把RDD隨機打亂並分成設定的分區數目。使用coalesce()操作沒有打亂數據,比repartition()更為高效。
- 序列化格式
當Spark需要通過網絡傳輸數據,或者將數據溢寫到磁盤上時,Spark需要把數據序列化為二進制格式。序列化會在數據進行混洗操作時發生,此時有可能需要通過網絡傳輸大量數據。
Spark默認會使用Java內建的序列化庫。Spark也支持第三方序列化庫Kryo,可以提供比Java的序列化工具更短的序列化時間和更高壓縮比的二進制表示。
使用Kryo序列化工具示例如下:
val conf = new SparkConf().setMaster("local").setAppName("partitions")
conf.set("spark.serializer","org.apache.spark.serializer.KryoSerializer")
//嚴格要求注冊類 獲得最佳性能
conf.set("spark.kryo.registrationRequired","true")
conf.registerKryoClasses(Array(classOf[MyClass],classOf[MyotherClass]))
- 內存管理
Spark內存有以下用途:
- RDD存儲,默認占60% 當調用RDD的persist()或cache()方法時,這個RDD的分區會被存儲到緩存區中。
- 數據混洗與聚合的緩存區,默認占20% 當進行數據混洗操作時,Spark會創建出一些中間緩存區來存儲數據混洗的輸出數據。這些緩存區用來存儲聚合操作的中間結果以及數據混洗操作中直接輸出的部分緩存數據。
- 用戶代碼,默認占20% Spark可以執行任意的用戶代碼,用戶自行申請大量的內存。
可以通過調整調整內存各區域比例得到更好的性能表現。
其它優化:
Spark默認的cache()操作會以MEMORY_ONLY的存儲等級持久化數據,當緩存新的RDD時分區空間不夠,舊的分區會被刪除。當用到這些分取數據時,在進行重算。使用persist()方法以MEMORY_AND_DISK存儲等級存儲,內存中放不下的分區會被寫入磁盤,需要時再從磁盤讀取回來。這種方式會有更好的性能。
還有一種是緩存序列化后的對象而非直接緩存。通過MEMORY_ONLY_SER 或者 MEMORY_AND_DISK_SER的存儲等級實現。
- 硬件供給
提供給Spark的硬件資源會顯著影響應用的完成時間,影響集群規模的主要參數包括:分配給沒各執行器節點的內存大小,每個執行器節點占用的核心數,執行器節點總數,以及用來存儲臨時數據的本地磁盤數量。
