前言
Spark的rdd之間的關系需要通過一些特定的操作來實現,
操作比較多也,特別是一堆JOIN也挺容易讓人產生混亂的。
因此做了下小結梳理一下。
准備數據
var rdd1 = sc.makeRDD(Array(("A","a1"),("C","c1"),("D","d1"),("F","f1"),("F","f2")),2) var rdd2 = sc.makeRDD(Array(("A","a2"),("C","c2"),("C","c3"),("E","e1")),2)
這兩個RDD 有以下幾個特征:
- “A” : rdd1中有rdd2中也有且他們都只有一個
- “C”: rdd1中有rdd2中有兩個
- “D”: rdd1中有rdd2中沒有
- “E”: rdd1中沒有rdd2中有一個
- “F”: rdd1中有兩個rdd2中沒有
實驗操作
1. JOIN
類似SQL的inner join操作,返回結果是前面和后面配對成功的,過濾掉關聯不上的。
執行結果
scala> rdd1.join(rdd2).collect() res5: Array[(String, (String, String))] = Array((A,(a1,a2)), (C,(c1,c2)), (C,(c1,c3)))
可以看到,結果以左邊的Key為准。且是一對多的關系。
2. leftOuterJoin
leftOuterJoin類似於SQL中的左外關聯left outer join,返回結果以前面的RDD為主,關聯不上的記錄為空。只能用於兩個RDD之間的關聯,如果要多個RDD關聯,多關聯幾次即可。
執行結果
scala> rdd1.leftOuterJoin(rdd2).collect() res6: Array[(String, (String, Option[String]))] = Array((F,(f1,None)), (F,(f2,None)), (D,(d1,None)), (A,(a1,Some(a2))), (C,(c1,Some(c2))), (C,(c1,Some(c3))))
可以看到,其實leftOuterJoin和Join非常類似,只不過Join會直接過濾掉不存在的,而leftOuterJoin會保留值為None。
3. rightOuterJoin
同上,只不過這次是以右邊為准。
執行結果
scala> rdd1.rightOuterJoin(rdd2).collect() res7: Array[(String, (Option[String], String))] = Array((A,(Some(a1),a2)), (C,(Some(c1),c2)), (C,(Some(c1),c3)), (E,(None,e1)))
4. subtractByKey
返回左邊RDD有的Key而右邊沒有對應的Key。值為左邊RDD原有的值。
執行結果
scala> rdd1.subtractByKey(rdd2).collect() res9: Array[(String, String)] = Array((D,d1), (F,f1), (F,f2))
可以看到該操作與值無關。僅僅是過濾一些指定Key。
5. cogroup
cogroup相當於SQL中的全外關聯full outer join,返回左右RDD中的記錄,關聯不上的為空。
執行結果
scala> rdd1.cogroup(rdd2).collect() res11: Array[(String, (Iterable[String], Iterable[String]))] = Array((F,(CompactBuffer(f1, f2),CompactBuffer())),
(D,(CompactBuffer(d1),CompactBuffer())), (A,(CompactBuffer(a1),CompactBuffer(a2))), (C,(CompactBuffer(c1),CompactBuffer(c2, c3))),
(E,(CompactBuffer(),CompactBuffer(e1))))