RDD算子分為兩類:Transformation和Action,如下圖,記住這張圖,走遍天下都不怕。
Transformation:將一個RDD通過一種規則映射為另外一個RDD。
Action:返回結果或保存結果。
注意:只有action才觸發程序的執行,transformation不觸發執行。
RDD的操作種類有多個,分為: 單指RDD操作、Key/Value RDD操作、多個RDD聯合操作,其他操作。
單值RDD
1. Map
map (f: T => U) : RDD[U] ,其中f定義了類型為T的元素到類型為U
的元素的映射,RDD[T] => RDD[U]的變換
舉例:
var rdd=sc.makeRDD(1 to 7,3) 簡寫為 rdd.map(_+1) //rdd.map(x=>x+1)
2. collect
collect(): Array[T],T是RDD中元素類型,將RDD轉化為數組。
舉例:
val rdd = sc.makeRDD(1 to 7, 3)
rdd.collect()
注意:此算子非常危險,他會將所有RDD中的數據匯總到Drive端的JVM內存中,對Drive端壓力很大。
3. take
take(num: Int): Array[T] ,其中k是整數,T是RDD中元素類型,返回RDD中前k個元素,並保存成數組
舉例:
val rdd = sc.makeRDD(1 to 7, 3)
rdd.take(2)
4. glom
glom() : RDD[Array[T]],將RDD中每個partition中元素轉換為數組
舉例:
val rdd = sc.makeRDD(1 to 7, 3)
rdd.glom.collect
5. coalesce
coalesce(numPartitions: Int) : RDD[T],將RDD中的partition個數合並為numPartitions個
舉例:
val rdd = sc.makeRDD(1 to 7,7) rdd.coalesce(3) // 生成新的RDD,它包含三個Partition
6. repartition
repartition(numPartitions: Int) :RDD[T],將RDD中的partition個數均勻合並為numPartitions個
舉例:
val list = Seq(Seq(),Seq(),Seq(),Seq(),Seq(),Seq(), Seq(1,2,3,4,5,6,7)) val rdd = sc.makeRDD(list, 7).flatMap(x => x) rdd.repartition(3) // 生成新的RDD,它包含三個Partition
7. filter
filter(f: T => Boolean):
RDD[T] ,其中f定義了類型為T的元素是否留下,過濾輸入RDD中的元素,將f返回true的元素留下
舉例:
var rdd=sc.makeRDD(1 to 7,7)
rdd.filter(_%3==0)
8. count
count(): Long,統計RDD中元素個數,並返回Long類型
val rdd = sc.makeRDD(1 to 7, 3) rdd.count() // 統計RDD中元素總數
9. flatMap
flatMap(f: T =>TraversableOnce[U]): RDD[U],將函數f作用在RDD中每個元素上,並展開(flatten)
輸出的每個結果, flatMap = flatten + map,先映射(map),再拍扁(flatten )
舉例:
val rdd = sc.makeRDD(1 to 3, 3) rdd.flatMap( x => 1 to x) // 將x映射成1~x
10. reduce
reduce(f: (T, T) => T): T, 按照函數f對RDD中元素,進行規約
舉例:
val rdd = sc.makeRDD(1 to 7, 3) rdd.reduce((x, y) => x + y) 簡寫為:rdd.reduce(_ + _)
11. foreach
foreach(f: T => Unit):Unit,對RDD中每個元素,調用函數f
舉例:
val rdd = sc.makeRDD(1 to 7, 3) rdd.foreach( x => println(x)) 簡寫為:rdd.foreach(println)
Key/Value RDD
首先先來看下如何創建一個Key/Value的rdd
var seq=Seq((A,1),(B,1),(C,1))
var rdd=sc.makeRDD(seq)
1. mapValues
對vaule做map操作
舉例:
val pairs = Seq((A,1), (B,2), (A,2), (C, 4), (B, 1), (B, 1), (D, 1)) val rdd = sc.makeRDD(pairs, 3) rdd.mapValues(_ + 1)
2. reduceByKey
對Key相同的value做計算
舉例:
val pairs = Seq(('A',1), ('B',2), ('A',2), ('C', 4), ('B', 1), ('B', 1), ('D', 1)) val rdd = sc.makeRDD(pairs, 3) rdd.reduceByKey(_ + _)
3. groupByKey
將RDD[key,value] 按照相同的key進行分組,形成RDD[key,Iterable[value]]的形式, 有點類似於sql中的groupby
舉例:
val pairs = Seq((A,1), (B,2), (A,2), (C, 4), (B, 1), (B, 1), (D, 1)) val rdd = sc.makeRDD(pairs, 3) rdd.groupByKey()
注意:能用reducebykey代替就不用groupbykey,groupbykey會將所有的元素進行聚合,消耗大量內存。
多RDD
1. union
將多個RDD合並為一個RDD
舉例:
val pairs1 = Seq((A,1), (B,1), (C,1), (D, 1), (A, 2), (C, 3)) val rdd1 = sc.makeRDD(pairs1, 3) val pairs2 = Seq((A,4), (D,1), (E, 1)) val rdd2 = sc.makeRDD(pairs2, 2) rdd1.union(rdd2)
2. zip
zip函數用於將兩個RDD組合成Key/Value形式的RDD,如果兩個rdd中的partition數量不一致,會報錯。
舉例:
val s1 = Seq(A, B, C, D, E) val rdd1 = sc.makeRDD(s1) val s2 = Seq(1, 2, 3, 4, 5) val rdd2 = sc.makeRDD(s2) rdd1.zip(rdd2)
3. join
join相當於SQL中的內關聯join,只返回兩個RDD根據K可以關聯上的結果,join只能用於兩個RDD之間的關聯,
如果要多個RDD關聯,多關聯幾次即可
舉例:
val pairs1 = Seq((A,1), (B,1), (C,1), (D, 1), (A, 2), (C, 3)) val rdd1 = sc.makeRDD(pairs1, 3) val pairs2 = Seq((A,4), (D,1), (C,1), (E, 1)) val rdd2 = sc.makeRDD(pairs2, 2) rdd1.join(rdd2)
還有些是是其他rdd操作符,這里就不講解了,上述所寫如有不對之處,還請各位前輩賜教。