Spark操作:Aggregate和AggregateByKey


1. Aggregate

Aggregate即聚合操作。直接上代碼:

import org.apache.spark.{SparkConf, SparkContext}

object AggregateTest {

  def main(args:Array[String]) = {

    // 設置運行環境
    val conf = new SparkConf().setAppName("Aggregate Test").setMaster("spark://master:7077").setJars(Seq("E:\\Intellij\\Projects\\SimpleGraphX\\SimpleGraphX.jar"))
    val sc = new SparkContext(conf)

    var data = List(2,5,8,1,2,6,9,4,3,5)
    var res = data.par.aggregate((0,0))(
      // seqOp
      (acc, number) => (acc._1+number, acc._2+1),
      // combOp
      (par1, par2) => (par1._1+par2._1, par1._2+par2._2)
    )

    println(res)

    sc.stop
  }

}

acc即(0,0),number即data,seqOp將data的值累加到Tuple的第一個元素,將data的個數累加到Tuple的第二個元素。由於沒有分區,所以combOp是不起作用的,這個例子里面即使分區了,combOp起作用了,結果也是一樣的。

運行結果:

(45,10)

2. AggregateByKey

AggregateByKey和Aggregate差不多,也是聚合,不過它是根據Key的值來聚合。

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

/**
  * Created by Administrator on 2017/6/13.
  */
object AggregateByKeyTest {

  def main(args:Array[String]) = {

    // 設置運行環境
    val conf = new SparkConf().setAppName("AggregateByKey Test").setMaster("spark://master:7077").setJars(Seq("E:\\Intellij\\Projects\\SimpleGraphX\\SimpleGraphX.jar"))
    val sc = new SparkContext(conf)

    val data = List((1,3),(1,2),(1,4),(2,3),(3,6),(3,8))
    val rdd = sc.parallelize(data)

    val res : RDD[(Int,Int)] = rdd.aggregateByKey(0)(
      // seqOp
      math.max(_,_),
      // combOp
      _+_
    )

    res.collect.foreach(println)
    sc.stop
  }

}

根據Key值的不同,可以分為3個組:

(1)  (1,3),(1,2),(1,4);

(2)  (2,3);

(3)  (3,6),(3,8)。

這3個組分別進行seqOp,也就是(K,V)里面的V和0進行math.max()運算,運算結果和下一個V繼續運算,以第一個組為例,運算過程是這樣的:

0, 3 => 3

3, 2 => 3

3, 4 => 4

所以最終結果是(1,4)。combOp是對把各分區的V加起來,由於這里並沒有分區,所以實際上是不起作用的。

運行結果:

(2,3)
(1,4)
(3,8)

如果生成RDD時分成3個區:

val rdd = sc.parallelize(data,3)

運行結果就變成了:

(3,8)
(1,7)
(2,3)

這是因為一個分區返回(1,3),另一個分區返回(1,4),combOp將這兩個V加起來,就得到了(1,7)。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM