摘要:
RDD:彈性分布式數據集,是一種特殊集合 ‚ 支持多種來源 ‚ 有容錯機制 ‚ 可以被緩存 ‚ 支持並行操作,一個RDD代表一個分區里的數據集
RDD有兩種操作算子:
Transformation(轉換):Transformation屬於延遲計算,當一個RDD轉換成另一個RDD時並沒有立即進行轉換,僅僅是記住 了數據集的邏輯操作
Ation(執行):觸發Spark作業的運行,真正觸發轉換算子的計算
本系列主要講解Spark中常用的函數操作:
1.RDD基本轉換
2.鍵-值RDD轉換
3.Action操作篇
Ation(執行):觸發Spark作業的運行,真正觸發轉換算子的計算
本系列主要講解Spark中常用的函數操作:
1.RDD基本轉換
2.鍵-值RDD轉換
3.Action操作篇
本發所講函數
例1:
def main(args: Array[String]) {
val conf = new SparkConf().setMaster("local").setAppName("reduce")
val sc = new SparkContext(conf)
val rdd = sc.parallelize(1 to 10,2)
val reduceRDD = rdd.reduce(_ + _)
val reduceRDD1 = rdd.reduce(_ - _) //如果分區數據為1結果為 -53
val countRDD = rdd.count()
val firstRDD = rdd.first()
val takeRDD = rdd.take(5) //輸出前個元素
val topRDD = rdd.top(3) //從高到底輸出前三個元素
val takeOrderedRDD = rdd.takeOrdered(3) //按自然順序從底到高輸出前三個元素
println("func +: "+reduceRDD)
println("func -: "+reduceRDD1)
println("count: "+countRDD)
println("first: "+firstRDD)
println("take:")
takeRDD.foreach(x => print(x +" "))
println("\ntop:")
topRDD.foreach(x => print(x +" "))
println("\ntakeOrdered:")
takeOrderedRDD.foreach(x => print(x +" "))
sc.stop
}
輸出:
func +: 55 func -: 15 //如果分區數據為1結果為 -53 count: 10 first: 1 take: 1 2 3 4 5 top: 10 9 8 takeOrdered: 1 2 3
(RDD依賴圖:紅色塊表示一個RDD區,黑色塊表示該分區集合,下同)
(RDD依賴圖)
9.collectAsMap():作用於K-V類型的RDD上,作用與collect不同的是collectAsMap函數不包含重復的key,對於重復的key。后面的元素覆蓋前面的元素
例2:
def main(args: Array[String]) {
val conf = new SparkConf().setMaster("local").setAppName("KVFunc")
val sc = new SparkContext(conf)
val arr = List(("A", 1), ("B", 2), ("A", 2), ("B", 3))
val rdd = sc.parallelize(arr,2)
val countByKeyRDD = rdd.countByKey()
val collectAsMapRDD = rdd.collectAsMap()
println("countByKey:")
countByKeyRDD.foreach(print)
println("\ncollectAsMap:")
collectAsMapRDD.foreach(print)
sc.stop
}
輸出:
countByKey: (B,2)(A,2) collectAsMap: (A,2)(B,3)
(RDD依賴圖)
seqOp函數將每個分區的數據聚合成類型為U的值,comOp函數將各分區的U類型數據聚合起來得到類型為U的值
def main(args: Array[String]) {
val conf = new SparkConf().setMaster("local").setAppName("Fold")
val sc = new SparkContext(conf)
val rdd = sc.parallelize(List(1,2,3,4),2)
val aggregateRDD = rdd.aggregate(2)(_+_,_ * _)
println(aggregateRDD)
sc.stop
}
輸出:
90
步驟1:分區1:zeroValue+1+2=5 分區2:zeroValue+3+4=9
步驟2:zeroValue*分區1的結果*分區2的結果=90
(RDD依賴圖)
12.fold(zeroValue:T)(op:(T,T) => T):通過op函數聚合各分區中的元素及合並各分區的元素,op函數需要兩個參數,在開始時第一個傳入的參數為zeroValue,T為RDD數據集的數據類型,,其作用相當於SeqOp和comOp函數都相同的aggregate函數
例3
def main(args: Array[String]) {
val conf = new SparkConf().setMaster("local").setAppName("Fold")
val sc = new SparkContext(conf)
val rdd = sc.parallelize(Array(("a", 1), ("b", 2), ("a", 2), ("c", 5), ("a", 3)), 2)
val foldRDD = rdd.fold(("d", 0))((val1, val2) => { if (val1._2 >= val2._2) val1 else val2
})
println(foldRDD)
}
輸出:
c,5
其過程如下:
1.開始時將(“d”,0)作為op函數的第一個參數傳入,將Array中和第一個元素("a",1)作為op函數的第二個參數傳入,並比較value的值,返回value值較大的元素
2.將上一步返回的元素又作為op函數的第一個參數傳入,Array的下一個元素作為op函數的第二個參數傳入,比較大小
3.重復第2步驟
每個分區的數據集都會經過以上三步后匯聚后再重復以上三步得出最大值的那個元素,對於其他op函數也類似,只不過函數里的處理數據的方式不同而已
(RDD依賴圖)
