Flink 讀寫Kafka
在Flink中,我們分別用Source Connectors代表連接數據源的連接器,用Sink Connector代表連接數據輸出的連接器。下面我們介紹一下Flink中用於讀寫kafka的source & sink connector。
Apache Kafka Source Connectors
Apache Kafka 是一個分布式的流平台,其核心是一個分布式的發布-訂閱消息系統,被廣泛用於消費與分發事件流。
Kafka將事件流組織成為topics。一個topic是一個事件日志(event-log),保證讀入事件的順序為事件寫入的順序。為了實現可擴展,topic可以被分為多個partition,並分布在集群中的各個節點中。但是在topic分區后,由於consumers可能會從多個partition讀入數據,所以此時只能在partition級別保證事件的順序。在Kafka中,當前讀的位置稱為偏移量(offset)。
可以通過sbt或maven構建Flink Kafka connector 的依賴,下面是一個sbt的例子:
// https://mvnrepository.com/artifact/org.apache.flink/flink-connector-kafka
libraryDependencies += "org.apache.flink" %% "flink-connector-kafka" % "1.8.1"
Flink Kafka connector以並行的方式讀入事件流,每個並行的source task 都可以從一個或多個partition讀入數據。Task對於每個它當前正在讀的partition,都會追蹤當前的offset,並將這些offset數據存儲到它的檢查點數據中。當發生故障,進行恢復時,offset被取出並重置,使得數據可以在上次檢查點時的offset繼續讀數據。Flink Kafka connector並不依賴於Kafka 本身的offset-tracking 機制(也就是consumer groups機制)。下圖顯示的是partitions被分配給不同的source task:
下面是一個創建一個 kafka source 的例子:
val properties = new Properties() properties.setProperty("bootstrap.servers", "localhost:9092") properties.setProperty("group.id", "test") val stream: DataStream[String] = env.addSource( new FlinkKafkaConsumer[String]( "topic", new SimpleStringSchema(), properties))
這個FlinkKafkaConsumer 構造函數有3 個參數,第一個參數定義的是讀入的目標topic名。這里可以是單個topic、一組topics、亦或是一個正則表達式匹配所有符合規則的topics。當從多個topics讀入事件時,Kafka connectors會將所有topics的所有partitions 一視同仁,並將這些事件合並到一個單個流中。
第二個參數是一個DeserializationSchema
或KeyedDeserializationSchema
。Kafka中的消息是以純字節消息存儲,所以需要被反序列化為Java或Scala 對象。在上例中用到的SimpleStringSchema
是一個內置的DeserializationSchema
,可以將字節數字反序列化為一個String。Flink也提供了對Apache Avro以及基於text的JSON編碼的實現。我們也可以通過實現DeserializationSchema
與KeyedDeserializationSchema
這兩個公開的接口,用於實現自定義的反序列化邏輯。
第三個參數是一個Properties
對象,用於配置Kafka的客戶端。此對象至少要包含兩個條目,"bootstrap.servers"
與"group.id"
。
為了提取事件-時間的時間戳,並生成水印,我們可以為Kafka consumer提供AssignerWithPeriodicWatermark
或AssignerWithPunctuatedWatermark
,具體可以通過調用FlinkKafkaConsumer.assignTimestampsAndWatermark()
實現。這個assigner會被應用到每個partition,並影響每個partition的message序列保證。Source 實例會根據水印的propagation protocol 將partition的水印進行merge。
需要注意的是:如果一個partition變為inactive狀態,則source 實例的水印機制便無法向前推進。並最終會因為一個inactive的partition導致整個application停止運行,因為application的水印無法向前推進。
在Kafka 0.10.0版本后,提供了消息時間戳的支持。如果application是以event-time模式運行,則consumer會自動從Kakfa消息中獲取時間戳,並以此時間戳為event-time時間戳。在這種情況下,我們需要生成水印並應用AssignerWithPeriodicWatermark
或AssignerWithPunctuatedWatermark
,它們會向前推進前面被分配的Kafka時間戳。
還有一些需要注意的配置選項,在 consumer 開始讀Kafka 消息時,我們可以配置它的讀起始位置,有以下幾種:
- Kafka為consumer group(由group.id配置指定)存儲的最近的讀取位置。這個也是默認的行為,指定方式為:
FlinkKafkaConsumer.setStartFromGroupOffsets()
- 每個partition最開始的offset:
FlinkKafkaConsumer.setStartFromEarliest()
- 每個partition最近的offset:
FlinkKafkaConsumer.setStartFromLatest()
- 給定時間戳之后的records(需Kafka 版本高於0.10.x):
FlinkKafkaConsumer.setStartFromTimestamp(long)
- 為所有partition指定讀取位置,傳入參數為Map對象:
FlinkKafkaConsumer.setStartFromSpecificOffsets(Map)
需要注意的是:這些配置僅影響第一次讀取的位置。在application出現故障后恢復時,使用的是檢查點的offset(或是在以savepoint啟動時,使用的是savepoint中的offset)。
Flink Kafka consumer可以被配置為自動發現新加入的topics(通過正則表達式)或新加入的partition的模式。默認此功能是禁止的,不過可以通過指定flink.partition-discovery.interval-millis
參數(在Properties
對象中指定)開啟,只需要將此參數指定為非負整數即可。
Apache Kafka Sink Connector
Flink提供為Kafka 0.8 版本后所有 Kafka版本的sink connectors。同樣,根據環境中Flink的版本,我們需要加上相關依賴(具體可參考上文)。
下面是一個Kafka sink的例子:
val stream: DataStream[String] = ... val myProducer = new FlinkKafkaProducer[String]( "localhost:9092", // broker list "topic", // target topic new SimpleStringSchema) // serialization schema stream.addSink(myProducer)
這個FlinkKafkaProducer構造函數有3個參數。第一個參數是一個以逗號分隔的Kafka broker地址。第二個參數是目標topic名,最后是一個SerializationSchema
,用於將輸入數據(此例子中String)轉換為一個字節數組。此處的DeserializationSchema
與Kakfa source connector中的DeserializationSchema
相對應。
FlinkKafkaProducer
提供了多種構造函數的選項:
- 與Kafka source connector 類似,可以傳遞一個
Properties
對象,用於為客戶端提供自定義的配置。在使用Properties
時,broker list需要以"bootstrap.servers"
屬性的方式寫在Properties
中。 - 也可以指定一個
FlinkKafkaPartitioner
,用於控制records與Kafka partition中的映射。 - 除了使用
SerializationSchema
將記錄轉換為字節數組,我們也可以實現一個KeyedSerializationSchema
接口,用於將一條記錄轉換為兩個字節數組——一個是key,另一個是value。也可以通過它實現更多的功能,例如將目標topic重寫為多個topics。
為KAFKA SINK提供AT-LEAST-ONCE 保證
Flink Kafka sink可提供的一致性保障由它的配置決定。Kafka sink 在滿足以下條件時,可以提供at-least-once 保障:
- Flink的檢查點功能已開啟,並且application的所有source是可重置的(也就是reading offset可重置)
- 如果寫未成功,則sink connector拋出一個異常,導致application失敗並recover。這個是默認的行為。在Kafka 客戶端,我們也可以配置為:在寫入失敗時,進行重試,重試參數可由
retries
屬性指定(默認為0,設置為大於0即可),在達到重試次數后,再申明寫入失敗。我們也可以配置sink僅將寫入失敗的操作記錄到日志,而不拋出任何異常,對sink對象調用setLogFailuresOnly(true)
即可。需要注意的是這個會導致application無任何輸出一致性保證。 - sink connector在完成它的檢查點之前,需要等待Kafka ack 所有in-flight records。這個也是默認的行為。
為KAFKA SINK提供EXACTLY-ONCE一致性保證
Kafka 0.11版本之后引入了對事務寫(transactional writes)的支持。得益於此功能,Flink的Kafka sink也可以提供exactly-once的一致性保證(在正確配置了sink與Kafka的前提下)。同樣,Flink application必須啟用檢查點的功能,並且source為可重置的(也就是reading offset可重置)。
FlinkKafkaProducer
提供了一個構造方法,可以傳入一個Semantic
(語義)
參數,用於控制(由sink提供的)一致性保證。可選的參數值有:
Semantic.NONE
:提供無保證——records可能會丟失或是被寫入多次Semantic.AT_LEAST_ONCE
:可以保證沒有寫丟失,但是可能會有重復的寫入。這個是默認設置。Semantic.EXACTLY_ONCE
:基於Kafka的事務(transactions),保證每個record exactly-once 寫入
在使用Flink application結合Kafka sink 的exactly-once模式時,有幾點需要考慮的地方,這幾點也有助於理解Kafka如何處理事務。簡單地說,Kafka的事務工作流程如下:
- 開啟一個事務,將所有屬於此事務內的消息(寫入)追加到partition的末尾,並標注這些消息為uncommitted
- 在一個事務committed后,這些標記變為committed
- 從Kafka topic消費消息的consumer,可以配置為一個isolation級別(通過
isolation.level
屬性進行配置),申明是否它可以讀uncommitted消息,可讀參數為read_uncommitted
,也是默認配置。不可讀的參數為read_committed
。
如果consumer被配置為read_committed
,則它會在遇到一個uncommitted消息后,停止從一個partition消費數據,並在消息變為committed后,恢復消費數據。
所以,一個正在進行的事務可以阻止consumers從partition讀未committed的數據,並引入相當的延遲。Kafka防止高延時所做的一個改進是:在超過timeout interval時間后,會直接拒絕並關閉事務。此超時時間由transaction.timeout.ms
屬性設置。
在Flink Kafka sink中,這個超時時間非常重要,因為事務超時可能會導致數據丟失。所以我們必須謹慎配置超時時間的屬性。默認情況下,Flink Kafka sink(也就是Kafka producer端的配置)設置transaction.timeout.ms
為1小時。也就是說,我們需要對應調整Kafka broker端的transaction.max.timeout.ms
配置,因為此配置的默認時間為15分鍾,而此時間必須要大於transaction.timeout.ms
。還有一點需要注意的是:committed 消息的可見性是取決於Flink application的檢查點時間間隔的。
檢查Kafka集群的配置
Kafka集群的默認配置可能會導致數據丟失(即使一個寫操作已經被ack)。我們需要特別注意一下Kafka啟動參數:
- acks
- log.flush.interval.messages
- log.flush.interval.ms
- log.flush.*
建議查閱一下Kakfa 官方文檔,對這些配置信息有更進一步的了解。
用戶自定義partitioning以及寫入消息時間戳
在向一個Kafka topic寫入消息時,Flink Kafka sink taks可以選擇寫入topic的哪個partition。FlinkKafkaPartitioner
可以在Flink Kafka sink構造函數中指定(此類為一個抽象類,需要傳入它的一個實現類)。若是未指定,則默認的partitioner會將每個sink task均映射到一個Kafka partition中,也就是說,所有由同一個sink釋放的records會被寫入到同一個partition中。若是task的數目大於partition的數目,則其中部分partition可能會包含多個sink tasks的數據。如果partition的數目大於task的數目,則默認的配置會導致有partition的數據為空。在這種情況下,若是application是運行在event-time模式下對topic進行消費,可能會造成問題。
通過提供用戶定義的FlinkKafkaPartitioner
,我們可以控制records路由到partition的規則。例如,我們創建一個基於key的partitioner、或是輪詢的partitioner(可平均分布records)。也可以使用Kafka的partitioning規則,根據message的key,將record分布到不同的partition中。使用此方式時,需要指定KeyedSerializationSchema
,以便於從messages中抽取keys,並需要設置FlinkKafkaPartitioner
為null
,以disable默認的Flink端的partitioner。
最后,Flink Kafka sink可以配置為寫入消息時間戳(Kafka 0.10 版本之后支持)。為一條record寫入event-time時間戳到Kafka的功能可通過在sink對象上調用setWriteTimestampToKafka(true)
啟用。
References:
Vasiliki Kalavri, Fabian Hueske. Stream Processing With Apache Flink. 2019