(3)Kafka整合Flink使用----使用Flink消費kafka內的數據


Kafka整合Flink使用----使用Flink消費kafka內的數據

添加依賴(代碼參照kafka官網:https://kafka.apache.org/)

<dependency>
	<groupId>org.apache.flink</groupId>
	<artifactId>flink-connector-kafka_${scala.binary.version}					</artifactId>
	<version>${flink.version}</version>
</dependency>
kafka作為Flink的數據源

使用Flink消費kafka內的數據

package com.shujia.flink.kafka

import java.util.Properties

import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer

object Demo1KafkaSource {
  def main(args: Array[String]): Unit = {
    //創建flink環境
    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment

    val properties = new Properties()
    //broler地址列表
    properties.setProperty("bootstrap.servers", "master:9092,node1:9092,node2:9092")
    //消費者組,同一條數據在一個組內只處理一次
    properties.setProperty("group.id", "test")

    //創建消費者
    val flinkKakfaConsumer = new FlinkKafkaConsumer[String](
      "words", //指定topic
      new SimpleStringSchema(), //指定數據格式
      properties //指定配置文件對象
    )

    flinkKakfaConsumer.setStartFromEarliest() //盡可能從最早的記錄開始
    //flinkKakfaConsumer.setStartFromLatest() //從最新的記錄開始
    //flinkKakfaConsumer.setStartFromTimestamp() //從指定的時間開始(毫秒)
    //flinkKakfaConsumer.setStartFromGroupOffsets()  //默認的方法, 按照消費者組讀取數據,如果消費者組第一次使用,默認只讀取最新的數據

    //使用kafka source   -- 無界流
    val kafkaDS: DataStream[String] = env.addSource(flinkKakfaConsumer)
    kafkaDS
      .flatMap(_.split(","))
      .map((_, 1))
      .keyBy(_._1)
      .sum(1)
      .print()

    env.execute()
  }
}


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM