spark-job提交原理和資源配置


spark術語
---------------
    1.RDD
        彈性分布式數據集 , 輕量級數據集合。
        內部含有5方面屬性:
        a.分區列表
        b.計算函數
        c.依賴列表
        e.分區類(KV)
        f.首選位置
        
        創建RDD方式)
        a.textFile
        b.makeRDD/parallelize()
        c.rdd變換

    2.Stage
        對RDD鏈條的划分,按照shuffle動作。
        ShuffleMapStage
        ResultStage

        Stage和RDD關聯,ResultStage和最后一個RDD關聯。**************************
        創建stage時按照shuffle依賴進行創建,shffleDep含有指向父RDD的
        引用,因此之前的階段就是ShuffleMapStage,而ShuffleMapStage所關聯的
        RDD就是本階段的最后一個RDD。
        總結:
        每個Stage關聯的RDD都是最后一個RDD.


    3.依賴
        Dependency,
        子rdd的每個分區和父RDD的分區集合之間的對應關系。
        創建RDD時創建的依賴。

        NarrowDependency
            OneToOne
            Range
            prune

        ShuffleDependency
            
    4.分區
        RDD內分區列表,分區對應的數據的切片。
        textFile(,n) ;
        
    5.Task
        任務,具體執行的單元。
        每個分區對應一個任務。
        任務內部含有對應分區和廣播變量(串行的rdd和依賴)
        ShuffleMapTask
        ResultTask

    6.


master的local模式
-----------------
    local
    local[4]
    local[*]
    local[4,5]
    local-cluster[1,2,3]            //1:N , 2:內核數,3:內存數
    spark://...

Standalone提交job流程
---------------------
    首選創建SparkContext,陸續在client創建三個調度框架(dag + task + backend),
    啟動task調度器,進而啟動后台調度器,由后調度器創建AppClient對象,AppClient
    啟動后創建ClientEndpoint,該終端發送"RegisterApplication"消息給master,
    master接受消息后完成應用的注冊,回傳App注冊完成消息給ClientEndPoint,然后master
    開始調度資源,向worker發送啟動Driver和startExecutor消息,隨后Worker上分別啟動Driver
    和執行器,driver也是在Executor進程中運行。


    執行rdd時,由后台調度器發送消息給DriverEndpoint,driver終端再向executor發送LaunchTask的消息,
    各worker節點上執行器接受命令,通過Executor啟動任務。


    CliendEndpoint                DriverEndpoint
---------------------------------------------
    SubmitDriverResponse        StatusUpdate
    KillDriverResponse            ReviveOffers
    RegisteredApplication        KillTask
                                RegisterExecutor
                                StopDriver
                                StopExecutors
                                RemoveExecutor
                                RetrieveSparkAppConfig

spark job的部署模式
--------------------
    [測試代碼]
    import java.lang.management.ManagementFactory
    import java.net.InetAddress

    import org.apache.spark.rdd.RDD
    import org.apache.spark.{SparkConf, SparkContext}

    import scala.tools.nsc.io.Socket

    /**
      * Created by Administrator on 2018/5/8.
      */
    object WCAppScala {

        def sendInfo(msg: String) = {
            //獲取ip
            val ip = InetAddress.getLocalHost.getHostAddress

            //得到pid
            val rr = ManagementFactory.getRuntimeMXBean();
            val pid = rr.getName().split("@")(0);//pid

            //線程
            val tname = Thread.currentThread().getName

            //對象id
            val oid = this.toString;

            val sock = new java.net.Socket("s101", 8888)

            val out = sock.getOutputStream

            val m = ip + "\t:" + pid + "\t:" + tname + "\t:" + oid + "\t:" + msg + "\r\n"
            out.write(m.getBytes)
            out.flush()
            out.close()
        }

        def main(args: Array[String]): Unit = {
            //1.創建spark配置對象
            val conf = new SparkConf()
            conf.setAppName("wcApp")
            conf.setMaster("spark://s101:7077")



            sendInfo("before new sc! ") ;
            //2.創建spark上下文件對象
            val sc = new SparkContext(conf)
            sendInfo("after new sc! ") ;

            //3.加載文件

            val rdd1 = sc.textFile("hdfs://mycluster/user/centos/1.txt" , 3)

            sendInfo("load file! ");
            //4.壓扁
            val rdd2 = rdd1.flatMap(line=>{
                sendInfo("flatMap : " + line);
                line.split(" ")
            })

            //5.標1成對
            val rdd3 = rdd2.map(w => {
                sendInfo("map : " + w)
                (w, 1)})

            //6.化簡
            val rdd4 = rdd3.reduceByKey((a,b)=>{
                sendInfo("reduceByKey() : " + a + "/" + b)
                a + b
            })

            //收集數據
            val arr = rdd4.collect()
            arr.foreach(println)
        }
    }


    1.client
        driver端運行在client主機上.默認模式。
        spark-submit --class WCAppScala --maste spark://s101:7077 --deploy-mode client

    2.cluster
        driver運行在一台worker上。
        上傳jar包到hdfs。
        hdfs dfs -put myspark.jar .

        spark-submit --class WCAppScala --maste spark://s101:7077 --deploy-mode client



Spark資源管理
--------------------
    1.Spark涉及的進程
        Master                //spark守護進程, daemon
        Worker                //spark守護進程, daemon
                            //core + memory,指worker可支配的資源。

        Driver                //
        BackExecutor        //
    
    2.配置spark支配資源
        [spark/conf/spark-env.sh]
        # Options read when launching programs locally with
        # ./bin/run-example or ./bin/spark-submit
        # - HADOOP_CONF_DIR, to point Spark towards Hadoop configuration files
        # - SPARK_LOCAL_IP, to set the IP address Spark binds to on this node
        # - SPARK_PUBLIC_DNS, to set the public dns name of the driver program
        # - SPARK_CLASSPATH, default classpath entries to append

        # Options read by executors and drivers running inside the cluster
        # - SPARK_LOCAL_IP, to set the IP address Spark binds to on this node
        # - SPARK_PUBLIC_DNS, to set the public DNS name of the driver program
        # - SPARK_CLASSPATH, default classpath entries to append
        # - SPARK_LOCAL_DIRS, storage directories to use on this node for shuffle and RDD data
        # - MESOS_NATIVE_JAVA_LIBRARY, to point to your libmesos.so if you use Mesos

        # YARN模式讀取的選線
        # - HADOOP_CONF_DIR, to point Spark towards Hadoop configuration files
        # - SPARK_EXECUTOR_INSTANCES, Number of executors to start (Default: 2)
        # - SPARK_EXECUTOR_CORES, Number of cores for the executors (Default: 1).
        # - SPARK_EXECUTOR_MEMORY, Memory per Executor (e.g. 1000M, 2G) (Default: 1G)
        # - SPARK_DRIVER_MEMORY, Memory for Driver (e.g. 1000M, 2G) (Default: 1G)

        ###################################################################
        ####################獨立模式的守護進程配置#########################
        ###################################################################

        # spark master綁定ip,0000
        # - SPARK_MASTER_HOST, to bind the master to a different IP address or hostname

        #master rpc端口,默認7077
        # - SPARK_MASTER_PORT 
        
        #master webui端口 ,默認8080
        #SPARK_MASTER_WEBUI_PORT, to use non-default ports for the master
        # - SPARK_MASTER_OPTS, to set config properties only for the master (e.g. "-Dx=y")

        #worker支配的內核數,默認所有可用內核。
        # - SPARK_WORKER_CORES, to set the number of cores to use on this machine

        #worker內存, 默認1g
        # - SPARK_WORKER_MEMORY, to set how much total memory workers have to give executors (e.g. 1000m, 2g)
        # - SPARK_WORKER_PORT / SPARK_WORKER_WEBUI_PORT, to use non-default ports for the worker

        #設置每個節點worker進程數,默認1
        # - SPARK_WORKER_INSTANCES, to set the number of worker processes per node

        #
        # - SPARK_WORKER_DIR, to set the working directory of worker processes
        # - SPARK_WORKER_OPTS, to set config properties only for the worker (e.g. "-Dx=y")

        #分配給master、worker以及歷史服務器本身的內存,默認1g.(最大對空間)
        # - SPARK_DAEMON_MEMORY, to allocate to the master, worker and history server themselves (default: 1g).
        # - SPARK_HISTORY_OPTS, to set config properties only for the history server (e.g. "-Dx=y")
        # - SPARK_SHUFFLE_OPTS, to set config properties only for the external shuffle service (e.g. "-Dx=y")
        # - SPARK_DAEMON_JAVA_OPTS, to set config properties for all daemons (e.g. "-Dx=y")
        # - SPARK_PUBLIC_DNS, to set the public dns name of the master or workers

        ###################################################################
        ####################獨立模式的守護進程配置#########################
        ###################################################################
        # Generic options for the daemons used in the standalone deploy mode
        # - SPARK_CONF_DIR      Alternate conf dir. (Default: ${SPARK_HOME}/conf)
        # - SPARK_LOG_DIR       Where log files are stored.  (Default: ${SPARK_HOME}/logs)
        # - SPARK_PID_DIR       Where the pid file is stored. (Default: /tmp)
        # - SPARK_IDENT_STRING  A string representing this instance of spark. (Default: $USER)
        # - SPARK_NICENESS      The scheduling priority for daemons. (Default: 0)
        # - SPARK_NO_DAEMONIZE  Run the proposed command in the foreground. It will not output a PID file.

    3.job提交時,為job制定資源配置。
        spark-submit 
        
        //設置driver內存數,默認1g
        --driver-memory MEM

        //每個執行器內存數,默認1g
        --executor-memory MEM


        //Only : standalone + cluster,Driver使用的內核總數。
        --driver-cores NUM 

        //standalone | mesos , 指定job使用的內核總數
        --total-executor-cores NUM
        
        //standalone | yarn , job每個執行器內核數。
        --executor-cores NUM

        //YARN-only , 
        --driver-cores NUM                //驅動器內核總數
        --num-executors NUM                //啟動的執行器個數

        //內存比較統一

        內核配置

        集群模式        部署模式        參數
        --------------------------------------------------
        standalone    |    cluster        |    --driver-cores NUM
                    |--------------------------------------
                    |    --total-executor-cores NUM            //總執行器內核
                    |    --executor-cores NUM                //
        ---------------------------------------------------
        yarn        |--executor-cores NUM
                    |--driver-cores NUM
                    |--num-executors NUM

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM