實現kafka進,kafka出的流程。
代碼:
object KafkaTest { def main(args: Array[String]): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment env.setParallelism(1) //kafka配置文件 val properties = new Properties() properties.setProperty("bootstrap.servers", "hadoop102:9092") properties.setProperty("group.id", "consumer-group") properties.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer") properties.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer") properties.setProperty("auto.offset.reset", "latest") //接收kafka的sensor這個topic發來的數據 val kafkaDataStream: DataStream[String] = env.addSource(new FlinkKafkaConsumer011[String]("sensor",new SimpleStringSchema(),properties)) val dataStream: DataStream[String] = kafkaDataStream.map(d => { val arr = d.split(",") SensorReading(arr(0).trim, arr(1).trim.toLong, arr(2).toDouble).toString }) //發送到kafka的flink-sink-test這個topic dataStream.addSink(new FlinkKafkaProducer011[String]("hadoop102:9092","flink-sink-test",new SimpleStringSchema())) env.execute("kafka") } }
生產者發送數據,供flink消費
[atguigu@hadoop102 bin]$ ./kafka-console-producer.sh --broker-list hadoop102:9092 --topic sensor >sensor_1, 1547718199, 35.80018327300259 >sensor_1, 1547718201, 40.8 >sensor_1, 1547718202, 998 >
消費者查看flink發回的數據
[atguigu@hadoop102 bin]$ ./kafka-console-consumer.sh --bootstrap-server hadoop102:9092 --topic flink-sink-test SensorReading(sensor_1,1547718199,35.80018327300259) SensorReading(sensor_1,1547718201,40.8) SensorReading(sensor_1,1547718202,998.0)