大數據Flink的SQL在API中進行操作


使用Flink SQL/TABLE API完成下面功能。

有如下數據

字段分別為:姓名,年齡,性別,班級,考試成績

zhangsan,18,man,1707e,81.5

lisi,22,woman,1707e,77.5

wangwu,28,woman,1707e,82.0

zhaoliu,24,man,1707e,73.5

qianqi,18,woman,1707e,91.0

maba,22,man,1707e,84.0

sunjiiu,27,woman,1707e,88.0

xiaoming,20,man,1710e,73.5

xiaohong,21,woman,1710e,80.0

xiaozhang,22,man,1710e,73.5

xiaoli,19,woman,1710e,92.0

xiaowang,26,man,1710e,86.5

使用flinktable讀取以上數據,進行統計分析。

1.計算各班平均成績;

2.計算總分最高的班級名稱和總分;

3.計算各班最高成績的學生姓名;

4.計算1707e成績最高的女生姓名;

5.計算各班男生平均年齡,女生平均年齡;

6.計算1710e最小年齡學生姓名;

7.計算各班男生個數和女生個數。

 

import org.apache.flink.api.scala._
import org.apache.flink.table.api.TableEnvironment
import org.apache.flink.table.api.{Table, TableEnvironment}
import org.apache.flink.types.Row
object Job629 {

def main(args: Array[String]): Unit = {

//批處理環境
val env = ExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)

//表環境
val tableEnvironment = TableEnvironment.getTableEnvironment(env)

//獲取數據
val dataDS: DataSet[String] = env.readTextFile("src/main/resources/job629.txt")
//封裝樣例類
val stuDS = dataDS.map(x => {
val arr: Array[String] = x.split(",")
Student(arr(0).trim, arr(1).trim.toInt, arr(2).trim, arr(3).trim, arr(4).trim.toDouble)
})

//注冊表
tableEnvironment.registerDataSet("stu",stuDS)

// 1.計算各班平均成績;
val clazzAvg: Table = tableEnvironment.sqlQuery("select clazz,avg(score) from stu group by clazz")
val sql1: DataSet[Row] = tableEnvironment.toDataSet[Row](clazzAvg)
// sql1.print()

// 2.計算總分最高的班級名稱和總分;
val scoreMaxClazz: Table = tableEnvironment.sqlQuery(
"""
select c2.clazz,c2.sumScore from
(select sum(score) sumScore from stu group by clazz) c1 join
(select clazz,sum(score) sumScore from stu group by clazz order by sumScore desc limit 1) c2
on c1.sumScore = c2.sumScore
""")
val sql2: DataSet[Row] = tableEnvironment.toDataSet[Row](scoreMaxClazz)
// sql2.print()

// 3.計算各班最高成績的學生姓名;
val scoreMaxName: Table = tableEnvironment.sqlQuery(
"""
select s2.clazz,s1.name,s2.maxScore from stu s1 join
(select clazz,max(score) maxScore from stu group by clazz) s2
on s1.score = s2.maxScore
""")
val sql3: DataSet[Row] = tableEnvironment.toDataSet[Row](scoreMaxName)
//sql3.print()

// 4.計算1707e成績最高的女生姓名;
val scoreMaxF: Table = tableEnvironment.sqlQuery(
"""
select s1.name,s2.maxScore from stu s1 join
(select max(score) maxScore from stu where clazz = '1707e' and sex = 'woman') s2
on s1.score = s2.maxScore
""")
val sql4: DataSet[Row] = tableEnvironment.toDataSet[Row](scoreMaxF)
//sql4.print()

// 5.計算各班男生平均年齡,女生平均年齡;
val avgAge: Table = tableEnvironment.sqlQuery(
"""
select clazz,sex,avg(age) from stu group by clazz,sex
""")
val sql5: DataSet[Row] = tableEnvironment.toDataSet[Row](avgAge)
//sql5.print()

// 6.計算1710e最小年齡學生姓名;
val minAgeName: Table = tableEnvironment.sqlQuery(
"""
select s1.name,s2.minAge from stu s1 join
(select min(age) minAge from stu where clazz = '1710e') s2
on s1.age = s2.minAge
""")
val sql6: DataSet[Row] = tableEnvironment.toDataSet[Row](minAgeName)
//sql6.print()

// 7.計算各班男生個數和女生個數。
val count: Table = tableEnvironment.sqlQuery(
"""
select clazz,sex,count(sex) from stu group by clazz,sex
""")
val sql7: DataSet[Row] = tableEnvironment.toDataSet[Row](count)
//sql7.print()



}
}

case class Student(name:String,age:Int,sex:String,clazz:String,score:Double)


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM