Flink 读取用户自定义source


 

package com.kpwong.aiptest import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment} import org.apache.flink.api.scala._ import org.apache.flink.streaming.api.functions.source.SourceFunction import scala.collection.immutable import scala.util.Random object MySourceTest { def main(args: Array[String]): Unit = { val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment  val myDS: DataStream[SensorReading] = env.addSource(new MySensorSource) myDS.print() env.execute() } } //自定义SourceFunction

class MySensorSource() extends SourceFunction[SensorReading]{ //定义一个flag 用来标记数据源是否正常发出数据
 var running = true System.currentTimeMillis() override def run(sourceContext: SourceFunction.SourceContext[SensorReading]): Unit = { //定义一个随机数发生器
     val random: Random = new Random() val init: immutable.IndexedSeq[(String, Long, Double)] = (1 to 10).map(i=>("Sensor"+i,System.currentTimeMillis(),random.nextDouble())) while (running) { Thread.sleep(1000) //在上次数据基础上微调,更新温度值
         val current: immutable.IndexedSeq[(String, Long, Double)] = init.map(data=>(data._1,System.currentTimeMillis(),data._3+random.nextGaussian())) val readings: immutable.IndexedSeq[SensorReading] = current.map(data=>SensorReading(data._1,data._2,data._3)) readings.map(data=>sourceContext.collect(data)) // readings.foreach(data=>sourceContext.collect(data))
 } } override def cancel(): Unit = { running = false } }

运行结果:

 


免责声明!

本站转载的文章为个人学习借鉴使用,本站对版权不负任何法律责任。如果侵犯了您的隐私权益,请联系本站邮箱yoyou2525@163.com删除。



 
粤ICP备18138465号  © 2018-2025 CODEPRJ.COM