006 Spark中的wordcount以及TopK的程序编写


1.启动

  启动HDFS

  启动spark的local模式./spark-shell

 

2.知识点

 textFile:

  def textFile(
path: String,
minPartitions: Int = defaultMinPartitions): RDD[String]

 Filter: 

  Return a new RDD containing only the elements that satisfy a predicate.

  def filter(f: T => Boolean): RDD[T],返回里面判断是true的RDD。

 map:

  Return a new RDD by applying a function to all elements of this RDD.
 def map[U: ClassTag](f: T => U): RDD[U],从T到U类型的一个数据转换函数,最终返回的RDD中的数据类型是f函数返回的数据类型

 flatMap:

    Return a new RDD by first applying a function to all elements of this
RDD, and then flattening the results.
    def flatMap[U: ClassTag](f: T => TraversableOnce[U]): RDD[U]
  从T到集合类型的数据类型转换,集合中的数据类型是U,最终返回的RDD数据类型是f函数返回的集合中的具体的类型数据。


3.编写基础的wordcount程序
 1 //读取文件
 2 val rdd=sc.textFile("wc/input/wc.input")
 3 //过滤数据
 4 val filterRdd=rdd.filter(len=>len.length>0)
 5 //数据转换
 6 val flatMapRdd=filterRdd.flatMap(line=>line.split(" ")
 7     .map(word=>(word,1)))
 8 //分组
 9 val groupByRdd=flatMapRdd.groupBy(tuple=>tuple._1)
10 //聚合
11 val wordCount=groupByRdd.map(tuple=>{
12     val word=tuple._1
13     val sum=tuple._2.toList.foldLeft(0)((a,b)=>a+b._2)
14     (word,sum)
15 })
16 //输出
17 wordCount.foreach(println)             //控制台上的输出
18 wordCount.saveAsTextFile("wc/output6") //HDFS上的输出

 

4.简化代码(链式编程)

 1 sc.textFile("wc/input/wc.input").
 2 //数据过滤
 3 filter(_.length>0).
 4 //数据转换
 5 flatMap(_.split(" ").map((_,1))).
 6 //分组
 7 groupByKey().
 8 //统计
 9 map(tuple=>(tuple._1,tuple._2.toList.sum)).
10 //输出
11 saveAsTextFile("wc/output7")

 

5.最优化程序

   reduceByKey存在combiner。

  groupBy在大数据量的情况下,会出现OOM

1 sc.textFile("wc/input/wc.input").
2 //数据过滤
3 filter(_.length>0).
4 //数据转换
5 flatMap(_.split(" ").map((_,1))).
6 //统计
7 reduceByKey(_+_).
8 //输出
9 saveAsTextFile("wc/output8")

 

6.显示结果

1 sc.textFile("wc/input/wc.input").
2 //数据过滤
3 filter(_.length>0).
4 //数据转换
5 flatMap(_.split(" ").map((_,1))).
6 //统计
7 reduceByKey(_+_).
8 collect()

 

7.排序(第二个数,从大到小)

 1 sc.textFile("wc/input/wc.input").
 2 //数据过滤
 3 filter(_.length>0).
 4 //数据转换
 5 flatMap(_.split(" ").map((_,1))).
 6 //统计
 7 reduceByKey(_+_).
 8 //排序
 9 sortBy(tuple=>tuple._2,ascending=false).
10 collect()

 

8.TopK(方式一)

 1 sc.textFile("wc/input/wc.input").
 2 //数据过滤
 3 filter(_.length>0).
 4 //数据转换
 5 flatMap(_.split(" ").map((_,1))).
 6 //统计
 7 reduceByKey(_+_).
 8 //排序
 9 sortBy(tuple=>tuple._2,ascending=false).
10 take(4)

 

9.TopK(方式二,自定义)

 1 sc.textFile("wc/input/wc.input").
 2 //数据过滤
 3 filter(_.length>0).
 4 //数据转换
 5 flatMap(_.split(" ").map((_,1))).
 6 //统计
 7 reduceByKey(_+_).
 8 //排序
 9 sortBy(tuple=>tuple._2,ascending=false).
10 top(3)(new scala.math.Ordering[(String,Int)](){
11     override def compare(x:(String,Int),y:(String,Int))={
12         val tmp=x._2.compare(y._2)
13         if(tmp!=0) tmp
14         else x._1.compare(x._1)
15     }
16     })

 


免责声明!

本站转载的文章为个人学习借鉴使用,本站对版权不负任何法律责任。如果侵犯了您的隐私权益,请联系本站邮箱yoyou2525@163.com删除。



 
粤ICP备18138465号  © 2018-2025 CODEPRJ.COM