spark cache:
1,cache 方法不是被調用時立即緩存,而是觸發后面的action時,該RDD將會被緩存在計算節點的內存中,並供后面重用
2, cache 是調用的 persist() 默認情況下 persist() 會把數據以序列化的形式緩存在 JVM 的堆空間中
3,cache 默認的存儲級別都是僅在內存存儲一份,Spark的存儲級別還有好多種,存儲級別在object StorageLevel中定義的
4,緩存有可能丟失,或者存儲存儲於內存的數據由於內存不足而被刪除,RDD的緩存容錯機制保證了即使緩存丟失也能保證計算的正確執行。通過基於RDD的一系列轉換,丟失的數據會被重算,由於RDD的各個Partition是相對獨立的,因此只需要計算丟失的部分即可,並不需要重算全部Partition。
checkpoint 緩存:
1,檢查點(本質是通過將RDD寫入Disk做檢查點)是為了通過lineage(血統)做容錯的輔助,lineage過長會造成容錯成本過高,這樣就不如在中間階段做檢查點容錯,如果之后有節點出現問題而丟失分區,從做檢查點的RDD開始重做Lineage,就會減少開銷
2,為當前RDD設置檢查點。該函數將會創建一個二進制的文件,並存儲到checkpoint目錄中,該目錄是用SparkContext.setCheckpointDir()設置的。在checkpoint的過程中,該RDD的所有依賴於父RDD中的信息將全部被移出。對RDD進行checkpoint操作並不會馬上被執行,必須執行Action操作才能觸發
cache 和 checkpoint 區別:
1,緩存把 RDD 計算出來然后放在內存中,但是RDD 的依賴鏈(相當於數據庫中的redo 日志),當某個點某個 executor 宕了,上面cache 的RDD就會丟掉,需要通過依賴鏈重放計算出來,
2,checkpoint 是把 RDD 保存在 HDFS中, 是多副本可靠存儲,不依靠RDD之間依賴鏈,是通過復制實現的高容錯
package Day3 import org.apache.spark.{SparkConf, SparkContext} object cache_point { def main(args: Array[String]): Unit = { val conf = new SparkConf().setAppName("praction").setMaster("local") val sc = new SparkContext(conf) val rdd = sc.makeRDD(1 to 10) val nocache = rdd.map(_.toString+"["+System.currentTimeMillis+"]") val cache = rdd.map(_.toString+"["+System.currentTimeMillis+"]") // 調用 cache 執行緩存, 值不變 // jvm 開辟了內存 //實際上是 使用 LRU cache 來緩存 RDD println(cache.cache) // 沒有執行緩存 , 重新計算 println(nocache.collect) //使用檢查點機制 //檢查點通過將數據寫入到HDFS文件系統實現了RDD的檢查點功能。 sc.setCheckpointDir("hdfs://hadoop:9000/checkpoint") val ch1 = sc.parallelize(1 to 2) val ch2 = ch1.map(_.toString+"["+System.currentTimeMillis+"]") val ch3 = ch1.map(_.toString+"["+System.currentTimeMillis+"]") ch3.checkpoint ch2.collect ch3.collect }}
Spark目前支持Hash分區和Range分區,用戶也可以自定義分區,Hash分區為當前的默認分區,Spark中分區器直接決定了RDD中分區的個數、RDD中每條數據經過Shuffle過程屬於哪個分區和Reduce的個數,只有Key-Value類型的RDD才有分區的,非Key-Value類型的RDD分區的值是None
分區方式
1,HashPartitioner分區的原理:對於給定的key,計算其hashCode,並除於分區的個數取余,如果余數小於0,則用余數+分區的個數,最后返回的值就是這個key所屬的分區ID。
2,HashPartitioner分區弊端:可能導致每個分區中數據量的不均勻,極端情況下會導致某些分區擁有RDD的全部數據。
3,RangePartitioner分區優勢:盡量保證每個分區中數據量的均勻,而且分區與分區之間是有序的,一個分區中的元素肯定都是比另一個分區內的元素小或者大;但是分區內的元素是不能保證順序的。簡單的說就是將一定范圍內的數映射到某一個分區內。
4,RangePartitioner作用:將一定范圍內的數映射到某一個分區內,在實現中,分界的算法尤為重要。用到了水塘抽樣算法
自定義分區:
package Day3 import org.apache.spark.{Partitioner, SparkConf, SparkContext} // 構造方法傳遞的是分區數 class custompation(numPar:Int) extends Partitioner{ // 分區數 override def numPartitions: Int = numPar // 獲取分區鍵,生成分區號 override def getPartition(key: Any): Int = { key.toString.hashCode%numPar }} object App{ def main(args: Array[String]): Unit = { val conf = new SparkConf().setMaster("local[4]").setAppName("JDBC") val sc = new SparkContext(conf) val rdd = sc.makeRDD(Array(1,2,3,4,5)) // 原始分區方式 mapPartitionsWithIndex 可以拿到分區的迭代器,又可以拿到分區編號 rdd.mapPartitionsWithIndex((x,y)=>Iterator(x+":"+y.mkString("|"))).foreach(println) /* 1:2 2:3 3:4|5 0:1 */ // 查看自定義分區方式 rdd.map((_,1)).partitionBy(new custompation(5)) .mapPartitionsWithIndex((x,y)=>Iterator(x+":"+y.mkString("|"))).foreach(println) /* 0:(2,1) 1:(3,1) 2:(4,1) 4:(1,1) 3:(5,1) * */ sc.stop()}}