Kafka整合Flink使用----使用Flink消費kafka內的數據
添加依賴(代碼參照kafka官網:https://kafka.apache.org/)
<dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-kafka_${scala.binary.version} </artifactId> <version>${flink.version}</version> </dependency>
kafka作為Flink的數據源
使用Flink消費kafka內的數據
package com.shujia.flink.kafka
import java.util.Properties
import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer
object Demo1KafkaSource {
def main(args: Array[String]): Unit = {
//創建flink環境
val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
val properties = new Properties()
//broler地址列表
properties.setProperty("bootstrap.servers", "master:9092,node1:9092,node2:9092")
//消費者組,同一條數據在一個組內只處理一次
properties.setProperty("group.id", "test")
//創建消費者
val flinkKakfaConsumer = new FlinkKafkaConsumer[String](
"words", //指定topic
new SimpleStringSchema(), //指定數據格式
properties //指定配置文件對象
)
flinkKakfaConsumer.setStartFromEarliest() //盡可能從最早的記錄開始
//flinkKakfaConsumer.setStartFromLatest() //從最新的記錄開始
//flinkKakfaConsumer.setStartFromTimestamp() //從指定的時間開始(毫秒)
//flinkKakfaConsumer.setStartFromGroupOffsets() //默認的方法, 按照消費者組讀取數據,如果消費者組第一次使用,默認只讀取最新的數據
//使用kafka source -- 無界流
val kafkaDS: DataStream[String] = env.addSource(flinkKakfaConsumer)
kafkaDS
.flatMap(_.split(","))
.map((_, 1))
.keyBy(_._1)
.sum(1)
.print()
env.execute()
}
}