Spark的基本說明


1、關於Application

   用戶程序,一個Application由一個在Driver運行的功能代碼和多個Executor上運行的代碼組成(工作在不同的節點上)。

又分成多個Job,每個Job由多個RDD和一些Action操作組成、job本分多個task組,每個task組稱為:stage。

每個task又被分到多個節點,由Executor執行:

在程序中RDD轉化其實還未真正運行,真正運行的是操作的時候。

 

2、程序執行過程

1)構建Spark Application的運行環境,就是啟動SparkContext,啟動后,向資源管理器

(standalone--spark自己的Master管理資源、Mesos或Yarn)注冊且申請運行Executor資源。

2)資源管理器分配Executor資源,並且在各個節點上啟動StandaloneExecutorBackend(對Standalone來說),Executor將運行情況隨着心跳發送到資源管理器上。

3)SparkContext根據用戶程序,構建DAG圖,將DAG分解成Stage,划分原則是寬依賴時候划分,把Stage(TaskSet)發送給TaskScheduler。Stage

根據RDD的Partition數量來決定Task的數量;Executor向SparkContext申請Task。Task Scheduler將Task發送給Executor運行,且同時把代碼發送給Executor(好像是Master開啟HTTP服務,Executor去取代碼)。

4)Task在Executor【此程序專屬】上運行,多線程運行,線程數看可以運行的核數。

 

5)Spark Context運行地點和Worker不要分隔太遠,中間過程有數據交換。

 3、DAG Scheduler

1)根據RDD的依賴關系來划分Stage,簡單來說,如果一個子RDD只依賴一個父RDD,則在一個Stage中,否則在多個Stage中,只依賴一個父RDD稱為窄依賴,依賴多個父RDD為寬依賴,

發生寬依賴稱為Shuffle。

2)當Shuffle數據處理失敗的時候,它重新處理之前的數據。

3)它根據RDD構建DAG(有向無環圖),然后再進一步找出開銷最小的調度方法。將Stage發送給Task Scheduler。

 

4、Task Scheduler

1)保存維護所有的TaskSet。

2)當Executor向Driver發送心跳的時候,TaskScheduler會根據其資源使用情況分配相應Task,如果允許失敗,重試失敗的Task。

5、RDD的運行原理

1)根據Spark內部對象或者Hadoop等外部對象創建RDD。

2)構建DAG。

3)划分為Task,分別在多個節點上執行后匯總。

舉例:第一個字母排序:

sc.textFile("hdfs://names")

.map(name => (name.charAt(0),name))

.groupByKey()

.mapValues(names =>names.toSet.size)

.collect()

假設文件內容為按行的姓名:

Ah                                    (A,Ah)                                (A,(Ah,Anlly)                             [ (A,2),

PPT        ---> map---->      (P,PPT) ----->groupByKey--->(P,(PPT))-------->mapValues--->   (P,1)]

Anlly                                 (A,Anlly)

 

1)創建RDD、最后的collect為動作不會創建RDD,其他的操作都會創建新的RDD。

2)創建DAG,groupBy()會進行依賴多條上一個RDD的數據,所以多划分為一個階段。

如圖:

3)執行任務,每個階段必須等上一個階段執行完成。每個Stage又分成不同的Task執行,每個Task都包含代碼+數據。

假設例子中的names下面有四個文件塊,那么HadoopRDD中的Partitions自動划分為四個分區對應這四塊數據。

就會創建四個Task執行相關任務。

每個Task操作一塊數據再執行,以上例子的簡單模擬:

import org.apache.spark.{SparkConf, SparkContext}

object NameCountCh {
  def main(args: Array[String]) {
    if (args.length < 1) {
      System.err.println("Usage:<File>")
      System.exit(1)
    }
    val conf = new SparkConf().setAppName("NameCountCh")
    val sc = new SparkContext(conf)

    sc.textFile(args(0))
      .map(name => (name.charAt(0), name))
      .groupByKey()
      .mapValues(names => names.toSet.size)
      .collect().foreach(println)

  }
}

 實際執行過程截圖:

執行命令: ./spark-submit --master  spark://xxxx:7077 --class NameCountCh --executor-memory 512m --total-executor-cores 2  /data/spark/miaohq/scalaTestApp/scalatest4.jar  hdfs://spark29:9000/home/miaohq/testName.txt

1、啟動一個HTTP端口:

 

2、按照提交的文件將文件放到這個Web服務器上

3、創建程序生成兩個Executor

4、DAG調度

完成第一stage:

調度第二stage:

完成第二個stage輸出結果:

疑惑:

1、小文件看不出來文件分區的過程,另外設置了幾個執行核,就會有幾個Executor,如果超過總數可能要多線程了??

2、為什么一個stage是兩個task,按照原理應該是文件分為幾個partition就幾個task,目前測試文件很小,只能分1個partition,也不是和Executor相關的,

設置了3個執行核心仍然只是兩個task?

3、為什么從mapValues划分第二個stage不應該是 groupByKey()???

 6、Standalone架構下Spark的執行

1、standalone是Spark實現的資源調度框架,有:Client節點、Master節點、Worker節點。

2、Driver即可運行在Master節點,也可以運行在本地的Client端。

 用spark-shell交互工具提交Spark的job的時候,運行在Master節點;

 用spark-submit 提交或者用sparkConf.setManager("Spark://master:7077")是運行在Client端。

3、運行在Client端的執行過程如下:

說明:

1)sparkContext連接到Master,注冊並申請資源(cpu 和內存)

2)Master根據申請信息和Worker心跳報告決定在哪個主機上分配資源,然后獲取資源,啟動StandaloneExecutorBackend。

3)StandaloneExecutorBackend向sparkContext注冊。

4)sparkContext發送代碼給StandaloneExecutorBackend且根據代碼,構建DAG。

遇到Action動作會生成一個Job,然后根據Job內部根據RDD依賴關系生成多個Stage,Stage提交給TaskScheduler,

5)StandaloneExecutorBackend在匯報狀態時候獲取Task信息調用Executor多線程執行task,且向sparkContext匯報,

直到任務完成。

6)所有Task完成后,SparkContext向Master注銷,釋放資源。

 說明:

文章中圖片和內容來自:http://www.cnblogs.com/shishanyuan


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM