Windows下Flink讀取Kafka


代碼:
import java.util.Properties

import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer011

//溫度傳感器讀取樣例類
case class SensorReading(id: String, timestamp: Long, temperature: Double)

object SourceTest {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment

//1.從自定義的集合中讀取數據
import org.apache.flink.api.scala._
// val stream1 = env.fromCollection(List(SensorReading("sensor_1", 1547718199, 35.80018327300259),
// SensorReading("sensor_6", 1547718201, 15.402984393403084),
// SensorReading("sensor_7", 1547718202, 6.720945201171228),
// SensorReading("sensor_10", 1547718205, 38.101067604893444)
// ))
// stream1.print("stream1").setParallelism(1)
// env.execute("source test")
//
// //2.從文件讀取數據
// val stream2 = env.readTextFile("sensor.txt")
// stream2.print("stream2").setParallelism(1)
// env.execute("source test")

//3.以 kafka 消息隊列的數據作為來源
val properties = new Properties()
properties.setProperty("bootstrap.servers", "localhost:9092")
properties.setProperty("group.id", "consumer-group")
properties.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
properties.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
properties.setProperty("auto.offset.reset", "latest")

val stream3 = env.addSource(new FlinkKafkaConsumer011[String]("sensor", new SimpleStringSchema(), properties))
stream3.print("stream3").setParallelism(1)
env.execute("source test")
}
}

1.啟動zookerper

   

   2.啟動Kafka

   

   3.啟動Producer並發送數據

   

   4.結果

   

有幫助的歡迎評論打賞哈,謝謝!


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM