Spark的Java開發環境構建


  為開發和調試SPark應用程序設置的完整的開發環境。這里,我們將使用Java,其實SPark還支持使用Scala, Python和R。我們將使用IntelliJ作為IDE,因為我們對於eclipse再熟悉不過了,這里就使用IntelliJ進行練練手,所以我們也將使用Maven作為構建的管理器。在本篇結束時,您將了解如何設置IntelliJ,如何使用Maven管理依賴項,如何將SPark應用程序打包並部署到集群中,以及如何將活動程序連接到調試器。

一、創建一個新的IntelliJ項目

  (注意:指令可能因操作系統而異。在本教程中,我們將使用IntelliJ版本:2018.2.1上的MAC OSX High Sierra. )

  通過選擇File > New > Project:         ,再選擇 Maven > 單擊Next

 

將項目命名如下:

  • GroupId:Hortonworks
  • ArtifactId:SparkTutorial
  • 版本:1.0-SNAPSHOT

  然后單擊下一個繼續。

  最后,選擇項目名稱和位置。這些字段應該是自動填充的,所以讓我們保持默認值即可:

  IntelliJ應該創建一個具有默認目錄結構的新項目。生成所有文件夾可能需要一兩分鍾。

  讓我們分解一下項目結構。

  • .IDEA:這些是IntelliJ配置文件。
  • SRC:源代碼。大部分代碼應該進入主目錄。應該為測試腳本保留測試文件夾。
  • 目標:當您編譯您的項目時,它將位於這里。
  • xml:Maven配置文件。我們將向您展示如何使用該文件導入第三方庫和文檔。

  在繼續之前,讓我們驗證幾個IntelliJ設置:

    1.核實import Maven projects automatically是ON.

    •   Preferences > Build, Execution, Deployment > Build Tools > Maven > Importing

    2.核實Project SDK和Project language level設置為Java版本:

    •   File > Project Structure > Project

 

 

    3.核實Language level 設置為Java版本:

    •   File > Project Structure > Module

 

二、Maven配置

  在開始編寫SPark應用程序之前,我們需要將SPark庫和文檔導入IntelliJ。為此,我們將使用Maven。這是必要的,如果我們想要IntelliJ識別Spark代碼。要導入SPark庫,我們將使用依賴關系管理器Maven。將以下行添加到文件put.xml中:

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>hortonworks</groupId>
    <artifactId>SparkTutorial</artifactId>
    <version>1.0-SNAPSHOT</version>

    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <configuration>
                    <source>1.8</source>
                    <target>1.8</target>
                </configuration>
            </plugin>
        </plugins>
    </build>

    <dependencies>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-core_2.11</artifactId>
            <version>2.2.1</version>
        </dependency>
    </dependencies>

</project>
需要導入put.xml的代碼

  保存文件后,IntelliJ將自動導入運行SPark所需的庫和文檔。

 

三、創建SPark應用程序

  本地部署

  對於我們的第一個應用程序,我們將構建一個簡單的程序,進行單詞計數。Download the file,將文件保存為shakespeare.txt.

  稍后,我們希望SPark從HDFS(HadoopDistributedFileSystem)檢索這個文件,所以現在讓我們把它放在那里。要上傳到HDFS,首先要確保我們的Ambari 沙箱已經啟動並運行。

  • 導航到sandbox-hdp.hortonworks.com:8080
  • 使用用戶名/密碼登錄
  • 一旦您登錄到Ambari Manager,鼠標在右上角的下拉菜單上並單擊文件視圖.
  • 打開TMP文件夾並單擊上傳按鈕在右上角上傳文件。確保它的名字shakespeare.txt.

 

  現在我們已經准備好創建應用程序了。在IDE中打開文件夾src/main/resources,這應該是自動為您生成的。

  接下來,選擇文件夾SRC/Main/java:

  • 右鍵單擊文件夾並選擇New > Java Class
  • 新建類文件:Main.java

 

  將其復制到您的新文件中:

public class Main {

    public static void main(String[] args) {
        System.out.println("Hello World");
    }
}

 

  現在轉到IDE頂部的“Run”下拉菜單,然后選擇Run。然后選擇Main。如果所有設置都正確,IDE應該打印“HelloWorld”。現在我們已經知道環境已正確設置,請用以下代碼替換該文件:

package Hortonworks.SparkTutorial;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.SparkConf;
import scala.Tuple2;

import java.util.Arrays;

public class Main {

    public static void main(String[] args){

        //Create a SparkContext to initialize
        SparkConf conf = new SparkConf().setMaster("local").setAppName("Word Count");

        // Create a Java version of the Spark Context
        JavaSparkContext sc = new JavaSparkContext(conf);

        // Load the text into a Spark RDD, which is a distributed representation of each line of text
        JavaRDD<String> textFile = sc.textFile("src/main/resources/shakespeare.txt");
        JavaPairRDD<String, Integer> counts = textFile
                .flatMap(s -> Arrays.asList(s.split("[ ,]")).iterator())
                .mapToPair(word -> new Tuple2<>(word, 1))
                .reduceByKey((a, b) -> a + b);
        counts.foreach(p -> System.out.println(p));
        System.out.println("Total words: " + counts.count());
        counts.saveAsTextFile("/tmp/shakespeareWordCount");
    }

}
替換代碼

  

  如前所述,單擊Run>Run以運行文件。這應該運行Spark進行作業,並打印在莎士比亞shakespeare.txt中出現的每個單詞的頻率。

  由您的程序創建的文件可以在上面代碼中指定的目錄中找到,在我們的例子中,我們使用/tmp/shakspeareWordCount.

注意,我們已經設置了這一行:

setMaster("local")

  這告訴SPark使用此計算機在本地運行,而不是在分布式模式下運行。要在多台機器上運行Spark,我們需要將此值更改為 YARN。我們稍后再看看怎么做。

  現在我們已經了解了如何在IDE中直接部署應用程序。這是一種快速構建和測試應用程序的好方法,但這有點不切實際,因為SPark只在一台機器上運行。在生產中,SPark通常會處理存儲在分布式文件系統(如HDFS)上的數據(如果運行在雲中,則可能是S3或Azure博客存儲)。SPark也通常在集群模式下運行(即,分布在許多機器上)。

  那么,首先,我們將學習如何在Hortonworks沙箱(這是一個單節點Hadoop環境)上部署SPark,然后我們將學習如何在雲中部署SPark。

   部署到Sandbox

  雖然我們仍然在一台機器上運行SPark,但是我們將使用HDFS和SEAR(集群資源管理器)。這將比我們之前所做的更接近於運行一個完整的分布式集群。我們要做的第一件事是更改這一行:

JavaRDD<String> textFile = sc.textFile("src/main/resources/shakespeare.txt");

  更改為:

JavaRDD<String> textFile = sc.textFile("hdfs:///tmp/shakespeare.txt");

 

將:

counts.saveAsTextFile("/tmp/shakespeareWordCount");

  更改為:

counts.saveAsTextFile("hdfs:///tmp/shakespeareWordCount");

    這告訴 Spark 要讀取和寫入HDFS,而不是本地。確保保存文件。

  接下來,我們將把這些代碼打包到一個編譯好的JAR文件中,這個JAR文件可以部署在 sandbox沙箱上。了簡化我們的生活,我們將創建一個程序集JAR:一個包含我們的代碼和我們的代碼所依賴的所有JAR的JAR文件。通過將我們的代碼打包成一個程序集,我們保證在代碼運行時,所有依賴的JAR(在pu.xml中定義的)都會出現。

  打開終端和CD到包含 pom.xml. 運行mvn package. 這將在文件夾中創建一個名為“SparkTutver-1.0-SNAPSHOT.jar”的編譯JAR。

      (注:如果MVN命令不起作用-請確保已成功安裝maven。)

將程序集復制到沙箱:

scp -P 2222 ./target/SparkTutorial-1.0-SNAPSHOT.jar root@sandbox-hdp.hortonworks.com:/root

打開第二個終端窗口,將ssh放入沙箱:

ssh -p 2222 root@sandbox-hdp.hortonworks.com

使用spark-submit 運行我們的代碼。我們需要指定主類、要運行的JAR和運行模式(本地或集群):

spark-submit --class "Hortonworks.SparkTutorial.Main" --master local ./SparkTutorial-1.0-SNAPSHOT.jar

您的控制台應該可以看到出現的每個單詞的頻率,如下所示:

...
(comutual,1)
(ban-dogs,1)
(rut-time,1)
(ORLANDO],4)
(Deceitful,1)
(commits,3)
(GENTLEWOMAN,4)
(honors,10)
(returnest,1)
(topp'd?,1)
(compass?,1)
(toothache?,1)
(miserably,1)
(hen?,1)
(luck?,2)
(call'd,162)
(lecherous,2)
...
控制台出現的內容

  此外,如果在Ambari中打開“文件視圖”,您應該會看到/tmp/shakspeareWordCount下的結果。這表明結果也存儲在HDFS中。

 

四、部署到雲端

  在設置集群之后,部署代碼的過程類似於部署到沙箱。我們需要對集群的JAR進行SCP:

scp -P 2222 -i "key.pem" ./target/SparkTutorial-1.0-SNAPSHOT.jar root@[ip address of a master node]:root

  然后打開第二個終端窗口並將ssh插入主節點:

ssh -p 2222 -i "key.pem" root@[ip address of a master node]

  然后使用 spark-submit運行我們的代碼:

spark-submit --class "Hortonworks.SparkTutorial.Main"  --master yarn --deploy-mode client ./SparkTutorial-1.0-SNAPSHOT.jar

  注意,我們指定了參數--master yarn ,而不是--master local. 這意味着我們希望SPark在分布式模式下運行,而不是在一台機器上運行,並且我們希望依賴SEARY(集群資源管理器)來獲取可用的機器來運行作業。如果您不熟悉YARN,那么如果您想要在同一個集群上同時運行多個作業,則這一點尤為重要。如果配置正確,YARN隊列將提供不同的用戶或處理允許使用的群集資源配額。它還提供了允許作業在資源可用時充分利用集群的機制,以及在其他用戶或作業開始提交作業時縮小現有作業的范圍。

  參數--deploy-mode client  ,指示要將當前計算機用作Spark的驅動程序機器。驅動程序機器是一台啟動Spark作業的機器,也是工作完成后收集匯總結果的地方。或者,我們可以指定 --deploy-mode cluster ,這將允許YARN選擇驅動機。

  重要的是要注意的是,一個寫得不好的SPark程序可能會意外地試圖將許多兆字節的數據帶回到驅動程序機器中,從而導致其崩潰。因此,不應該使用集群的主節點作為驅動程序機器。許多組織從所謂的邊緣節點提交火花作業,這是一個單獨的機器,不用於存儲數據或執行計算。由於邊緣節點與集群是分開的,所以它可以在不影響集群其余部分的情況下降。邊緣節點還用於從集群中檢索的聚合數據的數據科學工作。

  例如,數據科學家可能從邊緣節點提交一份SPark作業,將10 TB數據集轉換為1GB聚合數據集,然后使用R和Python等工具對邊緣節點進行分析。如果計划設置邊緣節點,請確保機器沒有安裝DataNode或HostManager組件,因為它們是集群的數據存儲和計算組件。

 

五、現場調試

  將正在運行的SPark程序連接到調試器,這將允許我們設置斷點並逐行遍歷代碼。在直接從IDE運行時,調試SPark就像其他程序一樣,但是調試遠程集群需要一些配置。

  在計划提交Spark作業的機器上,從終端運行這一行:

export SPARK_JAVA_OPTS=-agentlib:jdwp=transport=dt_socket,server=y,suspend=n,address=8086

  這將使您可以在端口8086上附加調試器。您需要確保端口8086能夠接收入站連接。然后在IntelliJ中 Run > Edit Configurations:

 

  然后單擊左上角的+按鈕並添加一個新的遠程配置。用主機IP地址填充主機,以及使用端口8086.

  

  如果在提交SPark作業后立即從IDE運行此調試配置,則調試器將附加,SPark將在斷點處停止。

  注意:要重新提交單詞計數代碼,我們必須首先刪除前面創建的目錄。使用命令:

hdfs dfs -rm -r /tmp/shakespeareWordCount

  在沙箱外殼上刪除舊目錄。再次在沙箱外殼上提交火花作業,並在執行spark-submit命令。請記住,為了運行您的代碼,我們使用了以下命令:

spark-submit --class "Hortonworks.SparkTutorial.Main" --master yarn --deploy-mode client ./SparkTutorial-1.0-SNAPSHOT.jar

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM