1.啟動
啟動HDFS
啟動spark的local模式./spark-shell
2.知識點
textFile:
def textFile(
path: String,
minPartitions: Int = defaultMinPartitions): RDD[String]
Filter:
Return a new RDD containing only the elements that satisfy a predicate.
def filter(f: T => Boolean): RDD[T],返回里面判斷是true的RDD。
map:
Return a new RDD by applying a function to all elements of this RDD.
def map[U: ClassTag](f: T => U): RDD[U],從T到U類型的一個數據轉換函數,最終返回的RDD中的數據類型是f函數返回的數據類型
flatMap:
Return a new RDD by first applying a function to all elements of this
RDD, and then flattening the results.
def flatMap[U: ClassTag](f: T => TraversableOnce[U]): RDD[U]
從T到集合類型的數據類型轉換,集合中的數據類型是U,最終返回的RDD數據類型是f函數返回的集合中的具體的類型數據。
3.編寫基礎的wordcount程序
1 //讀取文件 2 val rdd=sc.textFile("wc/input/wc.input") 3 //過濾數據 4 val filterRdd=rdd.filter(len=>len.length>0) 5 //數據轉換 6 val flatMapRdd=filterRdd.flatMap(line=>line.split(" ") 7 .map(word=>(word,1))) 8 //分組 9 val groupByRdd=flatMapRdd.groupBy(tuple=>tuple._1) 10 //聚合 11 val wordCount=groupByRdd.map(tuple=>{ 12 val word=tuple._1 13 val sum=tuple._2.toList.foldLeft(0)((a,b)=>a+b._2) 14 (word,sum) 15 }) 16 //輸出 17 wordCount.foreach(println) //控制台上的輸出 18 wordCount.saveAsTextFile("wc/output6") //HDFS上的輸出
4.簡化代碼(鏈式編程)
1 sc.textFile("wc/input/wc.input"). 2 //數據過濾 3 filter(_.length>0). 4 //數據轉換 5 flatMap(_.split(" ").map((_,1))). 6 //分組 7 groupByKey(). 8 //統計 9 map(tuple=>(tuple._1,tuple._2.toList.sum)). 10 //輸出 11 saveAsTextFile("wc/output7")
5.最優化程序
reduceByKey存在combiner。
groupBy在大數據量的情況下,會出現OOM
1 sc.textFile("wc/input/wc.input"). 2 //數據過濾 3 filter(_.length>0). 4 //數據轉換 5 flatMap(_.split(" ").map((_,1))). 6 //統計 7 reduceByKey(_+_). 8 //輸出 9 saveAsTextFile("wc/output8")
6.顯示結果
1 sc.textFile("wc/input/wc.input"). 2 //數據過濾 3 filter(_.length>0). 4 //數據轉換 5 flatMap(_.split(" ").map((_,1))). 6 //統計 7 reduceByKey(_+_). 8 collect()
7.排序(第二個數,從大到小)
1 sc.textFile("wc/input/wc.input"). 2 //數據過濾 3 filter(_.length>0). 4 //數據轉換 5 flatMap(_.split(" ").map((_,1))). 6 //統計 7 reduceByKey(_+_). 8 //排序 9 sortBy(tuple=>tuple._2,ascending=false). 10 collect()
8.TopK(方式一)
1 sc.textFile("wc/input/wc.input"). 2 //數據過濾 3 filter(_.length>0). 4 //數據轉換 5 flatMap(_.split(" ").map((_,1))). 6 //統計 7 reduceByKey(_+_). 8 //排序 9 sortBy(tuple=>tuple._2,ascending=false). 10 take(4)
9.TopK(方式二,自定義)
1 sc.textFile("wc/input/wc.input"). 2 //數據過濾 3 filter(_.length>0). 4 //數據轉換 5 flatMap(_.split(" ").map((_,1))). 6 //統計 7 reduceByKey(_+_). 8 //排序 9 sortBy(tuple=>tuple._2,ascending=false). 10 top(3)(new scala.math.Ordering[(String,Int)](){ 11 override def compare(x:(String,Int),y:(String,Int))={ 12 val tmp=x._2.compare(y._2) 13 if(tmp!=0) tmp 14 else x._1.compare(x._1) 15 } 16 })