(3)Kafka整合Flink使用----使用Flink消费kafka内的数据


Kafka整合Flink使用----使用Flink消费kafka内的数据

添加依赖(代码参照kafka官网:https://kafka.apache.org/)

<dependency>
	<groupId>org.apache.flink</groupId>
	<artifactId>flink-connector-kafka_${scala.binary.version}					</artifactId>
	<version>${flink.version}</version>
</dependency>
kafka作为Flink的数据源

使用Flink消费kafka内的数据

package com.shujia.flink.kafka

import java.util.Properties

import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer

object Demo1KafkaSource {
  def main(args: Array[String]): Unit = {
    //创建flink环境
    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment

    val properties = new Properties()
    //broler地址列表
    properties.setProperty("bootstrap.servers", "master:9092,node1:9092,node2:9092")
    //消费者组,同一条数据在一个组内只处理一次
    properties.setProperty("group.id", "test")

    //创建消费者
    val flinkKakfaConsumer = new FlinkKafkaConsumer[String](
      "words", //指定topic
      new SimpleStringSchema(), //指定数据格式
      properties //指定配置文件对象
    )

    flinkKakfaConsumer.setStartFromEarliest() //尽可能从最早的记录开始
    //flinkKakfaConsumer.setStartFromLatest() //从最新的记录开始
    //flinkKakfaConsumer.setStartFromTimestamp() //从指定的时间开始(毫秒)
    //flinkKakfaConsumer.setStartFromGroupOffsets()  //默认的方法, 按照消费者组读取数据,如果消费者组第一次使用,默认只读取最新的数据

    //使用kafka source   -- 无界流
    val kafkaDS: DataStream[String] = env.addSource(flinkKakfaConsumer)
    kafkaDS
      .flatMap(_.split(","))
      .map((_, 1))
      .keyBy(_._1)
      .sum(1)
      .print()

    env.execute()
  }
}


免责声明!

本站转载的文章为个人学习借鉴使用,本站对版权不负任何法律责任。如果侵犯了您的隐私权益,请联系本站邮箱yoyou2525@163.com删除。



 
粤ICP备18138465号  © 2018-2025 CODEPRJ.COM