Flink kafka connector 端到端精確一次測試


官網博客中: Apache Flink中的端到端精確一次處理概述  對Flink 端到端精確一次處理和兩段提交的原理,有詳盡的描述

這里要寫的是,關於 Flink  kafka  端到端精確一次的測試

之前就大概測試過相應內容,應該是測試失敗了的,只得到了至少一次的結果(之前的關注點不在這個上面,下面會說明為什么只得到 至少一次 的結果)。

這一次是要做Flink HA 相關的配置,有個重要的點就是任務在異常恢復的時候,是否能保持精確一次,這個關乎線上的數據和我們代碼的寫法(如果Flink 不能保證精確一次,就需要在代碼里添加對應的內容)。

測試的前提當然是,開啟了 checkpoint,也設置了checkpoint mode 設為精確一次:

val rock = new RocksDBStateBackend(Common.CHECK_POINT_DATA_DIR)
env.setStateBackend(rock.getCheckpointBackend)
// checkpoint interval 10 minute
env.enableCheckpointing(2 * 60 * 1000)
env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE)

同時 Kafka 的生產者也必須開啟精確一次的語義: FlinkKafkaProducer 沒有過期的公有構造方法,都需要制定 Kafka 生產者的一致性語義:

public FlinkKafkaProducer(String defaultTopic, KafkaSerializationSchema<IN> serializationSchema, Properties producerConfig, FlinkKafkaProducer.Semantic semantic) 
public FlinkKafkaProducer(String defaultTopic, KafkaSerializationSchema<IN> serializationSchema, Properties producerConfig, FlinkKafkaProducer.Semantic semantic, int kafkaProducersPoolSize) 

特別吐槽下,1.10 版本 KafkaSerializationSchema 沒有提供對應的實現類,讓我這種菜鳥很尷尬

看下我的生產者寫法(很挫)

Common.getProp.setProperty("transaction.timeout.ms", 1000*60*2+"")
    val sink = new FlinkKafkaProducer[String](topic+"_out" // 沒用了
      , new MyKafkaSerializationSchema[String](topic + "_out") // 指到這里了
      , Common.getProp
      , FlinkKafkaProducer.Semantic.EXACTLY_ONCE)
 
        

MyKafkaSerializationSchema 的實現如下:

public class MyKafkaSerializationSchema<T>
        implements KafkaSerializationSchema<T>, KafkaContextAware<T> {
    private String topic;

    public MyKafkaSerializationSchema(String topic) {
        this.topic = topic;
    }

    @Override
    public ProducerRecord<byte[], byte[]> serialize(T element, @Nullable Long timestamp) {

        return new ProducerRecord<>(topic, element.toString().getBytes());
    }

    @Override
    public String getTargetTopic(T element) {
        return null;
    }
}

由於ProducerRecord 必須要指定 topic,但是又獲取不到 FlinkKafkaProducer 中指定的topic,就先這樣寫了

測試代碼就簡單了:

val topic = "simple_string_kafka"
val source = new FlinkKafkaConsumer[String](topic, new SimpleStringSchema(), Common.getProp)

# 開啟精確一次語義會報錯,必須在kakfa 的 prop 中指定事務過期的時間
Common.getProp.setProperty("transaction.timeout.ms", 1000*60*2+"")
val sink = new FlinkKafkaProducer[String](topic+"_out" // 沒用
  , new MyKafkaSerializationSchema[String](topic + "_out") // 指到這里了
  , Common.getProp
  , FlinkKafkaProducer.Semantic.EXACTLY_ONCE)

env.addSource(source)
  .name("source")
  .disableChaining()
  .map(str => {
    str
  })
  .disableChaining()
  .name("map1")
  .map(s => {
    s
  })
  .addSink(sink)
  .name("sink")

代碼就這樣的,然后就是打包上傳服務器。

跑起來就這樣的了:

 

 往 source 發送的數據如下:

 

 只有id和create_time 兩個字段,id 是個隨機的字符串,create_time 是當前的時間戳(每秒一條數據)

接收到 sink  topic 的數據如下:

 

 這里的時候,其實是有個疑問的,source 接收到一條數據,直接就寫到了sink,我也消費出來了,怎么保證精確一次呢?

kafka 在 0.11 版本之后,增加了對事物的支持,這就是 kafka 端到端一致性的關鍵。

不然來一條數據,直接就寫出去了,下游也消費了,這個時候異常恢復,source是后退了,sink 卻沒辦法后退了。

關鍵就這 kakfa properties 的這個參數:

// default read_uncommitted
prop.put("isolation.level", "read_committed");

kafka 消費者的隔離級別,默認是  read_uncommitted (讀未提交),意思是有消息就讀,不管是否提交。

在消費 sink topic 的消費者中添加這個配置,就可以看到,消息不是有一條就輸出一條,而是在完成一個checkpoint 后才會輸出一批數據,就對應一個checkpoint 時間,flink 處理過的數據,就這樣做到 端到端的一致性。

這樣Flink 在正常執行的時候,source 消費一條數據,處理后,直接寫到sink(也就是kafka),在做checkpoint 的時候,再提交事務,如果這個過程中異常恢復了,source 的offset 直接從 checkpoint 中獲取,而之前已經消費過的數據,雖然已經寫到kafka,但是沒有提交,就這樣做到了端到端的一致性。

在 FlinkKafkaProducer 中添加精確一次語義,會到一個異常:

 在生產者的 prop 中添加如下配置即可:

Common.getProp.setProperty("transaction.timeout.ms", 1000*60*2+"")

配置合理的超時時間,不能大於kafka 的: transaction.max.timeout.ms 配置(默認 900 秒)

搞定

 

歡迎關注Flink菜鳥公眾號,會不定期更新Flink(開發技術)相關的推文


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM