RDD 重新分區,排序 repartitionAndSortWithinPartitions


需求:將rdd數據中相同班級的學生分到一個partition中,並根據分數降序排序。

此實例用到的repartitionAndSortWithinPartitions是Spark官網推薦的一個算子,官方建議,如果需要在repartition重分區之后,還要進行排序,建議直接使用repartitionAndSortWithinPartitions算子。因為該算子可以一邊進行重分區的shuffle操作,一邊進行排序。shuffle與sort兩個操作同時進行,比先shuffle再sort來說,性能可能是要高的。 

import org.apache.spark.{SparkContext, SparkConf}

/**
 * Created by sunxufeng on 2016/6/18.
 */
class Student {

}


//創建key類,key組合鍵為grade,score
case class StudentKey(grade:String,score:Int)
//  extends Ordered[StudentKey]{
//  def compare(that: StudentKey) : Int = {
//    var result:Int = this.grade.compareTo(that.grade)
//    if (result == 0){
//      result = this.student.compareTo(that.student)
//      if(result ==0){
//        result = that.score.compareTo(this.score)
//      }
//    }
//    result
//  }
//}

object StudentKey {
  implicit def orderingByGradeStudentScore[A <: StudentKey] : Ordering[A] = {
//    Ordering.by(fk => (fk.grade, fk.student, fk.score * -1))
    Ordering.by(fk => (fk.grade, fk.score * -1))
  }
}

object Student{
  def main(args: Array[String]) {



    //定義hdfs文件索引值
    val grade_idx:Int=0
    val student_idx:Int=1
    val course_idx:Int=2
    val score_idx:Int=3

    //定義轉化函數,不能轉化為Int類型的,給默認值0
    def safeInt(s: String): Int = try { s.toInt } catch { case _: Throwable  => 0 }

    //定義提取key的函數
    def createKey(data: Array[String]):StudentKey={
      StudentKey(data(grade_idx),safeInt(data(score_idx)))
    }

    //定義提取value的函數
    def listData(data: Array[String]):List[String]={
      List(data(grade_idx),data(student_idx),data(course_idx),data(score_idx))
    }
    
    def createKeyValueTuple(data: Array[String]) :(StudentKey,List[String]) = {
      (createKey(data),listData(data))
    }

    //創建分區類
    import org.apache.spark.Partitioner
    class StudentPartitioner(partitions: Int) extends Partitioner {
      require(partitions >= 0, s"Number of partitions ($partitions) cannot be negative.")

      override def numPartitions: Int = partitions

      override def getPartition(key: Any): Int = {
        val k = key.asInstanceOf[StudentKey]
        k.grade.hashCode() % numPartitions
      }
    }

    //設置master為local,用來進行本地調試
    val conf = new SparkConf().setAppName("Student_partition_sort").setMaster("local")
    val sc = new SparkContext(conf)

    //學生信息是打亂的
    val student_array =Array(
      "c001,n003,chinese,59",
      "c002,n004,english,79",
      "c002,n004,chinese,13",
      "c001,n001,english,88",
      "c001,n002,chinese,10",
      "c002,n006,chinese,29",
      "c001,n001,chinese,54",
      "c001,n002,english,32",
      "c001,n003,english,43",
      "c002,n005,english,80",
      "c002,n005,chinese,48",
      "c002,n006,english,69"
      )
   //將學生信息並行化為rdd
    val student_rdd = sc.parallelize(student_array)
   //生成key-value格式的rdd
    val student_rdd2 = student_rdd.map(line => line.split(",")).map(createKeyValueTuple)
    //根據StudentKey中的grade進行分區,並根據score降序排列
    val student_rdd3 = student_rdd2.repartitionAndSortWithinPartitions(new StudentPartitioner(10))
   //打印數據 student_rdd3.collect.foreach(println) } }

 

 排序后的數據:

(StudentKey(c001,88),List(c001, n001, english, 88))
(StudentKey(c001,59),List(c001, n003, chinese, 59))
(StudentKey(c001,54),List(c001, n001, chinese, 54))
(StudentKey(c001,43),List(c001, n003, english, 43))
(StudentKey(c001,32),List(c001, n002, english, 32))
(StudentKey(c001,10),List(c001, n002, chinese, 10))
(StudentKey(c002,80),List(c002, n005, english, 80))
(StudentKey(c002,79),List(c002, n004, english, 79))
(StudentKey(c002,69),List(c002, n006, english, 69))
(StudentKey(c002,48),List(c002, n005, chinese, 48))
(StudentKey(c002,29),List(c002, n006, chinese, 29))
(StudentKey(c002,13),List(c002, n004, chinese, 13))

 

參考:http://codingjunkie.net/spark-secondary-sort/

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM