鍵值對的RDD操作與基本RDD操作一樣,只是操作的元素由基本類型改為二元組。
概述
鍵值對RDD是Spark操作中最常用的RDD,它是很多程序的構成要素,因為他們提供了並行操作各個鍵或跨界點重新進行數據分組的操作接口。
創建
Spark中有許多中創建鍵值對RDD的方式,其中包括
- 文件讀取時直接返回鍵值對RDD
- 通過List創建鍵值對RDD
在Scala中,可通過Map函數生成二元組
val listRDD = sc.parallelize(List(1,2,3,4,5)) val result = listRDD.map(x => (x,1)) result.foreach(println) //結果 (1,1) (2,1) (3,1) (4,1) (5,1)
鍵值對RDD的轉化操作
基本RDD轉化操作在此同樣適用。但因為鍵值對RDD中包含的是一個個二元組,所以需要傳遞的函數會由原來的操作單個元素改為操作二元組。
下表總結了針對單個鍵值對RDD的轉化操作,以 { (1,2) , (3,4) , (3,6) } 為例,f表示傳入的函數
函數名 | 目的 | 示例 | 結果 |
reduceByKey(f) | 合並具有相同key的值 | rdd.reduceByKey( ( x,y) => x+y ) | { (1,2) , (3,10) } |
groupByKey() | 對具有相同key的值分組 | rdd.groupByKey() | { (1,2) , (3, [4,6] ) } |
mapValues(f) | 對鍵值對中的每個值(value)應用一個函數,但不改變鍵(key) | rdd.mapValues(x => x+1) | { (1,3) , (3,5) , (3,7) } |
combineBy Key( createCombiner, mergeValue, mergeCombiners, partitioner) | 使用不同的返回類型合並具有相同鍵的值 | 下面有詳細講解 | - |
flatMapValues(f) | 對鍵值對RDD中每個值應用返回一個迭代器的函數,然后對每個元素生成一個對應的鍵值對。常用語符號化 | rdd.flatMapValues(x => ( x to 5 )) | { (1, 2) , (1, 3) , (1, 4) , (1, 5) , (3, 4) , (3, 5) } |
keys() | 獲取所有key | rdd.keys() | {1,3,3} |
values() | 獲取所有value | rdd.values() | {2,4,6} |
sortByKey() | 根據key排序 | rdd.sortByKey() | { (1,2) , (3,4) , (3,6) } |
下表總結了針對兩個鍵值對RDD的轉化操作,以rdd1 = { (1,2) , (3,4) , (3,6) } rdd2 = { (3,9) } 為例,
函數名 | 目的 | 示例 | 結果 |
subtractByKey | 刪掉rdd1中與rdd2的key相同的元素 | rdd1.subtractByKey(rdd2) | { (1,2) } |
join | 內連接 | rdd1.join(rdd2) | {(3, (4, 9)), (3, (6, 9))} |
leftOuterJoin | 左外鏈接 | rdd1.leftOuterJoin (rdd2) | {(3,( Some( 4), 9)), (3,( Some( 6), 9))} |
rightOuterJoin | 右外鏈接 | rdd1.rightOuterJoin(rdd2) | {(1,( 2, None)), (3, (4, Some( 9))), (3, (6, Some( 9)))} |
cogroup | 將兩個RDD鍾相同key的數據分組到一起 | rdd1.cogroup(rdd2) | {(1,([ 2],[])), (3, ([4, 6],[ 9]))} |
combineByKey
combineByKey( createCombiner, mergeValue, mergeCombiners, partitioner,mapSideCombine)
combineByKey( createCombiner, mergeValue, mergeCombiners, partitioner)
combineByKey( createCombiner, mergeValue, mergeCombiners)
函數功能:
聚合各分區的元素,而每個元素都是二元組。功能與基礎RDD函數aggregate()差不多,可讓用戶返回與輸入數據類型不同的返回值。
combineByKey函數的每個參數分別對應聚合操作的各個階段。所以,理解此函數對Spark如何操作RDD會有很大幫助。
參數解析:
createCombiner:分區內 創建組合函數
mergeValue:分區內 合並值函數
mergeCombiners:多分區 合並組合器函數
partitioner:自定義分區數,默認為HashPartitioner
mapSideCombine:是否在map端進行Combine操作,默認為true
工作流程:
- combineByKey會遍歷分區中的所有元素,因此每個元素的key要么沒遇到過,要么和之前某個元素的key相同。
- 如果這是一個新的元素,函數會調用createCombiner創建那個key對應的累加器初始值。
- 如果這是一個在處理當前分區之前已經遇到的key,會調用mergeCombiners把該key累加器對應的當前value與這個新的value合並。
代碼例子:
//統計男女個數
val conf = new SparkConf ().setMaster ("local").setAppName ("app_1") val sc = new SparkContext (conf) val people = List(("男", "李四"), ("男", "張三"), ("女", "韓梅梅"), ("女", "李思思"), ("男", "馬雲")) val rdd = sc.parallelize(people,2) val result = rdd.combineByKey( (x: String) => (List(x), 1), //createCombiner (peo: (List[String], Int), x : String) => (x :: peo._1, peo._2 + 1), //mergeValue (sex1: (List[String], Int), sex2: (List[String], Int)) => (sex1._1 ::: sex2._1, sex1._2 + sex2._2)) //mergeCombiners result.foreach(println)
結果
(男, ( List( 張三, 李四, 馬雲),3 ) )
(女, ( List( 李思思, 韓梅梅),2 ) )
流程分解:
解析:兩個分區,分區一按順序V1、V2、V3遍歷
- V1,發現第一個key=男時,調用createCombiner,即
(x: String) => (List(x), 1)
- V2,第二次碰到key=男的元素,調用mergeValue,即
(peo: (List[String], Int), x : String) => (x :: peo._1, peo._2 + 1)
- V3,發現第一個key=女,繼續調用createCombiner,即
(x: String) => (List(x), 1)
- … …
- 待各V1、V2分區都計算完后,數據進行混洗,調用mergeCombiners,即
(sex1: (List[String], Int), sex2: (List[String], Int)) => (sex1._1 ::: sex2._1, sex1._2 + sex2._2))
add by jan 2017-02-27 18:34:39
以下例子都基於此RDD
(Hadoop,1) (Spark,1) (Hive,1) (Spark,1)
reduceByKey(func)
reduceByKey(func)的功能是,使用func函數合並具有相同鍵的值。
比如,reduceByKey((a,b) => a+b),有四個鍵值對("spark",1)、("spark",2)、("hadoop",3)和("hadoop",5),對具有相同key的鍵值對進行合並后的結果就是:("spark",3)、("hadoop",8)。可以看出,(a,b) => a+b這個Lamda表達式中,a和b都是指value,比如,對於兩個具有相同key的鍵值對("spark",1)、("spark",2),a就是1,b就是2。
scala> pairRDD.reduceByKey((a,b)=>a+b).foreach(println) (Spark,2) (Hive,1) (Hadoop,1)
groupByKey()
roupByKey()的功能是,對具有相同鍵的值進行分組。比如,對四個鍵值對("spark",1)、("spark",2)、("hadoop",3)和("hadoop",5),采用groupByKey()后得到的結果是:("spark",(1,2))和("hadoop",(3,5))。
scala> pairRDD.groupByKey() res15: org.apache.spark.rdd.RDD[(String, Iterable[Int])] = ShuffledRDD[15] at groupByKey at <console>:34 //從上面執行結果信息中可以看出,分組后,value被保存到Iterable[Int]中 scala> pairRDD.groupByKey().foreach(println) (Spark,CompactBuffer(1, 1)) (Hive,CompactBuffer(1)) (Hadoop,CompactBuffer(1))
keys
keys只會把鍵值對RDD中的key返回形成一個新的RDD。比如,對四個鍵值對("spark",1)、("spark",2)、("hadoop",3)和("hadoop",5)構成的RDD,采用keys后得到的結果是一個RDD[Int],內容是{"spark","spark","hadoop","hadoop"}。
scala> pairRDD.keys res17: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[17] at keys at <console>:34 scala> pairRDD.keys.foreach(println) Hadoop Spark Hive Spark
values
values只會把鍵值對RDD中的value返回形成一個新的RDD。比如,對四個鍵值對("spark",1)、("spark",2)、("hadoop",3)和("hadoop",5)構成的RDD,采用keys后得到的結果是一個RDD[Int],內容是{1,2,3,5}。
scala> pairRDD.values res0: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[2] at values at <console>:34 scala> pairRDD.values.foreach(println) 1 1 1 1
sortByKey()
sortByKey()的功能是返回一個根據鍵排序的RDD。
scala> pairRDD.sortByKey() res0: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[2] at sortByKey at <console>:34 scala> pairRDD.sortByKey().foreach(println) (Hadoop,1) (Hive,1) (Spark,1) (Spark,1)
mapValues(func)
我們經常會遇到一種情形,我們只想對鍵值對RDD的value部分進行處理,而不是同時對key和value進行處理。對於這種情形,Spark提供了mapValues(func),它的功能是,對鍵值對RDD中的每個value都應用一個函數,但是,key不會發生變化。比如,對四個鍵值對("spark",1)、("spark",2)、("hadoop",3)和("hadoop",5)構成的pairRDD,如果執行pairRDD.mapValues(x => x+1),就會得到一個新的鍵值對RDD,它包含下面四個鍵值對("spark",2)、("spark",3)、("hadoop",4)和("hadoop",6)。
scala> pairRDD.mapValues(x => x+1) res2: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[4] at mapValues at <console>:34 scala> pairRDD.mapValues(x => x+1).foreach(println) (Hadoop,2) (Spark,2) (Hive,2) (Spark,2)
join
join(連接)操作是鍵值對常用的操作。“連接”(join)這個概念來自於關系數據庫領域,因此,join的類型也和關系數據庫中的join一樣,包括內連接(join)、左外連接(leftOuterJoin)、右外連接(rightOuterJoin)等。最常用的情形是內連接,所以,join就表示內連接。
對於內連接,對於給定的兩個輸入數據集(K,V1)和(K,V2),只有在兩個數據集中都存在的key才會被輸出,最終得到一個(K,(V1,V2))類型的數據集。
比如,pairRDD1是一個鍵值對集合{("spark",1)、("spark",2)、("hadoop",3)和("hadoop",5)},pairRDD2是一個鍵值對集合{("spark","fast")},那么,pairRDD1.join(pairRDD2)的結果就是一個新的RDD,這個新的RDD是鍵值對集合{("spark",1,"fast"),("spark",2,"fast")}。對於這個實例,我們下面在spark-shell中運行一下:
scala> val pairRDD1 = sc.parallelize(Array(("spark",1),("spark",2),("hadoop",3),("hadoop",5))) pairRDD1: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[24] at parallelize at <console>:27 scala> val pairRDD2 = sc.parallelize(Array(("spark","fast"))) pairRDD2: org.apache.spark.rdd.RDD[(String, String)] = ParallelCollectionRDD[25] at parallelize at <console>:27 scala> pairRDD1.join(pairRDD2) res9: org.apache.spark.rdd.RDD[(String, (Int, String))] = MapPartitionsRDD[28] at join at <console>:32 scala> pairRDD1.join(pairRDD2).foreach(println) (spark,(1,fast)) (spark,(2,fast))
詳細請參考《Spark快速大數據分析》