spark cache:
1,cache 方法不是被調用時立即緩存,而是觸發后面的action時,該RDD將會被緩存在計算節點的內存中,並供后面重用
2, cache 是調用的 persist() 默認情況下 persist() 會把數據以序列化的形式緩存在 JVM 的堆空間中
3,cache 默認的存儲級別都是僅在內存存儲一份,Spark的存儲級別還有好多種,存儲級別在object StorageLevel中定義的
4,緩存有可能丟失,或者存儲存儲於內存的數據由於內存不足而被刪除,RDD的緩存容錯機制保證了即使緩存丟失也能保證計算的正確執行。通過基於RDD的一系列轉換,丟失的數據會被重算,由於RDD的各個Partition是相對獨立的,因此只需要計算丟失的部分即可,並不需要重算全部Partition。
checkpoint 緩存:
1,檢查點(本質是通過將RDD寫入Disk做檢查點)是為了通過lineage(血統)做容錯的輔助,lineage過長會造成容錯成本過高,這樣就不如在中間階段做檢查點容錯,如果之后有節點出現問題而丟失分區,從做檢查點的RDD開始重做Lineage,就會減少開銷
2,為當前RDD設置檢查點。該函數將會創建一個二進制的文件,並存儲到checkpoint目錄中,該目錄是用SparkContext.setCheckpointDir()設置的。在checkpoint的過程中,該RDD的所有依賴於父RDD中的信息將全部被移出。對RDD進行checkpoint操作並不會馬上被執行,必須執行Action操作才能觸發
cache 和 checkpoint 區別:
1,緩存把 RDD 計算出來然后放在內存中,但是RDD 的依賴鏈(相當於數據庫中的redo 日志),當某個點某個 executor 宕了,上面cache 的RDD就會丟掉,需要通過依賴鏈重放計算出來,
2,checkpoint 是把 RDD 保存在 HDFS中, 是多副本可靠存儲,不依靠RDD之間依賴鏈,是通過復制實現的高容錯
package Day3
import org.apache.spark.{SparkConf, SparkContext}
object cache_point {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setAppName("praction").setMaster("local")
val sc = new SparkContext(conf)
val rdd = sc.makeRDD(1 to 10)
val nocache = rdd.map(_.toString+"["+System.currentTimeMillis+"]")
val cache = rdd.map(_.toString+"["+System.currentTimeMillis+"]")
// 調用 cache 執行緩存, 值不變
// jvm 開辟了內存
//實際上是 使用 LRU cache 來緩存 RDD
println(cache.cache)
// 沒有執行緩存 , 重新計算
println(nocache.collect)
//使用檢查點機制
//檢查點通過將數據寫入到HDFS文件系統實現了RDD的檢查點功能。
sc.setCheckpointDir("hdfs://hadoop:9000/checkpoint")
val ch1 = sc.parallelize(1 to 2)
val ch2 = ch1.map(_.toString+"["+System.currentTimeMillis+"]")
val ch3 = ch1.map(_.toString+"["+System.currentTimeMillis+"]")
ch3.checkpoint
ch2.collect
ch3.collect
}}
Spark目前支持Hash分區和Range分區,用戶也可以自定義分區,Hash分區為當前的默認分區,Spark中分區器直接決定了RDD中分區的個數、RDD中每條數據經過Shuffle過程屬於哪個分區和Reduce的個數,只有Key-Value類型的RDD才有分區的,非Key-Value類型的RDD分區的值是None
分區方式
1,HashPartitioner分區的原理:對於給定的key,計算其hashCode,並除於分區的個數取余,如果余數小於0,則用余數+分區的個數,最后返回的值就是這個key所屬的分區ID。
2,HashPartitioner分區弊端:可能導致每個分區中數據量的不均勻,極端情況下會導致某些分區擁有RDD的全部數據。
3,RangePartitioner分區優勢:盡量保證每個分區中數據量的均勻,而且分區與分區之間是有序的,一個分區中的元素肯定都是比另一個分區內的元素小或者大;但是分區內的元素是不能保證順序的。簡單的說就是將一定范圍內的數映射到某一個分區內。
4,RangePartitioner作用:將一定范圍內的數映射到某一個分區內,在實現中,分界的算法尤為重要。用到了水塘抽樣算法
自定義分區:
package Day3
import org.apache.spark.{Partitioner, SparkConf, SparkContext}
// 構造方法傳遞的是分區數
class custompation(numPar:Int) extends Partitioner{
// 分區數
override def numPartitions: Int = numPar
// 獲取分區鍵,生成分區號
override def getPartition(key: Any): Int = {
key.toString.hashCode%numPar
}}
object App{
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setMaster("local[4]").setAppName("JDBC")
val sc = new SparkContext(conf)
val rdd = sc.makeRDD(Array(1,2,3,4,5))
// 原始分區方式 mapPartitionsWithIndex 可以拿到分區的迭代器,又可以拿到分區編號
rdd.mapPartitionsWithIndex((x,y)=>Iterator(x+":"+y.mkString("|"))).foreach(println)
/*
1:2
2:3
3:4|5
0:1
*/
// 查看自定義分區方式
rdd.map((_,1)).partitionBy(new custompation(5))
.mapPartitionsWithIndex((x,y)=>Iterator(x+":"+y.mkString("|"))).foreach(println)
/*
0:(2,1)
1:(3,1)
2:(4,1)
4:(1,1)
3:(5,1)
* */
sc.stop()}}