大數據Hadoop之——計算引擎Spark


一、概述

Apache Spark 是專為大規模數據處理而設計的快速通用的計算引擎。Spark是UC Berkeley AMP lab (加州大學伯克利分校的AMP實驗室)所開源的類Hadoop MapReduce的通用並行框架,Spark,擁有Hadoop MapReduce所具有的優點;但不同於MapReduce的是——Job中間輸出結果可以保存在內存中,從而不再需要讀寫HDFS,因此Spark能更好地適用於數據挖掘與機器學習等需要迭代的MapReduce的算法。官方地址

1)Spark特點

  • 高效性:不同於MapReduce將中間計算結果放入磁盤中,Spark采用內存存儲中間計算結果,減少了迭代運算的磁盤IO,並通過並行計算DAG圖的優化,減少了不同任務之間的依賴,降低了延遲等待時間。內存計算下,Spark 比 MapReduce 快100倍。
  • 通用性:Spark提供了統一的解決方案。Spark可以用於批處理、交互式查詢(Spark SQL)、實時流處理(Spark Streaming)、機器學習(Spark MLlib)和圖計算(GraphX)。
  • 易用性:不同於MapReduce僅支持Map和Reduce兩種編程算子,Spark提供了超過80種不同的Transformation和Action算子,如map,reduce,filter,groupByKey,sortByKey,foreach等,並且采用函數式編程風格,實現相同的功能需要的代碼量極大縮小。
  • 兼容性:Spark能夠跟很多開源工程兼容使用。如Spark可以使用Hadoop的YARN和Apache Mesos作為它的資源管理和調度器,並且Spark可以讀取多種數據源,如HDFS、HBase、MySQL等。
  • 容錯性高:Spark引進了彈性分布式數據集RDD (Resilient Distributed Dataset) 的抽象,它是分布在一組節點中的只讀對象集合,這些集合是彈性的,如果數據集一部分丟失,則可以根據“血統”(即充許基於數據衍生過程)對它們進行重建。另外在RDD計算時可以通過CheckPoint來實現容錯,而CheckPoint有兩種方式:CheckPoint Data,和Logging The Updates,用戶可以控制采用哪種方式來實現容錯。
  • 適用場景廣泛:大數據分析統計,實時數據處理,圖計算及機器學習。

2)Spark適用場景

  • 復雜的批量處理(Batch Data Processing),偏重點在於處理海量數據的能力,至於處理速度可忍受,通常的時間可能是在數十分鍾到數小時。
  • 基於歷史數據的交互式查詢(Interactive Query),通常的時間在數十秒到數十分鍾之間。
  • 基於實時數據流的數據處理(Streaming Data Processing),通常在數百毫秒到數秒之間。

二、Spark核心組件

  • Spark Core:包含Spark的基本功能;尤其是定義RDD的API、操作以及這兩者上的動作。其他Spark的庫都是構建在RDD和Spark Core之上的。
  • Spark SQL:提供通過Apache Hive的SQL變體Hive查詢語言(HiveQL)與Spark進行交互的API。每個數據庫表被當做一個RDD,Spark SQL查詢被轉換為Spark操作。Spark提供的sql形式的對接Hive、JDBC、HBase等各種數據渠道的API,用Java開發人員的思想來講就是面向接口、解耦合,ORMapping、Spring Cloud Stream等都是類似的思想。
  • Spark Streaming:基於SparkCore實現的可擴展、高吞吐、高可靠性的實時數據流處理。支持從Kafka、Flume等數據源處理后存儲到HDFS、DataBase、Dashboard中。對實時數據流進行處理和控制。Spark Streaming允許程序能夠像普通RDD一樣處理實時數據。
  • MLlib:一個常用機器學習算法庫,算法被實現為對RDD的Spark操作。這個庫包含可擴展的學習算法,比如分類、回歸等需要對大量數據集進行迭代的操作。

三、Spark專業術語詳解

1)Application:Spark應用程序

指的是用戶編寫的Spark應用程序,包含了Driver功能代碼和分布在集群中多個節點上運行的Executor代碼。Spark應用程序,由一個或多個作業JOB組成,如下圖所示:

2)Driver:驅動程序

Spark中的Driver即運行上述Application的Main()函數並且創建SparkContext,其中創建SparkContext的目的是為了准備Spark應用程序的運行環境。在Spark中由SparkContext負責和ClusterManager通信,進行資源的申請、任務的分配和監控等;當Executor部分運行完畢后,Driver負責將SparkContext關閉。通常SparkContext代表Driver,如下圖所示:

3)Cluster Manager:資源管理器

指的是在集群上獲取資源的外部服務,常用的有:StandaloneSpark原生的資源管理器,由Master負責資源的分配;Haddop Yarn,由Yarn中的ResearchManager負責資源的分配;Messos,由Messos中的Messos Master負責資源管理。

4)Executor:執行器

Application運行在Worker節點上的一個進程,該進程負責運行Task,並且負責將數據存在內存或者磁盤上,每個Application都有各自獨立的一批Executor,如下圖所示:

5)Worker:計算節點

集群中任何可以運行Application代碼的節點,類似於Yarn中的NodeManager節點。在Standalone模式中指的就是通過Slave文件配置的Worker節點,在Spark on Yarn模式中指的就是NodeManager節點,在Spark on Messos模式中指的就是Messos Slave節點,如下圖所示:

6)RDD:彈性分布式數據集

Resillient Distributed Dataset,Spark的基本計算單元,可以通過一系列算子進行操作(主要有Transformation和Action操作),如下圖所示:

7)窄依賴

父RDD每一個分區最多被一個子RDD的分區所用;表現為一個父RDD的分區對應於一個子RDD的分區,或兩個父RDD的分區對應於一個子RDD 的分區。如圖所示:

8)寬依賴

父RDD的每個分區都可能被多個子RDD分區所使用,子RDD分區通常對應所有的父RDD分區。如圖所示:

  • 常見的窄依賴有:map、filter、union、mapPartitions、mapValues、join(父RDD是hash-partitioned :如果JoinAPI之前被調用的RDD API是寬依賴(存在shuffle), 而且兩個join的RDD的分區數量一致,join結果的rdd分區數量也一樣,這個時候join api是窄依賴)。
  • 常見的寬依賴有groupByKey、partitionBy、reduceByKey、join(父RDD不是hash-partitioned :除此之外的,rdd 的join api是寬依賴)。

9)DAG:有向無環圖

Directed Acycle graph,反應RDD之間的依賴關系,如圖所示:

10)DAGScheduler:有向無環圖調度器

基於DAG划分Stage 並以TaskSet的形勢提交Stage給TaskScheduler;負責將作業拆分成不同階段的具有依賴關系的多批任務;最重要的任務之一就是:計算作業和任務的依賴關系,制定調度邏輯。在SparkContext初始化的過程中被實例化,一個SparkContext對應創建一個DAGScheduler。如圖所示:

11)TaskScheduler:任務調度器

將Taskset提交給worker(集群)運行並回報結果;負責每個具體任務的實際物理調度。如圖所示:

12)Job:作業

由一個或多個調度階段所組成的一次計算作業;包含多個Task組成的並行計算,往往由Spark Action催生,一個JOB包含多個RDD及作用於相應RDD上的各種Operation。如圖所示:

13)Stage:調度階段

一個任務集對應的調度階段;每個Job會被拆分很多組Task,每組任務被稱為Stage,也可稱TaskSet,一個作業分為多個階段;Stage分成兩種類型ShuffleMapStage、ResultStage。如圖所示:

14)TaskSet:任務集

由一組關聯的,但相互之間沒有Shuffle依賴關系的任務所組成的任務集。如圖所示:

15)Task:任務

被送到某個Executor上的工作任務;單個分區數據集上的最小處理流程單元。如圖所示:

總體如圖所示:

四、Spark運行基本流程

計算流程:

七,Spark支持的資源管理器

Spark與資源管理器無關,只要能夠獲取executor進程,並能保持相互通信就可以了,Spark支持資源管理器包含: Standalone(Spark)、On Mesos、On YARN、Or On K8S,當然還有local模式。

模式 含義
local 在本地運行,只有一個工作進程,無並行計算能力
local[K] 在本地運行,有 K 個工作進程,通常設置 K 為機器的CPU 核心數量
local[*] 在本地運行,工作進程數量等於機器的 CPU 核心數量。
spark://HOST:PORT 以 Standalone 模式運行,這是 Spark 自身提供的集群運行模式,默認端口號: 7077
mesos://HOST:PORT 在 Mesos 集群上運行,Driver 進程和 Worker 進程運行在 Mesos 集群上,部署模式必須使用固定值:--deploy-mode cluster
yarn 在yarn集群上運行,依賴於hadoop集群,yarn資源調度框架,將應用提交給yarn,在ApplactionMaster(相當於Stand alone模式中的Master)中運行driver,在集群上調度資源,開啟excutor執行任務。
k8s 在k8s集群上運行

七、Spark環境搭建(Spark on Yarn)

1)下載

Spark下載地址:http://spark.apache.org/downloads.html

這里需要注意版本,我的hadoop版本是3.3.1,這里spark就下載最新版本的3.2.0,而Spark3.2.0依賴的Scala的2.13,所以后面用到Scala編程時注意Scala的版本。

$ cd /opt/bigdata/hadoop/software
# 下載
$ wget https://dlcdn.apache.org/spark/spark-3.2.0/spark-3.2.0-bin-hadoop3.2.tgz
# 解壓
$ tar -zxvf spark-3.2.0-bin-hadoop3.2.tgz -C /opt/bigdata/hadoop/server/

2)修改配置文件

# 進入spark配置目錄
$ cd /opt/bigdata/hadoop/server/spark-3.2.0-bin-hadoop3.2/conf
# copy 一個模板配置
$ cp spark-env.sh.template spark-env.sh

在spark-env.sh下加入如下配置

# Hadoop 的配置文件目錄
export HADOOP_CONF_DIR=$HADOOP_HOME/etc/hadoop
# YARN 的配置文件目錄
export YARN_CONF_DIR=$HADOOP_HOME/etc/hadoop
# SPARK 的目錄
export SPARK_HOME=/opt/bigdata/hadoop/server/spark-3.2.0-bin-hadoop3.2
# SPARK 執行文件目錄
export PATH=$SPARK_HOME/bin:$PATH

復制/opt/bigdata/hadoop/server/spark-3.2.0-bin-hadoop3.2 到其它節點

$ scp -r /opt/bigdata/hadoop/server/spark-3.2.0-bin-hadoop3.2 hadoop-node2:/opt/bigdata/hadoop/server/
$ scp -r /opt/bigdata/hadoop/server/spark-3.2.0-bin-hadoop3.2 hadoop-node3:/opt/bigdata/hadoop/server/

3)配置環境變量

在/etc/profile文件中追加如下內容:

export SPARK_HOME=/opt/bigdata/hadoop/server/spark-3.2.0-bin-hadoop3.2
export PATH=$SPARK_HOME/bin:$PATH

source 加載生效

$ source /etc/profile

4)運行SparkPi(圓周率) 測試驗證

spark-submit 詳細參數說明

參數名 參數說明
--master master 的地址,提交任務到哪里執行,例如 spark://host:port, yarn, local
--deploy-mode 在本地 (client) 啟動 driver 或在 cluster 上啟動,默認是 client
--class 應用程序的主類,僅針對 java 或 scala 應用
--name 應用程序的名稱
--jars 用逗號分隔的本地 jar 包,設置后,這些 jar 將包含在 driver 和 executor 的 classpath 下
--packages 包含在driver 和executor 的 classpath 中的 jar 的 maven 坐標
--exclude-packages 為了避免沖突 而指定不包含的 package
--repositories 遠程 repository
--conf PROP=VALUE 指定 spark 配置屬性的值, 例如 -conf spark.executor.extraJavaOptions="-XX:MaxPermSize=256m"
--properties-file 加載的配置文件,默認為 conf/spark-defaults.conf
--driver-memory Driver內存,默認 1G
--driver-java-options 傳給 driver 的額外的 Java 選項
--driver-library-path 傳給 driver 的額外的庫路徑
--driver-class-path 傳給 driver 的額外的類路徑
--driver-cores Driver 的核數,默認是1。在 yarn 或者 standalone 下使用
--executor-memory 每個 executor 的內存,默認是1G
--total-executor-cores 所有 executor 總共的核數。僅僅在 mesos 或者 standalone 下使用
--num-executors 啟動的 executor 數量。默認為2。在 yarn 下使用
--executor-core 每個 executor 的核數。在yarn或者standalone下使用
spark-submit \
--class org.apache.spark.examples.SparkPi \
--master yarn \
--deploy-mode cluster \
--driver-memory 1G \
--num-executors 3 \
--executor-memory 1G \
--executor-cores 1 \
/opt/bigdata/hadoop/server/spark-3.2.0-bin-hadoop3.2/examples/jars/spark-examples_2.12-3.2.0.jar 100

如果看到控制台出現這個,說明運行成功。

查看yarn任務

查看任務日志


【注意】默認情況下,Hadoop歷史服務historyserver是沒有啟動的,我們可以通過下面的命令來啟動Hadoop歷史服務器。查看日志依賴於historyserver服務

#啟動JobHistoryServer服務
$ mapred --daemon start historyserver
#查看進程
$ jps
#停止JobHistoryServer服務
$ mapred --daemon stop historyserver


至此已經完成的Spark on Yarn 的環境搭建,並通過測試SparkPi的運行成功了。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM