Spark RDD aggregateByKey


aggregateByKey 這個RDD有點繁瑣,整理一下使用示例,供參考

 

直接上代碼

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkContext, SparkConf}

/**
  * Created by Edward on 2016/10/27.
  */
object AggregateByKey {
  def main(args: Array[String]) {
    val sparkConf: SparkConf = new SparkConf().setAppName("AggregateByKey")
      .setMaster("local")
    val sc: SparkContext = new SparkContext(sparkConf)

    val data = List((1, 3), (1, 2), (1, 4), (2, 3))
    var rdd = sc.parallelize(data,2)//數據拆分成兩個分區

    //合並在不同partition中的值,a,b的數據類型為zeroValue的數據類型
    def comb(a: String, b: String): String = {
      println("comb: " + a + "\t " + b)
      a + b
    }
    //合並在同一個partition中的值, a的數據類型為zeroValue的數據類型,b的數據類型為原value的數據類型
    def seq(a: String, b: Int): String = {
      println("seq: " + a + "\t " + b)
      a + b
    }

    rdd.foreach(println)
    
//
zeroValue 中立值,定義返回value的類型,並參與運算 //seqOp 用來在一個partition中合並值的 //comb 用來在不同partition中合並值的 val aggregateByKeyRDD: RDD[(Int, String)] = rdd.aggregateByKey("100")(seq,comb) //打印輸出 aggregateByKeyRDD.foreach(println) sc.stop() } }

 

輸出結果說明:

 /*
將數據拆分成兩個分區

//分區一數據
(1,3)
(1,2)
//分區二數據
(1,4)
(2,3)

//分區一相同key的數據進行合並
seq: 100     3   //(1,3)開始和中立值進行合並  合並結果為 1003
seq: 1003     2   //(1,2)再次合並 結果為 10032

//分區二相同key的數據進行合並
seq: 100     4  //(1,4) 開始和中立值進行合並 1004
seq: 100     3  //(2,3) 開始和中立值進行合並 1003

將兩個分區的結果進行合並
//key為2的,只在一個分區存在,不需要合並 (2,1003)
(2,1003)

//key為1的, 在兩個分區存在,並且數據類型一致,合並
comb: 10032     1004
(1,100321004)

* */

 

參考代碼及下面的說明進行理解 

 

官網的說明

aggregateByKey(zeroValue)(seqOpcombOp, [numTasks]) When called on a dataset of (K, V) pairs, returns a dataset of (K, U) pairs where the values for each key are aggregated using the given combine functions and a neutral "zero" value. Allows an aggregated value type that is different than the input value type, while avoiding unnecessary allocations. Like in groupByKey, the number of reduce tasks is configurable through an optional second argument.

源碼中函數的說明 

/**
* Aggregate the values of each key, using given combine functions and a neutral "zero value".
* This function can return a different result type, U, than the type of the values in this RDD,
* V. Thus, we need one operation for merging a V into a U and one operation for merging two U's,
* as in scala.TraversableOnce. The former operation is used for merging values within a
* partition, and the latter is used for merging values between partitions. To avoid memory
* allocation, both of these functions are allowed to modify and return their first argument
* instead of creating a new U.
*/


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM