spark術語 --------------- 1.RDD 彈性分布式數據集 , 輕量級數據集合。 內部含有5方面屬性: a.分區列表 b.計算函數 c.依賴列表 e.分區類(KV) f.首選位置 創建RDD方式) a.textFile b.makeRDD/parallelize() c.rdd變換 2.Stage 對RDD鏈條的划分,按照shuffle動作。 ShuffleMapStage ResultStage Stage和RDD關聯,ResultStage和最后一個RDD關聯。************************** 創建stage時按照shuffle依賴進行創建,shffleDep含有指向父RDD的 引用,因此之前的階段就是ShuffleMapStage,而ShuffleMapStage所關聯的 RDD就是本階段的最后一個RDD。 總結: 每個Stage關聯的RDD都是最后一個RDD. 3.依賴 Dependency, 子rdd的每個分區和父RDD的分區集合之間的對應關系。 創建RDD時創建的依賴。 NarrowDependency OneToOne Range prune ShuffleDependency 4.分區 RDD內分區列表,分區對應的數據的切片。 textFile(,n) ; 5.Task 任務,具體執行的單元。 每個分區對應一個任務。 任務內部含有對應分區和廣播變量(串行的rdd和依賴) ShuffleMapTask ResultTask 6. master的local模式 ----------------- local local[4] local[*] local[4,5] local-cluster[1,2,3] //1:N , 2:內核數,3:內存數 spark://... Standalone提交job流程 --------------------- 首選創建SparkContext,陸續在client創建三個調度框架(dag + task + backend), 啟動task調度器,進而啟動后台調度器,由后調度器創建AppClient對象,AppClient 啟動后創建ClientEndpoint,該終端發送"RegisterApplication"消息給master, master接受消息后完成應用的注冊,回傳App注冊完成消息給ClientEndPoint,然后master 開始調度資源,向worker發送啟動Driver和startExecutor消息,隨后Worker上分別啟動Driver 和執行器,driver也是在Executor進程中運行。 執行rdd時,由后台調度器發送消息給DriverEndpoint,driver終端再向executor發送LaunchTask的消息, 各worker節點上執行器接受命令,通過Executor啟動任務。 CliendEndpoint DriverEndpoint --------------------------------------------- SubmitDriverResponse StatusUpdate KillDriverResponse ReviveOffers RegisteredApplication KillTask RegisterExecutor StopDriver StopExecutors RemoveExecutor RetrieveSparkAppConfig spark job的部署模式 -------------------- [測試代碼] import java.lang.management.ManagementFactory import java.net.InetAddress import org.apache.spark.rdd.RDD import org.apache.spark.{SparkConf, SparkContext} import scala.tools.nsc.io.Socket /** * Created by Administrator on 2018/5/8. */ object WCAppScala { def sendInfo(msg: String) = { //獲取ip val ip = InetAddress.getLocalHost.getHostAddress //得到pid val rr = ManagementFactory.getRuntimeMXBean(); val pid = rr.getName().split("@")(0);//pid //線程 val tname = Thread.currentThread().getName //對象id val oid = this.toString; val sock = new java.net.Socket("s101", 8888) val out = sock.getOutputStream val m = ip + "\t:" + pid + "\t:" + tname + "\t:" + oid + "\t:" + msg + "\r\n" out.write(m.getBytes) out.flush() out.close() } def main(args: Array[String]): Unit = { //1.創建spark配置對象 val conf = new SparkConf() conf.setAppName("wcApp") conf.setMaster("spark://s101:7077") sendInfo("before new sc! ") ; //2.創建spark上下文件對象 val sc = new SparkContext(conf) sendInfo("after new sc! ") ; //3.加載文件 val rdd1 = sc.textFile("hdfs://mycluster/user/centos/1.txt" , 3) sendInfo("load file! "); //4.壓扁 val rdd2 = rdd1.flatMap(line=>{ sendInfo("flatMap : " + line); line.split(" ") }) //5.標1成對 val rdd3 = rdd2.map(w => { sendInfo("map : " + w) (w, 1)}) //6.化簡 val rdd4 = rdd3.reduceByKey((a,b)=>{ sendInfo("reduceByKey() : " + a + "/" + b) a + b }) //收集數據 val arr = rdd4.collect() arr.foreach(println) } } 1.client driver端運行在client主機上.默認模式。 spark-submit --class WCAppScala --maste spark://s101:7077 --deploy-mode client 2.cluster driver運行在一台worker上。 上傳jar包到hdfs。 hdfs dfs -put myspark.jar . spark-submit --class WCAppScala --maste spark://s101:7077 --deploy-mode client Spark資源管理 -------------------- 1.Spark涉及的進程 Master //spark守護進程, daemon Worker //spark守護進程, daemon //core + memory,指worker可支配的資源。 Driver // BackExecutor // 2.配置spark支配資源 [spark/conf/spark-env.sh] # Options read when launching programs locally with # ./bin/run-example or ./bin/spark-submit # - HADOOP_CONF_DIR, to point Spark towards Hadoop configuration files # - SPARK_LOCAL_IP, to set the IP address Spark binds to on this node # - SPARK_PUBLIC_DNS, to set the public dns name of the driver program # - SPARK_CLASSPATH, default classpath entries to append # Options read by executors and drivers running inside the cluster # - SPARK_LOCAL_IP, to set the IP address Spark binds to on this node # - SPARK_PUBLIC_DNS, to set the public DNS name of the driver program # - SPARK_CLASSPATH, default classpath entries to append # - SPARK_LOCAL_DIRS, storage directories to use on this node for shuffle and RDD data # - MESOS_NATIVE_JAVA_LIBRARY, to point to your libmesos.so if you use Mesos # YARN模式讀取的選線 # - HADOOP_CONF_DIR, to point Spark towards Hadoop configuration files # - SPARK_EXECUTOR_INSTANCES, Number of executors to start (Default: 2) # - SPARK_EXECUTOR_CORES, Number of cores for the executors (Default: 1). # - SPARK_EXECUTOR_MEMORY, Memory per Executor (e.g. 1000M, 2G) (Default: 1G) # - SPARK_DRIVER_MEMORY, Memory for Driver (e.g. 1000M, 2G) (Default: 1G) ################################################################### ####################獨立模式的守護進程配置######################### ################################################################### # spark master綁定ip,0000 # - SPARK_MASTER_HOST, to bind the master to a different IP address or hostname #master rpc端口,默認7077 # - SPARK_MASTER_PORT #master webui端口 ,默認8080 #SPARK_MASTER_WEBUI_PORT, to use non-default ports for the master # - SPARK_MASTER_OPTS, to set config properties only for the master (e.g. "-Dx=y") #worker支配的內核數,默認所有可用內核。 # - SPARK_WORKER_CORES, to set the number of cores to use on this machine #worker內存, 默認1g # - SPARK_WORKER_MEMORY, to set how much total memory workers have to give executors (e.g. 1000m, 2g) # - SPARK_WORKER_PORT / SPARK_WORKER_WEBUI_PORT, to use non-default ports for the worker #設置每個節點worker進程數,默認1 # - SPARK_WORKER_INSTANCES, to set the number of worker processes per node # # - SPARK_WORKER_DIR, to set the working directory of worker processes # - SPARK_WORKER_OPTS, to set config properties only for the worker (e.g. "-Dx=y") #分配給master、worker以及歷史服務器本身的內存,默認1g.(最大對空間) # - SPARK_DAEMON_MEMORY, to allocate to the master, worker and history server themselves (default: 1g). # - SPARK_HISTORY_OPTS, to set config properties only for the history server (e.g. "-Dx=y") # - SPARK_SHUFFLE_OPTS, to set config properties only for the external shuffle service (e.g. "-Dx=y") # - SPARK_DAEMON_JAVA_OPTS, to set config properties for all daemons (e.g. "-Dx=y") # - SPARK_PUBLIC_DNS, to set the public dns name of the master or workers ################################################################### ####################獨立模式的守護進程配置######################### ################################################################### # Generic options for the daemons used in the standalone deploy mode # - SPARK_CONF_DIR Alternate conf dir. (Default: ${SPARK_HOME}/conf) # - SPARK_LOG_DIR Where log files are stored. (Default: ${SPARK_HOME}/logs) # - SPARK_PID_DIR Where the pid file is stored. (Default: /tmp) # - SPARK_IDENT_STRING A string representing this instance of spark. (Default: $USER) # - SPARK_NICENESS The scheduling priority for daemons. (Default: 0) # - SPARK_NO_DAEMONIZE Run the proposed command in the foreground. It will not output a PID file. 3.job提交時,為job制定資源配置。 spark-submit //設置driver內存數,默認1g --driver-memory MEM //每個執行器內存數,默認1g --executor-memory MEM //Only : standalone + cluster,Driver使用的內核總數。 --driver-cores NUM //standalone | mesos , 指定job使用的內核總數 --total-executor-cores NUM //standalone | yarn , job每個執行器內核數。 --executor-cores NUM //YARN-only , --driver-cores NUM //驅動器內核總數 --num-executors NUM //啟動的執行器個數 //內存比較統一 內核配置 集群模式 部署模式 參數 -------------------------------------------------- standalone | cluster | --driver-cores NUM |-------------------------------------- | --total-executor-cores NUM //總執行器內核 | --executor-cores NUM // --------------------------------------------------- yarn |--executor-cores NUM |--driver-cores NUM |--num-executors NUM
