Spark自定義排序與分區
前言:
隨着信息時代的不斷發展,數據成了時代主題,今天的我們徜徉在數據的海洋中;由於數據的爆炸式增長,各種數據計算引擎如雨后春筍般沖擊着這個時代。作為時下最主流的計算引擎之一 Spark也是從各方面向時代展示自己的強大能力。Spark無論是在數據處理還是數據分析、挖掘方面都展現出了強大的主導能力。其分布式計算能力受到越來越多的青睞。本文將介紹spark的排序以及分區。
一、Spark自定義排序
在spark中定義了封裝了很多高級的api,在我們的日常開發中使用這些api能獲得不少的便利。但是有的時候這些默認的規則並不足以實現我們的目的,這時候需要我們了解其底層原理,編寫一套適合我們需求的處理邏輯。下面通過代碼簡單介紹一下spark如何自定義排序。
import org.apache.spark.rdd.RDD import org.apache.spark.{SparkConf, SparkContext} object CustomSort1 { def main(args: Array[String]): Unit = { val conf = new SparkConf().setAppName("CustomSort1").setMaster("local[*]") val sc = new SparkContext(conf) //排序規則:首先按照顏值的降序,如果顏值相等,再按照年齡的升序 val users= Array("laoduan 30 99", "laozhao 29 9999", "laozhang 28 98", "laoyang 28 99") //將Driver端的數據並行化變成RDD val lines: RDD[String] = sc.parallelize(users) //切分整理數據 val userRDD: RDD[User] = lines.map(line => { val fields = line.split(" ") val name = fields(0) val age = fields(1).toInt val fv = fields(2).toInt //(name, age, fv) new User(name, age, fv) }) //不滿足要求 //tpRDD.sortBy(tp => tp._3, false) //將RDD里面裝的User類型的數據進行排序 val sorted: RDD[User] = userRDD.sortBy(u => u) val r = sorted.collect() println(r.toBuffer) sc.stop() } } class User(val name: String, val age: Int, val fv: Int) extends Ordered[User] with Serializable { override def compare(that: User): Int = { if(this.fv == that.fv) { this.age - that.age } else { -(this.fv - that.fv) } } override def toString: String = s"name: $name, age: $age, fv: $fv" }
對於自定義排序有多種方式實現:
1、User類繼承Ordered使User類變成可排序的類。在spark中由於我們雖然測試是在本地測試,但是他會模擬集群模式,所以我們自定義的object在運行時會shuffle有網絡傳輸會涉及序列化的問題。所以需要同時繼承Serializable。
2、使用case class樣例類:
case class Man(age: Int, fv: Int) extends Ordered[Man] {}
不需要繼承序列化類,case class默認已經實現序列化。
3、定義樣例類隱式排序規則
object SortRules { implicit object OrderingUser extends Ordering[User] { override def compare(x: User, y: User): Int = { if(x.fv == y.fv) { x.age - y.age } else { y.fv - x.fv } } } }
主程序代碼:
//切分整理數據 val tpRDD: RDD[(String, Int, Int)] = lines.map(line => { val fields = line.split(" ") val name = fields(0) val age = fields(1).toInt val fv = fields(2).toInt (name, age, fv) }) //排序(傳入了一個排序規則,不會改變數據的格式,只會改變順序) import SortRules.OrderingUser val sorted: RDD[(String, Int, Int)] = tpRDD.sortBy(tp => User(tp._2, tp._3))
4、某些特殊數據類型不需要自定義,使用原生api更方便。
//充分利用元組的比較規則,元組的比較規則:先比第一,相等再比第二個 val sorted: RDD[(String, Int, Int)] = tpRDD.sortBy(tp => (-tp._3, tp._2))
5、將排序規則添加到隱士轉換中
//Ordering[(Int, Int)]最終比較的規則格式 //on[(String, Int, Int)]未比較之前的數據格式 //(t =>(-t._3, t._2))怎樣將規則轉換成想要比較的格式 implicit val rules = Ordering[(Int, Int)].on[(String, Int, Int)](t =>(-t._3, t._2)) val sorted: RDD[(String, Int, Int)] = tpRDD.sortBy(tp => tp)
二、Spark自定義分區器
1、combineByKey
在reduceByKey、groupByKey等算子都基於combineByKey算子實現。這是一個底層的算子,可以自定義一些規則,比較靈活。
Rdd.combineByKey(x=>x,(m:Int,n:Int)=>m+n,(a:Int,B:Int)=>a+b,new HashPartition(2),true,null)
參數解釋:
(1)、相同key的value放入一個分區
(2)、局部聚合
(3)、全局聚合
(4)、分區數(可以設置分區數)
(5)、是否進行map端局部聚合
(6)、序列化參數
conbineByKey是一個較為底層的api,一般情況下可能不會用到它,但是當一些高級api滿足不了我們的需求的時候它給我們提供了解決便利。
2、自定義分區器
在spark計算中不可避免的會涉及到shuffle,數據會根據不同的規則有分區器分發到不同的分區中。所以分區器決定了上游的數據發送到哪個下游。以不同專業學生數據計算不同專業的學生成績。分組取topN :
(1)、自定義分區器
//自定義分區器:majors:專業集合 class MajorParitioner(majors: Array[String]) extends Partitioner { //相當於主構造器(new的時候回執行一次) //用於存放規則的一個map val rules = new mutable.HashMap[String, Int]() var i = 0 for(major<- majors) { //rules(major) = i rules.put(major, i) i += 1 } //返回分區的數量(下一個RDD有多少分區) override def numPartitions: Int = majors.length //根據傳入的key計算分區標號 //key是一個元組(String, String) override def getPartition(key: Any): Int = { //獲取key val major= key.asInstanceOf[(String, String)]._1 //根據規則計算分區編號 rules(major) } }
(2)、使用自定義分區器
//調用自定義的分區器,並且按照指定的分區器進行分區 val majorPatitioner = new MajorParitioner(subjects); //partitionBy按照指定的分區規則進行分區 //調用partitionBy時RDD的Key是(String, String) val partitioned: RDD[((String, String), Int)] = reduced.partitionBy(majorPatitioner ) //如果一次拿出一個分區(可以操作一個分區中的數據了) val sorted: RDD[((String, String), Int)] = partitioned.mapPartitions(it => { //將迭代器轉換成list,然后排序,在轉換成迭代器返回 it.toList.sortBy(_._2).reverse.take(topN).iterator }) // val r: Array[((String, String), Int)] = sorted.collect()
通過這樣自定義分區器后,數據通過shuffle之后每個分區的數據就是一個專業的學生數據,對這個分區的數據排序后取出前N個就是所需結果了。但是這個程序中還是會出現一個問題,當數據量太大的時候可能會導致內存溢出的情況,因為我們是將數據放到了list中進行排序,而list是存放於內存中。所以會導致內存溢出。那么怎么才能避免這個情況呢。我們可以在mapPartitions內部定義一個集合,不加載所有數據。,每次將這個集合排序后最小的值移除,通過多次循環后最終集合中剩下的就是需要的結果。
三、總結
無論是排序還是分區,在spark中都封裝了高級的api共我們使用,但是他不會適用於所有情況,只會適用與部分情況,而通過對這些api的底層實現了解,通過自定義規則可以編輯一套適合於我們需求的程序。這樣一來可以大大提高效率。沒有什么能適配萬物,隨機應變才是取勝之道。