【spark】常用轉換操作:reduceByKey和groupByKey


1.reduceByKey(func)

功能:

  使用 func 函數合並具有相同鍵的值。

示例:

val list = List("hadoop","spark","hive","spark")
val rdd = sc.parallelize(list)
val pairRdd = rdd.map((_,1))
pairRdd.reduceByKey(_+_).collect.foreach(println)

上例中,我們先是建立了一個list,然后建立通過這個list集合建立一個rdd

然后我們通過map函數將list的rdd轉化成鍵值對形式的rdd

然后我們通過reduceByKey方法對具有相同key的值進行func(_+_)的累加操作。

輸入結果如下

(hive,1)
(spark,2)
(hadoop,1)
list: List[String] = List(hadoop, spark, hive, spark)
rdd: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[127] at parallelize at command-3434610298353610:2
pairRdd: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[128] at map at command-3434610298353610:3 
pairRdd.collect.foreach(println) //打印pairRdd
(hive,1)
(spark,1)
(hadoop,1)
(spark,1)

我們需要留意的事情是,我們調用了reduceByKey操作的返回的結果類型是

org.apache.spark.rdd.RDD[(String, Int)]  

注意,我們這里的collect()方法的作用是收集分布在各個worker的數據到driver節點。

如果不使用這個方法,每個worker的數據只在自己本地顯示,並不會在driver節點顯示。

2.groupByKey()

功能:

  對具有相同key的value進行分組。

示例:

val list = List("hadoop","spark","hive","spark")
val rdd = sc.parallelize(list)
val pairRdd = rdd.map(x => (x,1))
pairRdd.groupByKey().collect.foreach(println)

我們同樣是對跟上面同樣的pairRdd進行groupByKey()操作

得出的結果為

(hive,CompactBuffer(1))
(spark,CompactBuffer(1, 1))
(hadoop,CompactBuffer(1))
list: List[String] = List(hadoop, spark, hive, spark)
rdd: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[130] at parallelize at command-3434610298353610:2
pairRdd: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[131] at map at command-3434610298353610:3

可以看到,結果並不是把具有相同key值進行相加,而是就簡單的進行了分組,生成一個sequence。

其實,我們可以把groupByKey()當作reduceByKey(func)操作的一部分,

reduceByKey(func)先是對rdd進行groupByKey()然后在對每個分組進行func操作。

pairRdd.reduceByKey(_+_).collect.foreach(println)
等同於
pairRdd.groupByKey().map(t => (t._1,t._2.sum)).collect.foreach(println)

我們這里通過groupByKey()后調用map遍歷每個分組,然后通過t => (t._1,t._2.sum)對每個分組的值進行累加。

因為groupByKey()操作是把具有相同類型的key收集到一起聚合成一個集合,集合中有個sum方法,對所有元素進行求和。

注意,(k,v)形式的數據,我們可以通過 ._1,._2 來訪問鍵和值,

用占位符表示就是 _._1,_._2,這里前面的兩個下划線的含義是不同的,前邊下划線是占位符,后邊的是訪問方式。 

我們記不記得 ._1,._2,._3 是元組的訪問方式。我們可以把鍵值看成二維的元組。

3.reduceByKey(func)和groupByKey()的區別

reduceByKey()對於每個key對應的多個value進行了merge操作,最重要的是它能夠先在本地進行merge操作。merge可以通過func自定義。

groupByKey()也是對每個key對應的多個value進行操作,但是只是匯總生成一個sequence,本身不能自定義函數,只能通過額外通過map(func)來實現。

 

使用reduceByKey()的時候,本地的數據先進行merge然后再傳輸到不同節點再進行merge,最終得到最終結果。

而使用groupByKey()的時候,並不進行本地的merge,全部數據傳出,得到全部數據后才會進行聚合成一個sequence,

groupByKey()傳輸速度明顯慢於reduceByKey()。

雖然groupByKey().map(func)也能實現reduceByKey(func)功能,但是,優先使用reduceByKey(func)

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM