flink系列-12、Flink 的 Connectors & Flink Kafka Connectors


  • Flink 里面預定義了一些 source 和 sink。
  • Flink 內部也提供了一些 Boundled connectors。
  • 通過異步 IO 方式
  • 自定義 Source & Sink。

一、Flink 預定義 & 自定義 Source 和 Sink

https://www.cnblogs.com/xiexiandong/p/12770187.html

二、Flink 綁定的connectors

需要專門引入對應的依賴,主要是實現外部數據進出Flink。

  • Apache Kafka (source/sink)
  • Apache Cassandra (sink)
  • Amazon Kinesis Streams (source/sink)
  • Elasticsearch (sink)
  • Hadoop FileSystem (sink)
  • RabbitMQ (source/sink)
  • Apache NiFi (source/sink)
  • Twitter Streaming API (source)
  • Apache Bahir
  • Apache ActiveMQ (source/sink)
  • Apache Flume (sink)
  • Redis (sink)
  • Akka (sink)
  • Netty (source)

三、Flink kafka Connector

<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-connector-kafka-0.11 -->
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-connector-kafka-0.11_2.11</artifactId>
    <version>1.10.0</version>
</dependency>
Flink-Kafka-Consumer
def main(args: Array[String]): Unit = {
    // kafka 配置
    val kafkaConsumerProps: Properties = new Properties()
    kafkaConsumerProps.setProperty("bootstrap.servers", KafkaConfig.BOOTSTRAP_SERVERS)
    kafkaConsumerProps.setProperty("group.id", KafkaConfig.CONSUMER_GROUP)
    // 獲取 kafkaConsumer
    val flinkKafkaConsumer: FlinkKafkaConsumer011[String] = new FlinkKafkaConsumer011[String](KafkaConfig.SOURCE_TOPIC, new SimpleStringSchema(), kafkaConsumerProps)

    val logPath: String = "/tmp/logs/flink_log"
    val conf: Configuration = new Configuration()
    // 開啟spark-webui
    conf.setBoolean(ConfigConstants.LOCAL_START_WEBSERVER, true)
    //配置webui的日志文件
    conf.setString("web.log.path", logPath)
    // 配置 taskManager 的日志文件,否則打印日志到控制台
    conf.setString(ConfigConstants.TASK_MANAGER_LOG_PATH_KEY, logPath)
    // 獲取本地運行環境
    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(conf)
    // 添加數據源
    val sourceDataStream: DataStream[String] = env.addSource(flinkKafkaConsumer)

    sourceDataStream.print()

    // 提交運行
    env.execute("FlinkKafkaExample")
  }

創建 FlinkKafkaConsumer:
FlinkKafkaConsumer011

三個構造參數:

  • 要消費的topic(topic name / topic list/正表達式)
  • DeserializationSchema / KeyedDeserializationSchema(反序列化Kafka中的數據))
  • Kafka consumer的屬性,其中三個屬性必須提供:
    • bootstrap.servers(逗號分隔的Kafka broker列表)
    • group.id(consumer group id)

1、反序列化數據

  • 因為 kafka 中數據都是以二進制 byte 形式存儲的。讀到 Flink 系統中之后,需要將二進制數據轉化為具體的 java、scala 對象。具體需要實現一個 schema 類,定義如何序列化和反序列數據。
  • 反序列化時需要實現 DeserializationSchema 接口,並重寫 deserialize(byte[] message) 函數,如果是反序列化 kafka 中 kv 的數據時,需要實現 KeyedDeserializationSchema 接口。
    •  // kafka 數據類型樣例類
      case class KafkaEvent(message: String, eventTime: Long)
      
      //實現 DeserializationSchema 用來反序列化kafka中的數據,kafka 中的數據按照二進制存儲,系列化為 java/scala對象
      //重寫 deserialize(message: Array[Byte]) 方法
      //如果 kafka 中的數據是按照 KV 格式存儲的,需要實現 KeyedDeserializationSchema
      class KafkaEventDeserializationSchema extends DeserializationSchema[KafkaEvent] {
        // 標記為無界流
        override def isEndOfStream(nextElement: KafkaEvent): Boolean = false
        
        override def deserialize(message: Array[Byte]): KafkaEvent = {
          KafkaEvent(new String(message), System.currentTimeMillis())
        }
      
        // 得到自定義序列化類型
        override def getProducedType: TypeInformation[KafkaEvent] = TypeInformation.of(new TypeHint[KafkaEvent] {})
      }
      
  • 另外 Flink 中也提供了一些常用的序列化反序列化的 schema 類。
    • 例如,SimpleStringSchema,按字符串方式進行序列化、反序列化。
    • TypeInformationSerializationSchema,它可根據 Flink 的 TypeInformation 信息來推斷出需要選擇的 schema。
    • JsonDeserializationSchema 使用 jackson 反序列化 json 格式消息,並返回 ObjectNode,可以使用 .get(“property”) 方法來訪問相應字段。

2、消費起始位置設置

在構造好的 FlinkKafkaConsumer 類后面調用如下相應函數,設置合適的起始位置。

  • setStartFromGroupOffsets,也是默認的策略,從 group offset 位置讀取數據,group offset 指的是 kafka broker 端記錄的某個 group 的最后一次的消費位置。但是 kafka broker 端沒有該 group 信息,會根據 kafka 的參數”auto.offset.reset”的設置來決定從哪個位置開始消費。
  • setStartFromEarliest,從 kafka 最早的位置開始讀取。
  • setStartFromLatest,從 kafka 最新的位置開始讀取。
  • setStartFromTimestamp(long),從時間戳大於或等於指定時間戳的位置開始讀取。Kafka 時戳,是指 kafka 為每條消息增加另一個時戳。該時戳可以表示消息在 proudcer 端生成時的時間、或進入到 kafka broker 時的時間。
  • setStartFromSpecificOffsets,從指定分區的 offset 位置開始讀取,如指定的 offsets 中不存某個分區,該分區從 group offset 位置開始讀取。此時需要用戶給定一個具體的分區、offset 的集合。
  • 需要注意的是,因為 Flink 框架有容錯機制,如果作業故障,如果作業開啟 checkpoint,會從上一次 checkpoint 狀態開始恢復。或者在停止作業的時候主動做 savepoint,啟動作業時從 savepoint 開始恢復。這兩種情況下恢復作業時,作業消費起始位置是從之前保存的狀態中恢復,與上面提到跟 kafka 這些單獨的配置無關
    kafka 設置消費位置
情形 誰決定起始位置
第一次啟動, 無savepoint(常規情況) 由消費模式決定
通過savepoint啟動(應用升級,比如加 大並行度) 由savepoint記錄的offset決定
有checkpoint,失敗后,job恢復的情況 由checkpoint的snapshoot中記錄的offset決定
無checkpoint,失敗后,job恢復的情況 由消費模式決定

3、topic 和 partition 動態發現

實際的生產環境中可能有這樣一些需求,比如:

  • 場景一,有一個 Flink 作業需要將五份數據聚合到一起,五份數據對應五個 kafka topic,隨着業務增長,新增一類數據,同時新增了一個 kafka topic,如何在不重啟作業的情況下作業自動感知新的 topic。
  • 場景二,作業從一個固定的 kafka topic 讀數據,開始該 topic 有 10 個 partition,但隨着業務的增長數據量變大,需要對 kafka partition 個數進行擴容,由 10 個擴容到 20。該情況下如何在不重啟作業情況下動態感知新擴容的 partition。

針對上面的兩種場景,首先需要在構建 FlinkKafkaConsumer 時的 properties 中設置 flink.partition-discovery.interval-millis 參數為非負值,表示開啟動態發現的開關,以及設置的時間間隔。此時 FlinkKafkaConsumer 內部會啟動一個單獨的線程定期去 kafka 獲取最新的 meta 信息。

kafkaConsumerProps.setProperty("flink.partition-discovery.interval-millis", "30000")
val topicPattern: Pattern = java.util.regex.Pattern.compile("test-topic-[0-9]")
val flinkKafkaConsumer: FlinkKafkaConsumer011[String] = new FlinkKafkaConsumer011[String](topicPattern, new SimpleStringSchema(), kafkaConsumerProps)
  • 針對場景一,還需在構建 FlinkKafkaConsumer 時,topic 的描述可以傳一個正則表達式描述的 pattern。每次獲取最新 kafka meta 時獲取正則匹配的最新 topic 列表。
  • 針對場景二,設置前面的動態發現參數,在定期獲取 kafka 最新 meta 信息時會匹配新的 partition。為了保證數據的正確性,新發現的 partition 從最早的位置開始讀取。

4、commit offset 方式

Flink kafka consumer commit offset 方式需要區分是否開啟了 checkpoint。

  • 如果 checkpoint 關閉,commit offset 要依賴於 kafka 客戶端的 auto commit。需設置 enable.auto.commit,auto.commit.interval.ms 參數到 consumer properties,就會按固定的時間間隔定期 auto commit offset 到 kafka。
enable.auto.commit 的默認值是 true;就是默認采用自動提交的機制。
auto.commit.interval.ms 的默認值是 5000,單位是毫秒。
  • 如果開啟 checkpoint,這個時候作業消費的 offset 是 Flink 在 state 中自己管理和容錯。此時提交 offset 到 kafka,一般都是作為外部進度的監控,想實時知道作業消費的位置和 lag 情況。此時需要 setCommitOffsetsOnCheckpoints 為 true 來設置當 checkpoint 成功時提交 offset 到 kafka。此時 commit offset 的間隔就取決於 checkpoint 的間隔,所以此時從 kafka 一側看到的 lag 可能並非完全實時,如果 checkpoint 間隔比較長 lag 曲線可能會是一個鋸齒狀。

KakfaProducer 的配置
Flink-Kafka-Producer

def main(args: Array[String]): Unit = {
    // 獲取 kafkaConsumer
    val kafkaConsumerProps: Properties = new Properties()
    kafkaConsumerProps.setProperty("bootstrap.servers", KafkaConfig.BOOTSTRAP_SERVERS)
    kafkaConsumerProps.setProperty("group.id", KafkaConfig.CONSUMER_GROUP)
    val flinkKafkaConsumer: FlinkKafkaConsumer011[String] = new FlinkKafkaConsumer011[String](KafkaConfig.SOURCE_TOPIC, new SimpleStringSchema(), kafkaConsumerProps)

    // 獲取 KafkaProducer
    var kafkaProducerProps: Properties = new Properties()
    kafkaProducerProps.setProperty("bootstrap.servers", KafkaConfig.BOOTSTRAP_SERVERS)
    val flinkKafkaProducer: FlinkKafkaProducer011[String] = new FlinkKafkaProducer011[String](KafkaConfig.PRODUCER_TOPIC, new SimpleStringSchema(), kafkaProducerProps)

    val logPath: String = "/tmp/logs/flink_log"
    val conf: Configuration = new Configuration()
    // 開啟spark-webui
    conf.setBoolean(ConfigConstants.LOCAL_START_WEBSERVER, true)
    //配置webui的日志文件
    conf.setString("web.log.path", logPath)
    // 配置 taskManager 的日志文件,否則打印日志到控制台
    conf.setString(ConfigConstants.TASK_MANAGER_LOG_PATH_KEY, logPath)
    // 獲取本地運行環境
    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(conf)

    val sourceDataStream: DataStream[String] = env.addSource(flinkKafkaConsumer)
    // 結果輸出
    sourceDataStream.print()
    // 結果存入 kafka
    sourceDataStream.addSink(flinkKafkaProducer)
    // 開啟任務
    env.execute("FlinkKafkaProducerExample")

  }
FlinkKafkaProducer011

1、Producer 分區

  • 使用 FlinkKafkaProducer 往 kafka 中寫數據時,如果不單獨設置 partition 策略,會默認使用 FlinkFixedPartitioner(此時無論是帶 key 的數據,還是不帶 key 都采用),該 partitioner 分區的方式是 task 所在的並發 id 對 topic 總 partition 數取余:parallelInstanceId % partitions.length。
    • 此時如果 sink 為 4,paritition 為 1,則 4 個 task 往同一個 partition 中寫數據。
    • 但當 sink task < partition 個數時會有部分 partition 沒有數據寫入,例如 sink task 為2,partition 總數為 4,則后面兩個 partition 將沒有數據寫入。
    • FlinkKafkaSink
  • 如果構建 FlinkKafkaProducer 時,partition 設置為 null,此時會使用 kafka producer 默認分區方式
    • 非 key 寫入的情況下,使用 round-robin 的方式進行分區,每個 task 都會輪循的寫下游的所有 partition。該方式下游的 partition 數據會比較均衡,但是缺點是 partition 個數過多的情況下需要維持過多的網絡連接,即每個 task 都會維持跟所有 partition 所在 broker 的連接。
    • 帶 key 的數據會根據 key,相同 key 數據分區的相同的 partition,如果 key 為 null,再輪詢寫。
    • FlinkKafkaSink

2、容錯

Flink kafka 09、010 版本下:

  • 能達到 at-least-once 語義(數據有可能重復),通過 setLogFailuresOnly 為 false,setFlushOnCheckpoint 為 true 來控制。
    • setLogFailuresOnly,默認為 false,是控制寫 kafka 失敗時,是否只打印失敗的 log 不拋異常讓作業停止。
    • setFlushOnCheckpoint,默認為 true,是控制是否在 checkpoint 時 fluse 數據到 kafka,保證數據已經寫到 kafka。否則數據有可能還緩存在 kafka 客戶端的 buffer 中,並沒有真正寫出到 kafka,此時作業掛掉數據即丟失,不能做到至少一次的語義。

Flink kafka 011 版本下:


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM