為開發和調試SPark應用程序設置的完整的開發環境。這里,我們將使用Java,其實SPark還支持使用Scala, Python和R。我們將使用IntelliJ作為IDE,因為我們對於eclipse再熟悉不過了,這里就使用IntelliJ進行練練手,所以我們也將使用Maven作為構建的管理器。在本篇結束時,您將了解如何設置IntelliJ,如何使用Maven管理依賴項,如何將SPark應用程序打包並部署到集群中,以及如何將活動程序連接到調試器。
一、創建一個新的IntelliJ項目
(注意:指令可能因操作系統而異。在本教程中,我們將使用IntelliJ版本:2018.2.1上的MAC OSX High Sierra. )
通過選擇File > New > Project: ,再選擇 Maven > 單擊Next
將項目命名如下:
- GroupId:
Hortonworks
- ArtifactId:
SparkTutorial
- 版本:
1.0-SNAPSHOT
然后單擊下一個繼續。
最后,選擇項目名稱和位置。這些字段應該是自動填充的,所以讓我們保持默認值即可:
IntelliJ應該創建一個具有默認目錄結構的新項目。生成所有文件夾可能需要一兩分鍾。
讓我們分解一下項目結構。
- .IDEA:這些是IntelliJ配置文件。
- SRC:源代碼。大部分代碼應該進入主目錄。應該為測試腳本保留測試文件夾。
- 目標:當您編譯您的項目時,它將位於這里。
- xml:Maven配置文件。我們將向您展示如何使用該文件導入第三方庫和文檔。
在繼續之前,讓我們驗證幾個IntelliJ設置:
1.核實import Maven projects automatically是ON.
-
- Preferences > Build, Execution, Deployment > Build Tools > Maven > Importing
2.核實Project SDK和Project language level設置為Java版本:
-
- File > Project Structure > Project
3.核實Language level 設置為Java版本:
-
- File > Project Structure > Module
二、Maven配置
在開始編寫SPark應用程序之前,我們需要將SPark庫和文檔導入IntelliJ。為此,我們將使用Maven。這是必要的,如果我們想要IntelliJ識別Spark代碼。要導入SPark庫,我們將使用依賴關系管理器Maven。將以下行添加到文件put.xml中:

<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>hortonworks</groupId> <artifactId>SparkTutorial</artifactId> <version>1.0-SNAPSHOT</version> <build> <plugins> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> <configuration> <source>1.8</source> <target>1.8</target> </configuration> </plugin> </plugins> </build> <dependencies> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-core_2.11</artifactId> <version>2.2.1</version> </dependency> </dependencies> </project>
保存文件后,IntelliJ將自動導入運行SPark所需的庫和文檔。
三、創建SPark應用程序
本地部署
對於我們的第一個應用程序,我們將構建一個簡單的程序,進行單詞計數。Download the file,將文件保存為shakespeare.txt.
稍后,我們希望SPark從HDFS(HadoopDistributedFileSystem)檢索這個文件,所以現在讓我們把它放在那里。要上傳到HDFS,首先要確保我們的Ambari 沙箱已經啟動並運行。
- 導航到sandbox-hdp.hortonworks.com:8080
- 使用用戶名/密碼登錄
- 一旦您登錄到Ambari Manager,鼠標在右上角的下拉菜單上並單擊文件視圖.
- 打開TMP文件夾並單擊上傳按鈕在右上角上傳文件。確保它的名字shakespeare.txt.
現在我們已經准備好創建應用程序了。在IDE中打開文件夾src/main/resources,這應該是自動為您生成的。
接下來,選擇文件夾SRC/Main/java:
- 右鍵單擊文件夾並選擇New > Java Class
- 新建類文件:Main.java
將其復制到您的新文件中:
public class Main { public static void main(String[] args) { System.out.println("Hello World"); } }
現在轉到IDE頂部的“Run”下拉菜單,然后選擇Run。然后選擇Main。如果所有設置都正確,IDE應該打印“HelloWorld”。現在我們已經知道環境已正確設置,請用以下代碼替換該文件:

package Hortonworks.SparkTutorial; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.SparkConf; import scala.Tuple2; import java.util.Arrays; public class Main { public static void main(String[] args){ //Create a SparkContext to initialize SparkConf conf = new SparkConf().setMaster("local").setAppName("Word Count"); // Create a Java version of the Spark Context JavaSparkContext sc = new JavaSparkContext(conf); // Load the text into a Spark RDD, which is a distributed representation of each line of text JavaRDD<String> textFile = sc.textFile("src/main/resources/shakespeare.txt"); JavaPairRDD<String, Integer> counts = textFile .flatMap(s -> Arrays.asList(s.split("[ ,]")).iterator()) .mapToPair(word -> new Tuple2<>(word, 1)) .reduceByKey((a, b) -> a + b); counts.foreach(p -> System.out.println(p)); System.out.println("Total words: " + counts.count()); counts.saveAsTextFile("/tmp/shakespeareWordCount"); } }
如前所述,單擊Run>Run以運行文件。這應該運行Spark進行作業,並打印在莎士比亞shakespeare.txt中出現的每個單詞的頻率。
由您的程序創建的文件可以在上面代碼中指定的目錄中找到,在我們的例子中,我們使用/tmp/shakspeareWordCount.
注意,我們已經設置了這一行:
setMaster("local")
這告訴SPark使用此計算機在本地運行,而不是在分布式模式下運行。要在多台機器上運行Spark,我們需要將此值更改為 YARN。我們稍后再看看怎么做。
現在我們已經了解了如何在IDE中直接部署應用程序。這是一種快速構建和測試應用程序的好方法,但這有點不切實際,因為SPark只在一台機器上運行。在生產中,SPark通常會處理存儲在分布式文件系統(如HDFS)上的數據(如果運行在雲中,則可能是S3或Azure博客存儲)。SPark也通常在集群模式下運行(即,分布在許多機器上)。
那么,首先,我們將學習如何在Hortonworks沙箱(這是一個單節點Hadoop環境)上部署SPark,然后我們將學習如何在雲中部署SPark。
部署到Sandbox
雖然我們仍然在一台機器上運行SPark,但是我們將使用HDFS和SEAR(集群資源管理器)。這將比我們之前所做的更接近於運行一個完整的分布式集群。我們要做的第一件事是更改這一行:
JavaRDD<String> textFile = sc.textFile("src/main/resources/shakespeare.txt");
更改為:
JavaRDD<String> textFile = sc.textFile("hdfs:///tmp/shakespeare.txt");
將:
counts.saveAsTextFile("/tmp/shakespeareWordCount");
更改為:
counts.saveAsTextFile("hdfs:///tmp/shakespeareWordCount");
這告訴 Spark 要讀取和寫入HDFS,而不是本地。確保保存文件。
接下來,我們將把這些代碼打包到一個編譯好的JAR文件中,這個JAR文件可以部署在 sandbox沙箱上。了簡化我們的生活,我們將創建一個程序集JAR:一個包含我們的代碼和我們的代碼所依賴的所有JAR的JAR文件。通過將我們的代碼打包成一個程序集,我們保證在代碼運行時,所有依賴的JAR(在pu.xml中定義的)都會出現。
打開終端和CD到包含 pom.xml. 運行mvn package. 這將在文件夾中創建一個名為“SparkTutver-1.0-SNAPSHOT.jar”的編譯JAR。
(注:如果MVN命令不起作用-請確保已成功安裝maven。)
將程序集復制到沙箱:
scp -P 2222 ./target/SparkTutorial-1.0-SNAPSHOT.jar root@sandbox-hdp.hortonworks.com:/root
打開第二個終端窗口,將ssh放入沙箱:
ssh -p 2222 root@sandbox-hdp.hortonworks.com
使用spark-submit 運行我們的代碼。我們需要指定主類、要運行的JAR和運行模式(本地或集群):
spark-submit --class "Hortonworks.SparkTutorial.Main" --master local ./SparkTutorial-1.0-SNAPSHOT.jar
您的控制台應該可以看到出現的每個單詞的頻率,如下所示:

... (comutual,1) (ban-dogs,1) (rut-time,1) (ORLANDO],4) (Deceitful,1) (commits,3) (GENTLEWOMAN,4) (honors,10) (returnest,1) (topp'd?,1) (compass?,1) (toothache?,1) (miserably,1) (hen?,1) (luck?,2) (call'd,162) (lecherous,2) ...
此外,如果在Ambari中打開“文件視圖”,您應該會看到/tmp/shakspeareWordCount下的結果。這表明結果也存儲在HDFS中。
四、部署到雲端
在設置集群之后,部署代碼的過程類似於部署到沙箱。我們需要對集群的JAR進行SCP:
scp -P 2222 -i "key.pem" ./target/SparkTutorial-1.0-SNAPSHOT.jar root@[ip address of a master node]:root
然后打開第二個終端窗口並將ssh插入主節點:
ssh -p 2222 -i "key.pem" root@[ip address of a master node]
然后使用 spark-submit運行我們的代碼:
spark-submit --class "Hortonworks.SparkTutorial.Main" --master yarn --deploy-mode client ./SparkTutorial-1.0-SNAPSHOT.jar
注意,我們指定了參數--master yarn ,而不是--master local. 這意味着我們希望SPark在分布式模式下運行,而不是在一台機器上運行,並且我們希望依賴SEARY(集群資源管理器)來獲取可用的機器來運行作業。如果您不熟悉YARN,那么如果您想要在同一個集群上同時運行多個作業,則這一點尤為重要。如果配置正確,YARN隊列將提供不同的用戶或處理允許使用的群集資源配額。它還提供了允許作業在資源可用時充分利用集群的機制,以及在其他用戶或作業開始提交作業時縮小現有作業的范圍。
參數--deploy-mode client ,指示要將當前計算機用作Spark的驅動程序機器。驅動程序機器是一台啟動Spark作業的機器,也是工作完成后收集匯總結果的地方。或者,我們可以指定 --deploy-mode cluster ,這將允許YARN選擇驅動機。
重要的是要注意的是,一個寫得不好的SPark程序可能會意外地試圖將許多兆字節的數據帶回到驅動程序機器中,從而導致其崩潰。因此,不應該使用集群的主節點作為驅動程序機器。許多組織從所謂的邊緣節點提交火花作業,這是一個單獨的機器,不用於存儲數據或執行計算。由於邊緣節點與集群是分開的,所以它可以在不影響集群其余部分的情況下降。邊緣節點還用於從集群中檢索的聚合數據的數據科學工作。
例如,數據科學家可能從邊緣節點提交一份SPark作業,將10 TB數據集轉換為1GB聚合數據集,然后使用R和Python等工具對邊緣節點進行分析。如果計划設置邊緣節點,請確保機器沒有安裝DataNode或HostManager組件,因為它們是集群的數據存儲和計算組件。
五、現場調試
將正在運行的SPark程序連接到調試器,這將允許我們設置斷點並逐行遍歷代碼。在直接從IDE運行時,調試SPark就像其他程序一樣,但是調試遠程集群需要一些配置。
在計划提交Spark作業的機器上,從終端運行這一行:
export SPARK_JAVA_OPTS=-agentlib:jdwp=transport=dt_socket,server=y,suspend=n,address=8086
這將使您可以在端口8086上附加調試器。您需要確保端口8086能夠接收入站連接。然后在IntelliJ中 Run > Edit Configurations:
然后單擊左上角的+按鈕並添加一個新的遠程配置。用主機IP地址填充主機,以及使用端口8086.
如果在提交SPark作業后立即從IDE運行此調試配置,則調試器將附加,SPark將在斷點處停止。
注意:要重新提交單詞計數代碼,我們必須首先刪除前面創建的目錄。使用命令:
hdfs dfs -rm -r /tmp/shakespeareWordCount
在沙箱外殼上刪除舊目錄。再次在沙箱外殼上提交火花作業,並在執行spark-submit命令。請記住,為了運行您的代碼,我們使用了以下命令:
spark-submit --class "Hortonworks.SparkTutorial.Main" --master yarn --deploy-mode client ./SparkTutorial-1.0-SNAPSHOT.jar