官網博客中: Apache Flink中的端到端精確一次處理概述 對Flink 端到端精確一次處理和兩段提交的原理,有詳盡的描述
這里要寫的是,關於 Flink kafka 端到端精確一次的測試
之前就大概測試過相應內容,應該是測試失敗了的,只得到了至少一次的結果(之前的關注點不在這個上面,下面會說明為什么只得到 至少一次 的結果)。
這一次是要做Flink HA 相關的配置,有個重要的點就是任務在異常恢復的時候,是否能保持精確一次,這個關乎線上的數據和我們代碼的寫法(如果Flink 不能保證精確一次,就需要在代碼里添加對應的內容)。
測試的前提當然是,開啟了 checkpoint,也設置了checkpoint mode 設為精確一次:
val rock = new RocksDBStateBackend(Common.CHECK_POINT_DATA_DIR) env.setStateBackend(rock.getCheckpointBackend) // checkpoint interval 10 minute env.enableCheckpointing(2 * 60 * 1000) env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE)
同時 Kafka 的生產者也必須開啟精確一次的語義: FlinkKafkaProducer 沒有過期的公有構造方法,都需要制定 Kafka 生產者的一致性語義:
public FlinkKafkaProducer(String defaultTopic, KafkaSerializationSchema<IN> serializationSchema, Properties producerConfig, FlinkKafkaProducer.Semantic semantic) public FlinkKafkaProducer(String defaultTopic, KafkaSerializationSchema<IN> serializationSchema, Properties producerConfig, FlinkKafkaProducer.Semantic semantic, int kafkaProducersPoolSize)
特別吐槽下,1.10 版本 KafkaSerializationSchema 沒有提供對應的實現類,讓我這種菜鳥很尷尬
看下我的生產者寫法(很挫)
Common.getProp.setProperty("transaction.timeout.ms", 1000*60*2+"") val sink = new FlinkKafkaProducer[String](topic+"_out" // 沒用了 , new MyKafkaSerializationSchema[String](topic + "_out") // 指到這里了 , Common.getProp , FlinkKafkaProducer.Semantic.EXACTLY_ONCE)
MyKafkaSerializationSchema 的實現如下:
public class MyKafkaSerializationSchema<T> implements KafkaSerializationSchema<T>, KafkaContextAware<T> { private String topic; public MyKafkaSerializationSchema(String topic) { this.topic = topic; } @Override public ProducerRecord<byte[], byte[]> serialize(T element, @Nullable Long timestamp) { return new ProducerRecord<>(topic, element.toString().getBytes()); } @Override public String getTargetTopic(T element) { return null; } }
由於ProducerRecord 必須要指定 topic,但是又獲取不到 FlinkKafkaProducer 中指定的topic,就先這樣寫了
測試代碼就簡單了:
val topic = "simple_string_kafka" val source = new FlinkKafkaConsumer[String](topic, new SimpleStringSchema(), Common.getProp) # 開啟精確一次語義會報錯,必須在kakfa 的 prop 中指定事務過期的時間 Common.getProp.setProperty("transaction.timeout.ms", 1000*60*2+"") val sink = new FlinkKafkaProducer[String](topic+"_out" // 沒用 , new MyKafkaSerializationSchema[String](topic + "_out") // 指到這里了 , Common.getProp , FlinkKafkaProducer.Semantic.EXACTLY_ONCE) env.addSource(source) .name("source") .disableChaining() .map(str => { str }) .disableChaining() .name("map1") .map(s => { s }) .addSink(sink) .name("sink")
代碼就這樣的,然后就是打包上傳服務器。
跑起來就這樣的了:
往 source 發送的數據如下:
只有id和create_time 兩個字段,id 是個隨機的字符串,create_time 是當前的時間戳(每秒一條數據)
接收到 sink topic 的數據如下:
這里的時候,其實是有個疑問的,source 接收到一條數據,直接就寫到了sink,我也消費出來了,怎么保證精確一次呢?
kafka 在 0.11 版本之后,增加了對事物的支持,這就是 kafka 端到端一致性的關鍵。
不然來一條數據,直接就寫出去了,下游也消費了,這個時候異常恢復,source是后退了,sink 卻沒辦法后退了。
關鍵就這 kakfa properties 的這個參數:
// default read_uncommitted prop.put("isolation.level", "read_committed");
kafka 消費者的隔離級別,默認是 read_uncommitted (讀未提交),意思是有消息就讀,不管是否提交。
在消費 sink topic 的消費者中添加這個配置,就可以看到,消息不是有一條就輸出一條,而是在完成一個checkpoint 后才會輸出一批數據,就對應一個checkpoint 時間,flink 處理過的數據,就這樣做到 端到端的一致性。
這樣Flink 在正常執行的時候,source 消費一條數據,處理后,直接寫到sink(也就是kafka),在做checkpoint 的時候,再提交事務,如果這個過程中異常恢復了,source 的offset 直接從 checkpoint 中獲取,而之前已經消費過的數據,雖然已經寫到kafka,但是沒有提交,就這樣做到了端到端的一致性。
在 FlinkKafkaProducer 中添加精確一次語義,會到一個異常:
在生產者的 prop 中添加如下配置即可:
Common.getProp.setProperty("transaction.timeout.ms", 1000*60*2+"")
配置合理的超時時間,不能大於kafka 的: transaction.max.timeout.ms 配置(默認 900 秒)
搞定
歡迎關注Flink菜鳥公眾號,會不定期更新Flink(開發技術)相關的推文