使用Flink SQL/TABLE API完成下面功能。
有如下数据
字段分别为:姓名,年龄,性别,班级,考试成绩
zhangsan,18,man,1707e,81.5
lisi,22,woman,1707e,77.5
wangwu,28,woman,1707e,82.0
zhaoliu,24,man,1707e,73.5
qianqi,18,woman,1707e,91.0
maba,22,man,1707e,84.0
sunjiiu,27,woman,1707e,88.0
xiaoming,20,man,1710e,73.5
xiaohong,21,woman,1710e,80.0
xiaozhang,22,man,1710e,73.5
xiaoli,19,woman,1710e,92.0
xiaowang,26,man,1710e,86.5
使用flinktable读取以上数据,进行统计分析。
1.计算各班平均成绩;
2.计算总分最高的班级名称和总分;
3.计算各班最高成绩的学生姓名;
4.计算1707e成绩最高的女生姓名;
5.计算各班男生平均年龄,女生平均年龄;
6.计算1710e最小年龄学生姓名;
7.计算各班男生个数和女生个数。
import org.apache.flink.api.scala._
import org.apache.flink.table.api.TableEnvironment
import org.apache.flink.table.api.{Table, TableEnvironment}
import org.apache.flink.types.Row
object Job629 {
def main(args: Array[String]): Unit = {
//批处理环境
val env = ExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)
//表环境
val tableEnvironment = TableEnvironment.getTableEnvironment(env)
//获取数据
val dataDS: DataSet[String] = env.readTextFile("src/main/resources/job629.txt")
//封装样例类
val stuDS = dataDS.map(x => {
val arr: Array[String] = x.split(",")
Student(arr(0).trim, arr(1).trim.toInt, arr(2).trim, arr(3).trim, arr(4).trim.toDouble)
})
//注册表
tableEnvironment.registerDataSet("stu",stuDS)
// 1.计算各班平均成绩;
val clazzAvg: Table = tableEnvironment.sqlQuery("select clazz,avg(score) from stu group by clazz")
val sql1: DataSet[Row] = tableEnvironment.toDataSet[Row](clazzAvg)
// sql1.print()
// 2.计算总分最高的班级名称和总分;
val scoreMaxClazz: Table = tableEnvironment.sqlQuery(
"""
select c2.clazz,c2.sumScore from
(select sum(score) sumScore from stu group by clazz) c1 join
(select clazz,sum(score) sumScore from stu group by clazz order by sumScore desc limit 1) c2
on c1.sumScore = c2.sumScore
""")
val sql2: DataSet[Row] = tableEnvironment.toDataSet[Row](scoreMaxClazz)
// sql2.print()
// 3.计算各班最高成绩的学生姓名;
val scoreMaxName: Table = tableEnvironment.sqlQuery(
"""
select s2.clazz,s1.name,s2.maxScore from stu s1 join
(select clazz,max(score) maxScore from stu group by clazz) s2
on s1.score = s2.maxScore
""")
val sql3: DataSet[Row] = tableEnvironment.toDataSet[Row](scoreMaxName)
//sql3.print()
// 4.计算1707e成绩最高的女生姓名;
val scoreMaxF: Table = tableEnvironment.sqlQuery(
"""
select s1.name,s2.maxScore from stu s1 join
(select max(score) maxScore from stu where clazz = '1707e' and sex = 'woman') s2
on s1.score = s2.maxScore
""")
val sql4: DataSet[Row] = tableEnvironment.toDataSet[Row](scoreMaxF)
//sql4.print()
// 5.计算各班男生平均年龄,女生平均年龄;
val avgAge: Table = tableEnvironment.sqlQuery(
"""
select clazz,sex,avg(age) from stu group by clazz,sex
""")
val sql5: DataSet[Row] = tableEnvironment.toDataSet[Row](avgAge)
//sql5.print()
// 6.计算1710e最小年龄学生姓名;
val minAgeName: Table = tableEnvironment.sqlQuery(
"""
select s1.name,s2.minAge from stu s1 join
(select min(age) minAge from stu where clazz = '1710e') s2
on s1.age = s2.minAge
""")
val sql6: DataSet[Row] = tableEnvironment.toDataSet[Row](minAgeName)
//sql6.print()
// 7.计算各班男生个数和女生个数。
val count: Table = tableEnvironment.sqlQuery(
"""
select clazz,sex,count(sex) from stu group by clazz,sex
""")
val sql7: DataSet[Row] = tableEnvironment.toDataSet[Row](count)
//sql7.print()
}
}
case class Student(name:String,age:Int,sex:String,clazz:String,score:Double)