spark基於不同模式下搭建集群及spark資源請求任務調度,廣播變量和累加器


spark環境搭建

standalone模式搭建

1、上傳解壓,配置環境變量 配置bin目錄 2、修改配置文件 conf mv spark-env.sh.template spark-env.sh添加以下代碼

export SPARK_MASTER_IP=master
export SPARK_MASTER_PORT=7077

export SPARK_WORKER_CORES=2
export SPARK_WORKER_INSTANCES=1
export SPARK_WORKER_MEMORY=2g
export JAVA_HOME=/usr/local/soft/jdk1.8.0_171

master相當於RM worker相當於NM,增加從節點配置

mv slaves.template slaves
node1
node2

3、復制到其它節點

scp -r spark-2.4.5 node1:`pwd`
scp -r spark-2.4.5 node2:`pwd`

4、在主節點執行啟動命令 啟動集群,在master中執行

./sbin/start-all.sh
http://master:8080/  訪問spark ui
  1. standalone client模式 日志在本地輸出,一班用於上線前測試(bin/下執行)

需要進入到spark-examples_2.11-2.4.5.jar 包所在的目錄下執行

cd /usr/local/soft/spark-2.4.5/examples/jars
spark-submit --class org.apache.spark.examples.SparkPi --master spark://master:7077 --executor-memory 512m --total-executor-cores 1 spark-examples_2.11-2.4.5.jar 100
  1. standalone cluster模式 上線使用,不會再本地打印日志

    spark-submit --class org.apache.spark.examples.SparkPi --master spark://master:7077 --driver-memory 512m --deploy-mode cluster --supervise --executor-memory 512M --total-executor-cores 1 spark-examples_2.11-2.4.5.jar 100

spark-shell spark 提供的一個交互式的命令行,可以直接寫代碼

spark-shell --master spark://master:7077

運行自己寫的代碼 1、注釋掉setMaster 2、將項目打包 3、上擦到服務器 4、提交任務 spark-submit --class com.shujia.spark.core.Demo18PI --master spark://master:7077 --executor-memory 512m --total-executor-cores 1 spark-1.0.jar

package sparkcore

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

import scala.util.Random

object Demo18PI {

def main(args: Array[String]): Unit = {

  val conf: SparkConf = new SparkConf().setAppName("PI").setMaster("local")

  val sc = new SparkContext(conf)

  val list: Range = 0 until 10000000

  //構建一個很大的RDD
  val listRDD: RDD[Int] = sc.parallelize(list)

  //模擬生成點
  val pointRDD: RDD[(Double, Double)] = listRDD.map(i => {
    //模擬點
    val x: Double = Random.nextDouble() * 2 - 1
    val y: Double = Random.nextDouble() * 2 - 1
    (x, y)

  })
  //取出園內的點
  val yuanPointRDD: RDD[(Double, Double)] = pointRDD.filter {
    case (x: Double, y: Double) =>
      //計算點到圓心的距離
      x * x + y * y <= 1
  }

  //計算PI值
  val PI: Double = yuanPointRDD.count().toDouble / list.length * 4.0

  println("PI:"+PI)
}

}

整合yarn搭建Spark集群

在公司一般不適用standalone模式,因為公司一般已經有yarn 不需要搞兩個資源管理框架停止spark集群 在spark sbin目錄下執行 ./stop-all.sh

spark整合yarn只需要在一個節點整合, 可以刪除node1 和node2中所有的spark 文件

1、增加hadoop 配置文件地址

vim spark-env.sh
增加
export HADOOP_CONF_DIR=/usr/local/soft/hadoop-2.7.6/etc/hadoop

2、往yarn提交任務需要增加兩個配置 yarn-site.xml(/usr/local/soft/hadoop-2.7.6/etc/hadoop/yarn-site.xml)

先關閉yarn stop-yarn.sh

<property>
<name>yarn.nodemanager.pmem-check-enabled</name>
<value>false</value>
</property>

<property>
<name>yarn.nodemanager.vmem-check-enabled</name>
<value>false</value>
</property>

4、同步到其他節點,重啟yarn

scp -r yarn-site.xml node1:`pwd`
scp -r yarn-site.xml node2:`pwd`

啟動yarn start-yarn.sh

cd /usr/local/soft/spark-2.4.5/examples/jars

3.spark on yarn client模式 日志在本地輸出,一班用於上線前測試

spark-submit --class org.apache.spark.examples.SparkPi --master yarn-client --executor-memory 512M --num-executors 2 spark-examples_2.11-2.4.5.jar 100

4.spark on yarn cluster模式 上線使用,不會再本地打印日志 減少io

spark-submit --class org.apache.spark.examples.SparkPi --master yarn-cluster --executor-memory 512m --num-executors 2 --executor-cores 1 spark-examples_2.11-2.4.5.jar 100

獲取yarn程序執行日志 執行成功之后才能獲取到 yarn logs -applicationId application_1626695992877_0001

image.png

運行自己寫的代碼 1、注釋掉setMaster, 修改輸入輸出路徑 2、將項目打包 3、上擦到服務器 4、提交任務

spark-submit --class com.shujia.spark.core.Demo20Submit --master yarn-cluster --executor-memory 512m --total-executor-cores 1 spark-1.0.jar

殺死yarn 任務 yarn application -kill application_1626660789491_0012

hdfs webui http://node1:50070

yarn ui http://node1:8088

Spark資源申請和任務調度

image.png

Spark運行流程

image.png

1) 任務提交后,都會先啟動 Driver 程序;

2) 隨后 Driver 向集群管理器注冊應用程序;

3) 之后集群管理器根據此任務的配置文件分配 Executor 並啟動;

4) Driver 開始執行 main 函數,Spark 查詢為懶執行,當執行到 Action 算子時開始反向推算,根據寬依賴進行 Stage 的划分,隨后每一個 Stage 對應一個 Taskset,Taskset 中有多個 Task,查找可用資源 Executor 進行調度;

5) 根據本地化原則,Task 會被分發到指定的 Executor 去執行,在任務執行的過程中,Executor 也會不斷與 Driver 進行通信,報告任務運行情況。

累加器

package sparkcore

import java.lang

import org.apache.spark.rdd.RDD
import org.apache.spark.util.LongAccumulator
import org.apache.spark.{SparkConf, SparkContext}

object Demo21Accumulator {
def main(args: Array[String]): Unit = {
  val conf: SparkConf = new SparkConf().setMaster("local").setAppName("Accumulator")

  val sc = new SparkContext(conf)

  val rdd: RDD[Int] = sc.parallelize(List(1,2,3,4,5,6,7,8,9))

  /**
    * 累加器,只能累加
    *
    * 累加器只能在Driver中定義
    * 累加器只能在Executor累加
    * 了假期只能在Driver讀取
    */
  //1、在Driver端定義累加器
  val accumulator: LongAccumulator = sc.longAccumulator

  rdd.foreach(i=>
  //2、在Executor端進行累加
    accumulator.add(i)


  )
  //3、在Driver帶你去累加結果
  val count: lang.Long = accumulator.value

  println(count)

  /**
    * 累加器的使用
    * 如果不使用累加器需要單獨啟動一個job計算總人數
    * 使用了假期,樂嘉計算和班級人數的計算在一起計算出來
    */

  val students: RDD[String] = sc.textFile("data/students.txt")

  //定義累加器
  val studentsNum: LongAccumulator = sc.longAccumulator

  val KvRDD: RDD[(String, Int)] =students.map(stu=>{

    //累加
    studentsNum.add(1)

    val clazz: String = stu.split(",")(4)
    (clazz,1)
  })

  val clazzNumRDD: RDD[(String, Int)] = KvRDD.reduceByKey(_+_)
  //學生總人數
  val stuNum: lang.Long = studentsNum.value

  clazzNumRDD.foreach(println)
}

}

package sparkcore

import org.apache.spark.broadcast.Broadcast
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

object Demo22Broadcast {
 def main(args: Array[String]): Unit = {

   val conf: SparkConf = new SparkConf().setAppName("Broadcast").setMaster("local")

   val sc = new SparkContext(conf)

   val students: RDD[String] = sc.textFile("data/students.txt")

   //   val ids = List("1500100983","1500100911","1500100932","1500100915","1500100961")
   //
   //   val studentfilter: RDD[String] =students.filter(stu=>{
   //     val id: String = stu.split(",")(0)
   //     ids.contains(id)
   //   })
   //   studentfilter.foreach(println)

   /**
     * 廣播變量
     */


   val ids = List("1500100983", "1500100911", "1500100932", "1500100915", "1500100961")

   //1、在Driver端將一個變量廣播出去
   val broId: Broadcast[List[String]] = sc.broadcast(ids)


   val filterRDD: RDD[String] = students.filter(student => {
     val id: String = student.split(",")(0)

     //2、在Executor使用廣播變量
     val value: List[String] = broId.value
     value.contains(id)
  })
   filterRDD.foreach(println)


   /**
     * 廣播變量的應用
     *
     * 實現map join
     * 將小表加載到內存中,在map端進行關聯
     */

   val students1: RDD[String] = sc.textFile("data/students.txt")
   val scores: RDD[String] = sc.textFile("data/score.txt")


   /**
     * collect:將RDD的數據拉取到RDriver端的內存中
     */

   val list: Array[String] = students1.collect()


   val studentMap: Map[String, String] = list.map(stu => {
     val id: String = stu.split(",")(0)
    (id, stu)
  }).toMap

   //將小表廣播
   val broStudentMap: Broadcast[Map[String, String]] = sc.broadcast(studentMap)

   val stucoInfo: RDD[String] =scores.map(sco => {
     val id: String = sco.split(",")(0)

     //讀取廣播變量
     val value: Map[String, String] = broStudentMap.value

     //使用id到學生表的map中獲取學生信息
     val studentInfo: String = value.getOrElse(id, "默認值")

     studentInfo + "\t" + sco
  })
   stucoInfo.foreach(println)

}

}

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM