Flink之ProcessFunction側輸出流


1、代碼案例

package processFunction

import com.yangwj.api.SensorReading
import org.apache.flink.streaming.api.functions.{ProcessFunction}
import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
import org.apache.flink.streaming.api.scala._
import org.apache.flink.util.Collector
/**
 * @author yangwj
 * @date 2021/1/10 21:25
 * @version 1.0
 */
object SideOutPutTest {
  def main(args: Array[String]): Unit = {
    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
    
    val inputFile:String = "G:\\Java\\Flink\\guigu\\flink\\src\\main\\resources\\sensor.txt"
    val input: DataStream[String] = env.readTextFile(inputFile)

    val dataStream = input.map(data => {
      val arr: Array[String] = data.split(",")
      SensorReading(arr(0), arr(1).toLong, arr(2).toDouble)
    })

    val hightStream= dataStream.process(new SplitTemp(35.5))

    hightStream.print("high")
    hightStream.getSideOutput(new OutputTag[(String,Long,Double)]("low")).print("low")
    env.execute("SideOutPutTest Test")

  }
}

//分流測試
class SplitTemp(threshold:Double) extends ProcessFunction[SensorReading,SensorReading]{
  override def processElement(i: SensorReading, context: ProcessFunction[SensorReading, SensorReading]#Context, collector: Collector[SensorReading]): Unit = {

    if(i.temperature > threshold){
      //如果溫度大於35.5,那么輸出到主流
      collector.collect(i)
    }else {
      //如果溫度小於35.5,那么輸出到測流
      context.output(new OutputTag[(String, Long, Double)]("low"), (i.id, i.timestamp, i.temperature))
    }
  }
}

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM