大數據技術之_19_Spark學習_01_Spark 基礎解析 + Spark 概述 + Spark 集群安裝 + 執行 Spark 程序


第1章 Spark 概述1.1 什么是 Spark1.2 Spark 特點1.3 Spark 的用戶和用途第2章 Spark 集群安裝2.1 集群角色2.2 機器准備2.3 下載 Spark 安裝包2.4 配置 Spark Standalone 模式2.5 配置 Spark History Server2.6 配置 Spark HA2.7 配置 Spark Yarn 模式第3章 執行 Spark 程序3.1 執行第一個 spark 程序3.2 Spark 應用提交3.3 Spark shell3.3.1 啟動 Spark shell3.3.2 在 Spark shell 中編寫 WordCount 程序3.4 在 IDEA 中編寫 WordCount 程序3.5 在 IDEA 中本地調試 WordCount 程序3.6 在 IDEA 中遠程調試 WordCount 程序3.7 Spark 核心概念


第1章 Spark 概述

1.1 什么是 Spark

  官網:http://spark.apache.org
  


  Spark 的產生背景
  
  Spark 是一種快速、通用、可擴展的大數據分析引擎,2009 年誕生於加州大學伯克利分校 AMPLab,2010 年開源,2013 年 6 月成為 Apache 孵化項目,2014 年 2 月成為 Apache 頂級項目。項目是用 Scala 進行編寫。
  目前,Spark生態系統已經發展成為一個包含多個子項目的集合,其中包含 SparkSQL、Spark Streaming、GraphX、MLib、SparkR 等子項目,Spark 是基於內存計算的大數據並行計算框架。除了擴展了廣泛使用的 MapReduce 計算模型,而且高效地支持更多計算模式,包括交互式查詢和流處理。Spark 適用於各種各樣原先需要多種不同的分布式平台的場景,包括批處理、迭代算法、交互式查詢、流處理。通過 在一個統一的框架下支持這些不同的計算,Spark 使我們可以簡單而低耗地把各種處理流程整合在一起。而這樣的組合,在實際的數據分析過程中是很有意義的。不僅如此,Spark 的這種特性還大大減輕了原先需要對各種平台分別管理的負擔。
  大一統的軟件棧,各個組件關系密切並且可以相互調用,這種設計有幾個好處:
  1、軟件棧中所有的程序庫和高級組件都可以從下層的改進中獲益。
  2、運行整個軟件棧的代價變小了。不需要運行 5 到 10 套獨立的軟件系統了,一個機構只需要運行一套軟件系統即可。系統的部署、維護、測試、支持等大大縮減。
  3、能夠構建出無縫整合不同處理模型的應用。

 

  Spark 的內置項目如下:
  


   Spark Core:實現了 Spark 的基本功能,包含任務調度、內存管理、錯誤恢復、與存儲系統交互等模塊。Spark Core 中還包含了對彈性分布式數據集(resilient distributed dataset,簡稱RDD)的 API 定義。
   Spark SQL:是 Spark 用來操作結構化數據的程序包。通過 Spark SQL,我們可以使用 SQL 或者 Apache Hive 版本的 SQL 方言(HQL)來查詢數據。Spark SQL 支持多種數據源,比 如 Hive 表、Parquet 以及 JSON 等。
   Spark Streaming:是 Spark 提供的對實時數據進行流式計算的組件。提供了用來操作數據流的 API,並且與 Spark Core 中的 RDD API 高度對應。
   Spark MLlib:提供常見的機器學習(ML)功能的程序庫。包括分類、回歸、聚類、協同過濾等,還提供了模型評估、數據導入等額外的支持功能。
   集群管理器:Spark 設計為可以高效地在一個計算節點到數千個計算節點之間伸縮計算。為了實現這樣的要求,同時獲得最大靈活性,Spark 支持在各種集群管理器(cluster manager)上運行,包括 Hadoop YARN、Apache Mesos,以及 Spark 自帶的一個簡易調度器,叫作獨立調度器。
  Spark 得到了眾多大數據公司的支持,這些公司包括 Hortonworks、IBM、Intel、Cloudera、MapR、Pivotal、百度、阿里、騰訊、京東、攜程、優酷土豆。當前百度的 Spark 已應用於鳳巢、大搜索、直達號、百度大數據等業務;阿里利用 GraphX 構建了大規模的圖計算和圖挖掘系統,實現了很多生產系統的推薦算法;騰訊 Spark 集群達到 8000 台的規模,是當前已知的世界上最大的 Spark 集群。

 

1.2 Spark 特點


  • 與 Hadoop 的 MapReduce 相比,Spark 基於內存的運算要快 100 倍以上,基於硬盤的運算也要快 10 倍以上。Spark 實現了高效的 DAG 執行引擎,可以通過基於內存來高效處理數據流。計算的中間結果是存在於內存中的。

  • 易用
    Spark 支持 Java、Python、R 和 Scala 的 API,還支持超過 80 種高級算法,使用戶可以快速構建不同的應用。而且 Spark 支持交互式的 Python、R 和 Scala 的 shell,可以非常方便地在這些 shell 中使用 Spark 集群來驗證解決問題的方法。

  • 通用
    Spark 提供了統一的解決方案。Spark 可以用於批處理、交互式查詢(Spark SQL)、實時流處理(Spark Streaming)、機器學習(Spark MLlib)和圖計算(GraphX)。這些不同類型的處理都可以在同一個應用中無縫使用。Spark 統一的解決方案非常具有吸引力,畢竟任何公司都想用統一的平台去處理遇到的問題,減少開發和維護的人力成本和部署平台的物力成本。

  • 兼容性
    Spark 可以非常方便地與其他的開源產品進行融合。比如,Spark 可以使用 Hadoop 的 YARN 和 Apache Mesos 作為它的資源管理和調度器器,並且可以處理所有 Hadoop 支持的數據,包括 HDFS、HBase 和 Cassandra 等。這對於已經部署 Hadoop 集群的用戶特別重要,因為不需要做任何數據遷移就可以使用 Spark 的強大處理能力。Spark 也可以不依賴於第三方的資源管理和調度器,它實現了 Standalone 作為其內置的資源管理和調度框架,這樣進一步降低了 Spark 的使用門檻,使得所有人都可以非常容易地部署和使用 Spark。此外,Spark 還提供了在 EC2 上部署 Standalone 的 Spark 集群的工具。

1.3 Spark 的用戶和用途

  我們大致把 Spark 的用例分為兩類:數據科學應用和數據處理應用。也就對應的有兩種人群:數據科學家和工程師。
  數據科學任務
  主要是數據分析領域,數據科學家要負責分析數據並建模,具備 SQL、統計、預測建模(機器學習)等方面的經驗,以及一定的使用 Python、Matlab 或 R 語言進行編程的能力。
  數據處理應用
  工程師定義為使用 Spark 開發生產環境中的數據處理應用的軟件開發者,通過對接 Spark 的 API 實現對處理的處理和轉換等任務。

第2章 Spark 集群安裝

2.1 集群角色


  從物理部署層面上來看,Spark 主要分為兩種類型的節點,Master 節點和 Worker 節點,Master 節點主要運行集群管理器的中心化部分,所承載的作用是分配 Application 到 Worker 節點,維護 Worker 節點 的 Driver、Application 的狀態。Worker 節點負責具體的業務運行。
  從 Spark 程序運行的層面來看,Spark 主要分為驅動器節點和執行器節點。

 

2.2 機器准備

  准備兩台以上 Linux 服務器,安裝好 JDK1.8。

2.3 下載 Spark 安裝包


Step0、使用下載命令
wget 下載地址

Step1、上傳 spark-2.1.1-bin-hadoop2.7.tgz 安裝包到 Linux 對應的目錄上,本人是上傳至 /opt/software 目錄下
Step2、解壓安裝包到指定位置

tar -zxf /opt/software/spark-2.1.1-bin-hadoop2.7.tgz -C /opt/module

如下圖所示:

2.4 配置 Spark Standalone 模式

  Spark 的部署模式有Local、Local-Cluster、Standalone、Yarn、Mesos,我們選擇最具代表性的 Standalone 集群部署模式。

Step1、進入到 Spark 安裝目錄中的配置目錄 conf

cd /opt/module/spark-2.1.1-bin-hadoop2.7/conf

如下圖所示:


Step2、將 slaves.template 復制為 slaves
Step3、將 spark-env.sh.template 復制為 spark-env.sh

Step4、修改 slaves 文件,將 Worker 的 hostname 輸入:

Step5、修改 spark-env.sh 文件,添加如下配置:
SPARK_MASTER_HOST=hadoop102
SPARK_MASTER_PORT=7077

Step6、將配置好的 Spark 文件拷貝到其他節點上 或者 使用配置分發的腳本
scp -r /opt/module/spark-2.1.1-bin-hadoop2.7/ atguigu@hadoop103:/opt/module/
scp -r /opt/module/spark-2.1.1-bin-hadoop2.7/ atguigu@hadoop104:/opt/module/

或者
xsync /opt/module/spark-2.1.1-bin-hadoop2.7/

Step7、Spark 集群配置完畢,目前是 1 個 Master,2 個 Work,hadoop102 上啟動 Spark 集群

$ /opt/module/spark-2.1.1-bin-hadoop2.7/sbin/start-all.sh

如下圖所示:


啟動后執行 jps 命令,主節點上有 Master 進程,其他子節點上有 Worker 進行
登錄 Spark 管理界面查看集群狀態(主節點):http://hadoop102:8080/ 或者 http://192.168.25.102:8080/

到此為止,Spark 集群安裝完畢。

問題1:如果遇到 “JAVA_HOME not set” 異常,如下圖所示:


解決方案:可以在 sbin 目錄下的 spark-config.sh 文件中加入如下配置,然后配置分發到其他機器:
export JAVA_HOME=/opt/module/jdk1.8.0_144

如下圖所示:

問題2:如果遇到 Hadoop HDFS 的寫入權限異常:

org.apache.hadoop.security.AccessControlException

解決方案: 在 hdfs-site.xml 中添加如下配置,關閉權限驗證,然后配置分發到其他機器:

    <property>
        <name>dfs.permissions</name>
        <value>false</value>
    </property>  

2.5 配置 Spark History Server

Step1、進入到 Spark 安裝目錄

cd /opt/module/spark-2.1.1-bin-hadoop2.7/conf

Step2、將 spark-default.conf.template 復制為 spark-default.conf

$ cp spark-defaults.conf.template spark-defaults.conf

Step3、修改 spark-default.conf 文件,開啟 Log:

spark.master                     spark://hadoop102:7077
spark.eventLog.enabled           true
spark.eventLog.dir               hdfs://hadoop102:9000/directory

如下圖所示:


Step4、修改 spark-env.sh 文件,添加如下配置:
export SPARK_HISTORY_OPTS="-Dspark.history.ui.port=4000
-Dspark.history.retainedApplications=3
-Dspark.history.fs.logDirectory=hdfs://hadoop102:9000/directory"

如下圖所示:


Step5、啟動 HDFS 集群,在 HDFS 上創建好你所指定的 eventLog 日志目錄。
$ /opt/module/hadoop-2.7.2/bin/hdfs dfs -mkdir -p /directory

參數描述:

spark.eventLog.dir      Application 在運行過程中所有的信息均記錄在該屬性指定的路徑下

spark.history.ui.port=4000      調整 WEBUI 訪問的端口號為 4000
spark.history.retainedApplications=3        指定保存 Application 歷史記錄的個數,如果超過這個值,舊的應用程序信息將被刪除,這個是內存中的應用數,而不是頁面上顯示的應用數
spark.history.fs.logDirectory=hdfs://hadoop102:9000/directory       配置了該屬性后,在 start-history-server.sh 時就無需再顯式的指定路徑,Spark History Server 頁面只展示該指定路徑下的信息

Step6、將配置好的 Spark 文件拷貝到其他節點上或者配置分發。
Step7、重啟 Spark 集群。

$ /opt/module/spark-2.1.1-bin-hadoop2.7/sbin/start-all.sh

Step8、啟動后執行歷史服務器。

$ /opt/module/spark-2.1.1-bin-hadoop2.7/sbin/start-history-server.sh

網頁上查看


到此為止,Spark History Server 安裝完畢。

2.6 配置 Spark HA

集群部署完了,但是有一個很大的問題,那就是 Master 節點存在單點故障,要解決此問題,就要借助 zookeeper,並且啟動至少兩個 Master 節點來實現高可靠,配置方式比較簡單:


Step1、Spark 集群規划:hadoop102,hadoop103 是 Master;hadoop103,hadoop104 是 Worker。
Step2、安裝配置 Zookeeper 集群,並啟動 Zookeeper 集群。
Step3、停止 spark 所有服務,在 hadoop102 節點上修改配置文件 spark-env.sh,在該配置文件中刪掉 SPARK_MASTER_IP(即 SPARK_MASTER_HOST) 並添加如下配置:
export SPARK_DAEMON_JAVA_OPTS="
-Dspark.deploy.recoveryMode=ZOOKEEPER
-Dspark.deploy.zookeeper.url=hadoop102:2181,hadoop103:2181,hadoop104:2181
-Dspark.deploy.zookeeper.dir=/spark"

如下圖所示:

Step4、在 hadoop102 節點上修改 slaves 配置文件內容指定 worker 節點。

hadoop103
hadoop104

Step5、將配置文件同步到所有節點。
Step6、在 hadoop102 上執行 sbin/start-all.sh 腳本,啟動集群並啟動第一個 master 節點,然后在 hadoop103 上執行 sbin/start-master.sh 啟動第二個 master 節點。
Step7、程序中 spark 集群的訪問地址需要改成:

--master spark://hadoop102:7077,hadoop103:7077

我們干掉 hadoop102 上的 Master 進程,然后再次執行 WordCount 程序,看是否能夠執行成功:


由上圖可知,程序依舊可以運行。
同理:我們再干掉 hadoop103 上的 Master 進程,然后再次執行 WordCount 程序,看是否能夠執行成功,經過測試,程序依舊可以執行成功,到此為止,Spark 的高可用完成!

Step8、我們想知道 Zookeeper 中保存了什么?

[atguigu@hadoop102 zookeeper-3.4.10]$ pwd
/opt/module/zookeeper-3.4.10
[atguigu@hadoop102 zookeeper-3.4.10]$ bin/zkCli.sh -server hadoop102:2181,hadoop103:2181,hadoop104:2181
Connecting to hadoop102:2181,hadoop103:2181,hadoop104:2181
......
......
[zk: hadoop102:2181,hadoop103:2181,hadoop104:2181(CONNECTED) 0] ls /spark
[leader_election, master_status]
[zk: hadoop102:2181,hadoop103:2181,hadoop104:2181(CONNECTED) 1] get /spark/master_status
192.168.25.102
cZxid = 0x4000000059
ctime = Mon Apr 22 10:10:11 CST 2019
mZxid = 0x4000000059
mtime = Mon Apr 22 10:10:11 CST 2019
pZxid = 0x4000000063
cversion = 3
dataVersion = 0
aclVersion = 0
ephemeralOwner = 0x0
dataLength = 14
numChildren = 3
[zk: hadoop102:2181,hadoop103:2181,hadoop104:2181(CONNECTED) 2] 

2.7 配置 Spark Yarn 模式

Step1、修改 hadoop 配置下的 /opt/module/hadoop-2.7.2/etc/hadoop/yarn-site.xml 文件,然后分發到其他節點。
yarn-site.xml

<?xml version="1.0"?>
<configuration>
    <!-- Reducer獲取數據的方式 -->
    <property>
        <name>yarn.nodemanager.aux-services</name>
        <value>mapreduce_shuffle</value>
    </property>

    <!-- 指定YARN的ResourceManager的地址 -->
    <property>
        <name>yarn.resourcemanager.hostname</name>
        <value>hadoop103</value>
    </property>

    <!-- 日志聚集功能使能 -->
    <property>
        <name>yarn.log-aggregation-enable</name>
        <value>true</value>
    </property>

    <!-- 日志保留時間設置7天 -->
    <property>
        <name>yarn.log-aggregation.retain-seconds</name>
        <value>604800</value>
    </property>

    <!-- 任務歷史服務器 -->
    <property>
        <name>yarn.log.server.url</name>
        <value>http://hadoop102:19888/jobhistory/logs/</value>
    </property>

    <!-- 指定yarn在啟動的時候的內存大小 -->
    <property>
        <name>yarn.scheduler.maximum-allocation-mb</name>
        <value>2048</value>
    </property>
    <property>
        <name>yarn.scheduler.minimum-allocation-mb</name>
        <value>2048</value>
    </property>
    <property>
        <name>yarn.nodemanager.vmem-pmem-ratio</name>
        <value>2.1</value>
    </property>
    <property>
        <name>mapred.child.java.opts</name>
        <value>-Xmx1024m</value>
    </property>

    <!--是否啟動一個線程檢查每個任務正使用的物理內存量,如果任務超出分配值,則直接將其殺掉,默認是 true,實際開發中設置成 true,學習階段設置成 false -->
    <property>
        <name>yarn.nodemanager.pmem-check-enabled</name>
        <value>false</value>
    </property>
    <!--是否啟動一個線程檢查每個任務正使用的虛擬內存量,如果任務超出分配值,則直接將其殺掉,默認是 true,實際開發中設置成 true,學習階段設置成 false -->
    <property>
        <name>yarn.nodemanager.vmem-check-enabled</name>
        <value>false</value>
    </property>
</configuration>

Step2、修改 /opt/module/spark-2.1.1-bin-hadoop2.7/conf/spark-env.sh,添加以下內容,然后分發到其他節點。
spark-env.sh

# 讓 spark 能夠發現 hadoop 的配置文件
HADOOP_CONF_DIR=/opt/module/hadoop-2.7.2/etc/hadoop
YARN_CONF_DIR=/opt/module/hadoop-2.7.2/etc/hadoop

如下圖所示:

Step3、提交應用進行測即可

$ /opt/module/spark-2.1.1-bin-hadoop2.7/bin/spark-submit \
--class com.atguigu.sparkdemo.WordCountDemo \
--master yarn \
--deploy-mode client \
--executor-memory 1G \
--total-executor-cores 2 \
/opt/software/sparkdemo-1.0-SNAPSHOT-jar-with-dependencies.jar \
hdfs://hadoop102:9000/RELEASE \
hdfs://hadoop102:9000/out

或者

$ /opt/module/spark-2.1.1-bin-hadoop2.7/bin/spark-submit \
--class com.atguigu.sparkdemo.WordCountDemo \
--master yarn-client \
--executor-memory 1G \
--total-executor-cores 2 \
/opt/software/sparkdemo-1.0-SNAPSHOT-jar-with-dependencies.jar \
hdfs://hadoop102:9000/RELEASE \
hdfs://hadoop102:9000/out

第3章 執行 Spark 程序

3.1 執行第一個 spark 程序

$ /opt/module/spark-2.1.1-bin-hadoop2.7/bin/spark-submit \
--class org.apache.spark.examples.SparkPi \
--master spark://hadoop102:7077 \
--executor-memory 1G \
--total-executor-cores 2 \
/opt
/module/spark-2.1.1-bin-hadoop2.7/examples/jars/spark-examples_2.11-2.1.1.jar \
100

參數說明:

--master spark://hadoop102:7077     指定 Master 的地址
--executor-memory 1G                指定每個 executor 可用內存為 1G
--total-executor-cores 2            指定每個 executor 使用的 cup 核數為 2 個

該算法是利用蒙特·卡羅算法求 PI,結果如下圖:


網頁上查看 History Server

3.2 Spark 應用提交

一旦打包好,就可以使用 bin/spark-submit 腳本啟動應用了。 這個腳本負責設置 spark 使用的 classpath 和依賴,支持不同類型的集群管理器和發布模式:

$ /opt/module/spark-2.1.1-bin-hadoop2.7/bin/spark-submit \
--class <main-class> \
--master <master-url> \
--deploy-mode <deploy-mode> \
--conf <key>=<value> \
... # other options
<application-jar> \
[application-arguments]

一些常用選項:

1--class: 你的應用的啟動類 (如 org.apache.spark.examples.SparkPi)。
2--master: 集群的 master URL (如 spark://192.168.25.102:7077)。
3--deploy-mode: 是否發布你的驅動到 Worker 節點(cluster) 或者作為一個本地客戶端 client)(默認是 client)。
4--conf: 任意的 Spark 配置屬性, 格式 key=value,如果值包含空格,可以加引號 "key=value",缺省的 Spark 配置。
5) application-jar: 打包好的應用 jar,包含依賴,這個 URL 在集群中全局可見。 比如 hdfs://共享存儲系統, 如果是 file://path, 那么所有的節點的 path 都包含同樣的 jar。
6) application-arguments: 傳給 main() 方法的參數。

--master 后面的 URL 可以是以下格式:


查看 Spark-submit 全部參數:

3.3 Spark shell

  spark-shell 是 Spark 自帶的交互式 Shell 程序,方便用戶進行交互式編程,用戶可以在該命令行下用 scala 編寫 spark 程序。

3.3.1 啟動 Spark shell

啟動 spark shell 時沒有指定 master 地址

$ /opt/module/spark-2.1.1-bin-hadoop2.7/bin/spark-shell

啟動 spark shell 時指定 master 地址

$ /opt/module/spark-2.1.1-bin-hadoop2.7/bin/spark-shell \
--master spark://hadoop102:7077 \
--executor-memory 2G \
--total-executor-cores 2

注意1:如果啟動 spark shell 時沒有指定 master 地址,但是也可以正常啟動 spark shell 和執行 spark shell 中的程序,其實是啟動了 spark 的 cluster 模式,如果 spark 是單節點,並且沒有指定 slave 文件,這個時候如果打開 spark-shell 默認是 local 模式。
  Local 模式是 master 和 worker 在同同一進程內。
  Cluster 模式是 master 和 worker 在不同進程內。
注意2:Spark Shell 中已經默認將 SparkContext 類初始化為對象 sc。用戶代碼如果需要用到,則直接應用 sc 即可。

3.3.2 在 Spark shell 中編寫 WordCount 程序

Step1、首先啟動 HDFS,在 HDFS 上創建一個 /RELEASE 目錄

$ /opt/module/hadoop-2.7.2/bin/hdfs dfs -mkdir -p /RELEASE

Step2、將 Spark 目錄下的 RELEASE 文件上傳一個文件到:hdfs://hadoop102:9000/RELEASE 上

$ /opt/module/hadoop-2.7.2/bin/hdfs dfs -put /opt/module/spark-2.1.1-bin-hadoop2.7/RELEASE /RELEASE

如下圖所示:


Step3、在 Spark shell 中用 scala 語言編寫 spark 程序
scala> sc.textFile("hdfs://hadoop102:9000/RELEASE/RELEASE").flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_).saveAsTextFile("hdfs://hadoop102:9000/out")

如下圖所示:


Step4、使用 hdfs 命令查看結果
$ /opt/module/hadoop-2.7.2/bin/hdfs dfs -cat hdfs://hadoop102:9000/out/p*

如下圖所示:


說明:
sc 是 SparkContext 對象,該對象是提交 spark 程序的入口。
textFile(hdfs://hadoop102:9000/RELEASE/RELEASE)     是 hdfs 中讀取數據
flatMap(_.split(" "))   先 map 在壓平
map((_,1))              將單詞和1構成元組
reduceByKey(_+_)        按照 key 進行 reduce,並將 value 累加
saveAsTextFile("hdfs://hadoop102:9000/out")         將結果寫入到 hdfs 中

如下圖所示:

3.4 在 IDEA 中編寫 WordCount 程序

spark shell 僅在測試和驗證我們的程序時使用的較多,在生產環境中,通常會在 IDE 中編制程序,然后打成 jar 包,然后提交到集群,最常用的是創建一個 Maven 項目,利用 Maven 來管理 jar 包的依賴。
Step1、創建一個項目
Step2、選擇 Maven 項目,然后點擊 next
Step3、填寫 maven 的 GAV,然后點擊 next
Step4、填寫項目名稱,然后點擊 finish
Step5、創建好 maven 項目后,點擊 Enable Auto-Import
Step6、配置 Maven 的 pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">

    <modelVersion>4.0.0</modelVersion>

    <groupId>com.atguigu</groupId>
    <artifactId>sparkdemo</artifactId>
    <version>1.0-SNAPSHOT</version>

    <properties>
        <scala.version>2.11.8</scala.version>
        <spark.version>2.1.1</spark.version>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.scala-lang</groupId>
            <artifactId>scala-library</artifactId>
            <version>${scala.version}</version>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-core_2.11</artifactId>
            <version>${spark.version}</version>
            <scope>provided</scope>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>3.6.1</version>
                <configuration>
                    <source>1.8</source>
                    <target>1.8</target>
                </configuration>
            </plugin>
            <plugin>
                <groupId>net.alchim31.maven</groupId>
                <artifactId>scala-maven-plugin</artifactId>
                <version>3.2.2</version>
                <executions>
                    <execution>
                        <goals>
                            <goal>compile</goal>
                            <goal>testCompile</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-assembly-plugin</artifactId>
                <version>3.0.0</version>
                <executions>
                    <execution>
                        <id>make-assembly</id>
                        <phase>package</phase>
                        <goals>
                            <goal>single</goal>
                        </goals>
                    </execution>
                </executions>
                <configuration>
                    <archive>
                        <manifest>
                            <mainClass>com.atguigu.sparkdemo.WordCountDemo</mainClass>
                        </manifest>
                    </archive>
                    <descriptorRefs>
                        <descriptorRef>jar-with-dependencies</descriptorRef>
                    </descriptorRefs>
                </configuration>
            </plugin>
        </plugins>
    </build>
</project>

Step7、將 src/main/scala 設置成源代碼目錄。
Step8、添加 IDEA Scala(執行此操作后,pom 文件中不用添加 scala 依賴,因為已經以 lib 庫的方式加入)


選擇要添加的模塊

Step9、新建一個 Scala class,類型為 Object

Step10、編寫 spark 程序
示例代碼如下:
package com.atguigu.sparkdemo

import org.apache.spark.{SparkConf, SparkContext}
import org.slf4j.LoggerFactory

object WordCountDemo {
  val logger = LoggerFactory.getLogger(WordCountDemo.getClass)

  def main(args: Array[String]): Unit = {
    // 創建 SparkConf() 並設置 App 名稱
    val sparkConf = new SparkConf().setAppName("WC")
    // 創建 SparkContext,該對象是提交 Spark App 的入口
    val sc = new SparkContext(sparkConf)
    // 使用 sc 創建 RDD 並執行相應的 transformation 和 action
    sc.textFile(args(0)).flatMap(_.split(" ")).map((_, 1)).reduceByKey(_ + _, 1).sortBy(_._2, false).saveAsTextFile(args(1))
    // 停止 sc,結束該任務
    logger.info("complete!")
    sc.stop()
  }
}

Step11、使用 Maven 打包:首先修改 pom.xml 中的 main class


Step12、點擊 idea 右側的 Maven Project 選項,點擊 “閃電”圖表,表示跳過測試,然后點擊 Lifecycle,再分別雙擊 clean 和 package

Step13、選擇編譯成功的 jar 包,並將該 jar 上傳到 Spark 集群中的某個節點上

Step14、首先啟動 hdfs 和 Spark 集群
啟動 hdfs
/opt/module/hadoop-2.7.3/sbin/start-dfs.sh

啟動 spark

/opt/module/spark-2.1.1-bin-hadoop2.7/sbin/start-all.sh

Step15、使用 spark-submit 命令提交 Spark 應用(注意參數的順序)

$ /opt/module/spark-2.1.1-bin-hadoop2.7/bin/spark-submit \
--class com.atguigu.sparkdemo.WordCountDemo \
--master spark://hadoop102:7077 \
--executor-memory 1G \
--total-executor-cores 2 \
/opt
/software/sparkdemo-1.0-SNAPSHOT-jar-with-dependencies.jar \
hdfs://hadoop102:9000/RELEASE \
hdfs://hadoop102:9000/out1

Step16、查看程序執行結果

$ /opt/module/hadoop-2.7.2/bin/hdfs dfs -cat hdfs://hadoop102:9000/out1/p*

如下圖所示:


--master 后面跟的參數小結:

傳入參數說明:
hdfs://hadoop102:9000/RELEASE           輸入文件路徑
hdfs://hadoop102:9000/out1              輸出文件路徑

如果在 spark 程序中寫死了這兩處路徑,則這兩個參數就不需要了。

3.5 在 IDEA 中本地調試 WordCount 程序

本地 Spark 程序調試需要使用 local 提交模式,即將本機當做運行環境,Master 和 Worker 都為本機。運行時直接加斷點調試即可。如下:

如果本機操作系統是 windows,如果在程序中使用了 hadoop 相關的東西,比如寫入文件到 HDFS,則會遇到如下異常:


出現這個問題的原因,並不是程序的錯誤。在 windows 下調試 spark 的時候,用到了 hadoop 相關的服務。

解決辦法1:本項目生效,是將一個 hadoop 相關的服務 zip 包(hadoop-common-bin-2.7.3-x64.zip)解壓到任意目錄。
點擊 Run -> Run Configurations
然后在 IDEA 中配置 Run Configuration,添加 HADOOP_HOME 變量即可:

解決辦法2:所有項目生效,windows 系統中配置 hadoop 的環境變量,如下圖所示:

3.6 在 IDEA 中遠程調試 WordCount 程序

通過 IDEA 進行遠程調試,主要是將 IDEA 作為 Driver 來提交應用程序,配置過程如下:
修改 sparkConf,添加最終需要運行的 Jar 包、Driver 程序的地址,並設置 Master 的提交地址:

3.7 Spark 核心概念

  每個 Spark 應用都由一個驅動器程序(driver program)來發起集群上的各種並行操作。驅動器程序包含應用的 main 函數,並且定義了集群上的分布式數據集,還對這些分布式數據集應用了相關操作。
  驅動器程序通過一個 SparkContext 對象來訪問 Spark。這個對象代表對計算集群的一個連接。shell 啟動時已經自動創建了一個 SparkContext 對象,是一個叫作 sc 的變量。
  驅動器程序一般要管理多個執行器(executor)節點。
  


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM