1.reduceByKey(func)
功能:
使用 func 函數合並具有相同鍵的值。
示例:
val list = List("hadoop","spark","hive","spark") val rdd = sc.parallelize(list) val pairRdd = rdd.map((_,1)) pairRdd.reduceByKey(_+_).collect.foreach(println)
上例中,我們先是建立了一個list,然后建立通過這個list集合建立一個rdd
然后我們通過map函數將list的rdd轉化成鍵值對形式的rdd
然后我們通過reduceByKey方法對具有相同key的值進行func(_+_)的累加操作。
輸入結果如下
(hive,1) (spark,2) (hadoop,1) list: List[String] = List(hadoop, spark, hive, spark) rdd: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[127] at parallelize at command-3434610298353610:2 pairRdd: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[128] at map at command-3434610298353610:3
pairRdd.collect.foreach(println) //打印pairRdd (hive,1) (spark,1) (hadoop,1) (spark,1)
我們需要留意的事情是,我們調用了reduceByKey操作的返回的結果類型是
org.apache.spark.rdd.RDD[(String, Int)]
注意,我們這里的collect()方法的作用是收集分布在各個worker的數據到driver節點。
如果不使用這個方法,每個worker的數據只在自己本地顯示,並不會在driver節點顯示。
2.groupByKey()
功能:
對具有相同key的value進行分組。
示例:
val list = List("hadoop","spark","hive","spark") val rdd = sc.parallelize(list) val pairRdd = rdd.map(x => (x,1)) pairRdd.groupByKey().collect.foreach(println)
我們同樣是對跟上面同樣的pairRdd進行groupByKey()操作
得出的結果為
(hive,CompactBuffer(1)) (spark,CompactBuffer(1, 1)) (hadoop,CompactBuffer(1)) list: List[String] = List(hadoop, spark, hive, spark) rdd: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[130] at parallelize at command-3434610298353610:2 pairRdd: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[131] at map at command-3434610298353610:3
可以看到,結果並不是把具有相同key值進行相加,而是就簡單的進行了分組,生成一個sequence。
其實,我們可以把groupByKey()當作reduceByKey(func)操作的一部分,
reduceByKey(func)先是對rdd進行groupByKey()然后在對每個分組進行func操作。
pairRdd.reduceByKey(_+_).collect.foreach(println) 等同於 pairRdd.groupByKey().map(t => (t._1,t._2.sum)).collect.foreach(println)
我們這里通過groupByKey()后調用map遍歷每個分組,然后通過t => (t._1,t._2.sum)對每個分組的值進行累加。
因為groupByKey()操作是把具有相同類型的key收集到一起聚合成一個集合,集合中有個sum方法,對所有元素進行求和。
注意,(k,v)形式的數據,我們可以通過 ._1,._2 來訪問鍵和值,
用占位符表示就是 _._1,_._2,這里前面的兩個下划線的含義是不同的,前邊下划線是占位符,后邊的是訪問方式。
我們記不記得 ._1,._2,._3 是元組的訪問方式。我們可以把鍵值看成二維的元組。
3.reduceByKey(func)和groupByKey()的區別
reduceByKey()對於每個key對應的多個value進行了merge操作,最重要的是它能夠先在本地進行merge操作。merge可以通過func自定義。
groupByKey()也是對每個key對應的多個value進行操作,但是只是匯總生成一個sequence,本身不能自定義函數,只能通過額外通過map(func)來實現。
使用reduceByKey()的時候,本地的數據先進行merge然后再傳輸到不同節點再進行merge,最終得到最終結果。
而使用groupByKey()的時候,並不進行本地的merge,全部數據傳出,得到全部數據后才會進行聚合成一個sequence,
groupByKey()傳輸速度明顯慢於reduceByKey()。
雖然groupByKey().map(func)也能實現reduceByKey(func)功能,但是,優先使用reduceByKey(func)