Flink Connector開發


預定義的source和sink

大多都是在測試,開發驗證中使用

 

 

自帶的連接器

參考官網:https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/connectors/

 

 

基於Apache Bahir的連接器

比如寫redis: https://bahir.apache.org/docs/flink/current/flink-streaming-redis/

有時候在Flink 項目中訪問 Redis 的方法都是自己進行的實現,推薦使用 Bahir 連接器。

 

基於異步 I/O

異步 I/O 是 Flink 提供的非常底層的與外部系統交互的方式。

在流式系統中跟外部數據源做一個關聯,比如跟mysql數據庫中的一張表進行關聯,即可在map或者flatmap中去跟數據庫建立連接讀取數據,,如果用同步IO的話會等待其響應的時間比較長,影響整個作業的吞吐。所以為了解決這種問題,而引入了異步IO的方式,以批量發送批量獲取結果來提高吞吐,具體異步IO的實現原理可以通過下面的連接查看。

 

Flink kafka connector

官網:https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/connectors/kafka.html#apache-kafka-connector

Flink kafka consumer

1.構建consumer實例

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
​
        //設置環境
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
        env.enableCheckpointing(60*1000, CheckpointingMode.EXACTLY_ONCE);
​
        //設置kafka相關屬性
        Properties properties = new Properties();
        properties.setProperty("bootstrap.servers", "localhost:9092");
//        properties.setProperty("zookeeper.connect", "localhost:2181");// only required for Kafka 0.8
        properties.setProperty("group.id", "test");
        FlinkKafkaConsumer010<String> myConsumer = new FlinkKafkaConsumer010<>("myTopic", new SimpleStringSchema(), properties);
// new SimpleStringSchema():表示用什么樣的方式來反序列化kafka中的二進制數據。這里按字符串的方式來反序列化
// new FlinkKafkaConsumer010:表示kafka版本為0.10.x
// 如果kafka為0.8.x或者0.9.x 則使用FlinkKafkaConsumer08或者FlinkKafkaConsumer09
// 如果Kafka >= 1.0.0 則使用FlinkKafkaConsumer

 

2.反序列化數據

Flink Kafka Consumer需要知道怎么將kafka中的二進制數據轉換為Java/Scala對象,我們在使用時候通過定義DeserializationSchema來指定如何反序列化數據,然后在處理每一條kafak message時候通過調用deserialize(byte[] message) 方法來進行反序列化。

常用的DeserializationSchema

  • SimpleStringSchema:按字符串的方式進行序列化和反序列化

  • TypeInformationSerializationSchema:基於flink的TypeInformation來構建schema

  • JsonDeserializationSchema:使用jackson反序列化json格式消息,並返回ObjectNode,通過objectNode.get("field").as(Int/String/...)()來訪問字段

 

3.設置消費起始offset

// 從kafka最早的位置開始讀取
myConsumer.setStartFromEarliest();
// 從kafka最新的數據開始讀取
myConsumer.setStartFromLatest();
// 從時間戳>=1561281792000L的數據開始讀取
myConsumer.setStartFromTimestamp(1561281792000L); 
// (默認配置)從kafka記錄的group.id的位置開始記錄,如果沒有則根據auto.offset.reset設置
myConsumer.setStartFromGroupOffsets(); 
​
// 指定確切的offset位置
Map<KafkaTopicPartition, Long> specificStartOffsets = new HashMap<>();
specificStartOffsets.put(new KafkaTopicPartition("myTopic", 0), 23L);
specificStartOffsets.put(new KafkaTopicPartition("myTopic", 1), 31L);
specificStartOffsets.put(new KafkaTopicPartition("myTopic", 2), 43L);
myConsumer.setStartFromSpecificOffsets(specificStartOffsets);

注意:作業故障從checkpoint自動恢復,以及手動做savepoint時,消費的位置從保存狀態中恢復,與該配置無關

 

4.Topic和Partition動態發現

Partition discovery:

kafka 分區的增加在企業中很常見,在當前分區數不能滿足以下幾種情況時就需要新增分區數

  • 流量增大,當前分區數無法支持大數據量的寫入。

  • 業務復雜,雖然寫入正常,但是后端消費處理並行度不夠。

默認情況下,分區發現是沒有開啟的,開啟也很簡單,只需要給參數flink.partition-discovery.interval-millis 賦值一個非負值即可,該非負值代表制檢測的周期,是以毫秒為單位的。實現原理是內部有一個單獨的線程定義檢測kafka meta信息進行更新。新發現的分區從earliest的位置開始讀取。 限制是動態分區發現一旦開啟無法從 flink 1.3.x 以前應用的 savepoint 恢復。這種情況下,必須先用 flink 1.3.x 創建一個 savepoint,然后從該savepoint 恢復。

Topic discovery:

增加 topic 的形式來增加並行度和吞吐量。要識別新增的 topic,除了發現新增分區里說的配置 flink.partition-discovery.interval-millis 為 非負值,以外還要求我們在配置 topic 的時候以正則表達式的形式。

FlinkKafkaConsumer011<String> myConsumer = new FlinkKafkaConsumer011<>(
    java.util.regex.Pattern.compile("test-topic-[0-9]"),
    new SimpleStringSchema(),
    properties);

也即是以正則的形式指定要消費的 topic。

 

5.Commit Offset的方式

分兩種情況:

1.checkpoint禁用

  • 基於kafka客戶端的auto commit定期提交offset

  • 需要配置enable.auto.commit (or auto.commit.enable for Kafka 0.8) / auto.commit.interval.ms參數到consumer properties中。

2.checkpoint開啟

  • offset自己在checkpoint state中管理和容錯,提交kafka僅作為外部監視消費進度

  • 通過setCommitOffsetsOnCheckpoints(boolean)方法控制checkpoint成功之后是否提交offset到kafka當中

 

6.Timestamp Extraction/Watermark生成

per kafka partition watermark

  • assignTimestampsAndWatermarks,每個partition一個assigner,watermark為多個partition對齊后值(木桶短板原理)

  • 不在kafka source后生成watermark,會出現扔掉部分數據情況

Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "localhost:9092");
properties.setProperty("group.id", "test");
​
FlinkKafkaConsumer010<String> myConsumer =
    new FlinkKafkaConsumer010<>("topic", new SimpleStringSchema(), properties);
myConsumer.assignTimestampsAndWatermarks(new CustomWatermarkEmitter());
​
DataStream<String> stream = env
    .addSource(myConsumer)
    .print();

 

Flink kafka Producer

1.構建FlinkKafkaProducer

FlinkKafkaProducer011<String> myProducer = new FlinkKafkaProducer011<String>(
        "localhost:9092",            // broker list
        "my-topic",                  // target topic
        new SimpleStringSchema());   // serialization schema
// versions 0.10+ allow attaching the records' event timestamp when writing them to Kafka;
// this method is not available for earlier Kafka versions
myProducer.setWriteTimestampToKafka(true);
​
stream.addSink(myProducer);

代碼中是對應kafka 0.11.x,其他版本構建與上面消費者基本一樣。

 

2.Kafka Producer Partitioning Scheme

  • FlinkFixedPartitioner

默認情況下producer會使用FlinkFixedPartitioner,每個flink Kafka Producer 子任務就會寫到一個kafka分區里.

 

 Sink task與kafka partition有一個對應關系:parallelInstanceId % partitions.length,如果sink task多於partition,比如4個sink task,1個partition,則4個sink task會均寫入到那一個partition中,如果sink task小於 partition,比如2個sink,4個partition,則sink task會一一對應kafka partition。剩余2個partition不會有數據寫入。

  • Partitioner設置為null

round-robin kafka partitioner 在寫數據到kafka partition時,對數據做輪詢插入,這樣數據分布會比較均勻,但是有個缺點,就是每個sink task都會跟下游的每個kafka partition維持一個連接,這樣會導致維持太多的連接

  • 自定義partitioner

flink是支持自定義分區的,比如將一定規則的數據發送到指定kafka分區。需要繼承FlinkKafkaPartitioner類,實現自定義的partitioner,注意partitioner必須是可序列化的。

 

3.Kafka Producer 容錯

  • Kafka 0.8

在kafka 0.9之前,kafka沒法保證至少一次或者精准一次的實現。

  • Kafka 0.9 and 0.10

在這兩個版本中(FlinkKafkaProducer09和FlinkKafkaProducer010),如果開啟了checkpoint,是可以實現

至少一次。除了開啟checkpoint,還需要設置setLogFailuresOnly(boolean)和setFlushOnCheckpoint(boolean)

setLogFailuresOnly(boolean):默認false,表示在寫失敗時,是否只打印失敗log

setFlushOnCheckpoint(boolean):默認true,checkpoint時保證數據寫入kafka

如果要實現至少一次,需要配置:

setLogFailuresOnly(false)+setFlushOnCheckpoint(true)

  • Kafka 0.11 and newer

開啟checkpoint,兩階段提交sink結合kafka事物,可以保證端到端的精准一次。

https://www.ververica.com/blog/end-to-end-exactly-once-processing-apache-flink-apache-kafka

 

Flink Kafka 代碼示例

 

 


 

參考:flink-china 董亭亭 快手實時計算引擎團隊負責人

參考:flink 官網


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM