基於Intellij IDEA搭建Spark開發環境搭建
基於Intellij IDEA搭建Spark開發環境搭——參考文檔
● 參考文檔http://spark.apache.org/docs/latest/programming-guide.html
● 操作步驟
·a)創建maven 項目
·b)引入依賴(Spark 依賴、打包插件等等)
基於Intellij IDEA搭建Spark開發環境—maven vs sbt
● 哪個熟悉用哪個
● Maven也可以構建scala項目
基於Intellij IDEA搭建Spark開發環境搭—maven構建scala項目
● 參考文檔http://docs.scala-lang.org/tutorials/scala-with-maven.html
● 操作步驟
a)用maven構建scala項目(基於net.alchim31.maven:scala-archetype-simple)





b)pom.xml引入依賴(spark依賴、打包插件等等)
在pom.xml文件中的合適位置添加以下內容:
<dependencies>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId>
<version>2.2.0</version>
<scope>provided</scope> //設置作用域,不將所有依賴文件打包到最終的項目中
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>2.4.1</version>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<transformers>
<transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer"></transformer>
</transformers>
<createDependencyReducedPom>false</createDependencyReducedPom>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
進行一次打包操作以測試是否工作正常。

在Terminal中輸入指令:
mvn clean package 運行結果如下: D:\Code\JavaCode\sparkMaven>mvn clean package [INFO] Scanning for projects... [INFO] [INFO] ---------------------< com.zimo.spark:scala-spark >--------------------- [INFO] Building scala-spark 1.0-SNAPSHOT [INFO] --------------------------------[ jar ]--------------------------------- [INFO] [INFO] --- maven-clean-plugin:2.5:clean (default-clean) @ scala-spark --- [INFO] [INFO] --- maven-resources-plugin:2.6:resources (default-resources) @ scala-spark --- [WARNING] Using platform encoding (GBK actually) to copy filtered resources, i.e. build is platform dependent! [INFO] skip non existing resourceDirectory D:\Code\JavaCode\sparkMaven\src\main\resources [INFO] [INFO] --- maven-compiler-plugin:3.1:compile (default-compile) @ scala-spark --- [INFO] No sources to compile [INFO] [INFO] --- maven-resources-plugin:2.6:testResources (default-testResources) @ scala-spark --- [WARNING] Using platform encoding (GBK actually) to copy filtered resources, i.e. build is platform dependent! [INFO] skip non existing resourceDirectory D:\Code\JavaCode\sparkMaven\src\test\resources [INFO] [INFO] --- maven-compiler-plugin:3.1:testCompile (default-testCompile) @ scala-spark --- [INFO] No sources to compile [INFO] [INFO] --- maven-surefire-plugin:2.12.4:test (default-test) @ scala-spark --- [INFO] No tests to run. [INFO] [INFO] --- maven-jar-plugin:2.4:jar (default-jar) @ scala-spark --- [WARNING] JAR will be empty - no content was marked for inclusion! [INFO] Building jar: D:\Code\JavaCode\sparkMaven\target\scala-spark-1.0-SNAPSHOT.jar [INFO] [INFO] --- maven-shade-plugin:2.4.1:shade (default) @ scala-spark --- [INFO] Replacing original artifact with shaded artifact. [INFO] Replacing D:\Code\JavaCode\sparkMaven\target\scala-spark-1.0-SNAPSHOT.jar with D:\Code\JavaCode\sparkMaven\target\scala-spark-1.0-SNAPSHOT-shaded.jar [INFO] ------------------------------------------------------------------------ [INFO] BUILD SUCCESS [INFO] ------------------------------------------------------------------------ [INFO] Total time: 9.675 s [INFO] Finished at: 2018-09-11T15:33:53+08:00 [INFO] ------------------------------------------------------------------------
出現了BUILD SUCCESS,表明一切正常。下面給大家演示以下Scala編程的大致流程,以及在該框架下同樣用Java進行實現應該如何操作。
Scala編程實現WordCount


注意:此處必須選為Object,否則沒有main方法!
然后輸入以下代碼,執行打包操作
def main(args: Array[String]): Unit = { println("hello spark") }

完成后可以看到項目目錄下多出來了一個target目錄。這就是使用Scala編程的一個大致流程,下面我們來寫一個WordCount程序。(后面也會有Java編程的版本提供給大家)
首先在集群中創建以下目錄和測試文件:
[hadoop@masternode ~]$ cd /home/hadoop/ [hadoop@masternode ~]$ ll total 68 drwxr-xr-x. 9 hadoop hadoop 4096 Sep 10 22:15 app drwxrwxr-x. 6 hadoop hadoop 4096 Aug 17 10:42 data drwxr-xr-x. 2 hadoop hadoop 4096 Apr 17 10:03 Desktop drwxr-xr-x. 2 hadoop hadoop 4096 Apr 17 10:03 Documents drwxr-xr-x. 2 hadoop hadoop 4096 Apr 17 10:03 Downloads drwxr-xr-x. 2 hadoop hadoop 4096 Apr 17 10:03 Music drwxr-xr-x. 2 hadoop hadoop 4096 Apr 17 10:03 Pictures drwxr-xr-x. 2 hadoop hadoop 4096 Apr 17 10:03 Public drwxr-xr-x. 2 hadoop hadoop 4096 Apr 17 10:03 Templates drwxrwxr-x. 3 hadoop hadoop 4096 Apr 18 10:11 tools drwxr-xr-x. 2 hadoop hadoop 4096 Apr 17 10:03 Videos -rw-rw-r--. 1 hadoop hadoop 20876 Apr 20 18:03 zookeeper.out [hadoop@masternode ~]$ mkdir testSpark/ [hadoop@masternode ~]$ ll total 72 drwxr-xr-x. 9 hadoop hadoop 4096 Sep 10 22:15 app drwxrwxr-x. 6 hadoop hadoop 4096 Aug 17 10:42 data drwxr-xr-x. 2 hadoop hadoop 4096 Apr 17 10:03 Desktop drwxr-xr-x. 2 hadoop hadoop 4096 Apr 17 10:03 Documents drwxr-xr-x. 2 hadoop hadoop 4096 Apr 17 10:03 Downloads drwxr-xr-x. 2 hadoop hadoop 4096 Apr 17 10:03 Music drwxr-xr-x. 2 hadoop hadoop 4096 Apr 17 10:03 Pictures drwxr-xr-x. 2 hadoop hadoop 4096 Apr 17 10:03 Public drwxr-xr-x. 2 hadoop hadoop 4096 Apr 17 10:03 Templates drwxrwxr-x. 2 hadoop hadoop 4096 Sep 12 10:23 testSpark drwxrwxr-x. 3 hadoop hadoop 4096 Apr 18 10:11 tools drwxr-xr-x. 2 hadoop hadoop 4096 Apr 17 10:03 Videos -rw-rw-r--. 1 hadoop hadoop 20876 Apr 20 18:03 zookeeper.out [hadoop@masternode ~]$ cd testSpark/ [hadoop@masternode testSpark]$ vi word.txt apache hadoop spark scala apache hadoop spark scala apache hadoop spark scala apache hadoop spark scala
WordCount.scala代碼如下:(如果右鍵New下面沒有“Scala Class“”選項,請檢查IDEA是否添加了scala插件)
package com.zimo.spark import org.apache.spark.{SparkConf, SparkContext} /** * Created by Zimo on 2018/9/11 */ object MyWordCount { def main(args: Array[String]): Unit = { //參數檢查 if (args.length < 2) { System.err.println("Usage: myWordCount <input> <output>") System.exit(1) } //獲取參數 val input = args(0) val output = args(1) //創建Scala版本的SparkContext val conf = new SparkConf().setAppName("myWordCount") val sc = new SparkContext(conf) //讀取數據 val lines = sc.textFile(input) //進行相關計算 lines.flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_).collect().foreach(println) //保存結果 sc.stop() } }
從代碼可以看出scala的優勢就是簡潔,但是可讀性較差。所以,學習可以與后面的java代碼進行對比。
然后打包

打包完成后把上圖中的文件上傳到spark集群上去,然后執行。
[hadoop@masternode testSpark]$ rz [hadoop@masternode testSpark]$ ll total 8 -rw-r--r--. 1 hadoop hadoop 1936 Sep 12 10:59 scala-spark-1.0-SNAPSHOT.jar -rw-rw-r--. 1 hadoop hadoop 104 Sep 12 10:26 word.txt [hadoop@masternode testSpark]$ cd ../app/spark-2.2.0/ [hadoop@masternode spark-2.2.0]$ cd bin/ [hadoop@masternode bin]$ ll total 92 -rwxr-xr-x. 1 hadoop hadoop 1089 Jul 1 2017 beeline -rw-r--r--. 1 hadoop hadoop 899 Jul 1 2017 beeline.cmd -rwxr-xr-x. 1 hadoop hadoop 1933 Jul 1 2017 find-spark-home -rw-r--r--. 1 hadoop hadoop 1909 Jul 1 2017 load-spark-env.cmd -rw-r--r--. 1 hadoop hadoop 2133 Jul 1 2017 load-spark-env.sh -rwxr-xr-x. 1 hadoop hadoop 2989 Jul 1 2017 pyspark -rw-r--r--. 1 hadoop hadoop 1493 Jul 1 2017 pyspark2.cmd -rw-r--r--. 1 hadoop hadoop 1002 Jul 1 2017 pyspark.cmd -rwxr-xr-x. 1 hadoop hadoop 1030 Jul 1 2017 run-example -rw-r--r--. 1 hadoop hadoop 988 Jul 1 2017 run-example.cmd -rwxr-xr-x. 1 hadoop hadoop 3196 Jul 1 2017 spark-class -rw-r--r--. 1 hadoop hadoop 2467 Jul 1 2017 spark-class2.cmd -rw-r--r--. 1 hadoop hadoop 1012 Jul 1 2017 spark-class.cmd -rwxr-xr-x. 1 hadoop hadoop 1039 Jul 1 2017 sparkR -rw-r--r--. 1 hadoop hadoop 1014 Jul 1 2017 sparkR2.cmd -rw-r--r--. 1 hadoop hadoop 1000 Jul 1 2017 sparkR.cmd -rwxr-xr-x. 1 hadoop hadoop 3017 Jul 1 2017 spark-shell -rw-r--r--. 1 hadoop hadoop 1530 Jul 1 2017 spark-shell2.cmd -rw-r--r--. 1 hadoop hadoop 1010 Jul 1 2017 spark-shell.cmd -rwxr-xr-x. 1 hadoop hadoop 1065 Jul 1 2017 spark-sql -rwxr-xr-x. 1 hadoop hadoop 1040 Jul 1 2017 spark-submit -rw-r--r--. 1 hadoop hadoop 1128 Jul 1 2017 spark-submit2.cmd -rw-r--r--. 1 hadoop hadoop 1012 Jul 1 2017 spark-submit.cmd
[hadoop@masternode testSpark]$ ./spark-submit --class com.zimo.spark.MyWordCount ~/testSpark/scala-spark-1.0-SNAPSHOT.jar ~/testSpark/word.txt ~/testSpark/
運行結果如下圖所示:

以上操作是把結果直接打印出來,下面我們嘗試一下將結果保存到文本當中去。修改以下代碼:
//進行相關計算 //lines.flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_).collect().foreach(println) val resultRDD = lines.flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_) //保存結果 resultRDD.saveAsTextFile(output)
再次執行:
./spark-submit --class com.zimo.spark.MyWordCount ~/testSpark/scala-spark-1.0-SNAPSHOT.jar ~/testSpark/word.txt ~/testSpark/result
//輸出目錄一定要為不存在的目錄!
結果如下:
[hadoop@masternode testSpark]$ ll total 5460 drwxrwxr-x. 2 hadoop hadoop 4096 Sep 12 16:02 result -rw-r--r--. 1 hadoop hadoop 5582827 Sep 12 16:00 scala-spark-1.0-SNAPSHOT.jar -rw-rw-r--. 1 hadoop hadoop 104 Sep 12 15:52 word.txt [hadoop@masternode testSpark]$ cd result/ [hadoop@masternode result]$ ll total 4 -rw-r--r--. 1 hadoop hadoop 42 Sep 12 16:02 part-00000 -rw-r--r--. 1 hadoop hadoop 0 Sep 12 16:02 _SUCCESS [hadoop@masternode result]$ cat part-00000 (scala,4) (spark,4) (hadoop,4) (apache,4)
Java編程實現WordCount
在同樣目錄新建一個java目錄,並設置為”Sources Root”。

單元測試目錄”test”同樣需要建一個java文件夾。

同理設置為”Test Sources Root”。然后分別再創建resources目錄(用於存放配置文件),並分別設置為“Resources Root”和“Test Resources Root”。

最后,創建一個“com.zimo.spark”包,並在下面新建一個MyJavaWordCount.Class類(如果右鍵New下面沒有“Java Class”選項請參看博文https://www.cnblogs.com/zimo-jing/p/9628784.html下的詳細講解),其中的代碼為如下:
package com.zimo.spark; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.function.FlatMapFunction; import org.apache.spark.api.java.function.Function2; import org.apache.spark.api.java.function.PairFunction; import scala.Tuple2; import java.util.Arrays; import java.util.Iterator; /** * Created by Zimo on 2018/9/12 */ public class MyJavaWordCount { public static void main(String[] args) { //參數檢查 if (args.length < 2) { System.err.println("Usage: MyJavaWordCount <input> <output>"); System.exit(1); } //獲取參數 String input = args[0]; String output = args[1]; //創建Java版本的SparkContext SparkConf conf = new SparkConf().setAppName("MyJavaWordCount"); JavaSparkContext sc = new JavaSparkContext(conf); //讀取數據 JavaRDD<String> inputRDD = sc.textFile(input); //進行相關計算 JavaRDD<String> words = inputRDD.flatMap(new FlatMapFunction<String, String>() { @Override public Iterator<String> call(String line) throws Exception { return Arrays.asList(line.split(" ")); } }); JavaPairRDD<String, Integer> result = words.mapToPair(new PairFunction<String, String, Integer>() { @Override public Tuple2<String, Integer> call(String word) throws Exception { return new Tuple2<String, Integer>(word, 1); } }).reduceByKey(new Function2<Integer, Integer, Integer>() { @Override public Integer call(Integer x, Integer y) throws Exception { return x+y; } }); //保存結果 result.saveAsTextFile(output); //關閉sc sc.stop(); } }
注意:此處要做一點點修改。注釋掉pom.xml文件下的此處內容

此處是默認Source ROOT的路徑,所以打包時就只能打包Scala下的代碼,而我們新建的Java目錄則不會被打包,注釋之后則會以我們之前的目錄配置為主。
然后就可以執行打包和集群上的運行操作了。運行和Scala編程一模一樣,我在這里就不贅述了,大家參見上面即可!只是需要注意一點:output目錄必須為不存在的目錄,請記得每次運行前進行修改!
以上就是博主為大家介紹的這一板塊的主要內容,這都是博主自己的學習過程,希望能給大家帶來一定的指導作用,有用的還望大家點個支持,如果對你沒用也望包涵,有錯誤煩請指出。如有期待可關注博主以第一時間獲取更新哦,謝謝!
