Spark源碼解析 - Spark-shell淺析


1.准備工作

1.1 安裝spark,並配置spark-env.sh

使用spark-shell前需要安裝spark,詳情可以參考http://www.cnblogs.com/swordfall/p/7903678.html

如果只用一個節點,可以不用配置slaves文件,spark-env.sh文件只需配置為master_ip和local_ip兩個屬性

spark-env.sh添加如下配置:

export SPARK_MASTER_IP=hadoop1
export SPARK_LOCAL_IP=hadoop1

注意:hadoop1是這台虛擬機的ip地址,或者用127.0.0.1代替hadoop1也行。spark-shell淺析是基於spark-2.2.0-bin-hadoop2.7版本進行的。

1.2 啟動spark-shell

輸入spark安裝目錄的bin下,執行spark-shell命令

cd /opt/app/spark-2.2.0-bin-hadoop2.7/bin/
./spark-shell

最后我們會看到spark啟動的過程,如圖所示:

 

2. 執行word count 范例

通過word count例子來感受下spark任務的執行過程,啟動spark-shell后,會打開scala命令行,然后按照以下步驟輸入腳本。

1) 輸入val lines = sc.textFile("../README.md", 2)

2) 輸入val words = lines.flatMap(line => line.split(" "))

3) 輸入val ones = words.map(w => (w, 1))

4) 輸入val counts = ones.reduceByKey(_ + _)

5) 輸入counts.foreach(println)

3. 剖析spark-shell

通過word count在spark-shell中執行的過程,看看spark-shell做了什么。spark-shell中有以下一段腳本

我們看到腳本spark-shell里執行了spark-submit腳本,打開spark-submit腳本,發現其中包含以下腳本:

腳本spark-submit在執行spark-class腳本時,給它增加了參數SparkSubmit。打開spark-class腳本,其中包含以下腳本:

讀到這里,可知spark-class里面首先加載spark-env.sh里面的配置屬性,然后獲取jdk的java命令,接着拿到spark_home的jars目錄。至此,Spark啟動了以SparkSubmit為主類的jvm進程。

為便於在本地對Spark進程使用遠程監控,給SPARK_HOME目錄conf/spark-defaults.conf配置文件追加以下jmx配置:

#driver端監控
spark.driver.extraJavaOptions=-XX:+UnlockCommercialFeatures -XX:+FlightRecorder -Dcom.sun.management.jmxremote 
-Dcom.sun.management.jmxremote.port=10207 -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false

#executor端監控,暫時注釋
#spark.executor.extraJavaOptions=-XX:+UnlockCommercialFeatures -XX:+FlightRecorder -Dcom.sun.management.jmxremote 
-Dcom.sun.management.jmxremote.port=0 -Dcom.sun.management.jmxremote.ssl=false -Dcom.sun.management.jmxremote.authenticate=false

注意:使用遠程監控前,如果spark-shell在運行中需要先停止,配置好監控參數,再需要運行spark-shell命令,否則jvisualvm找不到該線程。上面的兩條命令都是一行的,不是兩行,兩行會導致jvisualvm連接不上,報“無法使用 service:jmx:rmi:///jndi/rmi://192.168.187.201:8009/jmxrmi 連接到 192.168.187.201:8009”錯誤。

在本地JAVA_HOME/bin目錄下打開jvisualvm,添加遠程主機,如圖;右擊已添加的遠程主機,添加JMX連接,如圖:

 

 

單擊右側的“線程”選項卡,選擇main線程,然后單擊“線程Dump”按鈕,如圖:

從dump的內容中找到線程main的信息,如圖:

 

main線程dump信息

   

從main線程的棧信息中可以看出程序的調用順序:SparkSubmit.main -> repl.Main -> ILoop.process。org.apache.spark.repl.SparkILoop類繼承ILoop類,ILoop的process方法調用SparkILoop的loadFiles(settings)與printWelcome()方法。SparkILoop的loadFiles(settings)方法中又調用了自身的initializeSpark方法,initializeSpark的實現如下:

initializationCommands是一個命令集合,見代碼:

從代碼中可以看到,命令集合中會調用org.apache.spark.repl.Main的createSparkSession()方法創建或者獲取sparkSession類,如圖:

從上述代碼可以看到builder是SparkSession里面的屬性,IDEA工具使用“ctrl+鼠標點擊”操作,可以進入到builder.getOrCreate()方法里面查看SparkSession如何創建,如圖:

 從上述代碼可以看到SparkContext首先創建,再創建SparkSession。SparkContext的創建代碼如下:

這里使用SparkConf、SparkContext和SparkSession來完成初始化,代碼分析中涉及的repl主要用於與Spark實時交互。

4.Spark-shell的整體流程

至此,Spark-shell解析完畢。

參考資料:

《深入理解Spark核心思想與源碼分析》

https://www.iteblog.com/archives/1349.html  使用jvisualvm監控Spark作業

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM