standalone模式搭建
1、上傳解壓,配置環境變量 配置bin目錄 2、修改配置文件 conf mv spark-env.sh.template spark-env.sh添加以下代碼
export SPARK_MASTER_IP=master
export SPARK_MASTER_PORT=7077
export SPARK_WORKER_CORES=2
export SPARK_WORKER_INSTANCES=1
export SPARK_WORKER_MEMORY=2g
export JAVA_HOME=/usr/local/soft/jdk1.8.0_171
master相當於RM worker相當於NM,增加從節點配置
mv slaves.template slaves
node1
node2
3、復制到其它節點
scp -r spark-2.4.5 node1:`pwd`
scp -r spark-2.4.5 node2:`pwd`
4、在主節點執行啟動命令 啟動集群,在master中執行
./sbin/start-all.sh
http://master:8080/ 訪問spark ui
-
standalone client模式 日志在本地輸出,一班用於上線前測試(bin/下執行)
需要進入到spark-examples_2.11-2.4.5.jar 包所在的目錄下執行
cd /usr/local/soft/spark-2.4.5/examples/jars
spark-submit --class org.apache.spark.examples.SparkPi --master spark://master:7077 --executor-memory 512m --total-executor-cores 1 spark-examples_2.11-2.4.5.jar 100
-
standalone cluster模式 上線使用,不會再本地打印日志
spark-submit --class org.apache.spark.examples.SparkPi --master spark://master:7077 --driver-memory 512m --deploy-mode cluster --supervise --executor-memory 512M --total-executor-cores 1 spark-examples_2.11-2.4.5.jar 100
spark-shell spark 提供的一個交互式的命令行,可以直接寫代碼
spark-shell --master spark://master:7077
運行自己寫的代碼 1、注釋掉setMaster 2、將項目打包 3、上擦到服務器 4、提交任務 spark-submit --class com.shujia.spark.core.Demo18PI --master spark://master:7077 --executor-memory 512m --total-executor-cores 1 spark-1.0.jar
package sparkcore
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
import scala.util.Random
object Demo18PI {
def main(args: Array[String]): Unit = {
val conf: SparkConf = new SparkConf().setAppName("PI").setMaster("local")
val sc = new SparkContext(conf)
val list: Range = 0 until 10000000
//構建一個很大的RDD
val listRDD: RDD[Int] = sc.parallelize(list)
//模擬生成點
val pointRDD: RDD[(Double, Double)] = listRDD.map(i => {
//模擬點
val x: Double = Random.nextDouble() * 2 - 1
val y: Double = Random.nextDouble() * 2 - 1
(x, y)
})
//取出園內的點
val yuanPointRDD: RDD[(Double, Double)] = pointRDD.filter {
case (x: Double, y: Double) =>
//計算點到圓心的距離
x * x + y * y <= 1
}
//計算PI值
val PI: Double = yuanPointRDD.count().toDouble / list.length * 4.0
println("PI:"+PI)
}
}
整合yarn搭建Spark集群
在公司一般不適用standalone模式,因為公司一般已經有yarn 不需要搞兩個資源管理框架停止spark集群 在spark sbin目錄下執行 ./stop-all.sh
spark整合yarn只需要在一個節點整合, 可以刪除node1 和node2中所有的spark 文件
1、增加hadoop 配置文件地址
vim spark-env.sh
增加
export HADOOP_CONF_DIR=/usr/local/soft/hadoop-2.7.6/etc/hadoop
2、往yarn提交任務需要增加兩個配置 yarn-site.xml(/usr/local/soft/hadoop-2.7.6/etc/hadoop/yarn-site.xml)
先關閉yarn stop-yarn.sh
<property>
<name>yarn.nodemanager.pmem-check-enabled</name>
<value>false</value>
</property>
<property>
<name>yarn.nodemanager.vmem-check-enabled</name>
<value>false</value>
</property>
4、同步到其他節點,重啟yarn
scp -r yarn-site.xml node1:`pwd`
scp -r yarn-site.xml node2:`pwd`
啟動yarn start-yarn.sh
cd /usr/local/soft/spark-2.4.5/examples/jars
3.spark on yarn client模式 日志在本地輸出,一班用於上線前測試
spark-submit --class org.apache.spark.examples.SparkPi --master yarn-client --executor-memory 512M --num-executors 2 spark-examples_2.11-2.4.5.jar 100
4.spark on yarn cluster模式 上線使用,不會再本地打印日志 減少io
spark-submit --class org.apache.spark.examples.SparkPi --master yarn-cluster --executor-memory 512m --num-executors 2 --executor-cores 1 spark-examples_2.11-2.4.5.jar 100
獲取yarn程序執行日志 執行成功之后才能獲取到 yarn logs -applicationId application_1626695992877_0001
運行自己寫的代碼 1、注釋掉setMaster, 修改輸入輸出路徑 2、將項目打包 3、上擦到服務器 4、提交任務
spark-submit --class com.shujia.spark.core.Demo20Submit --master yarn-cluster --executor-memory 512m --total-executor-cores 1 spark-1.0.jar
殺死yarn 任務 yarn application -kill application_1626660789491_0012
hdfs webui http://node1:50070
yarn ui http://node1:8088
Spark資源申請和任務調度
Spark運行流程
1) 任務提交后,都會先啟動 Driver 程序;
2) 隨后 Driver 向集群管理器注冊應用程序;
3) 之后集群管理器根據此任務的配置文件分配 Executor 並啟動;
4) Driver 開始執行 main 函數,Spark 查詢為懶執行,當執行到 Action 算子時開始反向推算,根據寬依賴進行 Stage 的划分,隨后每一個 Stage 對應一個 Taskset,Taskset 中有多個 Task,查找可用資源 Executor 進行調度;
5) 根據本地化原則,Task 會被分發到指定的 Executor 去執行,在任務執行的過程中,Executor 也會不斷與 Driver 進行通信,報告任務運行情況。
累加器
package sparkcore
import java.lang
import org.apache.spark.rdd.RDD
import org.apache.spark.util.LongAccumulator
import org.apache.spark.{SparkConf, SparkContext}
object Demo21Accumulator {
def main(args: Array[String]): Unit = {
val conf: SparkConf = new SparkConf().setMaster("local").setAppName("Accumulator")
val sc = new SparkContext(conf)
val rdd: RDD[Int] = sc.parallelize(List(1,2,3,4,5,6,7,8,9))
/**
* 累加器,只能累加
*
* 累加器只能在Driver中定義
* 累加器只能在Executor累加
* 了假期只能在Driver讀取
*/
//1、在Driver端定義累加器
val accumulator: LongAccumulator = sc.longAccumulator
rdd.foreach(i=>
//2、在Executor端進行累加
accumulator.add(i)
)
//3、在Driver帶你去累加結果
val count: lang.Long = accumulator.value
println(count)
/**
* 累加器的使用
* 如果不使用累加器需要單獨啟動一個job計算總人數
* 使用了假期,樂嘉計算和班級人數的計算在一起計算出來
*/
val students: RDD[String] = sc.textFile("data/students.txt")
//定義累加器
val studentsNum: LongAccumulator = sc.longAccumulator
val KvRDD: RDD[(String, Int)] =students.map(stu=>{
//累加
studentsNum.add(1)
val clazz: String = stu.split(",")(4)
(clazz,1)
})
val clazzNumRDD: RDD[(String, Int)] = KvRDD.reduceByKey(_+_)
//學生總人數
val stuNum: lang.Long = studentsNum.value
clazzNumRDD.foreach(println)
}
}
package sparkcore
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
object Demo22Broadcast {
def main(args: Array[String]): Unit = {
val conf: SparkConf = new SparkConf().setAppName("Broadcast").setMaster("local")
val sc = new SparkContext(conf)
val students: RDD[String] = sc.textFile("data/students.txt")
// val ids = List("1500100983","1500100911","1500100932","1500100915","1500100961")
//
// val studentfilter: RDD[String] =students.filter(stu=>{
// val id: String = stu.split(",")(0)
// ids.contains(id)
// })
// studentfilter.foreach(println)
/**
* 廣播變量
*/
val ids = List("1500100983", "1500100911", "1500100932", "1500100915", "1500100961")
//1、在Driver端將一個變量廣播出去
val broId: Broadcast[List[String]] = sc.broadcast(ids)
val filterRDD: RDD[String] = students.filter(student => {
val id: String = student.split(",")(0)
//2、在Executor使用廣播變量
val value: List[String] = broId.value
value.contains(id)
})
filterRDD.foreach(println)
/**
* 廣播變量的應用
*
* 實現map join
* 將小表加載到內存中,在map端進行關聯
*/
val students1: RDD[String] = sc.textFile("data/students.txt")
val scores: RDD[String] = sc.textFile("data/score.txt")
/**
* collect:將RDD的數據拉取到RDriver端的內存中
*/
val list: Array[String] = students1.collect()
val studentMap: Map[String, String] = list.map(stu => {
val id: String = stu.split(",")(0)