轉載自:https://www.cnblogs.com/qingyunzong/p/8886338.html
一:Spark簡介
(一)Spark介紹
spark是用於大規模數據處理的統一分析引擎。
(二)Spark組成
Spark組成(BDAS):全稱伯克利數據分析棧,通過大規模集成算法、機器、人之間展現大數據應用的一個平台。也是處理大數據、雲計算、通信的技術解決方案。
主要組件有:
SparkCore:
將分布式數據抽象為彈性分布式數據集(RDD),實現了應用任務調度、RPC、序列化和壓縮,並為運行在其上的上層組件提供API。
SparkSQL:
Spark Sql 是Spark來操作結構化數據的程序包,可以讓我使用SQL語句的方式來查詢數據,Spark支持 多種數據源,包含Hive表,parquest以及JSON等內容。
SparkStreaming:
是Spark提供的實時數據進行流式計算的組件。
MLlib:
提供常用機器學習算法的實現庫。
GraphX:
提供一個分布式圖計算框架,能高效進行圖計算。
BlinkDB:
用於在海量數據上進行交互式SQL的近似查詢引擎。
Tachyon:
以內存為中心高容錯的的分布式文件系統。
(三)Spark的特點
1.基於內存,所以速度快,但同時也是缺點,因為Spark沒有對內存進行管理,容易OOM(out of memory內存溢出),可以用Java Heap Dump對內存溢出問題進行分析
2.可以使用Scala、Java、Python、R等語言進行開發
3.兼容Hadoop
二:Spark與MapReduce對比
(一)IO輸出對比---mapreduce過程中
1.MapReduce最大的缺點,Shuffle過程中會有很多I/O開銷,可以看到這里有6個地方會產生IO,而Spark只會在1(寫入數據到內存)和6(輸出數據)的地方產生I/O,其他的過程都在內存中進行。
Hadoop基礎---shuffle機制(進一步理解Hadoop機制)
(二)中間結果輸出對比---ResouceManager資源分配中
2.基於MapReduce的計算引擎通常會將中間結果輸出到磁盤上,進行存儲和容錯。例如:mapreduce出現往往會產生多個Stage,而這些串聯的Stage又依賴於底層文件系統(如HDFS)來存儲每一個Stage的輸出結果。Spark可以避免中間結果的輸出。
Hadoop基礎---shuffle機制(進一步理解Hadoop機制)
(三)Spark是MapReduce的替代方案,兼容Hive、HDFS、融入到Hadoop
三:Spark四大特性
補充:DAG(有向無環圖)調度原理

有向無環圖對於構造一個任務必須發生在另一個任務之前的這種 依賴模型特別有效。
(一)高效性---相比較Hadoop,運行速度提高100倍
Apache Spark使用最先進的DAG調度程序,查詢優化程序和物理執行引擎,實現批量和流式數據的高性能。
(二)易用性---多種語言、算法
Spark支持Java、Python和Scala的API,還支持超過80種高級算法,使用戶可以快速構建不同的應用。而且Spark支持交互式的Python和Scala的shell,可以非常方便地在這些shell中使用Spark集群來驗證解決問題的方法。
(三)通用性---不同應用中
Spark提供了統一的解決方案。Spark可以用於批處理、交互式查詢(Spark SQL)、實時流處理(Spark Streaming)、機器學習(Spark MLlib)和圖計算(GraphX)。這些不同類型的處理都可以在同一個應用中無縫使用。Spark統一的解決方案非常具有吸引力,畢竟任何公司都想用統一的平台去處理遇到的問題,減少開發和維護的人力成本和部署平台的物力成本。
(四)兼容性---其他平台
Spark可以非常方便地與其他的開源產品進行融合。比如,Spark可以使用Hadoop的YARN和Apache Mesos作為它的資源管理和調度器,並且可以處理所有Hadoop支持的數據,包括HDFS、HBase和Cassandra等。這對於已經部署Hadoop集群的用戶特別重要,因為不需要做任何數據遷移就可以使用Spark的強大處理能力。Spark也可以不依賴於第三方的資源管理和調度器,它實現了Standalone作為其內置的資源管理和調度框架,這樣進一步降低了Spark的使用門檻,使得所有人都可以非常容易地部署和使用Spark。此外,Spark還提供了在EC2上部署Standalone的Spark集群的工具。
Mesos:Spark可以運行在Mesos里面(Mesos 類似於yarn的一個資源調度框架)
standalone:Spark自己可以給自己分配資源(master,worker)
YARN:Spark可以運行在yarn上面
Kubernetes:Spark接收 Kubernetes的資源調度
四:Spark應用場景
騰訊大數據精准推薦借助Spark快速迭代的優勢,實現了在“數據實時采集、算法實時訓練、系統實時預測”的全流程實時並行高維算法,最終成功應用於廣點通pCTR投放系統上。
優酷土豆將Spark應用於視頻推薦(圖計算)、廣告業務,主要實現機器學習、圖計算等迭代計算。
五:Spark體系結構
(一)Spark體系結構圖
Driver Program可以理解為是客戶端,而右邊的可以理解為服務器端。 Cluster Manager是主節點,主節點並不負責真正任務的執行(負責任務調度),任務的執行由Worker Node完成。
(二)Spark體系結構詳細架構圖
1.我們提交一個任務,任務就叫Application 2.初始化程序的入口SparkContext, 2.1 初始化DAG Scheduler 2.2 初始化Task Scheduler 3.Task Scheduler向master去進行注冊並申請資源(CPU Core和Memory) 4.Master根據SparkContext的資源申請要求和Worker心跳周期內報告的信息決定在哪個Worker上分配資源,然后在該Worker上獲取資源,然后啟動StandaloneExecutorBackend;順便初 始化好了一個線程池 5.StandaloneExecutorBackend向Driver(SparkContext)注冊,這樣Driver就知道哪些Executor為他進行服務了。 到這個時候其實我們的初始化過程基本完成了,我們開始執行transformation的代碼,但是代碼並不會真正的運行,直到我們遇到一個action操作。生產一個job任務,進行stage的划分 6.SparkContext將Applicaiton代碼發送給StandaloneExecutorBackend; 7.並且SparkContext解析Applicaiton代碼,構建DAG圖,並提交給DAG Scheduler分解成Stage(當碰到Action操作 時,就會催生Job;每個Job中含有1個或多個Stage,Stage一般在獲取外部數據和shuffle之前產生)。 8.將Stage(或者稱為TaskSet)提交給Task Scheduler。Task Scheduler負責將Task分配到相應的Worker,最后提交給StandaloneExecutorBackend執行; 9.對task進行序列化,並根據task的分配算法,分配task 10.對接收過來的task進行反序列化,把task封裝成一個線程 11.開始執行Task,並向SparkContext報告,直至Task完成。 12.資源注銷
其他框架下spark運行流程:https://www.cnblogs.com/qingyunzong/p/8945933.html
六: 安裝Spark偽分布式環境
(一)下載Spark
根據Hadoop版本,選取spark版本下載
http://mirrors.hust.edu.cn/apache/spark/spark-2.3.4/
(二)偽分布式安裝
1.安裝java環境
2.安裝Hadoop環境
Hadoop的安裝(2)---Hadoop配置
3.安裝Scala
https://www.scala-lang.org/download/all.html
解壓,添加環境變量:(修改/etc/profile)文件
4.spark的安裝
tar解壓、添加環境變量
注意:由於Hadoop和Spark的腳本有沖突,設置環境變量的時候,只能設置一個
5.修改spark配置文件
cp spark-env.sh.template spark-env.sh #先進入spark根目錄下conf目錄中
vi spark-env.sh 修改配置文件,在文件末尾添加
export JAVA_HOME=/home/hadoop/App/jdk1.8.0_241 export SCALA_HOME=/home/hadoop/App/scala-2.11.8 export HADOOP_HOME=/home/hadoop/App/hadoop-2.7.1 export HADOOP_CONF_DIR=/home/hadoop/App/hadoop-2.7.1/etc/hadoop export SPARK_MASTER_IP=haddopH1 export SPARK_MASTER_PORT=7077
6.spark啟動測試
注意:spark命令,可能和Hadoop沖突,所以使用相對路徑或者絕對路徑進行啟動
7.web訪問
七:Spark HA集群環境安裝
全分布式的部署與偽分布式類似,在每個節點上都解壓壓縮包,修改conf/spark-env.sh(稍微有些不同)。在主節點上的slaves文件中填入從節點的主機名,然后在每個節點上啟動集群即可。
(一)安裝Zookeeper集群---spark Master依賴於zookeeper集群實現HA
(二)Spark安裝
1.除了配置文件,其他同偽分布式一致。
2.修改配置文件 spark-env.sh
export JAVA_HOME=/home/hadoop/App/jdk1.8.0_241
#如果沒有安裝scala,可以不進行配置,照樣可以啟動 export SCALA_HOME=/home/hadoop/App/scala-2.11.8 export HADOOP_HOME=/home/hadoop/App/hadoop-2.7.1 export HADOOP_CONF_DIR=/home/hadoop/App/hadoop-2.7.1/etc/hadoop export SPARK_WORKER_MEMORY=500m export SPARK_WORKER_CORES=1 export SPARK_DAEMON_JAVA_OPTS="-Dspark.deploy.recoveryMode=ZOOKEEPER -Dspark.deploy.zookeeper.url=hadoopH5:2181,hadoopH6:2181,hadoopH7:2181 -Dspark.deploy.zookeeper.dir=/spark"
-Dspark.deploy.zookeeper.dir是保存spark的元數據,保存了spark的作業運行狀態;
zookeeper會保存spark集群的所有的狀態信息,包括所有的Workers信息,所有的Applactions信息,所有的Driver信息,如果集群
3.修改配置文件 slaves
hadoopH5
hadoopH6
hadoopH7
(三)將安裝目錄拷貝到其他節點
拷貝文件目錄到hadoopH2,hadoopH3,hadoopH5,hadoopH6,hadoopH7。
(四)啟動zookeeper集群
(五)啟動HDFS集群
start-dfs.sh
(六)啟動Spark集群
start-all.sh
只啟動了本機Master和slaves下的worker進程
需要手動啟動其他幾個主機中的Master,其他節點都要執行:
(七)啟動驗證
1.web查看Master狀態
2.HA驗證。kill hadoopH1 后,hadoopH2變成alive狀態
八:Spark shell使用
(一)啟動Spark shell
spark-shell --master spark://hadoopH1:7077 --executor-memory 500m --total-executor-cores 1
至少分配450m內存大小,負責可能報錯!!
(二) 參數說明
--master spark://hadoop1:7077 指定Master的地址 --executor-memory 500m:指定每個worker可用內存為500m --total-executor-cores 1: 指定整個集群使用的cpu核數為1個
(三)重點補充
如果啟動spark shell時沒有指定master地址,但是也可以正常啟動spark shell和執行spark shell中的程序,其實是啟動了spark的local模式,該模式僅在本機啟動一個進程,沒有與集群建立聯系。
Spark Shell中已經默認將SparkContext類初始化為對象sc。用戶代碼如果需要用到,則直接應用sc即可 Spark Shell中已經默認將SparkSQl類初始化為對象spark。用戶代碼如果需要用到,則直接應用spark即可
(四)在spark shell中實現wordcount程序
1.上傳文件到hdfs中
補充:當hadoop無法對hdfs修改時,出現: Name node is in safe mode
hadoop dfsadmin -safemode leave
退出安全模式
2.使用scala語言,編寫spark程序
sc.textFile("/spark/input/c.txt").flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_).saveAsTextFile("/spark/output")
參數說明:
sc是SparkContext對象,該對象是提交spark程序的入口 textFile("/spark/input/c.txt") 從hdfs中讀取數據 flatMap(_.split(" ")) 先字符串分割,后map map((_,1)) 將單詞和1構成map reduceByKey(_+_)按照key進行reduce,並將value累加 saveAsTextFile("/spark/out")將結果寫入到hdfs中
九:Spark在不同框架運行
(一)執行Spark程序on standalone
spark-submit --class org.apache.spark.examples.SparkPi --master spark://hadoopH1:7077 --executor-memory 500m --total-executor-cores 1 ../examples/jars/spark-examples_2.11-2.3.4.jar 100
后面100是指進行100次運算
(二)執行Spark程序on YARN
1.需要我們啟動hdfs、yarn、zookeeper集群
2.啟動Spark on YARN
spark-shell --master yarn --deploy-mode client
3.內存太小導致SparkContext初始化錯誤,問題解決:
先停止YARN服務,然后修改yarn-site.xml,增加如下內容:
<property> <name>yarn.nodemanager.vmem-check-enabled</name> <value>false</value> <description>Whether virtual memory limits will be enforced for containers</description> </property> <property> <name>yarn.nodemanager.vmem-pmem-ratio</name> <value>4</value> <description>Ratio between virtual memory to physical memory when setting memory limits for containers</description> </property>
將新的yarn-site.xml文件分發到其他Hadoop節點對應的目錄下,最后在重新啟動YARN。
4. 打開YARN的web界面,查看Spark shell應用程序
5.在spark shell中運行程序
scala> val array = Array(1,2,3,4,5) array: Array[Int] = Array(1, 2, 3, 4, 5) scala> val rdd = sc.makeRDD(array) rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at makeRDD at <console>:26 scala> val rdd = sc.makeRDD(array) rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[1] at makeRDD at <console>:26 scala> rdd.count res0: Long = 5 scala>
6. 使用示例程序求圓周率
spark-submit --class org.apache.spark.examples.SparkPi --master yarn --deploy-mode cluster --driver-memory 500m --executor-memory 500m --executor-cores 1 /home/hadoop/App/spark-2.3.4-bin-hadoop2.7/examples/jars/spark-examples_2.11-2.3.4.jar 10