Flink实现求平均值


1. 需求背景

文本文件File里面存放公司各个部门人员的工资明细 salary.txt文件数据格式如下:
deptId name salary
1001 张三01 2000
1002 李四02 2500
1003 张三05 3000
1002 王五01 2600
用程序写出各个部门的平均工资并倒序输出

2. 使用Flink实现

import org.apache.flink.api.common.state.{MapState, MapStateDescriptor}
import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.functions.{KeyedProcessFunction}
import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
import org.apache.flink.streaming.api.scala._
import org.apache.flink.util.Collector
import scala.collection.mutable

case class AvgInfo(deptId: String,cnt: Int, sum: Double,avg:Double)
case class Emp(deptId: String, name: String, id: String, salary: Double)

object DeptAvgSalaryApp {
  val path = "./spark/spark-study/src/main/resources/salary.txt"

  def main(args: Array[String]): Unit = {
    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
    env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime)
    env.setParallelism(1)
    val inputDstream: DataStream[String] = env.readTextFile(path)
    val keyedStream = inputDstream
      .filter(line => !line.contains("deptId"))
      .map(line => {
        val arrs = line.split(" ")
        Emp(arrs(0), arrs(1), arrs(2), arrs(3).toDouble)
      }
      ).keyBy(_.deptId)
    keyedStream.print("keyedStream: ")

    // 尝试一: 使用scala中的Map数据类型尝试 ----不成功
    useFlinkMapState(keyedStream)
    // 尝试二: 使用scala中的Map数据类型尝试 ----可以成功
//    useScalaMap(keyedStream)

    env.execute("DeptAvgSalaryApp1")
  }

  private def useFlinkMapState(keyedStream: KeyedStream[Emp, String]) = {
    // The state is only accessible by functions applied on a {@code KeyedStream}
    var mapState: MapState[String, AvgInfo] = null
    val dataStream =
      keyedStream.process(new KeyedProcessFunction[String, Emp, String] {
        var out_1: Collector[String] = _

        override def open(parameters: Configuration): Unit = {
          if(mapState == null){
            mapState = getRuntimeContext.getMapState(new MapStateDescriptor[String, AvgInfo](
              "avgInfo", classOf[String], classOf[AvgInfo]
            ))
          }
        }
        override def processElement(emp: Emp, ctx: KeyedProcessFunction[String, Emp, String]#Context,
                                    out: Collector[String]): Unit = {
          if (out_1 == null) {
            out_1 = out
          }
          val salary: Double = emp.salary
          val deptId = emp.deptId
          if (mapState.contains(deptId)) {
            val info = mapState.get(deptId)
            mapState.put(deptId, AvgInfo(deptId, info.cnt + 1, info.sum + salary,0))
          } else {
            mapState.put(deptId, AvgInfo(deptId, 1, salary,0))
          }
        }

        override def close(): Unit = {
          val it = mapState.keys().iterator
          while (it.hasNext) {
            val key = it.next()
            val inf = mapState.get(key)
            out_1.collect(inf.deptId + ":" + (inf.sum / inf.cnt))
          }
        }
      })

    dataStream.print("dataMapState: ")
  }

  private def useScalaMap(keyedStream: KeyedStream[Emp, String]) = {
    var dataMap: mutable.Map[String, AvgInfo] = scala.collection.mutable.Map[String, AvgInfo]()
    val dataStream =
      keyedStream.process(new KeyedProcessFunction[String, Emp, String] {
        var out_1: Collector[String] = _

        override def processElement(emp: Emp, ctx: KeyedProcessFunction[String, Emp, String]#Context,
                                    out: Collector[String]): Unit = {
          if (out_1 == null) {
            out_1 = out
          }
          val salary: Double = emp.salary
          val deptId = emp.deptId
          if (dataMap.contains(deptId)) {
            val info = dataMap.get(deptId).get
            dataMap.put(deptId, AvgInfo(deptId, info.cnt + 1, info.sum + salary,0))
          } else {
            dataMap.put(deptId, AvgInfo(deptId, 1, salary,0))
          }
        }

        override def close(): Unit = {
          dataMap.map(data => data match {
            case (deptId,info: AvgInfo) => {
              (deptId, AvgInfo(deptId, info.cnt, info.sum,info.sum/info.cnt))
            }
          }).values
            .toList
            .sortWith((n1,n2) => n1.avg > n2.avg)
            .foreach(inf => out_1.collect(inf.deptId + ":" + (inf.sum / inf.cnt)))
        }
      })

    dataStream.print("dataMap: ")
  }
}

3. 执行结果

使用mutable.Map保存中间数据 

keyedStream: > Emp(1001,张三,01,2000.0)
keyedStream: > Emp(1002,李四,02,2500.0)
keyedStream: > Emp(1003,张三,05,3000.0)
keyedStream: > Emp(1002,王五,01,2600.0)
dataMap: > 1003:3000.0
dataMap: > 1002:2550.0
dataMap: > 1001:2000.0

使用MapState保存中间数据<目前有问题> 

keyedStream: > Emp(1001,张三,01,2000.0)
dataMapState: > 1002:2550.0
keyedStream: > Emp(1002,李四,02,2500.0)
keyedStream: > Emp(1003,张三,05,3000.0)
keyedStream: > Emp(1002,王五,01,2600.0)


免责声明!

本站转载的文章为个人学习借鉴使用,本站对版权不负任何法律责任。如果侵犯了您的隐私权益,请联系本站邮箱yoyou2525@163.com删除。



 
粤ICP备18138465号  © 2018-2025 CODEPRJ.COM