用SBT編譯Spark的WordCount程序


問題導讀:
1.什么是sbt?
2.sbt項目環境如何建立?
3.如何使用sbt編譯打包scala?

 sbt介紹
sbt是一個代碼編譯工具,是scala界的mvn,可以編譯scala,java等,需要java1.6以上。

sbt項目環境建立
sbt編譯需要固定的目錄格式,並且需要聯網,sbt會將依賴的jar包下載到用戶home的.ivy2下面,目錄結構如下:

    |--build.sbt
    |--lib
    |--project
    |--src
    |   |--main
    |   |    |--scala
    |   |--test
    |         |--scala
    |--sbt
    |--target

以上建立目錄如下:

    mkdir -p ~/spark_wordcount/lib
    mkdir -p ~/spark_wordcount/project
    mkdir -p ~/spark_wordcount/src/main/scala
    mkdir -p ~/spark_wordcount/src/test/scala
    mkdir -p ~/spark_wordcount/target

然后拷貝spark安裝目錄的sbt目錄的 sbt腳本和sbt的jar包

cp /path/to/spark/sbt/sbt* ~/spark_wordcount/

由於spark的sbt腳本默認查找./sbt目錄,修改如下

    JAR=sbt/sbt-launch-${SBT_VERSION}.jar
    to
    JAR=sbt-launch-${SBT_VERSION}.jar

拷貝spark的jar包到,sbt的lib目錄

    cp /path/to/spark/assembly/target/scala-2.10/spark-assembly_2.10-0.9.0-incubating-hadoop2.2.0.jar \
    > ~/spark_wordcount/lib/ 

建立build.sbt配置文件,各行需要有一個空行分割

1     name := "WordCount"
2     [this is bank line]
3     version := "1.0.0"
4     [this is bank line]
5     scalaVersion := "2.10.3"

由於spark的sbt腳本需要到project的build.properties文件找sbt的版本號,我們建立該文件,增加如下內容:

sbt.version=0.12.4

Spark WordCount程序編寫及編譯
建立WordCount.scala源文件,假設需要包為spark.example

    mkdir -p ~/spark_wordcount/src/main/scala/spark/example
    vi -p ~/spark_wordcount/src/main/scala/spark/example/WordCount.scala

添加具體的程序代碼,並保存

 1     package spark.example
 2 
 3     import org.apache.spark._
 4     import SparkContext._
 5 
 6     object WordCount {
 7       def main(args: Array[String]) {
 8         //命令行參數個數檢查
 9         if (args.length == 0) {
10           System.err.println("Usage: spark.example.WordCount <input> <output>")
11           System.exit(1)
12         }
13         //使用hdfs文件系統
14         val hdfsPathRoot = "hdfshost:9000"
15         //實例化spark的上下文環境
16         val spark = new SparkContext(args(0), "WordCount",
17           System.getenv("SPARK_HOME"),SparkContext.jarOfClass(this.getClass))
18         //讀取輸入文件
19         val inputFile = spark.textFile(hdfsPathRoot + args(1))
20         //執行WordCount計數
21         //讀取inputFile執行方法flatMap,將每行通過空格分詞
22         //然后將該詞輸出該詞和計數的一個元組,並初始化計數
23         //為 1,然后執行reduceByKey方法,對相同的詞計數累
24         //
25         val countResult = inputFile.flatMap(line => line.split(" "))
26                           .map(word => (word, 1))
27                           .reduceByKey(_ + _)
28         //輸出WordCount結果到指定目錄
29         countResult.saveAsTextFile(hdfsPathRoot + args(2))
30       }
31     }

到spark_wordcount目錄,執行編譯:

    cd ~/spark_wordcount/
    ./sbt compile

打成jar包

./sbt package

編譯過程,sbt需要上網下載依賴工具包,jna,scala等。編譯完成后可以在target/scala-2.10/目錄找到打包好的jar

 

    [root@bd001 scala-2.10]# pwd
    /usr/local/hadoop/spark_wordcount/target/scala-2.10
    [root@bd001 scala-2.10]# ls
    cache  classes  wordcount_2.10-1.0.0.jar

 

WordCount執行
可以參考Spark分布式運行於hadoop的yarn上的方法,寫一個執行腳本

 1     #!/usr/bin/env bash
 2 
 3     SPARK_JAR=./assembly/target/scala-2.10/spark-assembly_2.10-0.9.0-incubating-hadoop2.2.0.jar \
 4         ./bin/spark-class org.apache.spark.deploy.yarn.Client \
 5           --jar ~/spark_wordcount/target/scala-2.10/wordcount_2.10-1.0.0.jar \
 6           --class  spark.example.WordCount \
 7           --args yarn-standalone \
 8           --args /testWordCount.txt \
 9           --args /resultWordCount \
10           --num-workers 3 \
11           --master-memory 4g \
12           --worker-memory 2g \
13           --worker-cores 2

然后,拷貝一個名為testWordCount.txt的文件進hdfs

hdfs dfs -copyFromLocal ./testWordCount.txt /testWordCount.txt

執行腳本,過一會就可以看到結果了

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM