Flink學習(九) Sink到Kafka


package com.wyh.streamingApi.sink import java.util.Properties import org.apache.flink.api.common.serialization.SimpleStringSchema import org.apache.flink.streaming.api.scala._ import org.apache.flink.streaming.connectors.kafka.{FlinkKafkaConsumer011, FlinkKafkaProducer011} //溫度傳感器讀數樣例類
case class SensorReading(id: String, timestamp: Long, temperature: Double) object Sink2Kafka { def main(args: Array[String]): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment env.setParallelism(1) /** * sensor_1,1547718199,35.80018327300259 * sensor_6,1547718201,15.402984393403084 * sensor_7,1547718202,6.720945201171228 * sensor_10,1547718205,38.1010676048934444 * sensor_1,1547718199,35.1 * sensor_1,1547718199,31.0 * sensor_1,1547718199,39 */
    //Source操作 // val inputStream = env.readTextFile("F:\\flink-study\\wyhFlinkSD\\data\\sensor.txt")
 val properties = new Properties() properties.setProperty("zookeeper.connect", "tuijian:2181") properties.setProperty("bootstrap.servers", "tuijian:9092") properties.setProperty("group.id", "test-consumer-group") properties.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer") properties.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer") properties.setProperty("auto.offset.reset", "latest") //偏移量自動重置
 val inputStream = env.addSource(new FlinkKafkaConsumer011[String]("sensor",new SimpleStringSchema(),properties)) //Transform操作
    val dataStream: DataStream[String] = inputStream.map(data => { val dataArray = data.split(",") SensorReading(dataArray(0).trim, dataArray(1).trim.toLong, dataArray(2).trim.toDouble).toString //轉成String方便序列化輸出
 }) //Sink操作
    dataStream.addSink(new FlinkKafkaProducer011[String]("tuijian:9092","sinkTest",new SimpleStringSchema())) dataStream.print() env.execute("kafka sink test") } }

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM