RDD彈性分布式數據集
RDD概述
RDD(Resilient Distributed Dataset)叫做彈性分布式數據集,是Spark中最基本的數據抽象,它代表一個不可變、可分區、里面的元素可並行計算的集合。RDD具有數據流模型的特點:自動容錯、位置感知性調度和可伸縮性。RDD允許用戶在執行多個查詢時顯式地將數據緩存在內存中,后續的查詢能夠重用這些數據,這極大地提升了查詢速度。
Resilient:RDD中的數據可以存儲在內存中或者磁盤中。
Distributed:RDD中的數據是分布式存儲的,可用於分布式計算。
Dataset:一個數據集合,用於存放數據的。
(1) 傳統的MapReduce雖然具有自動容錯、平衡負載和可拓展性的優點,但是其最大缺點是采用非循環式的數據流模型,使得在迭代計算中要進行大量的磁盤IO操作。RDD正是解決這一缺點的抽象方法。
(2) RDD是Spark提供的最重要的抽象的概念,它是一種具有容錯機制的特殊集合,可以分布在集群的節點上,以函數式編程來操作集合,進行各種並行操作。可以把RDD的結果數據進行緩存,方便進行多次重用,避免重復計算。
RDD是一種具有容錯性、基於內存計算的抽象方法,RDD是Spark Core的底層核心,Spark則是這個抽象方法的實現。
RDD五大屬性
* - A list of partitions
一個rdd有多個分區
* - A function for computing each split
作用在每一個分區中函數
* - A list of dependencies on other RDDs
一個rdd會依賴於很多其他RDD,這里就涉及到rdd的依賴關系
* - Optionally, a Partitioner for key-value RDDs (e.g. to say that the RDD is hash-partitioned)
(可選項) 針對於kv類型的rdd才會有分區函數(必須要產生shuffle),分區函數就決定了數據會流入到子rdd的那些分區中
* - Optionally, a list of preferred locations to compute each split on (e.g. block locations for an HDFS file)
(可選項)一個列表,存儲每個Partition的優先位置.數據的本地性,數據位置最優(spark在進行任務分配的時候,會優先考慮存有數據的worker節點來進行任務計算)
一個文件有3個block塊
block1-----》node1
block2-----》node2
block3-----》node3
node4
node5
創建RDD
-
-
sc.parallelize(List(1,2,3,4,5))
-
-
2、讀取外部數據源
-
val rdd1=sc.textFile("/words.txt")
-
-
3、從一個已經存在rdd經過對應的算子操作生成新的rdd
-
val rdd2=rdd1.flatMap(_.split(" "))
-
RDD編程API
RDD的算子分類
Transformation(轉換):根據數據集創建一個新的數據集,計算后返回一個新RDD;例如:一個rdd進行map操作后生了一個新的rdd。
Action(動作):對rdd結果計算后返回一個數值value給驅動程序;
例如:collect算子將數據集的所有元素收集完成返回給驅動程序。
Transformation
RDD中的所有轉換都是延遲加載的,也就是說,它們並不會直接計算結果。相反的,它們只是記住這些應用到基礎數據集(例如一個文件)上的轉換動作。只有當發生一個要求返回結果給Driver的動作時,這些轉換才會真正運行。這種設計讓Spark更加有效率地運行。
常用的Transformation:
轉換 |
含義 |
map(func) |
返回一個新的RDD,該RDD由每一個輸入元素經過func函數轉換后組成 |
filter(func) |
返回一個新的RDD,該RDD由經過func函數計算后返回值為true的輸入元素組成 |
flatMap(func) |
類似於map,但是每一個輸入元素可以被映射為0或多個輸出元素(所以func應該返回一個序列,而不是單一元素) |
mapPartitions(func) |
類似於map,但獨立地在RDD的每一個分片上運行,因此在類型為T的RDD上運行時,func的函數類型必須是Iterator[T] => Iterator[U] |
mapPartitionsWithIndex(func) |
類似於mapPartitions,但func帶有一個整數參數表示分片的索引值,因此在類型為T的RDD上運行時,func的函數類型必須是 (Int, Interator[T]) => Iterator[U] |
union(otherDataset) |
對源RDD和參數RDD求並集后返回一個新的RDD |
intersection(otherDataset) |
對源RDD和參數RDD求交集后返回一個新的RDD |
distinct([numTasks])) |
對源RDD進行去重后返回一個新的RDD |
groupByKey([numTasks]) |
在一個(K,V)的RDD上調用,返回一個(K, Iterator[V])的RDD |
reduceByKey(func, [numTasks]) |
在一個(K,V)的RDD上調用,返回一個(K,V)的RDD,使用指定的reduce函數,將相同key的值聚合到一起,與groupByKey類似,reduce任務的個數可以通過第二個可選的參數來設置 |
sortByKey([ascending], [numTasks]) |
在一個(K,V)的RDD上調用,K必須實現Ordered接口,返回一個按照key進行排序的(K,V)的RDD |
sortBy(func,[ascending], [numTasks]) |
與sortByKey類似,但是更靈活 |
join(otherDataset, [numTasks]) |
在類型為(K,V)和(K,W)的RDD上調用,返回一個相同key對應的所有元素對在一起的(K,(V,W))的RDD |
cogroup(otherDataset, [numTasks]) |
在類型為(K,V)和(K,W)的RDD上調用,返回一個(K,(Iterable<V>,Iterable<W>))類型的RDD |
coalesce(numPartitions) |
減少 RDD 的分區數到指定值。 |
repartition(numPartitions) |
重新給 RDD 分區 |
repartitionAndSortWithinPartitions(partitioner)
|
重新給 RDD 分區,並且每個分區內以記錄的 key 排序 |
Action
觸發整個任務真正的運行
動作 |
含義 |
reduce(func) |
reduce將RDD中元素前兩個傳給輸入函數,產生一個新的return值,新產生的return值與RDD中下一個元素(第三個元素)組成兩個元素,再被傳給輸入函數,直到最后只有一個值為止。 |
collect() |
在驅動程序中,以數組的形式返回數據集的所有元素 |
count() |
返回RDD的元素個數 |
first() |
返回RDD的第一個元素(類似於take(1)) |
take(n) |
返回一個由數據集的前n個元素組成的數組 |
takeOrdered(n, [ordering]) |
返回自然順序或者自定義順序的前 n 個元素 |
saveAsTextFile(path) |
將數據集的元素以textfile的形式保存到HDFS文件系統或者其他支持的文件系統,對於每個元素,Spark將會調用toString方法,將它裝換為文件中的文本 |
saveAsSequenceFile(path) |
將數據集中的元素以Hadoop sequencefile的格式保存到指定的目錄下,可以使HDFS或者其他Hadoop支持的文件系統。 |
saveAsObjectFile(path) |
將數據集的元素,以 Java 序列化的方式保存到指定的目錄下 |
countByKey() |
針對(K,V)類型的RDD,返回一個(K,Int)的map,表示每一個key對應的元素個數。 |
foreach(func) |
在數據集的每一個元素上,運行函數func |
foreachPartition(func) |
在數據集的每一個分區上,運行函數func |
RDD常用的算子操作
Spark Rdd的所有算子操作,請見《sparkRDD函數詳解》https://www.cnblogs.com/jifengblog/p/9369258.html
啟動spark-shell 進行測試:
spark-shell --master spark://node1:7077
練習1:map、filter
//通過並行化生成rdd
val rdd1 = sc.parallelize(List(5, 6, 4, 7, 3, 8, 2, 9, 1, 10))
//對rdd1里的每一個元素乘2然后排序
val rdd2 = rdd1.map(_ * 2).sortBy(x => x, true)
//過濾出大於等於5的元素
val rdd3 = rdd2.filter(_ >= 5)
//將元素以數組的方式在客戶端顯示
rdd3.collect
練習2:flatMap
val rdd1 = sc.parallelize(Array("a b c", "d e f", "h i j"))
//將rdd1里面的每一個元素先切分在壓平
val rdd2 = rdd1.flatMap(_.split(" "))
rdd2.collect
練習3:交集、並集
val rdd1 = sc.parallelize(List(5, 6, 4, 3))
val rdd2 = sc.parallelize(List(1, 2, 3, 4))
//求並集
val rdd3 = rdd1.union(rdd2)
//求交集
val rdd4 = rdd1.intersection(rdd2)
//去重
rdd3.distinct.collect
rdd4.collect
練習4:join、groupByKey
val rdd1 = sc.parallelize(List(("tom", 1), ("jerry", 3), ("kitty", 2)))
val rdd2 = sc.parallelize(List(("jerry", 2), ("tom", 1), ("shuke", 2)))
//求join
val rdd3 = rdd1.join(rdd2)
rdd3.collect
//求並集
val rdd4 = rdd1 union rdd2
rdd4.collect
//按key進行分組
val rdd5=rdd4.groupByKey
rdd5.collect
練習5:cogroup
val rdd1 = sc.parallelize(List(("tom", 1), ("tom", 2), ("jerry", 3), ("kitty", 2)))
val rdd2 = sc.parallelize(List(("jerry", 2), ("tom", 1), ("jim", 2)))
//cogroup
val rdd3 = rdd1.cogroup(rdd2)
//注意cogroup與groupByKey的區別
rdd3.collect
練習6:reduce
val rdd1 = sc.parallelize(List(1, 2, 3, 4, 5))
//reduce聚合
val rdd2 = rdd1.reduce(_ + _)
rdd2.collect
練習7:reduceByKey、sortByKey
val rdd1 = sc.parallelize(List(("tom", 1), ("jerry", 3), ("kitty", 2), ("shuke", 1)))
val rdd2 = sc.parallelize(List(("jerry", 2), ("tom", 3), ("shuke", 2), ("kitty", 5)))
val rdd3 = rdd1.union(rdd2)
//按key進行聚合
val rdd4 = rdd3.reduceByKey(_ + _)
rdd4.collect
//按value的降序排序
val rdd5 = rdd4.map(t => (t._2, t._1)).sortByKey(false).map(t => (t._2, t._1))
rdd5.collect
練習8:repartition、coalesce
val rdd1 = sc.parallelize(1 to 10,3)
//利用repartition改變rdd1分區數
//減少分區
rdd1.repartition(2).partitions.size
//增加分區
rdd1.repartition(4).partitions.size
//利用coalesce改變rdd1分區數
//減少分區
rdd1.coalesce(2).partitions.size
注意:repartition可以增加和減少rdd中的分區數,coalesce只能減少rdd分區數,增加rdd分區數不會生效。
RDD的依賴關系
RDD和它依賴的父RDD的關系有兩種不同的類型,即窄依賴(narrow dependency)和寬依賴(wide dependency)。
窄依賴
窄依賴指的是每一個父RDD的Partition最多被子RDD的一個Partition使用
總結:窄依賴我們形象的比喻為獨生子女
寬依賴
寬依賴指的是多個子RDD的Partition會依賴同一個父RDD的Partition
總結:寬依賴我們形象的比喻為超生
Lineage(血統)
RDD只支持粗粒度轉換,即只記錄單個塊上執行的單個操作。將創建RDD的一系列Lineage(即血統)記錄下來,以便恢復丟失的分區。RDD的Lineage會記錄RDD的元數據信息和轉換行為,當該RDD的部分分區數據丟失時,它可以根據這些信息來重新運算和恢復丟失的數據分區。
RDD的緩存
Spark速度非常快的原因之一,就是在不同操作中可以在內存中持久化或者緩存數據集。當持久化某個RDD后,每一個節點都將把計算分區結果保存在內存中,對此RDD或衍生出的RDD進行的其他動作中重用。這使得后續的動作變得更加迅速。
RDD通過persist方法或cache方法可以將前面的計算結果緩存,但是並不是這兩個方法被調用時立即緩存,而是觸發后面的action時,該RDD將會被緩存在計算節點的內存中,並供后面重用。
通過查看源碼發現cache最終也是調用了persist方法,默認的存儲級別都是僅在內存存儲一份,Spark的存儲級別還有好多種,存儲級別在object StorageLevel中定義的。
DAG的生成
DAG(Directed Acyclic Graph)叫做有向無環圖,原始的RDD通過一系列的轉換就形成了DAG,根據RDD之間依賴關系的不同將DAG划分成不同的Stage(調度階段)。對於窄依賴,partition的轉換處理在一個Stage中完成計算。對於寬依賴,由於有Shuffle的存在,只能在parent RDD處理完成后,才能開始接下來的計算,因此寬依賴是划分Stage的依據。
Spark任務調度
各個RDD之間存在着依賴關系,這些依賴關系就形成有向無環圖DAG,DAGScheduler對這些依賴關系形成的DAG進行Stage划分,划分的規則很簡單,從后往前回溯,遇到窄依賴加入本stage,遇見寬依賴進行Stage切分。完成了Stage的划分。DAGScheduler基於每個Stage生成TaskSet,並將TaskSet提交給TaskScheduler。TaskScheduler 負責具體的task調度,最后在Worker節點上啟動task。
DAGScheduler
(1)DAGScheduler對DAG有向無環圖進行Stage划分。
(2)記錄哪個RDD或者 Stage 輸出被物化(緩存),通常在一個復雜的shuffle之后,通常物化一下(cache、persist),方便之后的計算。
(3)重新提交shuffle輸出丟失的stage(stage內部計算出錯)給TaskScheduler
(4)將 Taskset 傳給底層調度器
a)– spark-cluster TaskScheduler
b)– yarn-cluster YarnClusterScheduler
c)– yarn-client YarnClientClusterScheduler
TaskScheduler
(1)為每一個TaskSet構建一個TaskSetManager 實例管理這個TaskSet 的生命周期
(2)數據本地性決定每個Task最佳位置
(3)提交 taskset( 一組task) 到集群運行並監控
(4)推測執行,碰到計算緩慢任務需要放到別的節點上重試
(5)重新提交Shuffle輸出丟失的Stage給DAGScheduler
RDD容錯機制之checkpoint
checkpoint是什么
(1)Spark 在生產環境下經常會面臨transformation的RDD非常多(例如一個Job中包含1萬個RDD)或者具體transformation的RDD本身計算特別復雜或者耗時(例如計算時長超過1個小時),這個時候就要考慮對計算結果數據持久化保存;
(2)Spark是擅長多步驟迭代的,同時擅長基於Job的復用,這個時候如果能夠對曾經計算的過程產生的數據進行復用,就可以極大的提升效率;
(3)如果采用persist把數據放在內存中,雖然是快速的,但是也是最不可靠的;如果把數據放在磁盤上,也不是完全可靠的!例如磁盤會損壞,系統管理員可能清空磁盤。
(4)Checkpoint的產生就是為了相對而言更加可靠的持久化數據,在Checkpoint的時候可以指定把數據放在本地,並且是多副本的方式,但是在生產環境下是放在HDFS上,這就天然的借助了HDFS高容錯、高可靠的特征來完成了最大化的可靠的持久化數據的方式;
假如進行一個1萬個算子操作,在9000個算子的時候persist,數據還是有可能丟失的,但是如果checkpoint,數據丟失的概率幾乎為0。
checkpoint原理機制
(1) 當RDD使用cache機制從內存中讀取數據,如果數據沒有讀到,會使用checkpoint機制讀取數據。此時如果沒有checkpoint機制,那么就需要找到父RDD重新計算數據了,因此checkpoint是個很重要的容錯機制。checkpoint就是對於一個RDD chain(鏈)如果后面需要反復使用某些中間結果RDD,可能因為一些故障導致該中間數據丟失,那么就可以針對該RDD啟動checkpoint機制,使用checkpoint首先需要調用sparkContext的setCheckpointDir方法,設置一個容錯文件系統目錄,比如hdfs,然后對RDD調用checkpoint方法。之后在RDD所處的job運行結束后,會啟動一個單獨的job來將checkpoint過的數據寫入之前設置的文件系統持久化,進行高可用。所以后面的計算在使用該RDD時,如果數據丟失了,但是還是可以從它的checkpoint中讀取數據,不需要重新計算。
(2) persist或者cache與checkpoint的區別在於,前者持久化只是將數據保存在BlockManager中但是其lineage是不變的,但是后者checkpoint執行完后,rdd已經沒有依賴RDD,只有一個checkpointRDD,checkpoint之后,RDD的lineage就改變了。persist或者cache持久化的數據丟失的可能性更大,因為可能磁盤或內存被清理,但是checkpoint的數據通常保存到hdfs上,放在了高容錯文件系統。
總結:
checkpoint就是提供了一個相對而言更加可靠的持久化數據方式
checkpoint使用
sc.setCheckpointDir("hdfs文件目錄")
rdd1.checkpoint
后面也需要一個action算子操作,才會觸發checkpoint數據寫入到HDFS
cache/persist/checkpoint
區別
三者都可以對數據進行持久化保存
cache、persist他們不會改變rdd的血統
checkpoint會改變血統
當前一個rdd數據丟失了,如何得到
首先看一下你有沒有設置cache,如果有,直接從內存中獲取得到,如果沒有,接下來看一下你有沒有設置checkpoint操作,如果有,直接獲取得到,如果也沒有,那么這個時候只能夠通過lineage血統重新計算恢復。
Spark運行架構
Spark運行基本流程
1) 構建Spark Application的運行環境(啟動SparkContext),SparkContext向資源管理器(可以是Standalone、Mesos或YARN)注冊並申請運行Executor資源;
2) 資源管理器分配Executor資源並啟動Executor,Executor運行情況將隨着心跳發送到資源管理器上;
3) SparkContext構建成DAG圖,將DAG圖分解成Stage,並把Taskset發送給Task Scheduler。Executor向SparkContext申請Task,Task Scheduler將Task發放給Executor運行同時SparkContext將應用程序代碼發放給Executor。
4) Task在Executor上運行,運行完畢釋放所有資源。
Spark運行架構特點
- 每個Application獲取專屬的executor進程,該進程在Application期間一直駐留,並以多線程方式運行tasks。
- Spark任務與資源管理器無關,只要能夠獲取executor進程,並能保持相互通信就可以了。
- 提交SparkContext的Client應該靠近Worker節點(運行Executor的節點),最好是在同一個Rack里,因為Spark程序運行過程中SparkContext和Executor之間有大量的信息交換;如果想在遠程集群中運行,最好使用RPC將SparkContext提交給集群,不要遠離Worker運行SparkContext。
- Task采用了數據本地性和推測執行的優化機制。