006 Spark中的wordcount以及TopK的程序編寫


1.啟動

  啟動HDFS

  啟動spark的local模式./spark-shell

 

2.知識點

 textFile:

  def textFile(
path: String,
minPartitions: Int = defaultMinPartitions): RDD[String]

 Filter: 

  Return a new RDD containing only the elements that satisfy a predicate.

  def filter(f: T => Boolean): RDD[T],返回里面判斷是true的RDD。

 map:

  Return a new RDD by applying a function to all elements of this RDD.
 def map[U: ClassTag](f: T => U): RDD[U],從T到U類型的一個數據轉換函數,最終返回的RDD中的數據類型是f函數返回的數據類型

 flatMap:

    Return a new RDD by first applying a function to all elements of this
RDD, and then flattening the results.
    def flatMap[U: ClassTag](f: T => TraversableOnce[U]): RDD[U]
  從T到集合類型的數據類型轉換,集合中的數據類型是U,最終返回的RDD數據類型是f函數返回的集合中的具體的類型數據。


3.編寫基礎的wordcount程序
 1 //讀取文件
 2 val rdd=sc.textFile("wc/input/wc.input")
 3 //過濾數據
 4 val filterRdd=rdd.filter(len=>len.length>0)
 5 //數據轉換
 6 val flatMapRdd=filterRdd.flatMap(line=>line.split(" ")
 7     .map(word=>(word,1)))
 8 //分組
 9 val groupByRdd=flatMapRdd.groupBy(tuple=>tuple._1)
10 //聚合
11 val wordCount=groupByRdd.map(tuple=>{
12     val word=tuple._1
13     val sum=tuple._2.toList.foldLeft(0)((a,b)=>a+b._2)
14     (word,sum)
15 })
16 //輸出
17 wordCount.foreach(println)             //控制台上的輸出
18 wordCount.saveAsTextFile("wc/output6") //HDFS上的輸出

 

4.簡化代碼(鏈式編程)

 1 sc.textFile("wc/input/wc.input").
 2 //數據過濾
 3 filter(_.length>0).
 4 //數據轉換
 5 flatMap(_.split(" ").map((_,1))).
 6 //分組
 7 groupByKey().
 8 //統計
 9 map(tuple=>(tuple._1,tuple._2.toList.sum)).
10 //輸出
11 saveAsTextFile("wc/output7")

 

5.最優化程序

   reduceByKey存在combiner。

  groupBy在大數據量的情況下,會出現OOM

1 sc.textFile("wc/input/wc.input").
2 //數據過濾
3 filter(_.length>0).
4 //數據轉換
5 flatMap(_.split(" ").map((_,1))).
6 //統計
7 reduceByKey(_+_).
8 //輸出
9 saveAsTextFile("wc/output8")

 

6.顯示結果

1 sc.textFile("wc/input/wc.input").
2 //數據過濾
3 filter(_.length>0).
4 //數據轉換
5 flatMap(_.split(" ").map((_,1))).
6 //統計
7 reduceByKey(_+_).
8 collect()

 

7.排序(第二個數,從大到小)

 1 sc.textFile("wc/input/wc.input").
 2 //數據過濾
 3 filter(_.length>0).
 4 //數據轉換
 5 flatMap(_.split(" ").map((_,1))).
 6 //統計
 7 reduceByKey(_+_).
 8 //排序
 9 sortBy(tuple=>tuple._2,ascending=false).
10 collect()

 

8.TopK(方式一)

 1 sc.textFile("wc/input/wc.input").
 2 //數據過濾
 3 filter(_.length>0).
 4 //數據轉換
 5 flatMap(_.split(" ").map((_,1))).
 6 //統計
 7 reduceByKey(_+_).
 8 //排序
 9 sortBy(tuple=>tuple._2,ascending=false).
10 take(4)

 

9.TopK(方式二,自定義)

 1 sc.textFile("wc/input/wc.input").
 2 //數據過濾
 3 filter(_.length>0).
 4 //數據轉換
 5 flatMap(_.split(" ").map((_,1))).
 6 //統計
 7 reduceByKey(_+_).
 8 //排序
 9 sortBy(tuple=>tuple._2,ascending=false).
10 top(3)(new scala.math.Ordering[(String,Int)](){
11     override def compare(x:(String,Int),y:(String,Int))={
12         val tmp=x._2.compare(y._2)
13         if(tmp!=0) tmp
14         else x._1.compare(x._1)
15     }
16     })

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM