1. 需求背景
文本文件File里面存放公司各個部門人員的工資明細 salary.txt文件數據格式如下:
deptId name salary
1001 張三01 2000
1002 李四02 2500
1003 張三05 3000
1002 王五01 2600
用程序寫出各個部門的平均工資並倒序輸出
2. 使用Spark實現
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{Encoders, SparkSession}
import org.apache.spark.{SparkConf, SparkContext}
case class Salary(deptId: String, name: String, id: String, salary: Double)
object DeptAvgSalaryApp { val path = "./spark/src/main/resources/salary.txt" def main(args: Array[String]): Unit = { val sparkConf: SparkConf = new SparkConf() .setMaster("local[2]") .setAppName("DeptAvgSalaryApp") val spark: SparkContext = new SparkContext(sparkConf) val inputRDD: RDD[String] = spark.textFile(path) val dataRDD = inputRDD .filter(line => !line.contains("deptId")) .map(line => { // println(s"------------:${line}") val arrs = line.split(" ") Tuple2(arrs(0), arrs(3).toDouble) }) // 方法一: 使用mapValue useMapValue(dataRDD) // 方法二: 使用reduceByKey useReduceByKey(dataRDD) // 方法三:使用combineByKey useCombineByKey(dataRDD) // 方法四: 使用Spark SQL useSparkSQL(); spark.stop() } def useMapValue(dataRDD: RDD[(String, Double)]) = { /** * 針對數據根據key分組 * (1001,CompactBuffer(2000.0)) * (1003,CompactBuffer(3000.0)) * (1002,CompactBuffer(2500.0, 2600.0)) * 針對Tuple中value值進行處理 * --計算平均值 */ dataRDD .groupByKey() .mapValues(iter => { var cnt: Int = 0 var sum: Double = 0 val it = iter.iterator while (it.hasNext) { cnt += 1 sum += it.next() } (sum / cnt) }).sortBy(_._2, false) .collect() .foreach(println(_)) } def useReduceByKey(dataRDD: RDD[(String, Double)]) = { /** * 原始數據處理 * (1001,2000) ===> (1001,(2000,1)) * (1002,2500) ===> (1002,(2500,1)) * (1002,2600) ===> (1002,(2600,1)) * 之后根據key進行reduceByKey操作生成如下數據 * (1001,(2000.0,1)) * (1003,(3000.0,1)) * (1002,(5100.0,2)) * 繼續針對數據操作求平均值 * (1001,2000.0) * (1003,3000.0) * (1002,2550.0) */ dataRDD.map(a => (a._1, (a._2, 1))) .reduceByKey((a, b) => (a._1 + b._1, a._2 + b._2)) .map(t => (t._1, t._2._1 / t._2._2)) .sortBy(_._2, false) .collect() .foreach(println(_)) } def useCombineByKey(dataRDD: RDD[(String, Double)]) = { /** * combineByKey(createCombiner, mergeValue, mergeCombiner)參數說明 * 1. createCombiner:組合器函數,用於將RDD中V類型轉換成C類型,輸入參數為RDD[K,V]中的V,輸出為C * --將key(DeptId)對應的Value數據salary,每一條數據輸出為(1,salary) * 2. mergeValue:合並值函數,將一個C類型和一個RDD中V類型值合並成一個C類型,輸入參數為(C,V),輸出為C * --將同一分區的(1,salary1)與另外一個salary2數據合並,輸出為(1+1,salary1+salary2) * 3. mergeCombiners:合並組合器函數,用於將兩個C類型值合並成一個C類型,輸入參數為(C,C),輸出為C * --將不同分區(n,salary1),(m,salary2) 進行合並處理,輸出為(n+m,salary1+salary2) * 4. numPartitions:結果RDD分區數,默認保持原有的分區數 * 5. partitioner:分區函數,默認為HashPartitioner * 6. mapSideCombine:是否需要在Map端進行combine操作,類似於MapReduce中的combine,默認為true */ val createCombiner = (salary: Double) => { Tuple2(1, salary) } val mergeValue = (c: (Int, Double), score: Double) => { Tuple2(c._1 + 1, c._2 + score) } val mergeCombiner = (t1: Tuple2[Int, Double], t2: Tuple2[Int, Double]) => { Tuple2(t1._1 + t2._1, t1._2 + t2._2) } dataRDD.combineByKey(createCombiner, mergeValue, mergeCombiner) .collect() .map(t => t match { case (id, (cnt, sum)) => { Tuple2(id, sum / cnt) } }) .sortBy(_._2) .foreach(println(_)) } def useSparkSQL() = { import org.apache.spark.sql.functions._ val spark = SparkSession.builder() .appName("DeptAvgSalaryApp") .master("local[*]") .getOrCreate() val sc = spark.sparkContext import spark.implicits._
// 讀取數據源文件進行過濾+處理生成Dataset數據 val dataFrame = spark .read .text(path) .filter(row => { val str = row.getString(0) if (str.contains("deptId")) { false } else { true } })/*.foreach(row => { println("--: "+ row.getString(0)) // --: 1001 張三 01 2000 println(row.toString()) // [1001 張三 01 2000] })*/.map(row => { val line = row.getString(0) val arrs = line.split(" ") Salary(arrs(0),arrs(1),arrs(2),arrs(3).toDouble) }) dataFrame.show() // Dataset數據注冊成一張表+使用SQL方式進行匯總查詢 dataFrame.createOrReplaceTempView("dept") val frame = spark.sql("select deptid,avg(salary) from dept group by deptId order by avg(salary) desc") frame.show() spark.stop() } }