Spark自定義排序與分區


Spark自定義排序與分區

前言:

隨着信息時代的不斷發展,數據成了時代主題,今天的我們徜徉在數據的海洋中;由於數據的爆炸式增長,各種數據計算引擎如雨后春筍般沖擊着這個時代。作為時下最主流的計算引擎之一 Spark也是從各方面向時代展示自己的強大能力。Spark無論是在數據處理還是數據分析、挖掘方面都展現出了強大的主導能力。其分布式計算能力受到越來越多的青睞。本文將介紹spark的排序以及分區。

一、Spark自定義排序

spark中定義了封裝了很多高級的api,在我們的日常開發中使用這些api能獲得不少的便利。但是有的時候這些默認的規則並不足以實現我們的目的,這時候需要我們了解其底層原理,編寫一套適合我們需求的處理邏輯。下面通過代碼簡單介紹一下spark如何自定義排序。

import org.apache.spark.rdd.RDD

import org.apache.spark.{SparkConf, SparkContext}

object CustomSort1 {

  def main(args: Array[String]): Unit = {

    val conf = new SparkConf().setAppName("CustomSort1").setMaster("local[*]")

    val sc = new SparkContext(conf)

    //排序規則:首先按照顏值的降序,如果顏值相等,再按照年齡的升序

    val users= Array("laoduan 30 99", "laozhao 29 9999", "laozhang 28 98", "laoyang 28 99")

    //將Driver端的數據並行化變成RDD

    val lines: RDD[String] = sc.parallelize(users)

    //切分整理數據

    val userRDD: RDD[User] = lines.map(line => {

      val fields = line.split(" ")

      val name = fields(0)

      val age = fields(1).toInt

      val fv = fields(2).toInt

      //(name, age, fv)

      new User(name, age, fv)

    })

    //不滿足要求

    //tpRDD.sortBy(tp => tp._3, false)

    //將RDD里面裝的User類型的數據進行排序

    val sorted: RDD[User] = userRDD.sortBy(u => u)

    val r = sorted.collect()

    println(r.toBuffer)

    sc.stop()

  }

}

class User(val name: String, val age: Int, val fv: Int) extends Ordered[User] with Serializable {

  override def compare(that: User): Int = {

    if(this.fv == that.fv) {

      this.age - that.age

    } else {

      -(this.fv - that.fv)

    }

  }

  override def toString: String = s"name: $name, age: $age, fv: $fv"

}

 

對於自定義排序有多種方式實現:

1、User類繼承Ordered使User類變成可排序的類。在spark中由於我們雖然測試是在本地測試,但是他會模擬集群模式,所以我們自定義的object在運行時會shuffle有網絡傳輸會涉及序列化的問題。所以需要同時繼承Serializable。

2、使用case class樣例類:

 case class Man(age: Int, fv: Int) extends Ordered[Man] {}

 

不需要繼承序列化類,case class默認已經實現序列化。

3、定義樣例類隱式排序規則

 object SortRules {

  implicit object OrderingUser extends Ordering[User] {

    override def compare(x: User, y: User): Int = {

      if(x.fv == y.fv) {

        x.age - y.age

      } else {

        y.fv - x.fv

      }

    }

  }

}

 

 

主程序代碼:

//切分整理數據

    val tpRDD: RDD[(String, Int, Int)] = lines.map(line => {

      val fields = line.split(" ")

      val name = fields(0)

      val age = fields(1).toInt

      val fv = fields(2).toInt

      (name, age, fv)

    })

 

    //排序(傳入了一個排序規則,不會改變數據的格式,只會改變順序)

    import SortRules.OrderingUser

    val sorted: RDD[(String, Int, Int)] = tpRDD.sortBy(tp => User(tp._2, tp._3))

 

4、某些特殊數據類型不需要自定義,使用原生api更方便。

//充分利用元組的比較規則,元組的比較規則:先比第一,相等再比第二個

val sorted: RDD[(String, Int, Int)] = tpRDD.sortBy(tp => (-tp._3, tp._2))

 

5、將排序規則添加到隱士轉換中

   

  //Ordering[(Int, Int)]最終比較的規則格式

    //on[(String, Int, Int)]未比較之前的數據格式

    //(t =>(-t._3, t._2))怎樣將規則轉換成想要比較的格式

    implicit val rules = Ordering[(Int, Int)].on[(String, Int, Int)](t =>(-t._3, t._2))

val sorted: RDD[(String, Int, Int)] = tpRDD.sortBy(tp => tp)

 

二、Spark自定義分區器

1、combineByKey

reduceByKeygroupByKey等算子都基於combineByKey算子實現。這是一個底層的算子,可以自定義一些規則,比較靈活。

Rdd.combineByKey(x=>x,(m:Int,n:Int)=>m+n,(a:Int,B:Int)=>a+b,new HashPartition(2),true,null)

 

參數解釋:

(1)、相同keyvalue放入一個分區

(2)、局部聚合

(3)、全局聚合

(4)、分區數(可以設置分區數)

(5)、是否進行map端局部聚合

(6)、序列化參數

   conbineByKey是一個較為底層的api,一般情況下可能不會用到它,但是當一些高級api滿足不了我們的需求的時候它給我們提供了解決便利。

2、自定義分區器

spark計算中不可避免的會涉及到shuffle,數據會根據不同的規則有分區器分發到不同的分區中。所以分區器決定了上游的數據發送到哪個下游。以不同專業學生數據計算不同專業的學生成績。分組取topN

 1)、自定義分區器  

//自定義分區器:majors:專業集合

class MajorParitioner(majors: Array[String]) extends Partitioner {

  //相當於主構造器(new的時候回執行一次)

  //用於存放規則的一個map

  val rules = new mutable.HashMap[String, Int]()

  var i = 0

  for(major<- majors) {

    //rules(major) = i

    rules.put(major, i)

    i += 1

  }

  //返回分區的數量(下一個RDD有多少分區)

  override def numPartitions: Int = majors.length

  //根據傳入的key計算分區標號

  //key是一個元組(String, String)

  override def getPartition(key: Any): Int = {

    //獲取key

    val major= key.asInstanceOf[(String, String)]._1

    //根據規則計算分區編號

    rules(major)

  }

}

 

 2)、使用自定義分區器

//調用自定義的分區器,並且按照指定的分區器進行分區

    val majorPatitioner = new MajorParitioner(subjects);

    //partitionBy按照指定的分區規則進行分區

    //調用partitionBy時RDD的Key是(String, String)

    val partitioned: RDD[((String, String), Int)] = reduced.partitionBy(majorPatitioner )

    //如果一次拿出一個分區(可以操作一個分區中的數據了)

    val sorted: RDD[((String, String), Int)] = partitioned.mapPartitions(it => {

      //將迭代器轉換成list,然后排序,在轉換成迭代器返回

      it.toList.sortBy(_._2).reverse.take(topN).iterator

    })

    //

val r: Array[((String, String), Int)] = sorted.collect()

 

通過這樣自定義分區器后,數據通過shuffle之后每個分區的數據就是一個專業的學生數據,對這個分區的數據排序后取出前N個就是所需結果了。但是這個程序中還是會出現一個問題,當數據量太大的時候可能會導致內存溢出的情況,因為我們是將數據放到了list中進行排序,而list是存放於內存中。所以會導致內存溢出。那么怎么才能避免這個情況呢。我們可以在mapPartitions內部定義一個集合,不加載所有數據。,每次將這個集合排序后最小的值移除,通過多次循環后最終集合中剩下的就是需要的結果。

三、總結

  無論是排序還是分區,在spark中都封裝了高級的api共我們使用,但是他不會適用於所有情況,只會適用與部分情況,而通過對這些api的底層實現了解,通過自定義規則可以編輯一套適合於我們需求的程序。這樣一來可以大大提高效率。沒有什么能適配萬物,隨機應變才是取勝之道。

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM