預定義的source和sink
大多都是在測試,開發驗證中使用
自帶的連接器
參考官網:https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/connectors/
基於Apache Bahir的連接器
比如寫redis: https://bahir.apache.org/docs/flink/current/flink-streaming-redis/
有時候在Flink 項目中訪問 Redis 的方法都是自己進行的實現,推薦使用 Bahir 連接器。
基於異步 I/O
異步 I/O 是 Flink 提供的非常底層的與外部系統交互的方式。
在流式系統中跟外部數據源做一個關聯,比如跟mysql數據庫中的一張表進行關聯,即可在map或者flatmap中去跟數據庫建立連接讀取數據,,如果用同步IO的話會等待其響應的時間比較長,影響整個作業的吞吐。所以為了解決這種問題,而引入了異步IO的方式,以批量發送批量獲取結果來提高吞吐,具體異步IO的實現原理可以通過下面的連接查看。
Flink kafka connector
Flink kafka consumer
1.構建consumer實例
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); //設置環境 env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); env.enableCheckpointing(60*1000, CheckpointingMode.EXACTLY_ONCE); //設置kafka相關屬性 Properties properties = new Properties(); properties.setProperty("bootstrap.servers", "localhost:9092"); // properties.setProperty("zookeeper.connect", "localhost:2181");// only required for Kafka 0.8 properties.setProperty("group.id", "test"); FlinkKafkaConsumer010<String> myConsumer = new FlinkKafkaConsumer010<>("myTopic", new SimpleStringSchema(), properties); // new SimpleStringSchema():表示用什么樣的方式來反序列化kafka中的二進制數據。這里按字符串的方式來反序列化 // new FlinkKafkaConsumer010:表示kafka版本為0.10.x // 如果kafka為0.8.x或者0.9.x 則使用FlinkKafkaConsumer08或者FlinkKafkaConsumer09 // 如果Kafka >= 1.0.0 則使用FlinkKafkaConsumer
2.反序列化數據
Flink Kafka Consumer需要知道怎么將kafka中的二進制數據轉換為Java/Scala對象,我們在使用時候通過定義DeserializationSchema來指定如何反序列化數據,然后在處理每一條kafak message時候通過調用deserialize(byte[] message)
方法來進行反序列化。
常用的DeserializationSchema
-
SimpleStringSchema:按字符串的方式進行序列化和反序列化
-
TypeInformationSerializationSchema:基於flink的TypeInformation來構建schema
-
JsonDeserializationSchema:使用jackson反序列化json格式消息,並返回ObjectNode,通過objectNode.get("field").as(Int/String/...)()來訪問字段
3.設置消費起始offset
// 從kafka最早的位置開始讀取 myConsumer.setStartFromEarliest(); // 從kafka最新的數據開始讀取 myConsumer.setStartFromLatest(); // 從時間戳>=1561281792000L的數據開始讀取 myConsumer.setStartFromTimestamp(1561281792000L); // (默認配置)從kafka記錄的group.id的位置開始記錄,如果沒有則根據auto.offset.reset設置 myConsumer.setStartFromGroupOffsets(); // 指定確切的offset位置 Map<KafkaTopicPartition, Long> specificStartOffsets = new HashMap<>(); specificStartOffsets.put(new KafkaTopicPartition("myTopic", 0), 23L); specificStartOffsets.put(new KafkaTopicPartition("myTopic", 1), 31L); specificStartOffsets.put(new KafkaTopicPartition("myTopic", 2), 43L); myConsumer.setStartFromSpecificOffsets(specificStartOffsets);
注意:作業故障從checkpoint自動恢復,以及手動做savepoint時,消費的位置從保存狀態中恢復,與該配置無關
4.Topic和Partition動態發現
Partition discovery:
kafka 分區的增加在企業中很常見,在當前分區數不能滿足以下幾種情況時就需要新增分區數
-
流量增大,當前分區數無法支持大數據量的寫入。
-
業務復雜,雖然寫入正常,但是后端消費處理並行度不夠。
默認情況下,分區發現是沒有開啟的,開啟也很簡單,只需要給參數flink.partition-discovery.interval-millis 賦值一個非負值即可,該非負值代表制檢測的周期,是以毫秒為單位的。實現原理是內部有一個單獨的線程定義檢測kafka meta信息進行更新。新發現的分區從earliest的位置開始讀取。 限制是動態分區發現一旦開啟無法從 flink 1.3.x 以前應用的 savepoint 恢復。這種情況下,必須先用 flink 1.3.x 創建一個 savepoint,然后從該savepoint 恢復。
Topic discovery:
增加 topic 的形式來增加並行度和吞吐量。要識別新增的 topic,除了發現新增分區里說的配置 flink.partition-discovery.interval-millis 為 非負值,以外還要求我們在配置 topic 的時候以正則表達式的形式。
FlinkKafkaConsumer011<String> myConsumer = new FlinkKafkaConsumer011<>( java.util.regex.Pattern.compile("test-topic-[0-9]"), new SimpleStringSchema(), properties);
也即是以正則的形式指定要消費的 topic。
5.Commit Offset的方式
分兩種情況:
1.checkpoint禁用
-
基於kafka客戶端的auto commit定期提交offset
-
需要配置enable.auto.commit (or auto.commit.enable for Kafka 0.8) / auto.commit.interval.ms參數到consumer properties中。
2.checkpoint開啟
-
offset自己在checkpoint state中管理和容錯,提交kafka僅作為外部監視消費進度
-
通過setCommitOffsetsOnCheckpoints(boolean)方法控制checkpoint成功之后是否提交offset到kafka當中
6.Timestamp Extraction/Watermark生成
per kafka partition watermark
-
assignTimestampsAndWatermarks,每個partition一個assigner,watermark為多個partition對齊后值(木桶短板原理)
-
不在kafka source后生成watermark,會出現扔掉部分數據情況
Properties properties = new Properties(); properties.setProperty("bootstrap.servers", "localhost:9092"); properties.setProperty("group.id", "test"); FlinkKafkaConsumer010<String> myConsumer = new FlinkKafkaConsumer010<>("topic", new SimpleStringSchema(), properties); myConsumer.assignTimestampsAndWatermarks(new CustomWatermarkEmitter()); DataStream<String> stream = env .addSource(myConsumer) .print();
Flink kafka Producer
1.構建FlinkKafkaProducer
FlinkKafkaProducer011<String> myProducer = new FlinkKafkaProducer011<String>( "localhost:9092", // broker list "my-topic", // target topic new SimpleStringSchema()); // serialization schema // versions 0.10+ allow attaching the records' event timestamp when writing them to Kafka; // this method is not available for earlier Kafka versions myProducer.setWriteTimestampToKafka(true); stream.addSink(myProducer);
代碼中是對應kafka 0.11.x,其他版本構建與上面消費者基本一樣。
2.Kafka Producer Partitioning Scheme
-
FlinkFixedPartitioner
默認情況下producer會使用FlinkFixedPartitioner,每個flink Kafka Producer 子任務就會寫到一個kafka分區里.
Sink task與kafka partition有一個對應關系:parallelInstanceId % partitions.length,如果sink task多於partition,比如4個sink task,1個partition,則4個sink task會均寫入到那一個partition中,如果sink task小於 partition,比如2個sink,4個partition,則sink task會一一對應kafka partition。剩余2個partition不會有數據寫入。
-
Partitioner設置為null
round-robin kafka partitioner 在寫數據到kafka partition時,對數據做輪詢插入,這樣數據分布會比較均勻,但是有個缺點,就是每個sink task都會跟下游的每個kafka partition維持一個連接,這樣會導致維持太多的連接
-
自定義partitioner
flink是支持自定義分區的,比如將一定規則的數據發送到指定kafka分區。需要繼承FlinkKafkaPartitioner類,實現自定義的partitioner,注意partitioner必須是可序列化的。
3.Kafka Producer 容錯
-
Kafka 0.8
在kafka 0.9之前,kafka沒法保證至少一次或者精准一次的實現。
-
Kafka 0.9 and 0.10
在這兩個版本中(FlinkKafkaProducer09和FlinkKafkaProducer010),如果開啟了checkpoint,是可以實現
至少一次。除了開啟checkpoint,還需要設置setLogFailuresOnly(boolean)和setFlushOnCheckpoint(boolean)
setLogFailuresOnly(boolean):默認false,表示在寫失敗時,是否只打印失敗log
setFlushOnCheckpoint(boolean):默認true,checkpoint時保證數據寫入kafka
如果要實現至少一次,需要配置:
setLogFailuresOnly(false)+setFlushOnCheckpoint(true)
-
Kafka 0.11 and newer
開啟checkpoint,兩階段提交sink結合kafka事物,可以保證端到端的精准一次。
https://www.ververica.com/blog/end-to-end-exactly-once-processing-apache-flink-apache-kafka
Flink Kafka 代碼示例
參考:flink-china 董亭亭 快手實時計算引擎團隊負責人
參考:flink 官網