Spark簡介及安裝


轉載自:https://www.cnblogs.com/qingyunzong/p/8886338.html

一:Spark簡介

(一)Spark介紹

spark是用於大規模數據處理統一分析引擎

spark是一個實現快速通用的集群計算平台。它是由加州大學伯克利分校AMP實驗室開發的 通用內存並行計算框架,用來構建 大型的、低延遲的數據分析應用程序。它擴展了廣泛使用的MapReduce計算模型。
高效的支撐更多計算模式,包括交互式查詢和流處理。 spark的一個主要特點是能夠在內存中進行計算,即使依賴磁盤進行復雜的運算,Spark依然比MapReduce更加高效。

(二)Spark組成

Spark組成(BDAS):全稱伯克利數據分析棧,通過大規模集成算法、機器、人之間展現大數據應用的一個平台。也是處理大數據、雲計算、通信的技術解決方案。

主要組件有:

SparkCore

將分布式數據抽象為彈性分布式數據集(RDD),實現了應用任務調度、RPC、序列化和壓縮,並為運行在其上的上層組件提供API。

SparkSQL

Spark Sql 是Spark來操作結構化數據的程序包,可以讓我使用SQL語句的方式來查詢數據,Spark支持 多種數據源,包含Hive表,parquest以及JSON等內容。

SparkStreaming: 

是Spark提供的實時數據進行流式計算的組件。

MLlib

提供常用機器學習算法的實現庫。

GraphX

提供一個分布式圖計算框架,能高效進行圖計算。

BlinkDB

用於在海量數據上進行交互式SQL的近似查詢引擎。

Tachyon

以內存為中心高容錯的的分布式文件系統。

(三)Spark的特點

1.基於內存,所以速度快,但同時也是缺點,因為Spark沒有對內存進行管理,容易OOM(out of memory內存溢出),可以用Java Heap Dump對內存溢出問題進行分析
2.可以使用Scala、Java、Python、R等語言進行開發
3.兼容Hadoop

二:Spark與MapReduce對比

(一)IO輸出對比---mapreduce過程中

1.MapReduce最大的缺點,Shuffle過程中會有很多I/O開銷,可以看到這里有6個地方會產生IO,而Spark只會在1(寫入數據到內存)和6(輸出數據)的地方產生I/O,其他的過程都在內存中進行。

Hadoop基礎---shuffle機制(進一步理解Hadoop機制

(二)中間結果輸出對比---ResouceManager資源分配中

2.基於MapReduce的計算引擎通常會將中間結果輸出到磁盤上,進行存儲和容錯。例如:mapreduce出現往往會產生多個Stage,而這些串聯的Stage又依賴於底層文件系統(如HDFS)來存儲每一個Stage的輸出結果。Spark可以避免中間結果的輸出。

Hadoop基礎---shuffle機制(進一步理解Hadoop機制

(三)Spark是MapReduce的替代方案,兼容Hive、HDFS、融入到Hadoop

三:Spark四大特性

補充:DAG(有向無環圖)調度原理

有向無環圖(Directed Acyclic Graph, DAG)是有向圖的一種,特點是圖中沒有環。常常被用來表示事件之間的驅動依賴關系,管理任務之間的調度。拓撲排序是對DAG的頂點進行排序,使得對每一條有向邊(u, v),均有u(在排序記錄中)比v先出現。亦可理解為對某點v而言,只有當v的所有源點均出現了,v才能出現。
為了 描述一個Job內所有Task相互依賴關系可以將Job中的每個Task對應為一個節點,將一個Job描述為一張有向無環圖DAG
有向無環圖對於構造一個任務必須發生在另一個任務之前的這種 依賴模型特別有效。

(一)高效性---相比較Hadoop,運行速度提高100倍

Apache Spark使用最先進的DAG調度程序,查詢優化程序和物理執行引擎,實現批量和流式數據的高性能。

 (二)易用性---多種語言、算法

Spark支持Java、Python和Scala的API,還支持超過80種高級算法,使用戶可以快速構建不同的應用。而且Spark支持交互式的Python和Scala的shell,可以非常方便地在這些shell中使用Spark集群來驗證解決問題的方法。

(三)通用性---不同應用中

Spark提供了統一的解決方案。Spark可以用於批處理、交互式查詢(Spark SQL)、實時流處理(Spark Streaming)、機器學習(Spark MLlib)和圖計算(GraphX)。這些不同類型的處理都可以在同一個應用中無縫使用。Spark統一的解決方案非常具有吸引力,畢竟任何公司都想用統一的平台去處理遇到的問題,減少開發和維護的人力成本和部署平台的物力成本。

(四)兼容性---其他平台

Spark可以非常方便地與其他的開源產品進行融合。比如,Spark可以使用Hadoop的YARN和Apache Mesos作為它的資源管理和調度器,並且可以處理所有Hadoop支持的數據,包括HDFS、HBase和Cassandra等。這對於已經部署Hadoop集群的用戶特別重要,因為不需要做任何數據遷移就可以使用Spark的強大處理能力。Spark也可以不依賴於第三方的資源管理和調度器,它實現了Standalone作為其內置的資源管理和調度框架,這樣進一步降低了Spark的使用門檻,使得所有人都可以非常容易地部署和使用Spark。此外,Spark還提供了在EC2上部署Standalone的Spark集群的工具。

 

Mesos:Spark可以運行在Mesos里面(Mesos 類似於yarn的一個資源調度框架)

standaloneSpark自己可以給自己分配資源(master,worker)

YARN:Spark可以運行在yarn上面

 Kubernetes:Spark接收 Kubernetes的資源調度

四:Spark應用場景

Yahoo將Spark用在Audience Expansion中的應用,進行點擊預測和即席查詢等
淘寶技術團隊使用了Spark來解決多次迭代的機器學習算法、高計算復雜度的算法等。應用於內容推薦、社區發現等
騰訊大數據精准推薦借助Spark快速迭代的優勢,實現了在“數據實時采集、算法實時訓練、系統實時預測”的全流程實時並行高維算法,最終成功應用於廣點通pCTR投放系統上。
優酷土豆將Spark應用於視頻推薦(圖計算)、廣告業務,主要實現機器學習、圖計算等迭代計算。

五:Spark體系結構 

(一)Spark體系結構圖

Driver Program可以理解為是客戶端,而右邊的可以理解為服務器端。 Cluster Manager是主節點,主節點並不負責真正任務的執行(負責任務調度),任務的執行由Worker Node完成。

(二)Spark體系結構詳細架構圖

1.我們提交一個任務,任務就叫Application
2.初始化程序的入口SparkContext,
  2.1 初始化DAG Scheduler
  2.2 初始化Task Scheduler
3.Task Scheduler向master去進行注冊並申請資源(CPU Core和Memory)
4.Master根據SparkContext的資源申請要求和Worker心跳周期內報告的信息決定在哪個Worker上分配資源,然后在該Worker上獲取資源,然后啟動StandaloneExecutorBackend;順便初
      始化好了一個線程池
5.StandaloneExecutorBackend向Driver(SparkContext)注冊,這樣Driver就知道哪些Executor為他進行服務了。
   到這個時候其實我們的初始化過程基本完成了,我們開始執行transformation的代碼,但是代碼並不會真正的運行,直到我們遇到一個action操作。生產一個job任務,進行stage的划分
6.SparkContext將Applicaiton代碼發送給StandaloneExecutorBackend;
7.並且SparkContext解析Applicaiton代碼,構建DAG圖,並提交給DAG Scheduler分解成Stage(當碰到Action操作        時,就會催生Job;每個Job中含有1個或多個Stage,Stage一般在獲取外部數據和shuffle之前產生)。
8.將Stage(或者稱為TaskSet)提交給Task Scheduler。Task Scheduler負責將Task分配到相應的Worker,最后提交給StandaloneExecutorBackend執行;
9.對task進行序列化,並根據task的分配算法,分配task
10.對接收過來的task進行反序列化,把task封裝成一個線程
11.開始執行Task,並向SparkContext報告,直至Task完成。
12.資源注銷

其他框架下spark運行流程:https://www.cnblogs.com/qingyunzong/p/8945933.html

六: 安裝Spark偽分布式環境

(一)下載Spark

根據Hadoop版本,選取spark版本下載 

http://mirrors.hust.edu.cn/apache/spark/spark-2.3.4/

(二)偽分布式安裝

1.安裝java環境

2.安裝Hadoop環境

Hadoop的安裝(2)---Hadoop配置

3.安裝Scala

https://www.scala-lang.org/download/all.html

解壓,添加環境變量:(修改/etc/profile)文件

4.spark的安裝

tar解壓、添加環境變量

注意:由於Hadoop和Spark的腳本有沖突,設置環境變量的時候,只能設置一個

5.修改spark配置文件

cp spark-env.sh.template spark-env.sh  #先進入spark根目錄下conf目錄中
vi spark-env.sh    修改配置文件,在文件末尾添加
export JAVA_HOME=/home/hadoop/App/jdk1.8.0_241
export SCALA_HOME=/home/hadoop/App/scala-2.11.8
export HADOOP_HOME=/home/hadoop/App/hadoop-2.7.1
export HADOOP_CONF_DIR=/home/hadoop/App/hadoop-2.7.1/etc/hadoop
export SPARK_MASTER_IP=haddopH1
export SPARK_MASTER_PORT=7077

6.spark啟動測試

注意:spark命令,可能和Hadoop沖突,所以使用相對路徑或者絕對路徑進行啟動

7.web訪問

七:Spark HA集群環境安裝

全分布式的部署與偽分布式類似,在每個節點上都解壓壓縮包,修改conf/spark-env.sh(稍微有些不同)。在主節點上的slaves文件中填入從節點的主機名,然后在每個節點上啟動集群即可。

(一)安裝Zookeeper集群---spark Master依賴於zookeeper集群實現HA

(二)Spark安裝

1.除了配置文件,其他同偽分布式一致。

2.修改配置文件 spark-env.sh

export JAVA_HOME=/home/hadoop/App/jdk1.8.0_241
#如果沒有安裝scala,可以不進行配置,照樣可以啟動 export SCALA_HOME
=/home/hadoop/App/scala-2.11.8       export HADOOP_HOME=/home/hadoop/App/hadoop-2.7.1 export HADOOP_CONF_DIR=/home/hadoop/App/hadoop-2.7.1/etc/hadoop export SPARK_WORKER_MEMORY=500m export SPARK_WORKER_CORES=1 export SPARK_DAEMON_JAVA_OPTS="-Dspark.deploy.recoveryMode=ZOOKEEPER -Dspark.deploy.zookeeper.url=hadoopH5:2181,hadoopH6:2181,hadoopH7:2181 -Dspark.deploy.zookeeper.dir=/spark"
-Dspark.deploy.zookeeper.dir是保存spark的元數據,保存了spark的作業運行狀態; 
zookeeper會保存spark集群的所有的狀態信息,包括所有的Workers信息,所有的Applactions信息,所有的Driver信息,如果集群 

3.修改配置文件 slaves

hadoopH5
hadoopH6
hadoopH7

(三)將安裝目錄拷貝到其他節點

拷貝文件目錄到hadoopH2,hadoopH3,hadoopH5,hadoopH6,hadoopH7。

(四)啟動zookeeper集群

(五)啟動HDFS集群

start-dfs.sh

(六)啟動Spark集群

start-all.sh

只啟動了本機Master和slaves下的worker進程

需要手動啟動其他幾個主機中的Master,其他節點都要執行:

(七)啟動驗證

1.web查看Master狀態

2.HA驗證。kill hadoopH1 后,hadoopH2變成alive狀態

八:Spark shell使用

(一)啟動Spark shell

spark-shell --master spark://hadoopH1:7077 --executor-memory 500m --total-executor-cores 1

至少分配450m內存大小,負責可能報錯!!

(二) 參數說明

--master spark://hadoop1:7077 指定Master的地址

--executor-memory 500m:指定每個worker可用內存為500m

--total-executor-cores 1: 指定整個集群使用的cpu核數為1個

(三)重點補充

如果啟動spark shell時沒有指定master地址,但是也可以正常啟動spark shell和執行spark shell中的程序,其實是啟動了spark的local模式,該模式僅在本機啟動一個進程,沒有與集群建立聯系。

Spark Shell中已經默認將SparkContext類初始化為對象sc。用戶代碼如果需要用到,則直接應用sc即可 Spark Shell中已經默認將SparkSQl類初始化為對象spark。用戶代碼如果需要用到,則直接應用spark即可

(四)在spark shell中實現wordcount程序

1.上傳文件到hdfs中

補充:當hadoop無法對hdfs修改時,出現: Name node is in safe mode

hadoop dfsadmin -safemode leave

退出安全模式

2.使用scala語言,編寫spark程序

sc.textFile("/spark/input/c.txt").flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_).saveAsTextFile("/spark/output")

參數說明:

sc是SparkContext對象,該對象是提交spark程序的入口

textFile("/spark/input/c.txt")  從hdfs中讀取數據

flatMap(_.split(" "))   先字符串分割,后map

map((_,1))  將單詞和1構成map

reduceByKey(_+_)按照key進行reduce,並將value累加

saveAsTextFile("/spark/out")將結果寫入到hdfs中

九:Spark在不同框架運行

(一)執行Spark程序on standalone

spark-submit --class org.apache.spark.examples.SparkPi --master spark://hadoopH1:7077 --executor-memory 500m --total-executor-cores 1 ../examples/jars/spark-examples_2.11-2.3.4.jar 100

后面100是指進行100次運算

(二)執行Spark程序on YARN

1.需要我們啟動hdfs、yarn、zookeeper集群

2.啟動Spark on YARN

spark-shell --master yarn --deploy-mode client

3.內存太小導致SparkContext初始化錯誤,問題解決: 

先停止YARN服務,然后修改yarn-site.xml,增加如下內容:

        <property>
                <name>yarn.nodemanager.vmem-check-enabled</name>
                <value>false</value>
                <description>Whether virtual memory limits will be enforced for containers</description>
        </property>
        <property>
                <name>yarn.nodemanager.vmem-pmem-ratio</name>
                <value>4</value>
                <description>Ratio between virtual memory to physical memory when setting memory limits for containers</description>
        </property> 

 

將新的yarn-site.xml文件分發到其他Hadoop節點對應的目錄下,最后在重新啟動YARN。

4. 打開YARN的web界面,查看Spark shell應用程序

5.在spark shell中運行程序

scala> val array = Array(1,2,3,4,5)
array: Array[Int] = Array(1, 2, 3, 4, 5)

scala> val rdd = sc.makeRDD(array)
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at makeRDD at <console>:26

scala> val rdd = sc.makeRDD(array)
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[1] at makeRDD at <console>:26

scala> rdd.count
res0: Long = 5                                                                  

scala> 

6. 使用示例程序求圓周率

spark-submit --class org.apache.spark.examples.SparkPi --master yarn --deploy-mode cluster --driver-memory 500m --executor-memory 500m --executor-cores 1 /home/hadoop/App/spark-2.3.4-bin-hadoop2.7/examples/jars/spark-examples_2.11-2.3.4.jar 10

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM