之前的兩篇文章是搭建Spark環境,准備工作做好之后接下來寫一個簡單的demo,功能是統計本地某個文件中每個單詞出現的次數。開發環境為Idea+Maven,開發語言為scala,首先我們要在Idea中下載scala的插件,具體如下:
一、Idea開發環境准備
1.下載scala插件
安裝插件之前需確保Idea的JDK已經安裝並配置好,然后打開Idea,選擇File--->Settings,在新窗口中選擇Plugins,在右邊的輸入框中輸入“scala”關鍵字進行搜索,然后在搜索結果中點擊下面的Install JetBrains plugin...進行安裝。
安裝完成之后需要重啟Idea。
二、新建項目工程
打開Idea,選擇File--->New--->Project,在新窗口中選擇Maven,勾選右邊的Create from archetype,找到scala-archetype-simple展開選擇1.2,然后點擊Next。
輸入GroupId和ArtifactId,然后繼續Next,之后選擇maven、repository路徑並輸入項目名稱。
pom文件如下:
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>com.test</groupId> <artifactId>test</artifactId> <version>1.0-SNAPSHOT</version> <inceptionYear>2008</inceptionYear> <properties> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <spark.version>2.3.2</spark.version> <scala.version>2.11</scala.version> <hadoop.version>2.7.0</hadoop.version> </properties> <dependencies> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-core_${scala.version}</artifactId> <version>${spark.version}</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-sql_${scala.version}</artifactId> <version>${spark.version}</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-hive_${scala.version}</artifactId> <version>${spark.version}</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-streaming_${scala.version}</artifactId> <version>${spark.version}</version> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-client</artifactId> <version>2.6.0</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-streaming-kafka_${scala.version}</artifactId> <version>1.6.3</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-mllib_${scala.version}</artifactId> <version>${spark.version}</version> </dependency> <dependency> <groupId>mysql</groupId> <artifactId>mysql-connector-java</artifactId> <version>5.1.39</version> </dependency> <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <version>4.12</version> </dependency> </dependencies> <pluginRepositories> <pluginRepository> <id>scala-tools.org</id> <name>Scala-Tools Maven2 Repository</name> <url>http://scala-tools.org/repo-releases</url> </pluginRepository> </pluginRepositories> <build> <sourceDirectory>src/main/scala</sourceDirectory> <testSourceDirectory>src/test/scala</testSourceDirectory> <plugins> <plugin> <groupId>org.scala-tools</groupId> <artifactId>maven-scala-plugin</artifactId> <executions> <execution> <goals> <goal>compile</goal> <goal>testCompile</goal> </goals> </execution> </executions> <configuration> <scalaVersion>${scala.version}</scalaVersion> <args> <arg>-target:jvm-1.5</arg> </args> </configuration> </plugin> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-eclipse-plugin</artifactId> <configuration> <downloadSources>true</downloadSources> <buildcommands> <buildcommand>ch.epfl.lamp.sdt.core.scalabuilder</buildcommand> </buildcommands> <additionalProjectnatures> <projectnature>ch.epfl.lamp.sdt.core.scalanature</projectnature> </additionalProjectnatures> <classpathContainers> <classpathContainer>org.eclipse.jdt.launching.JRE_CONTAINER</classpathContainer> <classpathContainer>ch.epfl.lamp.sdt.launching.SCALA_CONTAINER</classpathContainer> </classpathContainers> </configuration> </plugin> </plugins> </build> </project>
接下來我們要實現分析並統計文件中的單詞出現的次數,類文件代碼如下:
package com.test import org.apache.spark.SparkContext import org.apache.spark.SparkConf object WordCountLocal { def main(args: Array[String]) { /** * SparkContext 的初始化需要一個SparkConf對象 * SparkConf包含了Spark集群的配置的各種參數 */ val conf=new SparkConf() .setMaster("local")//啟動本地化計算 .setAppName("testRdd")//設置本程序名稱 //Spark程序的編寫都是從SparkContext開始的 val sc=new SparkContext(conf) //以上的語句等價與val sc=new SparkContext("local","testRdd") val data=sc.textFile("D:\\tmp\\hello.txt")//讀取本地文件 data.flatMap(_.split(" "))//下划線是占位符,flatMap是對行操作的方法,對讀入的數據進行分割 .map((_,1))//將每一項轉換為key-value,數據是key,value是1 .reduceByKey(_+_)//將具有相同key的項相加合並成一個 .collect()//將分布式的RDD返回一個單機的scala array,在這個數組上運用scala的函數操作,並返回結果到驅動程序 .foreach(println)//循環打印 } }
hello.txt文件內容可以隨便填寫,我的如下:
hello scala hello world hello nihao i am scala this is a spark example running program is ok
三、運行工程
右鍵WordCountLocal類,選擇Run,如果運行失敗並出現java.io.IOException: Could not locate executable null\bin\winutils.exe in the Hadoop binaries.請確認下本地hadoop-x.x.x/bin目錄下有沒有winutils.exe這個文件,如果沒有請到github上下載,
地址:https://github.com/srccodes/hadoop-common-2.2.0-bin
下載並解壓成功后配置環境變量,增加用戶變量HADOOP_HOME,值是下載的zip包解壓的目錄,然后在系統變量path里增加%HADOOP_HOME%\bin 即可。大功告成之后再次執行成功,結果如下:
一個簡單的數據統計demo就完成了。