1. 需求背景
文本文件File里面存放公司各个部门人员的工资明细 salary.txt文件数据格式如下:
deptId name salary
1001 张三01 2000
1002 李四02 2500
1003 张三05 3000
1002 王五01 2600
用程序写出各个部门的平均工资并倒序输出
2. 使用Flink实现
import org.apache.flink.api.common.state.{MapState, MapStateDescriptor} import org.apache.flink.configuration.Configuration import org.apache.flink.streaming.api.TimeCharacteristic import org.apache.flink.streaming.api.functions.{KeyedProcessFunction} import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment} import org.apache.flink.streaming.api.scala._ import org.apache.flink.util.Collector import scala.collection.mutable case class AvgInfo(deptId: String,cnt: Int, sum: Double,avg:Double) case class Emp(deptId: String, name: String, id: String, salary: Double) object DeptAvgSalaryApp { val path = "./spark/spark-study/src/main/resources/salary.txt" def main(args: Array[String]): Unit = { val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime) env.setParallelism(1) val inputDstream: DataStream[String] = env.readTextFile(path) val keyedStream = inputDstream .filter(line => !line.contains("deptId")) .map(line => { val arrs = line.split(" ") Emp(arrs(0), arrs(1), arrs(2), arrs(3).toDouble) } ).keyBy(_.deptId) keyedStream.print("keyedStream: ") // 尝试一: 使用scala中的Map数据类型尝试 ----不成功 useFlinkMapState(keyedStream) // 尝试二: 使用scala中的Map数据类型尝试 ----可以成功 // useScalaMap(keyedStream) env.execute("DeptAvgSalaryApp1") } private def useFlinkMapState(keyedStream: KeyedStream[Emp, String]) = { // The state is only accessible by functions applied on a {@code KeyedStream} var mapState: MapState[String, AvgInfo] = null val dataStream = keyedStream.process(new KeyedProcessFunction[String, Emp, String] { var out_1: Collector[String] = _ override def open(parameters: Configuration): Unit = { if(mapState == null){ mapState = getRuntimeContext.getMapState(new MapStateDescriptor[String, AvgInfo]( "avgInfo", classOf[String], classOf[AvgInfo] )) } } override def processElement(emp: Emp, ctx: KeyedProcessFunction[String, Emp, String]#Context, out: Collector[String]): Unit = { if (out_1 == null) { out_1 = out } val salary: Double = emp.salary val deptId = emp.deptId if (mapState.contains(deptId)) { val info = mapState.get(deptId) mapState.put(deptId, AvgInfo(deptId, info.cnt + 1, info.sum + salary,0)) } else { mapState.put(deptId, AvgInfo(deptId, 1, salary,0)) } } override def close(): Unit = { val it = mapState.keys().iterator while (it.hasNext) { val key = it.next() val inf = mapState.get(key) out_1.collect(inf.deptId + ":" + (inf.sum / inf.cnt)) } } }) dataStream.print("dataMapState: ") } private def useScalaMap(keyedStream: KeyedStream[Emp, String]) = { var dataMap: mutable.Map[String, AvgInfo] = scala.collection.mutable.Map[String, AvgInfo]() val dataStream = keyedStream.process(new KeyedProcessFunction[String, Emp, String] { var out_1: Collector[String] = _ override def processElement(emp: Emp, ctx: KeyedProcessFunction[String, Emp, String]#Context, out: Collector[String]): Unit = { if (out_1 == null) { out_1 = out } val salary: Double = emp.salary val deptId = emp.deptId if (dataMap.contains(deptId)) { val info = dataMap.get(deptId).get dataMap.put(deptId, AvgInfo(deptId, info.cnt + 1, info.sum + salary,0)) } else { dataMap.put(deptId, AvgInfo(deptId, 1, salary,0)) } } override def close(): Unit = { dataMap.map(data => data match { case (deptId,info: AvgInfo) => { (deptId, AvgInfo(deptId, info.cnt, info.sum,info.sum/info.cnt)) } }).values .toList .sortWith((n1,n2) => n1.avg > n2.avg) .foreach(inf => out_1.collect(inf.deptId + ":" + (inf.sum / inf.cnt))) } }) dataStream.print("dataMap: ") } }
3. 执行结果
使用mutable.Map保存中间数据
keyedStream: > Emp(1001,张三,01,2000.0)
keyedStream: > Emp(1002,李四,02,2500.0)
keyedStream: > Emp(1003,张三,05,3000.0)
keyedStream: > Emp(1002,王五,01,2600.0)
dataMap: > 1003:3000.0
dataMap: > 1002:2550.0
dataMap: > 1001:2000.0
使用MapState保存中间数据<目前有问题>
keyedStream: > Emp(1001,张三,01,2000.0)
dataMapState: > 1002:2550.0
keyedStream: > Emp(1002,李四,02,2500.0)
keyedStream: > Emp(1003,张三,05,3000.0)
keyedStream: > Emp(1002,王五,01,2600.0)