算子的分類和 寬依賴算子、窄依賴算子


一、轉換算子轉換算子

textfile,也會惰性加載

 

Transformation,懶執行,需要Action觸發執行
filter
過濾 RDD[T]==>RDD[T],窄依賴

map
RDD[T] ->RDD[O], 窄依賴

flatMap
RDD[T]–>RDD[[O]],一對多 ,窄依賴,

mapToPair
sample
抽樣算子 RDD[T]–>RDD[O],窄依賴

sortBy
RDD[T]–>RDD[T], 根據你指定的內容排序 寬依賴

sortByKey
根據你的K排序,要求RDD中必須是KV的,寬依賴

reduceByKey
根據RDD的K分組之后聚合(累加,字符串連接) , 寬依賴

join
把兩個RDD根據K相同合並,結果RDD[K,(V1,V2)] ,寬依賴

leftOuterJoin
左連接 和下面的一致 都是寬依賴

rightOuterJoin
fullOuterJoin
union
把兩個RDD直接聚合成一個RDD,數據不合並 ,窄依賴

intersection
取兩個RDD的交集,窄依賴

subtract
mapPartitions
把整個分區的數據作為一個迭代器一次計算 數據量不是特別大 會有性能提升,窄依賴

distinct
去重算子 本質是map + reduceByKey+map 寬依賴

cogroup
(K,V) (K,W)=>(K,([V],[W])) RDD1相同的key value 放在[V]中 另一個RDD相同的key 的value 放在[W]中 寬依賴

mapPartitionsWithIndex
把整個分區的數據作為一個迭代器一次計算 多了一個分區的index 數據量不是特別大 會有性能提升,窄依賴

repartition
可以增多分區,可以減少分區,有shuffle 寬依賴
repartition = coalesce(num,shuffle=true)

coalesce
可以增多分區,也可以減少分區,默認沒有shuffle 有shuffle就 寬依賴 沒shuffle 就是窄依賴
若RDD由少的分區分到多的分區時,不讓產生shuffle, 不起作用
少 - > 多 false RDD1 a、b分區 RDD2 0:a 1:b 2: 窄依賴 。 true RDD1 a、b 分區 RDD2 0:a1 b1 / 1: a2 b2 / 3: a3 b3 寬依賴
多 - > 少 false RDD1 a、b、c 分區 RDD2: 0: a 、 1: b c 窄依賴 。 true RDD1 a、b、c 分區 RDD2 0: a1、b1、c1 1:a2、b2、c2 寬依賴

zip
兩個RDD壓成一個RDD 窄依賴

zipWithIndex
groupByKey
(K,V)–>(K,iter),根據K相同分組,分組之后把一組的V封裝成一個迭代器, 寬依賴

二、行動算子Action

saveAsTextFile等存儲算子,也會立即執行

 

觸發transformation類算子執行,一個application中有一個action算子就有一個job
Action算子
foreach
循環出值

count
結果會拿到Driver端

collect
將結果拿回Driver端 返回一個列表

first
取出第一個值

take(num)
取出num個值 返回driver端

foreachPartition
reduce
聚合數據 不過 這個是在driver端 不合適聚合大量的數據 適合聚合結果數據

countByKey
countByValue

三、持久化算子

cache
默認將數據存儲在內存中
cache() = persist() = persist(StorageLevel.MEMORY_ONLY)

persist
可以手動指定持久化級別
MEMORY_ONLY
MEMORY_ONLY_SER
MEMORY_AND_DISK
MEMORY_AND_DISK_SER
“_2” 是由副本
盡量少使用DISK_ONLY級別
checkpoint
將數據直接持久化到指定的目錄,當lineage 計算非常復雜,可以嘗試使用checkpoint ,checkpoint還可以切斷RDD的依賴關系
特殊場景使用checkpoint,對RDD使用checkpoint要慎用
checkpoint要指定目錄,可以將數據持久化到指定的目錄中,當application執行完成之后,這個目錄中的數據不會被清除

checkpoint的執行流程
1.當sparkjob執行完成之后,Spark 會從后往前回溯,找到checkpointRDD做標記
2.回溯完成之后,Spark框架會重新啟動一個job,計算標記的RDD的數據,放入指定的checkpoint目錄中
3.數據計算完成,放入目錄之后,會切斷RDD的依賴關系,當SparkApplication執行完成之后,數據目錄中的數據不會被清除
優化:對哪個RDD進行checkpoint,最好先cache下,這樣回溯完成后再計算這個CheckpointRDD數據的時候可以直接在內存中拿到放指定的目錄中
cache和persist的注意
1.cache,persist,checkpoint 都是懶執行,最小持久化單位是partition
2.cache和persist之后可以直接賦值給一個值,下次直接使用這個值,就是使用的持久化的數據
3.如果采用第二種方式,后面不能緊跟action算子
4.cache和persist的數據,當application執行完成之后會自動清除
————————————————
版權聲明:本文為CSDN博主「狂躁的辣條」的原創文章,遵循 CC 4.0 BY-SA 版權協議,轉載請附上原文出處鏈接及本聲明。
原文鏈接:https://blog.csdn.net/weixin_42712704/article/details/89534037


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM