Flink學習筆記:Connectors之kafka


本文為《Flink大數據項目實戰》學習筆記,想通過視頻系統學習Flink這個最火爆的大數據計算框架的同學,推薦學習課程:

Flink大數據項目實戰:http://t.cn/EJtKhaz

1. Kafka-connector概述及FlinkKafkaConsumer(kafka source)

1.1回顧kafka

1.最初由Linkedin 開發的分布式消息中間件現已成為Apache頂級項目

 

2.面向大數據

3.基本概念:

1.Broker

2.Topic

3.Partition

4.Producer

5.Consumer

6.Consumer Group

7.Offset( 生產offset , 消費offset , offset lag)

1.2引入依賴

Flink讀取kafka數據需要通過maven引入依賴:

<dependency>

    <groupId>org.apache.flink</groupId>

    <artifactId>flink-connector-kafka-0.8_2.11</artifactId>

    <version>1.6.2</version>

</dependency>

 

1.3Flink KafkaConsumer

Flink KafkaConsumer目前已經出現了4個大的版本:FlinkKafkaConsumer08、FlinkKafkaConsumer09、FlinkKafkaConsumer10和FlinkKafkaConsumer11.

 FlinkKafkaConsumer08和FlinkKafkaConsumer09都繼承FlinkKafkaConsumerBase,FlinkKafkaConsumerBase內部實現了CheckpointFunction接口和繼承RichParallelSourceFunction類。

 FlinkKafkaConsumer11繼承FlinkKafkaConsumer10,FlinkKafkaConsumer10繼承FlinkKafkaConsumer09。FlinkKafkaConsumer081和FlinkKafkaConsumer082繼承FlinkKafkaConsumer08。

 

1.4 FlinkKafkaConsumer010

FlinkKafkaConsumer010(String topic, DeserializationSchema<T> valueDeserializer, Properties props)

FlinkKafkaConsumer010(String topic, KeyedDeserializationSchema<T> deserializer, Properties props)

FlinkKafkaConsumer010(List<String> topics, DeserializationSchema<T> deserializer, Properties props)

FlinkKafkaConsumer010(List<String> topics, KeyedDeserializationSchema<T> deserializer, Properties props)

FlinkKafkaConsumer010(Pattern subscriptionPattern, KeyedDeserializationSchema<T> deserializer, Properties props)

 

三個構造參數:

1.要消費的topic(topic name / topic names/正表達式)

2.DeserializationSchema / KeyedDeserializationSchema(反序列化Kafka中的數據)

3.Kafka consumer的屬性,其中三個屬性必須提供:

a)bootstrap.servers (逗號分隔的Kafka broker列表)

b)zookeeper.connect (逗號分隔的Zookeeper server列表) (僅Kafka 0.8需要)

c)group.id(consumer group id)

1.5反序列化Schema類型

作用:對kafka里獲取的二進制數據進行反序列化

FlinkKafkaConsumer需要知道如何將Kafka中的二進制數據轉換成Java/Scala對象,DeserializationSchema定義了該轉換模式,通過T deserialize(byte[] message)

FlinkKafkaConsumer從kafka獲取的每條消息都會通過DeserializationSchema的T deserialize(byte[] message)反序列化處理

反序列化Schema類型(接口):

1.DeserializationSchema(只反序列化value)

2.KeyedDeserializationSchema

1.6 DeserializationSchema接口

 

1.7 KeyedDeserializationSchema接口

 

1.8常見反序列化Schema

SimpleStringSchema

JSONDeserializationSchema / JSONKeyValueDeserializationSchema

TypeInformationSerializationSchema/ TypeInformationKeyValueSerializationSchema(適合讀寫均是flink的場景)

AvroDeserializationSchema

1.9 FlinkKafkaConsumer010最簡樣版代碼

 

1.10 FlinkKafkaConsumer消費模式設置(影響從哪里開始消費)

設置FlinkKafkaConsumer消費模式示例代碼如下所示:

 

不同消費模式的解釋如下所示:

  

注意1:kafka 0.8版本, consumer提交偏移量到zookeeper,后續版本提交到kafka(一個特殊的topic: __consumer_offsets)

注意2:當作業從故障中恢復或者從savepoint還原時,上述設置的消費策略將不能決定開始消費的位置,真正的起始位置由保存點或檢查點中存儲的偏移量。

1.11理解FlinkKafkaSource的容錯性(影響消費起始位置)

 

如果Flink啟用了檢查點,Flink Kafka Consumer將會周期性的checkpoint其Kafka偏移量到快照。

通過實現CheckpointedFunction。

ListState<Tuple2<KafkaTopicPartition, Long>> 。

保證僅一次消費。 

如果作業失敗,Flink將流程序恢復到最新檢查點的狀態,並從檢查點中存儲的偏移量開始重新消費Kafka中的記錄。(此時前面所講的消費策略就不能決定消費起始位置了,因為出故障了)。

1.12 Flink Kafka Consumer Offset提交行為

Flink Kafka Consumer Offset提交行為分為以下兩種:

 

1.13不同情況下消費起始位置的分析

 

1.14動態Partition discovery

Flink Kafka Consumer支持動態發現Kafka分區,且能保證exactly-once。 

默認禁止動態發現分區,把flink.partition-discovery.interval-millis設置大於0即可啟用:

properties.setProperty(“flink.partition-discovery.interval-millis”, “30000”)

1.15動態Topic discovery

Flink Kafka Consumer支持動態發現Kafka Topic,僅限通過正則表達式指定topic的方式。

默認禁止動態發現分區,把flink.partition-discovery.interval-millis設置大於0即可啟用。

 

2. FlinkKafkaProducer(kafka sink)

2.1 Flink KafkaProducer

FlinkKafkaProducerBase實現CheckpointFunction接口實現容錯,同時也繼承了RichSinkFunction類。FinkKafkaProducer08繼承FlinkKafkaProducerBase。FinkKafkaProducer09繼承FlinkKafkaProducerBase,FinkKafkaProducer10繼承FinkKafkaProducer09.

 

FinkKafkaProducer011已經支持事務,它繼承TowPhaseCommitSinkFunction。TowPhaseCommitSinkFunction繼承RichSinkFunction。

 

2.2FlinkKafkaProducer

FlinkKafkaProducer包含了如下不同的構造方法:

FlinkKafkaProducer010(String brokerList, String topicId, SerializationSchema<T> serializationSchema)

FlinkKafkaProducer010(String topicId, SerializationSchema<T> serializationSchema, Properties producerConfig)

FlinkKafkaProducer010(String brokerList, String topicId, KeyedSerializationSchema<T> serializationSchema)

FlinkKafkaProducer010(String topicId, KeyedSerializationSchema<T> serializationSchema, Properties producerConfig)

FlinkKafkaProducer010(String topicId,SerializationSchema<T> serializationSchema,Properties producerConfig,@Nullable FlinkKafkaPartitioner<T> customPartitioner)

FlinkKafkaProducer010(String topicId,KeyedSerializationSchema<T> serializationSchema,Properties producerConfig,@Nullable FlinkKafkaPartitioner<T> customPartitioner)

Value序列化接口SerializationSchema,如果實現這個接口就需要實現如下方法:

byte[] serialize(T element);

如果key也需要實現序列化,則需要實現序列化接口KeyedSerializationSchema,然后重新如下方法:

byte[] serializeKey(T element);

byte[] serializeValue(T element);

String getTargetTopic(T element) 

2.3常見序列化Schema

常見的序列化Schema:

1.TypeInformationSerializationSchema/ TypeInformationKeyValueSerializationSchema(適合讀寫均是flink的場景)

2.SimpleStringSchema

2.4 producerConfig

FlinkKafkaProducer內部KafkaProducer的配置,具體配置可以參考官網地址:

https://kafka.apache.org/documentation.html

2.5 FlinkKafkaPartitioner

默認使用FlinkFixedPartitioner,即每個subtask的數據寫到同一個Kafka partition中。

自定義分區器:繼承FlinkKafkaPartitioner(partitioner的狀態在job失敗時會丟失,不會checkpoint)。

2.6 FlinkKafkaProducer容錯

 

 

 

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM