spark實現wordcount的幾種方式總結


方法一:map + reduceByKey

package com.cw.bigdata.spark.wordcount

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

/** * WordCount實現第一種方式:map + reduceByKey * * @author 陳小哥cw * @date 2020/7/9 9:59 */
object WordCount1 {
  def main(args: Array[String]): Unit = {
    val config: SparkConf = new SparkConf().setMaster("local[*]").setAppName("WordCount1")

    val sc: SparkContext = new SparkContext(config)

    val lines: RDD[String] = sc.textFile("in")

    lines.flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_).collect().foreach(println)
  }
}

方法二:使用countByValue代替map + reduceByKey

package com.cw.bigdata.spark.wordcount

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

/** * WordCount實現第二種方式:使用countByValue代替map + reduceByKey * * 根據數據集每個元素相同的內容來計數。返回相同內容的元素對應的條數。(不必作用在kv格式上) * map(value => (value, null)).countByKey() * * @author 陳小哥cw * @date 2020/7/9 10:02 */
object WordCount2 {
  def main(args: Array[String]): Unit = {
    val config: SparkConf = new SparkConf().setMaster("local[*]").setAppName("WordCount2")

    val sc: SparkContext = new SparkContext(config)

    val lines: RDD[String] = sc.textFile("in")

    lines.flatMap(_.split(" ")).countByValue().foreach(println)

  }
}

方法三:aggregateByKey或者foldByKey

package com.cw.bigdata.spark.wordcount

import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.rdd.RDD

/** * WordCount實現第三種方式:aggregateByKey或者foldByKey * * def aggregateByKey[U: ClassTag](zeroValue: U)(seqOp: (U, V) => U,combOp: (U, U) => U): RDD[(K, U)] * 1.zeroValue:給每一個分區中的每一個key一個初始值; * 2.seqOp:函數用於在每一個分區中用初始值逐步迭代value;(分區內聚合函數) * 3.combOp:函數用於合並每個分區中的結果。(分區間聚合函數) * * foldByKey相當於aggregateByKey的簡化操作,seqop和combop相同 * * * @author 陳小哥cw * @date 2020/7/9 10:08 */
object WordCount3 {
  def main(args: Array[String]): Unit = {
    val config: SparkConf = new SparkConf().setMaster("local[*]").setAppName("WordCount3")

    val sc: SparkContext = new SparkContext(config)

    val lines: RDD[String] = sc.textFile("in")

    lines.flatMap(_.split(" ")).map((_, 1)).aggregateByKey(0)(_ + _, _ + _).collect().foreach(println)
    
    lines.flatMap(_.split(" ")).map((_, 1)).foldByKey(0)(_ + _).collect().foreach(println)

  }
}

方法四:groupByKey+map

package com.cw.bigdata.spark.wordcount

import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.rdd.RDD

/** * WordCount實現的第四種方式:groupByKey+map * * @author 陳小哥cw * @date 2020/7/9 13:32 */
object WordCount4 {
  def main(args: Array[String]): Unit = {
    val config: SparkConf = new SparkConf().setMaster("local[*]").setAppName("WordCount4")

    val sc: SparkContext = new SparkContext(config)

    val lines: RDD[String] = sc.textFile("in")

    val groupByKeyRDD: RDD[(String, Iterable[Int])] = lines.flatMap(_.split(" ")).map((_, 1)).groupByKey()

    groupByKeyRDD.map(tuple => {
      (tuple._1, tuple._2.sum)
    }).collect().foreach(println)

  }
}

方法五:Scala原生實現wordcount

package com.cw.bigdata.spark.wordcount


/** * Scala原生實現wordcount * * @author 陳小哥cw * @date 2020/7/9 14:22 */
object WordCount5 {
  def main(args: Array[String]): Unit = {

    val list = List("cw is cool", "wc is beautiful", "andy is beautiful", "mike is cool")
    /** * 第一步,將list中的元素按照分隔符這里是空格拆分,然后展開 * 先map(_.split(" "))將每一個元素按照空格拆分 * 然后flatten展開 * flatmap即為上面兩個步驟的整合 */


    val res0 = list.map(_.split(" ")).flatten
    val res1 = list.flatMap(_.split(" "))

    println("第一步結果")
    println(res0)
    println(res1)

    /** * 第二步是將拆分后得到的每個單詞生成一個元組 * k是單詞名稱,v任意字符即可這里是1 */
    val res3 = res1.map((_, 1))
    println("第二步結果")
    println(res3)
    /** * 第三步是根據相同的key合並 */
    val res4 = res3.groupBy(_._1)
    println("第三步結果")
    println(res4)
    /** * 最后一步是求出groupBy后的每個key對應的value的size大小,即單詞出現的個數 */
    val res5 = res4.mapValues(_.size)
    println("最后一步結果")
    println(res5.toBuffer)
  }
}

方法六:combineByKey

package com.cw.bigdata.spark.wordcount

import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.rdd.RDD

/** * WordCount實現的第六種方式:combineByKey * * @author 陳小哥cw * @date 2020/7/9 22:55 */
object WordCount6 {
  def main(args: Array[String]): Unit = {
    val config: SparkConf = new SparkConf().setMaster("local[*]").setAppName("combineByKey")

    val sc: SparkContext = new SparkContext(config)

    val lines: RDD[String] = sc.textFile("in")

    val mapRDD: RDD[(String, Int)] = lines.flatMap(_.split(" ")).map((_, 1))

    // combineByKey實現wordcount
    mapRDD.combineByKey(
      x => x,
      (x: Int, y: Int) => x + y,
      (x: Int, y: Int) => x + y
    ).collect().foreach(println)

  }
}


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM