本文為《Flink大數據項目實戰》學習筆記,想通過視頻系統學習Flink這個最火爆的大數據計算框架的同學,推薦學習課程:
Flink大數據項目實戰:http://t.cn/EJtKhaz
1. Kafka-connector概述及FlinkKafkaConsumer(kafka source)
1.1回顧kafka
1.最初由Linkedin 開發的分布式消息中間件現已成為Apache頂級項目
2.面向大數據
3.基本概念:
1.Broker
2.Topic
3.Partition
4.Producer
5.Consumer
6.Consumer Group
7.Offset( 生產offset , 消費offset , offset lag)
1.2引入依賴
Flink讀取kafka數據需要通過maven引入依賴:
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka-0.8_2.11</artifactId>
<version>1.6.2</version>
</dependency>
1.3Flink KafkaConsumer
Flink KafkaConsumer目前已經出現了4個大的版本:FlinkKafkaConsumer08、FlinkKafkaConsumer09、FlinkKafkaConsumer10和FlinkKafkaConsumer11.
FlinkKafkaConsumer08和FlinkKafkaConsumer09都繼承FlinkKafkaConsumerBase,FlinkKafkaConsumerBase內部實現了CheckpointFunction接口和繼承RichParallelSourceFunction類。
FlinkKafkaConsumer11繼承FlinkKafkaConsumer10,FlinkKafkaConsumer10繼承FlinkKafkaConsumer09。FlinkKafkaConsumer081和FlinkKafkaConsumer082繼承FlinkKafkaConsumer08。
1.4 FlinkKafkaConsumer010
FlinkKafkaConsumer010(String topic, DeserializationSchema<T> valueDeserializer, Properties props)
FlinkKafkaConsumer010(String topic, KeyedDeserializationSchema<T> deserializer, Properties props)
FlinkKafkaConsumer010(List<String> topics, DeserializationSchema<T> deserializer, Properties props)
FlinkKafkaConsumer010(List<String> topics, KeyedDeserializationSchema<T> deserializer, Properties props)
FlinkKafkaConsumer010(Pattern subscriptionPattern, KeyedDeserializationSchema<T> deserializer, Properties props)
三個構造參數:
1.要消費的topic(topic name / topic names/正表達式)
2.DeserializationSchema / KeyedDeserializationSchema(反序列化Kafka中的數據)
3.Kafka consumer的屬性,其中三個屬性必須提供:
a)bootstrap.servers (逗號分隔的Kafka broker列表)
b)zookeeper.connect (逗號分隔的Zookeeper server列表) (僅Kafka 0.8需要)
c)group.id(consumer group id)
1.5反序列化Schema類型
作用:對kafka里獲取的二進制數據進行反序列化
FlinkKafkaConsumer需要知道如何將Kafka中的二進制數據轉換成Java/Scala對象,DeserializationSchema定義了該轉換模式,通過T deserialize(byte[] message)
FlinkKafkaConsumer從kafka獲取的每條消息都會通過DeserializationSchema的T deserialize(byte[] message)反序列化處理
反序列化Schema類型(接口):
1.DeserializationSchema(只反序列化value)
2.KeyedDeserializationSchema
1.6 DeserializationSchema接口
1.7 KeyedDeserializationSchema接口
1.8常見反序列化Schema
SimpleStringSchema
JSONDeserializationSchema / JSONKeyValueDeserializationSchema
TypeInformationSerializationSchema/ TypeInformationKeyValueSerializationSchema(適合讀寫均是flink的場景)
AvroDeserializationSchema
1.9 FlinkKafkaConsumer010最簡樣版代碼
1.10 FlinkKafkaConsumer消費模式設置(影響從哪里開始消費)
設置FlinkKafkaConsumer消費模式示例代碼如下所示:
不同消費模式的解釋如下所示:
注意1:kafka 0.8版本, consumer提交偏移量到zookeeper,后續版本提交到kafka(一個特殊的topic: __consumer_offsets)
注意2:當作業從故障中恢復或者從savepoint還原時,上述設置的消費策略將不能決定開始消費的位置,真正的起始位置由保存點或檢查點中存儲的偏移量。
1.11理解FlinkKafkaSource的容錯性(影響消費起始位置)
如果Flink啟用了檢查點,Flink Kafka Consumer將會周期性的checkpoint其Kafka偏移量到快照。
通過實現CheckpointedFunction。
ListState<Tuple2<KafkaTopicPartition, Long>> 。
保證僅一次消費。
如果作業失敗,Flink將流程序恢復到最新檢查點的狀態,並從檢查點中存儲的偏移量開始重新消費Kafka中的記錄。(此時前面所講的消費策略就不能決定消費起始位置了,因為出故障了)。
1.12 Flink Kafka Consumer Offset提交行為
Flink Kafka Consumer Offset提交行為分為以下兩種:
1.13不同情況下消費起始位置的分析
1.14動態Partition discovery
Flink Kafka Consumer支持動態發現Kafka分區,且能保證exactly-once。
默認禁止動態發現分區,把flink.partition-discovery.interval-millis設置大於0即可啟用:
properties.setProperty(“flink.partition-discovery.interval-millis”, “30000”)
1.15動態Topic discovery
Flink Kafka Consumer支持動態發現Kafka Topic,僅限通過正則表達式指定topic的方式。
默認禁止動態發現分區,把flink.partition-discovery.interval-millis設置大於0即可啟用。
2. FlinkKafkaProducer(kafka sink)
2.1 Flink KafkaProducer
FlinkKafkaProducerBase實現CheckpointFunction接口實現容錯,同時也繼承了RichSinkFunction類。FinkKafkaProducer08繼承FlinkKafkaProducerBase。FinkKafkaProducer09繼承FlinkKafkaProducerBase,FinkKafkaProducer10繼承FinkKafkaProducer09.
FinkKafkaProducer011已經支持事務,它繼承TowPhaseCommitSinkFunction。TowPhaseCommitSinkFunction繼承RichSinkFunction。
2.2FlinkKafkaProducer
FlinkKafkaProducer包含了如下不同的構造方法:
FlinkKafkaProducer010(String brokerList, String topicId, SerializationSchema<T> serializationSchema)
FlinkKafkaProducer010(String topicId, SerializationSchema<T> serializationSchema, Properties producerConfig)
FlinkKafkaProducer010(String brokerList, String topicId, KeyedSerializationSchema<T> serializationSchema)
FlinkKafkaProducer010(String topicId, KeyedSerializationSchema<T> serializationSchema, Properties producerConfig)
FlinkKafkaProducer010(String topicId,SerializationSchema<T> serializationSchema,Properties producerConfig,@Nullable FlinkKafkaPartitioner<T> customPartitioner)
FlinkKafkaProducer010(String topicId,KeyedSerializationSchema<T> serializationSchema,Properties producerConfig,@Nullable FlinkKafkaPartitioner<T> customPartitioner)
Value序列化接口SerializationSchema,如果實現這個接口就需要實現如下方法:
byte[] serialize(T element);
如果key也需要實現序列化,則需要實現序列化接口KeyedSerializationSchema,然后重新如下方法:
byte[] serializeKey(T element);
byte[] serializeValue(T element);
String getTargetTopic(T element)
2.3常見序列化Schema
常見的序列化Schema:
1.TypeInformationSerializationSchema/ TypeInformationKeyValueSerializationSchema(適合讀寫均是flink的場景)
2.SimpleStringSchema
2.4 producerConfig
FlinkKafkaProducer內部KafkaProducer的配置,具體配置可以參考官網地址:
https://kafka.apache.org/documentation.html
2.5 FlinkKafkaPartitioner
默認使用FlinkFixedPartitioner,即每個subtask的數據寫到同一個Kafka partition中。
自定義分區器:繼承FlinkKafkaPartitioner(partitioner的狀態在job失敗時會丟失,不會checkpoint)。
2.6 FlinkKafkaProducer容錯