

Flink 提供了專門的 Kafka 連接器,向 Kafka topic 中讀取或者寫入數據。Flink Kafka Consumer 集成了 Flink 的 Checkpoint 機制,可提供 exactly-once 的處理語義。為此,Flink 並不完全依賴於跟蹤 Kafka 消費組的偏移量,而是在內部跟蹤和檢查偏移量。本文內容較長,可以關注收藏。
引言
當我們在使用Spark Streaming、Flink等計算框架進行數據實時處理時,使用Kafka作為一款發布與訂閱的消息系統成為了標配。Spark Streaming與Flink都提供了相對應的Kafka Consumer,使用起來非常的方便,只需要設置一下Kafka的參數,然后添加kafka的source就萬事大吉了。如果你真的覺得事情就是如此的so easy,感覺媽媽再也不用擔心你的學習了,那就真的是too young too simple sometimes naive了。本文以Flink 的Kafka Source為討論對象,首先從基本的使用入手,然后深入源碼逐一剖析,一並為你撥開Flink Kafka connector的神秘面紗。值得注意的是,本文假定讀者具備了Kafka的相關知識,關於Kafka的相關細節問題,不在本文的討論范圍之內。
Flink Kafka Consumer介紹
Flink Kafka Connector有很多個版本,可以根據你的kafka和Flink的版本選擇相應的包(maven artifact id)和類名。本文所涉及的Flink版本為1.10,Kafka的版本為2.3.4。Flink所提供的Maven依賴於類名如下表所示:
Demo示例
添加Maven依賴
<!--本文使用的是通用型的connector-->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka_2.11</artifactId>
<version>1.10.0</version>
</dependency>
簡單代碼案例
public class KafkaConnector {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment senv = StreamExecutionEnvironment.getExecutionEnvironment();
// 開啟checkpoint,時間間隔為毫秒
senv.enableCheckpointing(5000L);
// 選擇狀態后端
senv.setStateBackend((StateBackend) new FsStateBackend("file:///E://checkpoint"));
//senv.setStateBackend((StateBackend) new FsStateBackend("hdfs://kms-1:8020/checkpoint"));
Properties props = new Properties();
// kafka broker地址
props.put("bootstrap.servers", "kms-2:9092,kms-3:9092,kms-4:9092");
// 僅kafka0.8版本需要配置
props.put("zookeeper.connect", "kms-2:2181,kms-3:2181,kms-4:2181");
// 消費者組
props.put("group.id", "test");
// 自動偏移量提交
props.put("enable.auto.commit", true);
// 偏移量提交的時間間隔,毫秒
props.put("auto.commit.interval.ms", 5000);
// kafka 消息的key序列化器
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
// kafka 消息的value序列化器
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
// 指定kafka的消費者從哪里開始消費數據
// 共有三種方式,
// #earliest
// 當各分區下有已提交的offset時,從提交的offset開始消費;
// 無提交的offset時,從頭開始消費
// #latest
// 當各分區下有已提交的offset時,從提交的offset開始消費;
// 無提交的offset時,消費新產生的該分區下的數據
// #none
// topic各分區都存在已提交的offset時,
// 從offset后開始消費;
// 只要有一個分區不存在已提交的offset,則拋出異常
props.put("auto.offset.reset", "latest");
FlinkKafkaConsumer<String> consumer = new FlinkKafkaConsumer<>(
"qfbap_ods.code_city",
new SimpleStringSchema(),
props);
//設置checkpoint后在提交offset,即oncheckpoint模式
// 該值默認為true,
consumer.setCommitOffsetsOnCheckpoints(true);
// 最早的數據開始消費
// 該模式下,Kafka 中的 committed offset 將被忽略,不會用作起始位置。
//consumer.setStartFromEarliest();
// 消費者組最近一次提交的偏移量,默認。
// 如果找不到分區的偏移量,那么將會使用配置中的 auto.offset.reset 設置
//consumer.setStartFromGroupOffsets();
// 最新的數據開始消費
// 該模式下,Kafka 中的 committed offset 將被忽略,不會用作起始位置。
//consumer.setStartFromLatest();
// 指定具體的偏移量時間戳,毫秒
// 對於每個分區,其時間戳大於或等於指定時間戳的記錄將用作起始位置。
// 如果一個分區的最新記錄早於指定的時間戳,則只從最新記錄讀取該分區數據。
// 在這種模式下,Kafka 中的已提交 offset 將被忽略,不會用作起始位置。
//consumer.setStartFromTimestamp(1585047859000L);
// 為每個分區指定偏移量
/*Map<KafkaTopicPartition, Long> specificStartOffsets = new HashMap<>();
specificStartOffsets.put(new KafkaTopicPartition("qfbap_ods.code_city", 0), 23L);
specificStartOffsets.put(new KafkaTopicPartition("qfbap_ods.code_city", 1), 31L);
specificStartOffsets.put(new KafkaTopicPartition("qfbap_ods.code_city", 2), 43L);
consumer1.setStartFromSpecificOffsets(specificStartOffsets);*/
/**
*
* 請注意:當 Job 從故障中自動恢復或使用 savepoint 手動恢復時,
* 這些起始位置配置方法不會影響消費的起始位置。
* 在恢復時,每個 Kafka 分區的起始位置由存儲在 savepoint 或 checkpoint 中的 offset 確定
*
*/
DataStreamSource<String> source = senv.addSource(consumer);
// TODO
source.print();
senv.execute("test kafka connector");
}
}
參數配置解讀
在Demo示例中,給出了詳細的配置信息,下面將對上面的參數配置進行逐一分析。
kakfa的properties參數配置
bootstrap.servers:kafka broker地址
zookeeper.connect:僅kafka0.8版本需要配置
group.id:消費者組
enable.auto.commit:
自動偏移量提交,該值的配置不是最終的偏移量提交模式,需要考慮用戶是否開啟了checkpoint,
在下面的源碼分析中會進行解讀
auto.commit.interval.ms:偏移量提交的時間間隔,毫秒
key.deserializer:
kafka 消息的key序列化器,如果不指定會使用ByteArrayDeserializer序列化器
value.deserializer:
kafka 消息的value序列化器,如果不指定會使用ByteArrayDeserializer序列化器
auto.offset.reset:
指定kafka的消費者從哪里開始消費數據,共有三種方式,
第一種:earliest
當各分區下有已提交的offset時,從提交的offset開始消費;無提交的offset時,從頭開始消費第二種:latest
當各分區下有已提交的offset時,從提交的offset開始消費;無提交的offset時,消費新產生的該分區下的數據第三種:none
topic各分區都存在已提交的offset時,從offset后開始消費;只要有一個分區不存在已提交的offset,則拋出異常注意:上面的指定消費模式並不是最終的消費模式,取決於用戶在Flink程序中配置的消費模式
Flink程序用戶配置的參數
consumer.setCommitOffsetsOnCheckpoints(true)
解釋:設置checkpoint后在提交offset,即oncheckpoint模式,該值默認為true,該參數會影響偏移量的提交方式,下面的源碼中會進行分析
consumer.setStartFromEarliest()
解釋:最早的數據開始消費 ,該模式下,Kafka 中的 committed offset 將被忽略,不會用作起始位置。該方法為繼承父類FlinkKafkaConsumerBase的方法。
consumer.setStartFromGroupOffsets()
解釋:消費者組最近一次提交的偏移量,默認。如果找不到分區的偏移量,那么將會使用配置中的 auto.offset.reset 設置,該方法為繼承父類FlinkKafkaConsumerBase的方法。
consumer.setStartFromLatest()
解釋:最新的數據開始消費,該模式下,Kafka 中的 committed offset 將被忽略,不會用作起始位置。該方法為繼承父類FlinkKafkaConsumerBase的方法。
consumer.setStartFromTimestamp(1585047859000L)
解釋:指定具體的偏移量時間戳,毫秒。對於每個分區,其時間戳大於或等於指定時間戳的記錄將用作起始位置。如果一個分區的最新記錄早於指定的時間戳,則只從最新記錄讀取該分區數據。在這種模式下,Kafka 中的已提交 offset 將被忽略,不會用作起始位置。
consumer.setStartFromSpecificOffsets(specificStartOffsets)
解釋:為每個分區指定偏移量,該方法為繼承父類FlinkKafkaConsumerBase的方法。
請注意:當 Job 從故障中自動恢復或使用 savepoint 手動恢復時,這些起始位置配置方法不會影響消費的起始位置。在恢復時,每個 Kafka 分區的起始位置由存儲在 savepoint 或 checkpoint 中的 offset 確定。
Flink Kafka Consumer源碼解讀
繼承關系
Flink Kafka Consumer繼承了FlinkKafkaConsumerBase抽象類,而FlinkKafkaConsumerBase抽象類又繼承了RichParallelSourceFunction,所以要實現一個自定義的source時,有兩種實現方式:一種是通過實現SourceFunction接口來自定義並行度為1的數據源;另一種是通過實現ParallelSourceFunction接口或者繼承RichParallelSourceFunction來自定義具有並行度的數據源。FlinkKafkaConsumer的繼承關系如下圖所示。
源碼解讀
FlinkKafkaConsumer源碼
先看一下FlinkKafkaConsumer的源碼,為了方面閱讀,本文將盡量給出本比較完整的源代碼片段,具體如下所示:代碼較長,在這里可以先有有一個總體的印象,下面會對重要的代碼片段詳細進行分析。
public class FlinkKafkaConsumer<T> extends FlinkKafkaConsumerBase<T> {
// 配置輪詢超時超時時間,使用flink.poll-timeout參數在properties進行配置
public static final String KEY_POLL_TIMEOUT = "flink.poll-timeout";
// 如果沒有可用數據,則等待輪詢所需的時間(以毫秒為單位)。 如果為0,則立即返回所有可用的記錄
//默認輪詢超時時間
public static final long DEFAULT_POLL_TIMEOUT = 100L;
// 用戶提供的kafka 參數配置
protected final Properties properties;
// 如果沒有可用數據,則等待輪詢所需的時間(以毫秒為單位)。 如果為0,則立即返回所有可用的記錄
protected final long pollTimeout;
/**
* 創建一個kafka的consumer source
* @param topic 消費的主題名稱
* @param valueDeserializer 反序列化類型,用於將kafka的字節消息轉換為Flink的對象
* @param props 用戶傳入的kafka參數
*/
public FlinkKafkaConsumer(String topic, DeserializationSchema<T> valueDeserializer, Properties props) {
this(Collections.singletonList(topic), valueDeserializer, props);
}
/**
* 創建一個kafka的consumer source
* 該構造方法允許傳入KafkaDeserializationSchema,該反序列化類支持訪問kafka消費的額外信息
* 比如:key/value對,offsets(偏移量),topic(主題名稱)
* @param topic 消費的主題名稱
* @param deserializer 反序列化類型,用於將kafka的字節消息轉換為Flink的對象
* @param props 用戶傳入的kafka參數
*/
public FlinkKafkaConsumer(String topic, KafkaDeserializationSchema<T> deserializer, Properties props) {
this(Collections.singletonList(topic), deserializer, props);
}
/**
* 創建一個kafka的consumer source
* 該構造方法允許傳入多個topic(主題),支持消費多個主題
* @param topics 消費的主題名稱,多個主題為List集合
* @param deserializer 反序列化類型,用於將kafka的字節消息轉換為Flink的對象
* @param props 用戶傳入的kafka參數
*/
public FlinkKafkaConsumer(List<String> topics, DeserializationSchema<T> deserializer, Properties props) {
this(topics, new KafkaDeserializationSchemaWrapper<>(deserializer), props);
}
/**
* 創建一個kafka的consumer source
* 該構造方法允許傳入多個topic(主題),支持消費多個主題,
* @param topics 消費的主題名稱,多個主題為List集合
* @param deserializer 反序列化類型,用於將kafka的字節消息轉換為Flink的對象,支持獲取額外信息
* @param props 用戶傳入的kafka參數
*/
public FlinkKafkaConsumer(List<String> topics, KafkaDeserializationSchema<T> deserializer, Properties props) {
this(topics, null, deserializer, props);
}
/**
* 基於正則表達式訂閱多個topic
* 如果開啟了分區發現,即FlinkKafkaConsumer.KEY_PARTITION_DISCOVERY_INTERVAL_MILLIS值為非負數
* 只要是能夠正則匹配上,主題一旦被創建就會立即被訂閱
* @param subscriptionPattern 主題的正則表達式
* @param valueDeserializer 反序列化類型,用於將kafka的字節消息轉換為Flink的對象,支持獲取額外信息
* @param props 用戶傳入的kafka參數
*/
public FlinkKafkaConsumer(Pattern subscriptionPattern, DeserializationSchema<T> valueDeserializer, Properties props) {
this(null, subscriptionPattern, new KafkaDeserializationSchemaWrapper<>(valueDeserializer), props);
}
/**
* 基於正則表達式訂閱多個topic
* 如果開啟了分區發現,即FlinkKafkaConsumer.KEY_PARTITION_DISCOVERY_INTERVAL_MILLIS值為非負數
* 只要是能夠正則匹配上,主題一旦被創建就會立即被訂閱
* @param subscriptionPattern 主題的正則表達式
* @param deserializer 該反序列化類支持訪問kafka消費的額外信息,比如:key/value對,offsets(偏移量),topic(主題名稱)
* @param props 用戶傳入的kafka參數
*/
public FlinkKafkaConsumer(Pattern subscriptionPattern, KafkaDeserializationSchema<T> deserializer, Properties props) {
this(null, subscriptionPattern, deserializer, props);
}
private FlinkKafkaConsumer(
List<String> topics,
Pattern subscriptionPattern,
KafkaDeserializationSchema<T> deserializer,
Properties props) {
// 調用父類(FlinkKafkaConsumerBase)構造方法,PropertiesUtil.getLong方法第一個參數為Properties,第二個參數為key,第三個參數為value默認值
super(
topics,
subscriptionPattern,
deserializer,
getLong(
checkNotNull(props, "props"),
KEY_PARTITION_DISCOVERY_INTERVAL_MILLIS, PARTITION_DISCOVERY_DISABLED),
!getBoolean(props, KEY_DISABLE_METRICS, false));
this.properties = props;
setDeserializer(this.properties);
// 配置輪詢超時時間,如果在properties中配置了KEY_POLL_TIMEOUT參數,則返回具體的配置值,否則返回默認值DEFAULT_POLL_TIMEOUT
try {
if (properties.containsKey(KEY_POLL_TIMEOUT)) {
this.pollTimeout = Long.parseLong(properties.getProperty(KEY_POLL_TIMEOUT));
} else {
this.pollTimeout = DEFAULT_POLL_TIMEOUT;
}
}
catch (Exception e) {
throw new IllegalArgumentException("Cannot parse poll timeout for '" + KEY_POLL_TIMEOUT + '\'', e);
}
}
// 父類(FlinkKafkaConsumerBase)方法重寫,該方法的作用是返回一個fetcher實例,
// fetcher的作用是連接kafka的broker,拉去數據並進行反序列化,然后將數據輸出為數據流(data stream)
@Override
protected AbstractFetcher<T, ?> createFetcher(
SourceContext<T> sourceContext,
Map<KafkaTopicPartition, Long> assignedPartitionsWithInitialOffsets,
SerializedValue<AssignerWithPeriodicWatermarks<T>> watermarksPeriodic,
SerializedValue<AssignerWithPunctuatedWatermarks<T>> watermarksPunctuated,
StreamingRuntimeContext runtimeContext,
OffsetCommitMode offsetCommitMode,
MetricGroup consumerMetricGroup,
boolean useMetrics) throws Exception {
// 確保當偏移量的提交模式為ON_CHECKPOINTS(條件1:開啟checkpoint,條件2:consumer.setCommitOffsetsOnCheckpoints(true))時,禁用自動提交
// 該方法為父類(FlinkKafkaConsumerBase)的靜態方法
// 這將覆蓋用戶在properties中配置的任何設置
// 當offset的模式為ON_CHECKPOINTS,或者為DISABLED時,會將用戶配置的properties屬性進行覆蓋
// 具體是將ENABLE_AUTO_COMMIT_CONFIG = "enable.auto.commit"的值重置為"false
// 可以理解為:如果開啟了checkpoint,並且設置了consumer.setCommitOffsetsOnCheckpoints(true),默認為true,
// 就會將kafka properties的enable.auto.commit強制置為false
adjustAutoCommitConfig(properties, offsetCommitMode);
return new KafkaFetcher<>(
sourceContext,
assignedPartitionsWithInitialOffsets,
watermarksPeriodic,
watermarksPunctuated,
runtimeContext.getProcessingTimeService(),
runtimeContext.getExecutionConfig().getAutoWatermarkInterval(),
runtimeContext.getUserCodeClassLoader(),
runtimeContext.getTaskNameWithSubtasks(),
deserializer,
properties,
pollTimeout,
runtimeContext.getMetricGroup(),
consumerMetricGroup,
useMetrics);
}
//父類(FlinkKafkaConsumerBase)方法重寫
// 返回一個分區發現類,分區發現可以使用kafka broker的高級consumer API發現topic和partition的元數據
@Override
protected AbstractPartitionDiscoverer createPartitionDiscoverer(
KafkaTopicsDescriptor topicsDescriptor,
int indexOfThisSubtask,
int numParallelSubtasks) {
return new KafkaPartitionDiscoverer(topicsDescriptor, indexOfThisSubtask, numParallelSubtasks, properties);
}
/**
*判斷是否在kafka的參數開啟了自動提交,即enable.auto.commit=true,
* 並且auto.commit.interval.ms>0,
* 注意:如果沒有沒有設置enable.auto.commit的參數,則默認為true
* 如果沒有設置auto.commit.interval.ms的參數,則默認為5000毫秒
* @return
*/
@Override
protected boolean getIsAutoCommitEnabled() {
//
return getBoolean(properties, ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true) &&
PropertiesUtil.getLong(properties, ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 5000) > 0;
}
/**
* 確保配置了kafka消息的key與value的反序列化方式,
* 如果沒有配置,則使用ByteArrayDeserializer序列化器,
* 該類的deserialize方法是直接將數據進行return,未做任何處理
* @param props
*/
private static void setDeserializer(Properties props) {
final String deSerName = ByteArrayDeserializer.class.getName();
Object keyDeSer = props.get(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG);
Object valDeSer = props.get(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG);
if (keyDeSer != null && !keyDeSer.equals(deSerName)) {
LOG.warn("Ignoring configured key DeSerializer ({})", ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG);
}
if (valDeSer != null && !valDeSer.equals(deSerName)) {
LOG.warn("Ignoring configured value DeSerializer ({})", ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG);
}
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, deSerName);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, deSerName);
}
}
分析
上面的代碼已經給出了非常詳細的注釋,下面將對比較關鍵的部分進行分析。
構造方法分析
FlinkKakfaConsumer提供了7種構造方法,如上圖所示。不同的構造方法分別具有不同的功能,通過傳遞的參數也可以大致分析出每種構造方法特有的功能,為了方便理解,本文將對其進行分組討論,具體如下:
單topic
/**
* 創建一個kafka的consumer source
* @param topic 消費的主題名稱
* @param valueDeserializer 反序列化類型,用於將kafka的字節消息轉換為Flink的對象
* @param props 用戶傳入的kafka參數
*/
public FlinkKafkaConsumer(String topic, DeserializationSchema<T> valueDeserializer, Properties props) {
this(Collections.singletonList(topic), valueDeserializer, props);
}
/**
* 創建一個kafka的consumer source
* 該構造方法允許傳入KafkaDeserializationSchema,該反序列化類支持訪問kafka消費的額外信息
* 比如:key/value對,offsets(偏移量),topic(主題名稱)
* @param topic 消費的主題名稱
* @param deserializer 反序列化類型,用於將kafka的字節消息轉換為Flink的對象
* @param props 用戶傳入的kafka參數
*/
public FlinkKafkaConsumer(String topic, KafkaDeserializationSchema<T> deserializer, Properties props) {
this(Collections.singletonList(topic), deserializer, props);
}
上面兩種構造方法只支持單個topic,區別在於反序列化的方式不一樣。第一種使用的是DeserializationSchema,第二種使用的是KafkaDeserializationSchema,其中使用帶有KafkaDeserializationSchema參數的構造方法可以獲取更多的附屬信息,比如在某些場景下需要獲取key/value對,offsets(偏移量),topic(主題名稱)等信息,可以選擇使用此方式的構造方法。以上兩種方法都調用了私有的構造方法,私有構造方法的分析見下面。
多topic
/**
* 創建一個kafka的consumer source
* 該構造方法允許傳入多個topic(主題),支持消費多個主題
* @param topics 消費的主題名稱,多個主題為List集合
* @param deserializer 反序列化類型,用於將kafka的字節消息轉換為Flink的對象
* @param props 用戶傳入的kafka參數
*/
public FlinkKafkaConsumer(List<String> topics, DeserializationSchema<T> deserializer, Properties props) {
this(topics, new KafkaDeserializationSchemaWrapper<>(deserializer), props);
}
/**
* 創建一個kafka的consumer source
* 該構造方法允許傳入多個topic(主題),支持消費多個主題,
* @param topics 消費的主題名稱,多個主題為List集合
* @param deserializer 反序列化類型,用於將kafka的字節消息轉換為Flink的對象,支持獲取額外信息
* @param props 用戶傳入的kafka參數
*/
public FlinkKafkaConsumer(List<String> topics, KafkaDeserializationSchema<T> deserializer, Properties props) {
this(topics, null, deserializer, props);
}
上面的兩種多topic的構造方法,可以使用一個list集合接收多個topic進行消費,區別在於反序列化的方式不一樣。第一種使用的是DeserializationSchema,第二種使用的是KafkaDeserializationSchema,其中使用帶有KafkaDeserializationSchema參數的構造方法可以獲取更多的附屬信息,比如在某些場景下需要獲取key/value對,offsets(偏移量),topic(主題名稱)等信息,可以選擇使用此方式的構造方法。以上兩種方法都調用了私有的構造方法,私有構造方法的分析見下面。
正則匹配topic
/**
* 基於正則表達式訂閱多個topic
* 如果開啟了分區發現,即FlinkKafkaConsumer.KEY_PARTITION_DISCOVERY_INTERVAL_MILLIS值為非負數
* 只要是能夠正則匹配上,主題一旦被創建就會立即被訂閱
* @param subscriptionPattern 主題的正則表達式
* @param valueDeserializer 反序列化類型,用於將kafka的字節消息轉換為Flink的對象,支持獲取額外信息
* @param props 用戶傳入的kafka參數
*/
public FlinkKafkaConsumer(Pattern subscriptionPattern, DeserializationSchema<T> valueDeserializer, Properties props) {
this(null, subscriptionPattern, new KafkaDeserializationSchemaWrapper<>(valueDeserializer), props);
}
/**
* 基於正則表達式訂閱多個topic
* 如果開啟了分區發現,即FlinkKafkaConsumer.KEY_PARTITION_DISCOVERY_INTERVAL_MILLIS值為非負數
* 只要是能夠正則匹配上,主題一旦被創建就會立即被訂閱
* @param subscriptionPattern 主題的正則表達式
* @param deserializer 該反序列化類支持訪問kafka消費的額外信息,比如:key/value對,offsets(偏移量),topic(主題名稱)
* @param props 用戶傳入的kafka參數
*/
public FlinkKafkaConsumer(Pattern subscriptionPattern, KafkaDeserializationSchema<T> deserializer, Properties props) {
this(null, subscriptionPattern, deserializer, props);
}
實際的生產環境中可能有這樣一些需求,比如有一個flink作業需要將多種不同的數據聚合到一起,而這些數據對應着不同的kafka topic,隨着業務增長,新增一類數據,同時新增了一個kafka topic,如何在不重啟作業的情況下作業自動感知新的topic。首先需要在構建FlinkKafkaConsumer時的properties中設置flink.partition-discovery.interval-millis參數為非負值,表示開啟動態發現的開關,以及設置的時間間隔。此時FLinkKafkaConsumer內部會啟動一個單獨的線程定期去kafka獲取最新的meta信息。具體的調用執行信息,參見下面的私有構造方法
私有構造方法
private FlinkKafkaConsumer(
List<String> topics,
Pattern subscriptionPattern,
KafkaDeserializationSchema<T> deserializer,
Properties props) {
// 調用父類(FlinkKafkaConsumerBase)構造方法,PropertiesUtil.getLong方法第一個參數為Properties,第二個參數為key,第三個參數為value默認值。KEY_PARTITION_DISCOVERY_INTERVAL_MILLIS值是開啟分區發現的配置參數,在properties里面配置flink.partition-discovery.interval-millis=5000(大於0的數),如果沒有配置則使用PARTITION_DISCOVERY_DISABLED=Long.MIN_VALUE(表示禁用分區發現)
super(
topics,
subscriptionPattern,
deserializer,
getLong(
checkNotNull(props, "props"),
KEY_PARTITION_DISCOVERY_INTERVAL_MILLIS, PARTITION_DISCOVERY_DISABLED),
!getBoolean(props, KEY_DISABLE_METRICS, false));
this.properties = props;
setDeserializer(this.properties);
// 配置輪詢超時時間,如果在properties中配置了KEY_POLL_TIMEOUT參數,則返回具體的配置值,否則返回默認值DEFAULT_POLL_TIMEOUT
try {
if (properties.containsKey(KEY_POLL_TIMEOUT)) {
this.pollTimeout = Long.parseLong(properties.getProperty(KEY_POLL_TIMEOUT));
} else {
this.pollTimeout = DEFAULT_POLL_TIMEOUT;
}
}
catch (Exception e) {
throw new IllegalArgumentException("Cannot parse poll timeout for '" + KEY_POLL_TIMEOUT + '\'', e);
}
}
其他方法分析
KafkaFetcher對象創建
// 父類(FlinkKafkaConsumerBase)方法重寫,該方法的作用是返回一個fetcher實例,
// fetcher的作用是連接kafka的broker,拉去數據並進行反序列化,然后將數據輸出為數據流(data stream)
@Override
protected AbstractFetcher<T, ?> createFetcher(
SourceContext<T> sourceContext,
Map<KafkaTopicPartition, Long> assignedPartitionsWithInitialOffsets,
SerializedValue<AssignerWithPeriodicWatermarks<T>> watermarksPeriodic,
SerializedValue<AssignerWithPunctuatedWatermarks<T>> watermarksPunctuated,
StreamingRuntimeContext runtimeContext,
OffsetCommitMode offsetCommitMode,
MetricGroup consumerMetricGroup,
boolean useMetrics) throws Exception {
// 確保當偏移量的提交模式為ON_CHECKPOINTS(條件1:開啟checkpoint,條件2:consumer.setCommitOffsetsOnCheckpoints(true))時,禁用自動提交
// 該方法為父類(FlinkKafkaConsumerBase)的靜態方法
// 這將覆蓋用戶在properties中配置的任何設置
// 當offset的模式為ON_CHECKPOINTS,或者為DISABLED時,會將用戶配置的properties屬性進行覆蓋
// 具體是將ENABLE_AUTO_COMMIT_CONFIG = "enable.auto.commit"的值重置為"false
// 可以理解為:如果開啟了checkpoint,並且設置了consumer.setCommitOffsetsOnCheckpoints(true),默認為true,
// 就會將kafka properties的enable.auto.commit強制置為false
adjustAutoCommitConfig(properties, offsetCommitMode);
return new KafkaFetcher<>(
sourceContext,
assignedPartitionsWithInitialOffsets,
watermarksPeriodic,
watermarksPunctuated,
runtimeContext.getProcessingTimeService(),
runtimeContext.getExecutionConfig().getAutoWatermarkInterval(),
runtimeContext.getUserCodeClassLoader(),
runtimeContext.getTaskNameWithSubtasks(),
deserializer,
properties,
pollTimeout,
runtimeContext.getMetricGroup(),
consumerMetricGroup,
useMetrics);
}
該方法的作用是返回一個fetcher實例,fetcher的作用是連接kafka的broker,拉去數據並進行反序列化,然后將數據輸出為數據流(data stream),在這里對自動偏移量提交模式進行了強制調整,即確保當偏移量的提交模式為ON_CHECKPOINTS(條件1:開啟checkpoint,條件2:consumer.setCommitOffsetsOnCheckpoints(true))時,禁用自動提交。這將覆蓋用戶在properties中配置的任何設置,簡單可以理解為:如果開啟了checkpoint,並且設置了consumer.setCommitOffsetsOnCheckpoints(true),默認為true,就會將kafka properties的enable.auto.commit強制置為false。關於offset的提交模式,見下文的偏移量提交模式分析。
判斷是否設置了自動提交
@Override
protected boolean getIsAutoCommitEnabled() {
//
return getBoolean(properties, ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true) &&
PropertiesUtil.getLong(properties, ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 5000) > 0;
}
判斷是否在kafka的參數開啟了自動提交,即enable.auto.commit=true,並且auto.commit.interval.ms>0, 注意:如果沒有沒有設置enable.auto.commit的參數,則默認為true, 如果沒有設置auto.commit.interval.ms的參數,則默認為5000毫秒。該方法會在FlinkKafkaConsumerBase的open方法進行初始化的時候調用。
反序列化
private static void setDeserializer(Properties props) {
// 默認的反序列化方式
final String deSerName = ByteArrayDeserializer.class.getName();
//獲取用戶配置的properties關於key與value的反序列化模式
Object keyDeSer = props.get(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG);
Object valDeSer = props.get(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG);
// 如果配置了,則使用用戶配置的值
if (keyDeSer != null && !keyDeSer.equals(deSerName)) {
LOG.warn("Ignoring configured key DeSerializer ({})", ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG);
}
if (valDeSer != null && !valDeSer.equals(deSerName)) {
LOG.warn("Ignoring configured value DeSerializer ({})", ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG);
}
// 沒有配置,則使用ByteArrayDeserializer進行反序列化
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, deSerName);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, deSerName);
}
確保配置了kafka消息的key與value的反序列化方式,如果沒有配置,則使用ByteArrayDeserializer序列化器,
ByteArrayDeserializer類的deserialize方法是直接將數據進行return,未做任何處理。
FlinkKafkaConsumerBase源碼
@Internal
public abstract class FlinkKafkaConsumerBase<T> extends RichParallelSourceFunction<T> implements
CheckpointListener,
ResultTypeQueryable<T>,
CheckpointedFunction {
public static final int MAX_NUM_PENDING_CHECKPOINTS = 100;
public static final long PARTITION_DISCOVERY_DISABLED = Long.MIN_VALUE;
public static final String KEY_DISABLE_METRICS = "flink.disable-metrics";
public static final String KEY_PARTITION_DISCOVERY_INTERVAL_MILLIS = "flink.partition-discovery.interval-millis";
private static final String OFFSETS_STATE_NAME = "topic-partition-offset-states";
private boolean enableCommitOnCheckpoints = true;
/**
* 偏移量的提交模式,僅能通過在FlinkKafkaConsumerBase#open(Configuration)進行配置
* 該值取決於用戶是否開啟了checkpoint
*/
private OffsetCommitMode offsetCommitMode;
/**
* 配置從哪個位置開始消費kafka的消息,
* 默認為StartupMode#GROUP_OFFSETS,即從當前提交的偏移量開始消費
*/
private StartupMode startupMode = StartupMode.GROUP_OFFSETS;
private Map<KafkaTopicPartition, Long> specificStartupOffsets;
private Long startupOffsetsTimestamp;
/**
* 確保當偏移量的提交模式為ON_CHECKPOINTS時,禁用自動提交,
* 這將覆蓋用戶在properties中配置的任何設置。
* 當offset的模式為ON_CHECKPOINTS,或者為DISABLED時,會將用戶配置的properties屬性進行覆蓋
* 具體是將ENABLE_AUTO_COMMIT_CONFIG = "enable.auto.commit"的值重置為"false,即禁用自動提交
* @param properties kafka配置的properties,會通過該方法進行覆蓋
* @param offsetCommitMode offset提交模式
*/
static void adjustAutoCommitConfig(Properties properties, OffsetCommitMode offsetCommitMode) {
if (offsetCommitMode == OffsetCommitMode.ON_CHECKPOINTS || offsetCommitMode == OffsetCommitMode.DISABLED) {
properties.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
}
}
/**
* 決定是否在開啟checkpoint時,在checkpoin之后提交偏移量,
* 只有用戶配置了啟用checkpoint,該參數才會其作用
* 如果沒有開啟checkpoint,則使用kafka的配置參數:enable.auto.commit
* @param commitOnCheckpoints
* @return
*/
public FlinkKafkaConsumerBase<T> setCommitOffsetsOnCheckpoints(boolean commitOnCheckpoints) {
this.enableCommitOnCheckpoints = commitOnCheckpoints;
return this;
}
/**
* 從最早的偏移量開始消費,
*該模式下,Kafka 中的已經提交的偏移量將被忽略,不會用作起始位置。
*可以通過consumer1.setStartFromEarliest()進行設置
*/
public FlinkKafkaConsumerBase<T> setStartFromEarliest() {
this.startupMode = StartupMode.EARLIEST;
this.startupOffsetsTimestamp = null;
this.specificStartupOffsets = null;
return this;
}
/**
* 從最新的數據開始消費,
* 該模式下,Kafka 中的 已提交的偏移量將被忽略,不會用作起始位置。
*
*/
public FlinkKafkaConsumerBase<T> setStartFromLatest() {
this.startupMode = StartupMode.LATEST;
this.startupOffsetsTimestamp = null;
this.specificStartupOffsets = null;
return this;
}
/**
*指定具體的偏移量時間戳,毫秒
*對於每個分區,其時間戳大於或等於指定時間戳的記錄將用作起始位置。
* 如果一個分區的最新記錄早於指定的時間戳,則只從最新記錄讀取該分區數據。
* 在這種模式下,Kafka 中的已提交 offset 將被忽略,不會用作起始位置。
*/
protected FlinkKafkaConsumerBase<T> setStartFromTimestamp(long startupOffsetsTimestamp) {
checkArgument(startupOffsetsTimestamp >= 0, "The provided value for the startup offsets timestamp is invalid.");
long currentTimestamp = System.currentTimeMillis();
checkArgument(startupOffsetsTimestamp <= currentTimestamp,
"Startup time[%s] must be before current time[%s].", startupOffsetsTimestamp, currentTimestamp);
this.startupMode = StartupMode.TIMESTAMP;
this.startupOffsetsTimestamp = startupOffsetsTimestamp;
this.specificStartupOffsets = null;
return this;
}
/**
*
* 從具體的消費者組最近提交的偏移量開始消費,為默認方式
* 如果沒有發現分區的偏移量,使用auto.offset.reset參數配置的值
* @return
*/
public FlinkKafkaConsumerBase<T> setStartFromGroupOffsets() {
this.startupMode = StartupMode.GROUP_OFFSETS;
this.startupOffsetsTimestamp = null;
this.specificStartupOffsets = null;
return this;
}
/**
*為每個分區指定偏移量進行消費
*/
public FlinkKafkaConsumerBase<T> setStartFromSpecificOffsets(Map<KafkaTopicPartition, Long> specificStartupOffsets) {
this.startupMode = StartupMode.SPECIFIC_OFFSETS;
this.startupOffsetsTimestamp = null;
this.specificStartupOffsets = checkNotNull(specificStartupOffsets);
return this;
}
@Override
public void open(Configuration configuration) throws Exception {
// determine the offset commit mode
// 決定偏移量的提交模式,
// 第一個參數為是否開啟了自動提交,
// 第二個參數為是否開啟了CommitOnCheckpoint模式
// 第三個參數為是否開啟了checkpoint
this.offsetCommitMode = OffsetCommitModes.fromConfiguration(
getIsAutoCommitEnabled(),
enableCommitOnCheckpoints,
((StreamingRuntimeContext) getRuntimeContext()).isCheckpointingEnabled());
// 省略的代碼
}
// 省略的代碼
/**
* 創建一個fetcher用於連接kafka的broker,拉去數據並進行反序列化,然后將數據輸出為數據流(data stream)
* @param sourceContext 數據輸出的上下文
* @param subscribedPartitionsToStartOffsets 當前sub task需要處理的topic分區集合,即topic的partition與offset的Map集合
* @param watermarksPeriodic 可選,一個序列化的時間戳提取器,生成periodic類型的 watermark
* @param watermarksPunctuated 可選,一個序列化的時間戳提取器,生成punctuated類型的 watermark
* @param runtimeContext task的runtime context上下文
* @param offsetCommitMode offset的提交模式,有三種,分別為:DISABLED(禁用偏移量自動提交),ON_CHECKPOINTS(僅僅當checkpoints完成之后,才提交偏移量給kafka)
* KAFKA_PERIODIC(使用kafka自動提交函數,周期性自動提交偏移量)
* @param kafkaMetricGroup Flink的Metric
* @param useMetrics 是否使用Metric
* @return 返回一個fetcher實例
* @throws Exception
*/
protected abstract AbstractFetcher<T, ?> createFetcher(
SourceContext<T> sourceContext,
Map<KafkaTopicPartition, Long> subscribedPartitionsToStartOffsets,
SerializedValue<AssignerWithPeriodicWatermarks<T>> watermarksPeriodic,
SerializedValue<AssignerWithPunctuatedWatermarks<T>> watermarksPunctuated,
StreamingRuntimeContext runtimeContext,
OffsetCommitMode offsetCommitMode,
MetricGroup kafkaMetricGroup,
boolean useMetrics) throws Exception;
protected abstract boolean getIsAutoCommitEnabled();
// 省略的代碼
}
上述代碼是FlinkKafkaConsumerBase的部分代碼片段,基本上對其做了詳細注釋,里面的有些方法是FlinkKafkaConsumer繼承的,有些是重寫的。之所以在這里給出,可以對照FlinkKafkaConsumer的源碼,從而方便理解。
偏移量提交模式分析
Flink Kafka Consumer 允許有配置如何將 offset 提交回 Kafka broker(或 0.8 版本的 Zookeeper)的行為。請注意:Flink Kafka Consumer 不依賴於提交的 offset 來實現容錯保證。提交的 offset 只是一種方法,用於公開 consumer 的進度以便進行監控。
配置 offset 提交行為的方法是否相同,取決於是否為 job 啟用了 checkpointing。在這里先給出提交模式的具體結論,下面會對兩種方式進行具體的分析。基本的結論為:
開啟checkpoint
情況1:用戶通過調用 consumer 上的 setCommitOffsetsOnCheckpoints(true) 方法來啟用 offset 的提交(默認情況下為 true )
那么當 checkpointing 完成時,Flink Kafka Consumer 將提交的 offset 存儲在 checkpoint 狀態中。
這確保 Kafka broker 中提交的 offset 與 checkpoint 狀態中的 offset 一致。
注意,在這個場景中,Properties 中的自動定期 offset 提交設置會被完全忽略。
此情況使用的是ON_CHECKPOINTS情況2:用戶通過調用 consumer 上的 setCommitOffsetsOnCheckpoints("false") 方法來禁用 offset 的提交,則使用DISABLED模式提交offset
未開啟checkpoint
Flink Kafka Consumer 依賴於內部使用的 Kafka client 自動定期 offset 提交功能,因此,要禁用或啟用 offset 的提交情況1:配置了Kafka properties的參數配置了"enable.auto.commit" = "true"或者 Kafka 0.8 的 auto.commit.enable=true,使用KAFKA_PERIODIC模式提交offset,即自動提交offset
情況2:沒有配置enable.auto.commit參數,使用DISABLED模式提交offset,這意味着kafka不知道當前的消費者組的消費者每次消費的偏移量。
提交模式源碼分析
offset的提交模式
public enum OffsetCommitMode {
// 禁用偏移量自動提交
DISABLED,
// 僅僅當checkpoints完成之后,才提交偏移量給kafka
ON_CHECKPOINTS,
// 使用kafka自動提交函數,周期性自動提交偏移量
KAFKA_PERIODIC;
}
提交模式的調用
public class OffsetCommitModes {
public static OffsetCommitMode fromConfiguration(
boolean enableAutoCommit,
boolean enableCommitOnCheckpoint,
boolean enableCheckpointing) {
// 如果開啟了checkinpoint,執行下面判斷
if (enableCheckpointing) {
// 如果開啟了checkpoint,進一步判斷是否在checkpoin啟用時提交(setCommitOffsetsOnCheckpoints(true)),如果是則使用ON_CHECKPOINTS模式
// 否則使用DISABLED模式
return (enableCommitOnCheckpoint) ? OffsetCommitMode.ON_CHECKPOINTS : OffsetCommitMode.DISABLED;
} else {
// 若Kafka properties的參數配置了"enable.auto.commit" = "true",則使用KAFKA_PERIODIC模式提交offset
// 否則使用DISABLED模式
return (enableAutoCommit) ? OffsetCommitMode.KAFKA_PERIODIC : OffsetCommitMode.DISABLED;
}
}
}
小結
本文主要介紹了Flink Kafka Consumer,首先對FlinkKafkaConsumer的不同版本進行了對比,然后給出了一個完整的Demo案例,並對案例的配置參數進行了詳細解釋,接着分析了FlinkKafkaConsumer的繼承關系,並分別對FlinkKafkaConsumer以及其父類FlinkKafkaConsumerBase的源碼進行了解讀,最后從源碼層面分析了Flink Kafka Consumer的偏移量提交模式,並對每一種提交模式進行了梳理。




本文分享自微信公眾號 - 大數據技術與數倉(gh_95306769522d)。
如有侵權,請聯系 support@oschina.cn 刪除。
本文參與“OSC源創計划”,歡迎正在閱讀的你也加入,一起分享。