Spark 學習(四)RDD自定義分區和緩存


一,簡介

二,自定義分區規則

  2.1 普通的分組TopN實現

  2.2 自定義分區規則TopN實現

三,RDD的緩存

  3.1 RDD緩存簡介

  3.2 RDD緩存方式

 

 

 

 

正文

一,簡介

  在之前的文章中,我們知道RDD的有一個特征:就是一組分片(Partition),即數據集的基本組成單位。對於RDD來說,每個分片都會被一個計算任務處理,並決定並行計算的粒度。用戶可以在創建RDD時指定RDD的分片個數,如果沒有指定,那么就會采用默認值。默認值就是程序所分配到的CPU Core的數目。這個分配的規則我們是可以自己定制的。同時我們一直在討論Spark快,快的方式有那些方面可以體現,RDD緩存就是其中的一個形式,這里將對這兩者進行介紹。

二,自定義分區規則

  分組求TopN的方式有多種,這里進行簡單的幾種。這里尊卑一些數據:點擊下載

  2.1 普通的分組TopN實現

  實現思路一:先對數據進行處理,然后聚合。最后進行分組排序。

package cn.edu360.sparkTwo

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

object SubjectTopNone {

    def main(args: Array[String]): Unit = {
        val conf: SparkConf = new SparkConf().setAppName("SubjectTopNone").setMaster("local[4]")
        val sc: SparkContext = new SparkContext(conf)
        val lines: RDD[String] = sc.textFile("hdfs://hd1:9000/sparkLearn/", 2)
        // 對每一行數據進行整理
        val sbToTeacherOne: RDD[((String, String), Int)] = lines.map(line => {
            val words: Array[String] = line.split("/")
            val teacher: String = words(3)
            val subject: String = words(2).split("[.]")(0)
            ((subject, teacher), 1)
        })
        // 聚合,將學科和老師聯合當做key
        val reduced: RDD[((String, String), Int)] = sbToTeacherOne.reduceByKey(_+_)
        //分組排序(按學科進行分組)
        //[學科,該學科對應的老師的數據]
        val grouped: RDD[(String, Iterable[((String, String), Int)])] = reduced.groupBy(_._1._1)
        // 這里取出的是每一組的數據
        // 為什么可以調用scala的sortby方法呢?因為一個學科的數據已經在一台機器上的一個scala集合里面了
        // 弊端,調用scala的sortBy當數據量過大時,有內存溢出的缺陷
        val result: RDD[(String, List[((String, String), Int)])] = grouped.mapValues(_.toList.sortBy(_._2).reverse.take(4))
        println(result.collect.toBuffer)
    }
}

  實現思路二:先對數據進行處理,然后聚合,然后對數據進行單學科過濾,最后進行排序,提交

package cn.edu360.sparkTwo

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

object SubjectTopNtwo {
    def main(args: Array[String]): Unit = {
        val conf: SparkConf = new SparkConf().setAppName("SubjectTwo").setMaster("local[4]")
        val sc: SparkContext = new SparkContext(conf)
        val lines: RDD[String] = sc.textFile("hdfs://hd1:9000/sparkLearn")
        val sbToTeacherOne: RDD[((String, String), Int)] = lines.map(line => {
            val words: Array[String] = line.split("/")
            val teacher: String = words(3)
            val subject: String = words(2).split("[.]")(0)
            ((subject, teacher), 1)
        })
        val reduced: RDD[((String, String), Int)] = sbToTeacherOne.reduceByKey(_+_)
        // 獲取所有學科
        val subjects: Array[String] = reduced.map(_._1._1).distinct().collect()
        // 對所有的reduce后的數據進行單學科過濾,在進行排序
        for(sb <- subjects){
            val filter: RDD[((String, String), Int)] = reduced.filter(_._1._1 == sb)
            // 這里進行了多次提交
            val result: Array[((String, String), Int)] = filter.sortBy(_._2, false).take(3)
            print(result.toBuffer)
        }
        sc.stop()
    }
}

  2.2 自定義分區規則TopN實現

  實現方式一:先對數據進行處理,然后聚合,而后對按照學科進行分區,然后對每一個分區進行排序

package cn.edu360.sparkTwo

import org.apache.spark.{Partitioner, SparkConf, SparkContext}
import org.apache.spark.rdd.RDD

import scala.collection.mutable

object SubjectTopNthree {
    def main(args: Array[String]): Unit = {
        val conf: SparkConf = new SparkConf().setAppName("SubjectTopNone").setMaster("local[4]")
        val sc: SparkContext = new SparkContext(conf)
        val lines: RDD[String] = sc.textFile("hdfs://hd1:9000/sparkLearn/")
        val sbToTeacherOne: RDD[((String, String), Int)] = lines.map(line => {
            val words: Array[String] = line.split("/")
            val teacher: String = words(3)
            val subject: String = words(2).split("[.]")(0)
            ((subject, teacher), 1)
        })
        //聚合,將學科和老師聯合當做key ---> 這里有一次shuffle
        val reduced: RDD[((String, String), Int)] = sbToTeacherOne.reduceByKey(_+_)
        //計算有多少學科
        val subjects: Array[String] = reduced.map(_._1._1).distinct().collect()
        //partitionBy按照指定的分區規則進行分區
        //調用partitionBy時RDD的Key是(String, String) --->這里也有一次shuffle
        val partioned: RDD[((String, String), Int)] = reduced.partitionBy(new SubPartitioner(subjects))
        //如果一次拿出一個分區(可以操作一個分區中的數據了)
        val sorted: RDD[((String, String), Int)] = partioned.mapPartitions(it => {
            //將迭代器轉換成list,然后排序,在轉換成迭代器返回
            it.toList.sortBy(_._2).reverse.take(3).iterator
        })
        val result: Array[((String, String), Int)] = sorted.collect()
        print(result.toBuffer)
    }
}

// 自定義分區規則,需要繼承Partitioner
class SubPartitioner(subs: Array[String]) extends Partitioner{
    //相當於主構造器(new的時候回執行一次)
    //用於存放規則的一個map
    private val rules = new mutable.HashMap[String, Int]()
    var i = 0
    for(sub <- subs){
        rules.put(sub, i)
        i += 1
    }
    //返回分區的數量(下一個RDD有多少分區)
    override def numPartitions: Int = subs.length
    //根據傳入的key計算分區標號
    //key是一個元組(String, String)
    override def getPartition(key: Any): Int = {
        //獲取學科名稱
        val s: String = key.asInstanceOf[(String, String)]._1
        //根據規則計算分區編號
        rules(s)
    }
}

  實現方式二:上面的過程可以將聚合和分區操作進行合並,減少shuffle的次數

package cn.edu360.sparkTwo

import org.apache.spark.{Partitioner, SparkConf, SparkContext}
import org.apache.spark.rdd.RDD

import scala.collection.mutable

object SubjectTopNfour {
    def main(args: Array[String]): Unit = {
        val conf: SparkConf = new SparkConf().setAppName("SubjectTopNone").setMaster("local[4]")
        val sc: SparkContext = new SparkContext(conf)
        val lines: RDD[String] = sc.textFile("hdfs://hd1:9000/sparkLearn/")
        val sbToTeacherOne: RDD[((String, String), Int)] = lines.map(line => {
            val words: Array[String] = line.split("/")
            val teacher: String = words(3)
            val subject: String = words(2).split("[.]")(0)
            ((subject, teacher), 1)
        })
        val subjects: Array[String] = sbToTeacherOne.map(_._1._1).distinct().collect()
        // 在這里傳入分區規則,即聚合時就分區
        val reduced: RDD[((String, String), Int)] = sbToTeacherOne.reduceByKey(new SubPartinerTwo(subjects), _+_)
        // 對每個分區進行排序
        val result: RDD[((String, String), Int)] = reduced.mapPartitions(it => {
            it.toList.sortBy(_._2).reverse.take(3).iterator
        })
        print(result.collect().toBuffer)
    }
}

class SubPartinerTwo(subs: Array[String]) extends Partitioner{
    private val rules = new mutable.HashMap[String, Int]()
    var i = 0
    for(sub <- subs){
        rules.put(sub, i)
        i += 1
    }
    override def numPartitions: Int = subs.length
    override def getPartition(key: Any): Int = {
        val subject: String = key.asInstanceOf[(String, String)]._1
        rules(subject)
    }
}

三,RDD的緩存

  3.1 RDD緩存簡介

  Spark速度非常快的原因之一,就是在不同操作中可以在內存中持久化或緩存數據集。當持久化某個RDD后,每一個節點都將把計算的分片結果保存在內存中,並在對此RDD或衍生出的RDD進行的其他動作中重用。這使得后續的動作變得更加迅速。RDD相關的持久化和緩存,是Spark最重要的特征之一。可以說,緩存是Spark構建迭代式算法和快速交互式查詢的關鍵。

  3.2 RDD緩存方式

  RDD通過persist方法或cache方法可以將前面的計算結果緩存,但是並不是這兩個方法被調用時立即緩存,而是觸發后面的action時,該RDD將會被緩存在計算節點的內存中,並供后面重用。

  

  通過查看源碼發現cache最終也是調用了persist方法,默認的存儲級別都是僅在內存存儲一份,Spark的存儲級別還有好多種,存儲級別在object StorageLevel中定義的。

  

  緩存有可能丟失,或者存儲存儲於內存的數據由於內存不足而被刪除,RDD的緩存容錯機制保證了即使緩存丟失也能保證計算的正確執行。通過基於RDD的一系列轉換,丟失的數據會被重算,由於RDD的各個Partition是相對獨立的,因此只需要計算丟失的部分即可,並不需要重算全部Partition。

  實例:

package cn.edu360.sparkTwo

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

object SubjectTopNtwo {
    def main(args: Array[String]): Unit = {
        val conf: SparkConf = new SparkConf().setAppName("SubjectTwo").setMaster("local[4]")
        val sc: SparkContext = new SparkContext(conf)
        val lines: RDD[String] = sc.textFile("hdfs://hd1:9000/sparkLearn")
        val sbToTeacherOne: RDD[((String, String), Int)] = lines.map(line => {
            val words: Array[String] = line.split("/")
            val teacher: String = words(3)
            val subject: String = words(2).split("[.]")(0)
            ((subject, teacher), 1)
        })
        val reduced: RDD[((String, String), Int)] = sbToTeacherOne.reduceByKey(_+_)
        // 這里講reduced的數據集到緩存中
        val cached: RDD[((String, String), Int)] = cached.cache()
        // 獲取所有學科
        val subjects: Array[String] = cached.map(_._1._1).distinct().collect()
        // 對所有的reduce后的數據進行單學科過濾,在進行排序
        for(sb <- subjects){
            // 因為這里的多次提交和過濾,所以添加到緩存就有必要了
            val filter: RDD[((String, String), Int)] = cached.filter(_._1._1 == sb)
            // 這里進行了多次提交
            val result: Array[((String, String), Int)] = filter.sortBy(_._2, false).take(3)
            print(result.toBuffer)
        }
        sc.stop()
    }
}

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM