代碼:
import java.util.Properties
import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer011
//溫度傳感器讀取樣例類
case class SensorReading(id: String, timestamp: Long, temperature: Double)
object SourceTest {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
//1.從自定義的集合中讀取數據
import org.apache.flink.api.scala._
// val stream1 = env.fromCollection(List(SensorReading("sensor_1", 1547718199, 35.80018327300259),
// SensorReading("sensor_6", 1547718201, 15.402984393403084),
// SensorReading("sensor_7", 1547718202, 6.720945201171228),
// SensorReading("sensor_10", 1547718205, 38.101067604893444)
// ))
// stream1.print("stream1").setParallelism(1)
// env.execute("source test")
//
// //2.從文件讀取數據
// val stream2 = env.readTextFile("sensor.txt")
// stream2.print("stream2").setParallelism(1)
// env.execute("source test")
//3.以 kafka 消息隊列的數據作為來源
val properties = new Properties()
properties.setProperty("bootstrap.servers", "localhost:9092")
properties.setProperty("group.id", "consumer-group")
properties.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
properties.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
properties.setProperty("auto.offset.reset", "latest")
val stream3 = env.addSource(new FlinkKafkaConsumer011[String]("sensor", new SimpleStringSchema(), properties))
stream3.print("stream3").setParallelism(1)
env.execute("source test")
}
}
1.啟動zookerper
2.啟動Kafka
3.啟動Producer並發送數據
4.結果
有幫助的歡迎評論打賞哈,謝謝!