Spark基礎 --RDD算子詳解


RDD算子分為兩類:Transformation和Action,如下圖,記住這張圖,走遍天下都不怕。

 

6162d728968a06809e54ecff4c9ef92208371dd6

Transformation:將一個RDD通過一種規則映射為另外一個RDD。

Action:返回結果或保存結果。

注意:只有action才觸發程序的執行,transformation不觸發執行。

 

RDD的操作種類有多個,分為: 單指RDD操作、Key/Value RDD操作、多個RDD聯合操作,其他操作。

 

單值RDD

 

1. Map

 map (f: T => U) : RDD[U] ,其中f定義了類型為T的元素到類型為U

 

的元素的映射,RDD[T] => RDD[U]的變換

a975d77780bc53c68d691511405d2958562729e3

舉例:

 

  var rdd=sc.makeRDD(1 to 7,3)

簡寫為  rdd.map(_+1)   //rdd.map(x=>x+1)

 

 

c638b233ef6cf24a47e99ac5826c9b5ec1619566

 

2. collect

collect(): Array[T],T是RDD中元素類型,將RDD轉化為數組。

eac7b65d68ac3365ac2c9337af2a9553b22f39a0

舉例:

val rdd = sc.makeRDD(1 to 7, 3)

rdd.collect()

 

注意:此算子非常危險,他會將所有RDD中的數據匯總到Drive端的JVM內存中,對Drive端壓力很大。

 

3. take

take(num: Int): Array[T] ,其中k是整數,T是RDD中元素類型,返回RDD中前k個元素,並保存成數組

1d1149f3d4df566bf984d57b39345af948a24ea9

舉例:

 

val rdd = sc.makeRDD(1 to 7, 3)

rdd.take(2)

 

dc68720699bc06af29e0013659bd3f8d31cb6d50

 

4. glom

glom() : RDD[Array[T]],將RDD中每個partition中元素轉換為數組

2711db919a580061cc9a3d59e52faa79f6b73dc3

舉例:

val rdd = sc.makeRDD(1 to 7, 3)

rdd.glom.collect

 

849ad259e2219d47ea2e120a086d7982ab1b8a52

 

5. coalesce

coalesce(numPartitions: Int) : RDD[T],將RDD中的partition個數合並為numPartitions個

4a3ce467897342145cef46ca96ef1c96827f1f93

舉例:

 

val rdd = sc.makeRDD(1 to 7,7)

rdd.coalesce(3) // 生成新的RDD,它包含三個Partition

 

 

6. repartition

repartition(numPartitions: Int) :RDD[T],將RDD中的partition個數均勻合並為numPartitions個

f3ade082fb989c0802ccd36c8c1c453b121c9c4f

舉例:

 

val list = Seq(Seq(),Seq(),Seq(),Seq(),Seq(),Seq(),

Seq(1,2,3,4,5,6,7))

val rdd = sc.makeRDD(list, 7).flatMap(x => x)

rdd.repartition(3) // 生成新的RDD,它包含三個Partition

 

7. filter

filter(f: T => Boolean):

RDD[T] ,其中f定義了類型為T的元素是否留下,過濾輸入RDD中的元素,將f返回true的元素留下

c3b9f54390901438e667a938699f03a410da2239

舉例:

var rdd=sc.makeRDD(1 to 7,7)

rdd.filter(_%3==0)

 

bb724ec00ec075f8449649979fef74c31fa807bf

 

 8. count

count(): Long,統計RDD中元素個數,並返回Long類型

2cf64d32ecc75d8b919c7d4447a6750ab8e8fcaa

 

val rdd = sc.makeRDD(1 to 7, 3)


rdd.count() // 統計RDD中元素總數

 

9. flatMap

flatMap(f: T =>TraversableOnce[U]): RDD[U],將函數f作用在RDD中每個元素上,並展開(flatten)

輸出的每個結果, flatMap = flatten + map,先映射(map),再拍扁(flatten )

e87b5eed666a0bca232f9e8f88a32d9eb1a4b5f0

舉例:

val rdd = sc.makeRDD(1 to 3, 3)

rdd.flatMap( x => 1 to x) // 將x映射成1~x

 

10. reduce

reduce(f: (T, T) => T): T, 按照函數f對RDD中元素,進行規約

a46509b347756e3fd5172f2483da1237bb72bf1b

舉例:

val rdd = sc.makeRDD(1 to 7, 3)

rdd.reduce((x, y) => x + y)

簡寫為:rdd.reduce(_ + _)

 

bcd14168134faa0f1edd715738bd36bd1e9f12d2

 

11. foreach

foreach(f: T => Unit):Unit,對RDD中每個元素,調用函數f

f80f059417c4d12d0dc20e7427ea056d05c7dd1e

舉例:

val rdd = sc.makeRDD(1 to 7, 3)

rdd.foreach( x => println(x))

簡寫為:rdd.foreach(println)

 

Key/Value RDD

 

首先先來看下如何創建一個Key/Value的rdd

var seq=Seq((A,1),(B,1),(C,1))

var rdd=sc.makeRDD(seq)

1. mapValues

對vaule做map操作

dc834cd4d7468f74a3b9f7d698e02e7be3aaaad1

舉例:

val pairs = Seq((A,1), (B,2), (A,2), (C, 4), (B, 1), (B, 1), (D, 1))

val rdd = sc.makeRDD(pairs, 3)

rdd.mapValues(_ + 1)

 

59da49ff2804b636a101df40f4db0664b846abbc

 

2. reduceByKey

對Key相同的value做計算

c7367af8a9867daf571d5a6f9b3abbd8b037a48c

舉例:

val pairs = Seq(('A',1), ('B',2), ('A',2), ('C', 4), ('B', 1), ('B', 1), ('D', 1))

val rdd = sc.makeRDD(pairs, 3)

rdd.reduceByKey(_ + _)

 

7840fab36f17934175fb30e994ae60806ac98f61

 

3. groupByKey

將RDD[key,value] 按照相同的key進行分組,形成RDD[key,Iterable[value]]的形式, 有點類似於sql中的groupby

52f76378b2f9aa472bbbdfd8aeba181bf4ae5c3a

舉例:

val pairs = Seq((A,1), (B,2), (A,2), (C, 4), (B, 1), (B, 1), (D, 1))

val rdd = sc.makeRDD(pairs, 3)

rdd.groupByKey()

 

注意:能用reducebykey代替就不用groupbykey,groupbykey會將所有的元素進行聚合,消耗大量內存。

 

多RDD

 

1. union

將多個RDD合並為一個RDD

a571aa725cb66d976c234dbdb62d16c0a4d2612b

舉例:

val pairs1 = Seq((A,1), (B,1), (C,1), (D, 1), (A, 2), (C, 3))

val rdd1 = sc.makeRDD(pairs1, 3)

val pairs2 = Seq((A,4), (D,1), (E, 1))

val rdd2 = sc.makeRDD(pairs2, 2)

rdd1.union(rdd2)

2. zip

zip函數用於將兩個RDD組合成Key/Value形式的RDD,如果兩個rdd中的partition數量不一致,會報錯。

a404057613ec8fc28b8e7a5406c5b16a85c69dd1

舉例:

val s1 = Seq(A, B, C, D, E)

val rdd1 = sc.makeRDD(s1)

val s2 = Seq(1, 2, 3, 4, 5)

val rdd2 = sc.makeRDD(s2)

rdd1.zip(rdd2)

 

27142c9b418b24d74439595e40a082c448be53fd

 

3. join

join相當於SQL中的內關聯join,只返回兩個RDD根據K可以關聯上的結果,join只能用於兩個RDD之間的關聯,

如果要多個RDD關聯,多關聯幾次即可

fd5f7efb80d13a7a932022c10b1c512bb272da3a

舉例:

 

val pairs1 = Seq((A,1), (B,1), (C,1), (D, 1), (A, 2), (C, 3))

val rdd1 = sc.makeRDD(pairs1, 3)

val pairs2 = Seq((A,4), (D,1), (C,1), (E, 1))

val rdd2 = sc.makeRDD(pairs2, 2)

rdd1.join(rdd2)

 

還有些是是其他rdd操作符,這里就不講解了,上述所寫如有不對之處,還請各位前輩賜教。

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM