2.1 普通的分組TopN實現
2.2 自定義分區規則TopN實現
3.1 RDD緩存簡介
3.2 RDD緩存方式
正文
一,簡介
在之前的文章中,我們知道RDD的有一個特征:就是一組分片(Partition),即數據集的基本組成單位。對於RDD來說,每個分片都會被一個計算任務處理,並決定並行計算的粒度。用戶可以在創建RDD時指定RDD的分片個數,如果沒有指定,那么就會采用默認值。默認值就是程序所分配到的CPU Core的數目。這個分配的規則我們是可以自己定制的。同時我們一直在討論Spark快,快的方式有那些方面可以體現,RDD緩存就是其中的一個形式,這里將對這兩者進行介紹。
二,自定義分區規則
分組求TopN的方式有多種,這里進行簡單的幾種。這里尊卑一些數據:點擊下載
2.1 普通的分組TopN實現
實現思路一:先對數據進行處理,然后聚合。最后進行分組排序。
package cn.edu360.sparkTwo import org.apache.spark.rdd.RDD import org.apache.spark.{SparkConf, SparkContext} object SubjectTopNone { def main(args: Array[String]): Unit = { val conf: SparkConf = new SparkConf().setAppName("SubjectTopNone").setMaster("local[4]") val sc: SparkContext = new SparkContext(conf) val lines: RDD[String] = sc.textFile("hdfs://hd1:9000/sparkLearn/", 2) // 對每一行數據進行整理 val sbToTeacherOne: RDD[((String, String), Int)] = lines.map(line => { val words: Array[String] = line.split("/") val teacher: String = words(3) val subject: String = words(2).split("[.]")(0) ((subject, teacher), 1) }) // 聚合,將學科和老師聯合當做key val reduced: RDD[((String, String), Int)] = sbToTeacherOne.reduceByKey(_+_) //分組排序(按學科進行分組) //[學科,該學科對應的老師的數據] val grouped: RDD[(String, Iterable[((String, String), Int)])] = reduced.groupBy(_._1._1) // 這里取出的是每一組的數據 // 為什么可以調用scala的sortby方法呢?因為一個學科的數據已經在一台機器上的一個scala集合里面了 // 弊端,調用scala的sortBy當數據量過大時,有內存溢出的缺陷 val result: RDD[(String, List[((String, String), Int)])] = grouped.mapValues(_.toList.sortBy(_._2).reverse.take(4)) println(result.collect.toBuffer) } }
實現思路二:先對數據進行處理,然后聚合,然后對數據進行單學科過濾,最后進行排序,提交。
package cn.edu360.sparkTwo import org.apache.spark.rdd.RDD import org.apache.spark.{SparkConf, SparkContext} object SubjectTopNtwo { def main(args: Array[String]): Unit = { val conf: SparkConf = new SparkConf().setAppName("SubjectTwo").setMaster("local[4]") val sc: SparkContext = new SparkContext(conf) val lines: RDD[String] = sc.textFile("hdfs://hd1:9000/sparkLearn") val sbToTeacherOne: RDD[((String, String), Int)] = lines.map(line => { val words: Array[String] = line.split("/") val teacher: String = words(3) val subject: String = words(2).split("[.]")(0) ((subject, teacher), 1) }) val reduced: RDD[((String, String), Int)] = sbToTeacherOne.reduceByKey(_+_) // 獲取所有學科 val subjects: Array[String] = reduced.map(_._1._1).distinct().collect() // 對所有的reduce后的數據進行單學科過濾,在進行排序 for(sb <- subjects){ val filter: RDD[((String, String), Int)] = reduced.filter(_._1._1 == sb) // 這里進行了多次提交 val result: Array[((String, String), Int)] = filter.sortBy(_._2, false).take(3) print(result.toBuffer) } sc.stop() } }
2.2 自定義分區規則TopN實現
實現方式一:先對數據進行處理,然后聚合,而后對按照學科進行分區,然后對每一個分區進行排序。
package cn.edu360.sparkTwo import org.apache.spark.{Partitioner, SparkConf, SparkContext} import org.apache.spark.rdd.RDD import scala.collection.mutable object SubjectTopNthree { def main(args: Array[String]): Unit = { val conf: SparkConf = new SparkConf().setAppName("SubjectTopNone").setMaster("local[4]") val sc: SparkContext = new SparkContext(conf) val lines: RDD[String] = sc.textFile("hdfs://hd1:9000/sparkLearn/") val sbToTeacherOne: RDD[((String, String), Int)] = lines.map(line => { val words: Array[String] = line.split("/") val teacher: String = words(3) val subject: String = words(2).split("[.]")(0) ((subject, teacher), 1) }) //聚合,將學科和老師聯合當做key ---> 這里有一次shuffle val reduced: RDD[((String, String), Int)] = sbToTeacherOne.reduceByKey(_+_) //計算有多少學科 val subjects: Array[String] = reduced.map(_._1._1).distinct().collect() //partitionBy按照指定的分區規則進行分區 //調用partitionBy時RDD的Key是(String, String) --->這里也有一次shuffle val partioned: RDD[((String, String), Int)] = reduced.partitionBy(new SubPartitioner(subjects)) //如果一次拿出一個分區(可以操作一個分區中的數據了) val sorted: RDD[((String, String), Int)] = partioned.mapPartitions(it => { //將迭代器轉換成list,然后排序,在轉換成迭代器返回 it.toList.sortBy(_._2).reverse.take(3).iterator }) val result: Array[((String, String), Int)] = sorted.collect() print(result.toBuffer) } } // 自定義分區規則,需要繼承Partitioner class SubPartitioner(subs: Array[String]) extends Partitioner{ //相當於主構造器(new的時候回執行一次) //用於存放規則的一個map private val rules = new mutable.HashMap[String, Int]() var i = 0 for(sub <- subs){ rules.put(sub, i) i += 1 } //返回分區的數量(下一個RDD有多少分區) override def numPartitions: Int = subs.length //根據傳入的key計算分區標號 //key是一個元組(String, String) override def getPartition(key: Any): Int = { //獲取學科名稱 val s: String = key.asInstanceOf[(String, String)]._1 //根據規則計算分區編號 rules(s) } }
實現方式二:上面的過程可以將聚合和分區操作進行合並,減少shuffle的次數
package cn.edu360.sparkTwo import org.apache.spark.{Partitioner, SparkConf, SparkContext} import org.apache.spark.rdd.RDD import scala.collection.mutable object SubjectTopNfour { def main(args: Array[String]): Unit = { val conf: SparkConf = new SparkConf().setAppName("SubjectTopNone").setMaster("local[4]") val sc: SparkContext = new SparkContext(conf) val lines: RDD[String] = sc.textFile("hdfs://hd1:9000/sparkLearn/") val sbToTeacherOne: RDD[((String, String), Int)] = lines.map(line => { val words: Array[String] = line.split("/") val teacher: String = words(3) val subject: String = words(2).split("[.]")(0) ((subject, teacher), 1) }) val subjects: Array[String] = sbToTeacherOne.map(_._1._1).distinct().collect() // 在這里傳入分區規則,即聚合時就分區 val reduced: RDD[((String, String), Int)] = sbToTeacherOne.reduceByKey(new SubPartinerTwo(subjects), _+_) // 對每個分區進行排序 val result: RDD[((String, String), Int)] = reduced.mapPartitions(it => { it.toList.sortBy(_._2).reverse.take(3).iterator }) print(result.collect().toBuffer) } } class SubPartinerTwo(subs: Array[String]) extends Partitioner{ private val rules = new mutable.HashMap[String, Int]() var i = 0 for(sub <- subs){ rules.put(sub, i) i += 1 } override def numPartitions: Int = subs.length override def getPartition(key: Any): Int = { val subject: String = key.asInstanceOf[(String, String)]._1 rules(subject) } }
三,RDD的緩存
3.1 RDD緩存簡介
Spark速度非常快的原因之一,就是在不同操作中可以在內存中持久化或緩存數據集。當持久化某個RDD后,每一個節點都將把計算的分片結果保存在內存中,並在對此RDD或衍生出的RDD進行的其他動作中重用。這使得后續的動作變得更加迅速。RDD相關的持久化和緩存,是Spark最重要的特征之一。可以說,緩存是Spark構建迭代式算法和快速交互式查詢的關鍵。
3.2 RDD緩存方式
RDD通過persist方法或cache方法可以將前面的計算結果緩存,但是並不是這兩個方法被調用時立即緩存,而是觸發后面的action時,該RDD將會被緩存在計算節點的內存中,並供后面重用。
通過查看源碼發現cache最終也是調用了persist方法,默認的存儲級別都是僅在內存存儲一份,Spark的存儲級別還有好多種,存儲級別在object StorageLevel中定義的。
緩存有可能丟失,或者存儲存儲於內存的數據由於內存不足而被刪除,RDD的緩存容錯機制保證了即使緩存丟失也能保證計算的正確執行。通過基於RDD的一系列轉換,丟失的數據會被重算,由於RDD的各個Partition是相對獨立的,因此只需要計算丟失的部分即可,並不需要重算全部Partition。
實例:
package cn.edu360.sparkTwo import org.apache.spark.rdd.RDD import org.apache.spark.{SparkConf, SparkContext} object SubjectTopNtwo { def main(args: Array[String]): Unit = { val conf: SparkConf = new SparkConf().setAppName("SubjectTwo").setMaster("local[4]") val sc: SparkContext = new SparkContext(conf) val lines: RDD[String] = sc.textFile("hdfs://hd1:9000/sparkLearn") val sbToTeacherOne: RDD[((String, String), Int)] = lines.map(line => { val words: Array[String] = line.split("/") val teacher: String = words(3) val subject: String = words(2).split("[.]")(0) ((subject, teacher), 1) }) val reduced: RDD[((String, String), Int)] = sbToTeacherOne.reduceByKey(_+_) // 這里講reduced的數據集到緩存中 val cached: RDD[((String, String), Int)] = cached.cache() // 獲取所有學科 val subjects: Array[String] = cached.map(_._1._1).distinct().collect() // 對所有的reduce后的數據進行單學科過濾,在進行排序 for(sb <- subjects){ // 因為這里的多次提交和過濾,所以添加到緩存就有必要了 val filter: RDD[((String, String), Int)] = cached.filter(_._1._1 == sb) // 這里進行了多次提交 val result: Array[((String, String), Int)] = filter.sortBy(_._2, false).take(3) print(result.toBuffer) } sc.stop() } }