spark 緩存操作(cache checkpoint)與分區


spark cache:
1,cache 方法不是被調用時立即緩存,而是觸發后面的action時,該RDD將會被緩存在計算節點的內存中,並供后面重用
2, cache 是調用的 persist() 默認情況下 persist() 會把數據以序列化的形式緩存在 JVM 的堆空間中
3,cache 默認的存儲級別都是僅在內存存儲一份,Spark的存儲級別還有好多種,存儲級別在object StorageLevel中定義的
4,緩存有可能丟失,或者存儲存儲於內存的數據由於內存不足而被刪除,RDD的緩存容錯機制保證了即使緩存丟失也能保證計算的正確執行。通過基於RDD的一系列轉換,丟失的數據會被重算,由於RDD的各個Partition是相對獨立的,因此只需要計算丟失的部分即可,並不需要重算全部Partition。
checkpoint 緩存:
1,檢查點(本質是通過將RDD寫入Disk做檢查點)是為了通過lineage(血統)做容錯的輔助,lineage過長會造成容錯成本過高,這樣就不如在中間階段做檢查點容錯,如果之后有節點出現問題而丟失分區,從做檢查點的RDD開始重做Lineage,就會減少開銷
2,為當前RDD設置檢查點。該函數將會創建一個二進制的文件,並存儲到checkpoint目錄中,該目錄是用SparkContext.setCheckpointDir()設置的。在checkpoint的過程中,該RDD的所有依賴於父RDD中的信息將全部被移出。對RDD進行checkpoint操作並不會馬上被執行,必須執行Action操作才能觸發
cache 和 checkpoint 區別:
1,緩存把 RDD 計算出來然后放在內存中,但是RDD 的依賴鏈(相當於數據庫中的redo 日志),當某個點某個 executor 宕了,上面cache 的RDD就會丟掉,需要通過依賴鏈重放計算出來,
2,checkpoint 是把 RDD 保存在 HDFS中, 是多副本可靠存儲,不依靠RDD之間依賴鏈,是通過復制實現的高容錯
package Day3
import org.apache.spark.{SparkConf, SparkContext}
object cache_point {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setAppName("praction").setMaster("local")
    val sc = new SparkContext(conf)
    val rdd = sc.makeRDD(1 to 10)
    val nocache = rdd.map(_.toString+"["+System.currentTimeMillis+"]")
    val cache =  rdd.map(_.toString+"["+System.currentTimeMillis+"]")
    // 調用 cache 執行緩存, 值不變
    // jvm 開辟了內存
    //實際上是 使用 LRU cache 來緩存 RDD
    println(cache.cache)
    // 沒有執行緩存 , 重新計算
    println(nocache.collect)

    //使用檢查點機制
    //檢查點通過將數據寫入到HDFS文件系統實現了RDD的檢查點功能。
    sc.setCheckpointDir("hdfs://hadoop:9000/checkpoint")
    val ch1 = sc.parallelize(1 to 2)
    val ch2 = ch1.map(_.toString+"["+System.currentTimeMillis+"]")
    val ch3 = ch1.map(_.toString+"["+System.currentTimeMillis+"]")
    ch3.checkpoint
    ch2.collect
    ch3.collect
  }}
    Spark目前支持Hash分區和Range分區,用戶也可以自定義分區,Hash分區為當前的默認分區,Spark中分區器直接決定了RDD中分區的個數、RDD中每條數據經過Shuffle過程屬於哪個分區和Reduce的個數,只有Key-Value類型的RDD才有分區的,非Key-Value類型的RDD分區的值是None
分區方式
1,HashPartitioner分區的原理:對於給定的key,計算其hashCode,並除於分區的個數取余,如果余數小於0,則用余數+分區的個數,最后返回的值就是這個key所屬的分區ID。
2,HashPartitioner分區弊端:可能導致每個分區中數據量的不均勻,極端情況下會導致某些分區擁有RDD的全部數據。
3,RangePartitioner分區優勢:盡量保證每個分區中數據量的均勻,而且分區與分區之間是有序的,一個分區中的元素肯定都是比另一個分區內的元素小或者大;但是分區內的元素是不能保證順序的。簡單的說就是將一定范圍內的數映射到某一個分區內。
4,RangePartitioner作用:將一定范圍內的數映射到某一個分區內,在實現中,分界的算法尤為重要。用到了水塘抽樣算法
自定義分區:
package Day3
import org.apache.spark.{Partitioner, SparkConf, SparkContext}
// 構造方法傳遞的是分區數
class custompation(numPar:Int) extends Partitioner{
  // 分區數
  override def numPartitions: Int = numPar
  // 獲取分區鍵,生成分區號
  override def getPartition(key: Any): Int = {
    key.toString.hashCode%numPar
  }}
object App{
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setMaster("local[4]").setAppName("JDBC")
    val sc = new SparkContext(conf)
    val rdd = sc.makeRDD(Array(1,2,3,4,5))
    // 原始分區方式   mapPartitionsWithIndex 可以拿到分區的迭代器,又可以拿到分區編號
    rdd.mapPartitionsWithIndex((x,y)=>Iterator(x+":"+y.mkString("|"))).foreach(println)
    /*
    1:2
    2:3
    3:4|5
    0:1
     */
    // 查看自定義分區方式
    rdd.map((_,1)).partitionBy(new custompation(5))
      .mapPartitionsWithIndex((x,y)=>Iterator(x+":"+y.mkString("|"))).foreach(println)
    /*
    0:(2,1)
    1:(3,1)
    2:(4,1)
    4:(1,1)
    3:(5,1)
    * */
    sc.stop()}}

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM