Flink之ProcessFunction侧输出流


1、代码案例

package processFunction

import com.yangwj.api.SensorReading
import org.apache.flink.streaming.api.functions.{ProcessFunction}
import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
import org.apache.flink.streaming.api.scala._
import org.apache.flink.util.Collector
/**
 * @author yangwj
 * @date 2021/1/10 21:25
 * @version 1.0
 */
object SideOutPutTest {
  def main(args: Array[String]): Unit = {
    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
    
    val inputFile:String = "G:\\Java\\Flink\\guigu\\flink\\src\\main\\resources\\sensor.txt"
    val input: DataStream[String] = env.readTextFile(inputFile)

    val dataStream = input.map(data => {
      val arr: Array[String] = data.split(",")
      SensorReading(arr(0), arr(1).toLong, arr(2).toDouble)
    })

    val hightStream= dataStream.process(new SplitTemp(35.5))

    hightStream.print("high")
    hightStream.getSideOutput(new OutputTag[(String,Long,Double)]("low")).print("low")
    env.execute("SideOutPutTest Test")

  }
}

//分流测试
class SplitTemp(threshold:Double) extends ProcessFunction[SensorReading,SensorReading]{
  override def processElement(i: SensorReading, context: ProcessFunction[SensorReading, SensorReading]#Context, collector: Collector[SensorReading]): Unit = {

    if(i.temperature > threshold){
      //如果温度大于35.5,那么输出到主流
      collector.collect(i)
    }else {
      //如果温度小于35.5,那么输出到测流
      context.output(new OutputTag[(String, Long, Double)]("low"), (i.id, i.timestamp, i.temperature))
    }
  }
}

 


免责声明!

本站转载的文章为个人学习借鉴使用,本站对版权不负任何法律责任。如果侵犯了您的隐私权益,请联系本站邮箱yoyou2525@163.com删除。



 
粤ICP备18138465号  © 2018-2025 CODEPRJ.COM