Spark入門(三)--Spark經典的單詞統計


spark經典之單詞統計

准備數據

既然要統計單詞我們就需要一個包含一定數量的文本,我們這里選擇了英文原著《GoneWithTheWind》(《飄》)的文本來做一個數據統計,看看文章中各個單詞出現頻次如何。為了便於大家下載文本。可以到GitHub上下載文本以及對應的代碼。我將文本放在項目的目錄下。

 

 

首先我們要讀取該文件,就要用到SparkContext中的textFile的方法,我們嘗試先讀取第一行。

scala實現

import org.apache.spark.{SparkConf, SparkContext}

object WordCount {

  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setMaster("local").setAppName("WordCount") val sc = new SparkContext(conf) println(sc.textFile("./GoneWithTheWind").first()) } } 

java實現

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaSparkContext;

public class WordCountJava {

    public static void main(String[] args){

        SparkConf conf = new SparkConf().setMaster("local").setAppName("WordCountJava"); JavaSparkContext sc = new JavaSparkContext(conf); System.out.println(sc.textFile("./GoneWithTheWind").first()); } } 

python實現

from pyspark import SparkConf,SparkContext


conf = SparkConf().setMaster("local").setAppName("HelloWorld") sc = SparkContext(conf=conf) print(sc.textFile("./GoneWithTheWind").first()) 

得到輸出

 

 

Chapter 1

以scala為例,其余兩種語言也差不多。第一步我們創建了一個SparkConf

val conf = new SparkConf().setMaster("local").setAppName("WordCount") 

這里我們設置Master為local,該程序名稱為WordCount,當然程序名稱可以任意取,和類名不同也無妨。但是這個Master則不能亂寫,當我們在集群上運行,用spark-submit的時候,則要注意。我們現在只討論本地的寫法,因此,這里只寫local。

接着一句我們創建了一個SparkContext,這是spark的核心,我們將conf配置傳入初始化

 val sc = new SparkContext(conf)

最后我們將文本路徑告訴SparkContext,然后輸出第一行內容

println(sc.textFile("./GoneWithTheWind").first()) 

開始統計

接着我們就可以開始統計文本的單詞數了,因為單詞是以空格划分,所以我們可以把空格作為單詞的標記。

scala實現

import org.apache.spark.{SparkConf, SparkContext}

object WordCount {

  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setMaster("local").setAppName("WordCount") val sc = new SparkContext(conf) //設置數據路徑 val text = sc.textFile("./GoneWithTheWind") //將文本數據按行處理,每行按空格拆成一個數組 // flatMap會將各個數組中元素合成一個大的集合 val textSplit = text.flatMap(line =>line.split(" ")) //處理合並后的集合中的元素,每個元素的值為1,返回一個元組(key,value) //其中key為單詞,value這里是1,即該單詞出現一次 val textSplitFlag = textSplit.map(word => (word,1)) //reduceByKey會將textSplitFlag中的key相同的放在一起處理 //傳入的(x,y)中,x是上一次統計后的value,y是本次單詞中的value,即每一次是x+1 val countWord = textSplitFlag.reduceByKey((x,y)=>x+y) //將計算后的結果存在項目目錄下的result目錄中 countWord.saveAsTextFile("./result") } } 

java實現

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
import scala.Tuple2;

import java.util.Arrays;
import java.util.Iterator;

public class WordCountJava {

    public static void main(String[] args){

        SparkConf conf = new SparkConf().setMaster("local").setAppName("WordCountJava"); JavaSparkContext sc = new JavaSparkContext(conf); //設置數據的路徑 JavaRDD<String> textRDD = sc.textFile("./GoneWithTheWind"); //將文本數據按行處理,每行按空格拆成一個數組,flatMap會將各個數組中元素合成一個大的集合 //這里需要注意的是FlatMapFunction中<String, String>,第一個表示輸入,第二個表示輸出 //與Hadoop中的map-reduce非常相似 JavaRDD<String> splitRDD = textRDD.flatMap(new FlatMapFunction<String, String>() { @Override public Iterator<String> call(String s) throws Exception { return Arrays.asList(s.split(" ")).iterator(); } }); //處理合並后的集合中的元素,每個元素的值為1,返回一個Tuple2,Tuple2表示兩個元素的元組 //值得注意的是上面是JavaRDD,這里是JavaPairRDD,在返回的是元組時需要注意這個區別 //PairFunction中<String, String, Integer>,第一個String是輸入值類型 //第二第三個,String, Integer是返回值類型 //這里返回的是一個word和一個數值1,表示這個單詞出現一次 JavaPairRDD<String, Integer> splitFlagRDD = splitRDD.mapToPair(new PairFunction<String, String, Integer>() { @Override public Tuple2<String, Integer> call(String s) throws Exception { return new Tuple2<>(s,1); } }); //reduceByKey會將splitFlagRDD中的key相同的放在一起處理 //傳入的(x,y)中,x是上一次統計后的value,y是本次單詞中的value,即每一次是x+1 JavaPairRDD<String, Integer> countRDD = splitFlagRDD.reduceByKey(new Function2<Integer, Integer, Integer>() { @Override public Integer call(Integer integer, Integer integer2) throws Exception { return integer+integer2; } }); //將計算后的結果存在項目目錄下的result目錄中 countRDD.saveAsTextFile("./resultJava"); } } 

python實現

from pyspark import SparkConf,SparkContext


conf = SparkConf().setMaster("local").setAppName("HelloWorld") sc = SparkContext(conf=conf) # 設置數據的路徑 textData = sc.textFile("./GoneWithTheWind") # 將文本數據按行處理,每行按空格拆成一個數組,flatMap會將各個數組中元素合成一個大的集合 splitData = textData.flatMap(lambda line:line.split(" ")) # 處理合並后的集合中的元素,每個元素的值為1,返回一個元組(key,value) # 其中key為單詞,value這里是1,即該單詞出現一次 flagData = splitData.map(lambda word:(word,1)) # reduceByKey會將textSplitFlag中的key相同的放在一起處理 # 傳入的(x,y)中,x是上一次統計后的value,y是本次單詞中的value,即每一次是x+1 countData = flagData.reduceByKey(lambda x,y:x+y) #輸出文件 countData.saveAsTextFile("./result") 

運行后在住目錄下得到一個名為result的目錄,該目錄如下圖,SUCCESS表示生成文件成功,文件內容存儲在part-00000中

 

 

 


 

我們可以查看文件的部分內容:

('Chapter', 1) ('1', 1) ('SCARLETT', 1) ('O’HARA', 1) ('was', 74) ('not', 33) ('beautiful,', 1) ('but', 32) ('men', 4) ('seldom', 3) ('realized', 2) ('it', 37) ('when', 19) ('caught', 1) ('by', 20) ('her', 65) ('charmas', 1) ('the', 336) ('Tarleton', 7) ('twins', 16) ('were.', 1) ('In', 1) ('face', 6) ('were', 49) ... ... ... ... 

 

 

 


 

這樣就完成了一個spark的真正HelloWorld程序--單詞計數。對比三個語言版本的程序,發現一個事實那就是,用scala和python寫的代碼非常簡潔而且易懂,而Java實現的則相對復雜,難懂。當然這個易懂和難懂是相對而言的。如果你只會Java無論如何你都應該從中能看懂java的程序,而簡潔的scala和python對你來說根本看不懂。這也無妨,語言只是工具,重點看你怎么用。況且,我們使用java8的特性也可以寫出簡潔的代碼。

java8實現

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
import scala.Tuple2;

import java.util.Arrays;
import java.util.Iterator;

public class WordCountJava {

    public static void main(String[] args){

        SparkConf conf = new SparkConf().setMaster("local").setAppName("WordCountJava"); JavaSparkContext sc = new JavaSparkContext(conf); countJava8(sc); } public static void countJava8(JavaSparkContext sc){ sc.textFile("./GoneWithTheWind") .flatMap(s->Arrays.asList(s.split(" ")).iterator()) .mapToPair(s->new Tuple2<>(s,1)) .reduceByKey((x,y)->x+y) .saveAsTextFile("./resultJava8"); } } 

spark的優越性在這個小小的程序中已經有所體現,計算一本書的每個單詞出現的次數,spark在單機上運行(讀取文件、生成臨時文件、將結果寫到硬盤),加載-運行-結束只花費了2秒時間。

對程序進行優化

程序是否還能再簡單高效呢?當然是可以的,我們可以用countByValue這個函數,這個函數正是常用的計數的方法。

scala實現


import org.apache.spark.{SparkConf, SparkContext}

object WordCount {

  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setMaster("local").setAppName("WordCount") val sc = new SparkContext(conf) //設置數據路徑 val text = sc.textFile("./GoneWithTheWind") //將文本數據按行處理,每行按空格拆成一個數組 // flatMap會將各個數組中元素合成一個大的集合 val textSplit = text.flatMap(line =>line.split(" ")) println(textSplit.countByValue()) } } 

運行得到結果

Map(Heknew -> 1, &emsp;&emsp;“Ashley -> 1, “Let’s -> 1, anarresting -> 1, of. -> 1, pasture -> 1, war’s -> 1, wall. -> 1, looks -> 2, ain’t -> 7,.......

 

 

 

java實現

public class WordCountJava {

    public static void main(String[] args){

        SparkConf conf = new SparkConf().setMaster("local").setAppName("WordCountJava"); JavaSparkContext sc = new JavaSparkContext(conf); countJava(sc); } public static void countJava(JavaSparkContext sc){ //設置數據的路徑 JavaRDD<String> textRDD = sc.textFile("./GoneWithTheWind"); //將文本數據按行處理,每行按空格拆成一個數組,flatMap會將各個數組中元素合成一個大的集合 //這里需要注意的是FlatMapFunction中<String, String>,第一個表示輸入,第二個表示輸出 //與Hadoop中的map-reduce非常相似 JavaRDD<String> splitRDD = textRDD.flatMap(new FlatMapFunction<String, String>() { @Override public Iterator<String> call(String s) throws Exception { return Arrays.asList(s.split(" ")).iterator(); } }); System.out.println(splitRDD.countByValue()); } } 

運行得到結果

{Heknew=1, &emsp;&emsp;“Ashley=1, “Let’s=1, anarresting=1, of.=1, pasture=1, war’s=1, wall.=1, looks=2, ain’t=7, Clayton=1, approval.=1, ideas=1,

 

 

python實現

from pyspark import SparkConf,SparkContext


conf = SparkConf().setMaster("local").setAppName("HelloWorld") sc = SparkContext(conf=conf) # 設置數據的路徑 textData = sc.textFile("./GoneWithTheWind") # 將文本數據按行處理,每行按空格拆成一個數組,flatMap會將各個數組中元素合成一個大的集合 splitData = textData.flatMap(lambda line:line.split(" ")) print(splitData.countByValue()) 

運行得到結果:

defaultdict(<class 'int'>, {'Chapter': 1, '1': 1, 'SCARLETT': 1, 'O’HARA': 1, 'was': 74, 'not': 33, 'beautiful,': 1, 'but': 32, 'men': 4, 

 

spark的優越性在這個小小的程序中已經有所體現,計算一本書的每個單詞出現的次數,spark在單機上運行(讀取文件、生成臨時文件、將結果寫到硬盤),加載-運行-結束只花費了2秒時間。如果想要獲取源代碼以及數據內容,可以前往我的 github下載。



轉自:https://juejin.im/post/5c768f5b6fb9a049b348a811


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM