Spark 學習筆記之 distinct/groupByKey/reduceByKey


distinct/groupByKey/reduceByKey:

 

distinct:

import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.SparkSession

object TransformationsDemo {
  def main(args: Array[String]): Unit = {
    val sparkSession = SparkSession.builder().appName("TransformationsDemo").master("local[1]").getOrCreate()
    val sc = sparkSession.sparkContext
    testDistinct(sc)
  }

  private def testDistinct(sc: SparkContext) = {
    val rdd = sc.makeRDD(Seq("aa", "bb", "cc", "aa", "cc"), 1)
    //對RDD中的元素進行去重操作
    rdd.distinct(1).collect().foreach(println)
  }
}

運行結果:

 

groupByKey:

import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.SparkSession

object TransformationsDemo {
  def main(args: Array[String]): Unit = {
    val sparkSession = SparkSession.builder().appName("TransformationsDemo").master("local[1]").getOrCreate()
    val sc = sparkSession.sparkContext
    testGroupByKey(sc)

  }

  private def testGroupByKey(sc: SparkContext) = {
    val rdd: RDD[(String, Int)] = sc.makeRDD(Seq(("aa", 1), ("bb", 1), ("cc", 1), ("aa", 1), ("cc", 1)), 1)
    //pair RDD,即RDD的每一行是(key, value),key相同進行聚合
    rdd.groupByKey().map(v => (v._1, v._2.sum)).collect().foreach(println)
  }
}

運行結果:

 

reduceByKey:

import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.SparkSession

object TransformationsDemo {
  def main(args: Array[String]): Unit = {
    val sparkSession = SparkSession.builder().appName("TransformationsDemo").master("local[1]").getOrCreate()
    val sc = sparkSession.sparkContext
    testReduceByKey(sc)

  }

  private def testReduceByKey(sc: SparkContext) = {
    val rdd: RDD[(String, Int)] = sc.makeRDD(Seq(("aa", 1), ("bb", 1), ("cc", 1), ("aa", 1), ("cc", 1)), 1)
    //pair RDD,即RDD的每一行是(key, value),key相同進行聚合
    rdd.reduceByKey(_+_).collect().foreach(println)
  }
}

運行結果:

groupByKey與 reduceByKey區別:

reduceByKey用於對每個key對應的多個value進行merge操作,最重要的是它能夠在本地先進行merge操作,並且merge操作可以通過函數自定義。groupByKey也是對每個key進行操作,但只生成一個sequence。因為groupByKey不能自定義函數,我們需要先用groupByKey生成RDD,然后才能對此RDD通過map進行自定義函數操作。當調用 groupByKey時,所有的鍵值對(key-value pair) 都會被移動。在網絡上傳輸這些數據非常沒有必要。避免使用 GroupByKey。

 

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM