Spark實現求平均值


1. 需求背景

文本文件File里面存放公司各個部門人員的工資明細 salary.txt文件數據格式如下:
deptId name salary
1001 張三01 2000
1002 李四02 2500
1003 張三05 3000
1002 王五01 2600
用程序寫出各個部門的平均工資並倒序輸出

2. 使用Spark實現

import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{Encoders, SparkSession}
import org.apache.spark.{SparkConf, SparkContext}
case class Salary(deptId: String, name: String, id: String, salary: Double)
object DeptAvgSalaryApp {
  val path = "./spark/src/main/resources/salary.txt"

  def main(args: Array[String]): Unit = {
    val sparkConf: SparkConf = new SparkConf()
      .setMaster("local[2]")
      .setAppName("DeptAvgSalaryApp")
    val spark: SparkContext = new SparkContext(sparkConf)
    val inputRDD: RDD[String] = spark.textFile(path)
    val dataRDD = inputRDD
      .filter(line => !line.contains("deptId"))
      .map(line => {
        // println(s"------------:${line}")
        val arrs = line.split(" ")
        Tuple2(arrs(0), arrs(3).toDouble)
      })
    // 方法一: 使用mapValue
     useMapValue(dataRDD)

    // 方法二: 使用reduceByKey
     useReduceByKey(dataRDD)

    // 方法三:使用combineByKey
     useCombineByKey(dataRDD)

    // 方法四: 使用Spark SQL
     useSparkSQL();
     spark.stop()
  }

  def useMapValue(dataRDD: RDD[(String, Double)]) = {
    /**
     * 針對數據根據key分組
     * (1001,CompactBuffer(2000.0))
     * (1003,CompactBuffer(3000.0))
     * (1002,CompactBuffer(2500.0, 2600.0))
     * 針對Tuple中value值進行處理
     * --計算平均值
     */
    dataRDD
      .groupByKey()
      .mapValues(iter => {
        var cnt: Int = 0
        var sum: Double = 0
        val it = iter.iterator
        while (it.hasNext) {
          cnt += 1
          sum += it.next()
        }
        (sum / cnt)
      }).sortBy(_._2, false)
      .collect()
      .foreach(println(_))
  }

  def useReduceByKey(dataRDD: RDD[(String, Double)]) = {
    /**
     * 原始數據處理
     * (1001,2000) ===> (1001,(2000,1))
     * (1002,2500) ===> (1002,(2500,1))
     * (1002,2600) ===> (1002,(2600,1))
     * 之后根據key進行reduceByKey操作生成如下數據
     * (1001,(2000.0,1))
     * (1003,(3000.0,1))
     * (1002,(5100.0,2))
     * 繼續針對數據操作求平均值
     * (1001,2000.0)
     * (1003,3000.0)
     * (1002,2550.0)
     */
    dataRDD.map(a => (a._1, (a._2, 1)))
      .reduceByKey((a, b) => (a._1 + b._1, a._2 + b._2))
      .map(t => (t._1, t._2._1 / t._2._2))
      .sortBy(_._2, false)
      .collect()
      .foreach(println(_))
  }

  def useCombineByKey(dataRDD: RDD[(String, Double)]) = {
    /**
     * combineByKey(createCombiner, mergeValue, mergeCombiner)參數說明
     * 1. createCombiner:組合器函數,用於將RDD中V類型轉換成C類型,輸入參數為RDD[K,V]中的V,輸出為C
     *   --將key(DeptId)對應的Value數據salary,每一條數據輸出為(1,salary)
     * 2. mergeValue:合並值函數,將一個C類型和一個RDD中V類型值合並成一個C類型,輸入參數為(C,V),輸出為C
     *   --將同一分區的(1,salary1)與另外一個salary2數據合並,輸出為(1+1,salary1+salary2)
     * 3. mergeCombiners:合並組合器函數,用於將兩個C類型值合並成一個C類型,輸入參數為(C,C),輸出為C
     *   --將不同分區(n,salary1),(m,salary2) 進行合並處理,輸出為(n+m,salary1+salary2)
     * 4. numPartitions:結果RDD分區數,默認保持原有的分區數
     * 5. partitioner:分區函數,默認為HashPartitioner
     * 6. mapSideCombine:是否需要在Map端進行combine操作,類似於MapReduce中的combine,默認為true
     */
    val createCombiner = (salary: Double) => {
      Tuple2(1, salary)
    }
    val mergeValue = (c: (Int, Double), score: Double) => {
      Tuple2(c._1 + 1, c._2 + score)
    }
    val mergeCombiner = (t1: Tuple2[Int, Double], t2: Tuple2[Int, Double]) => {
      Tuple2(t1._1 + t2._1, t1._2 + t2._2)
    }

    dataRDD.combineByKey(createCombiner, mergeValue, mergeCombiner)
      .collect()
      .map(t => t match {
        case (id, (cnt, sum)) => {
          Tuple2(id, sum / cnt)
        }
      })
      .sortBy(_._2)
      .foreach(println(_))
  }

  def useSparkSQL() = {
    import org.apache.spark.sql.functions._
    val spark = SparkSession.builder()
      .appName("DeptAvgSalaryApp")
      .master("local[*]")
      .getOrCreate()

    val sc = spark.sparkContext
    import spark.implicits._

  // 讀取數據源文件進行過濾+處理生成Dataset數據 val dataFrame
= spark .read .text(path) .filter(row => { val str = row.getString(0) if (str.contains("deptId")) { false } else { true } })/*.foreach(row => { println("--: "+ row.getString(0)) // --: 1001 張三 01 2000 println(row.toString()) // [1001 張三 01 2000] })*/.map(row => { val line = row.getString(0) val arrs = line.split(" ") Salary(arrs(0),arrs(1),arrs(2),arrs(3).toDouble) }) dataFrame.show() // Dataset數據注冊成一張表+使用SQL方式進行匯總查詢 dataFrame.createOrReplaceTempView("dept") val frame = spark.sql("select deptid,avg(salary) from dept group by deptId order by avg(salary) desc") frame.show() spark.stop() } }

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM