Flink生產數據到Kafka頻繁出現事務失效導致任務重啟


在生產中需要將一些數據發到kafka,而且需要做到EXACTLY_ONCE,kafka使用的版本為1.1.0,flink的版本為1.8.0,但是會很經常因為提交事務引起錯誤,甚至導致任務重啟

kafka producer的配置如下

  def getKafkaProducer(kafkaAddr: String, targetTopicName: String, kafkaProducersPoolSize: Int): FlinkKafkaProducer[String] = { val properties = new Properties() properties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaAddr) properties.setProperty(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, 6000 * 6 + "") // 設置了retries參數,可以在Kafka的Partition發生leader切換時,Flink不重啟,而是做5次嘗試:
    properties.setProperty(ProducerConfig.RETRIES_CONFIG, "5") properties.setProperty(ProducerConfig.MAX_REQUEST_SIZE_CONFIG, String.valueOf(1048576 * 5)) val serial = new KeyedSerializationSchemaWrapper(new SimpleStringSchema()) //val producer = new FlinkKafkaProducer011[String](targetTopicName, serial, properties, Optional.of(new KafkaProducerPartitioner[String]()), Semantic.EXACTLY_ONCE, kafkaProducersPoolSize)
    val producer = new FlinkKafkaProducer[String](targetTopicName, serial, properties, Optional.of(new KafkaProducerPartitioner[String]()), FlinkKafkaProducer.Semantic.EXACTLY_ONCE, kafkaProducersPoolSize) producer.setWriteTimestampToKafka(true) producer }

Flink env如下

    val env = StreamExecutionEnvironment.getExecutionEnvironment env.enableCheckpointing(60 * 1000 * 1, CheckpointingMode.EXACTLY_ONCE) val config = env.getCheckpointConfig //RETAIN_ON_CANCELLATION在job canceled的時候會保留externalized checkpoint state
 config.enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION) //用於指定checkpoint coordinator上一個checkpoint完成之后最小等多久可以出發另一個checkpoint,當指定這個參數時,maxConcurrentCheckpoints的值為1
    config.setMinPauseBetweenCheckpoints(3000) //用於指定運行中的checkpoint最多可以有多少個,如果有設置了minPauseBetweenCheckpoints,則maxConcurrentCheckpoints這個參數就不起作用了(大於1的值不起作用)
    config.setMaxConcurrentCheckpoints(1) //指定checkpoint執行的超時時間(單位milliseconds),超時沒完成就會被abort掉
    config.setCheckpointTimeout(30000) //用於指定在checkpoint發生異常的時候,是否應該fail該task,默認為true,如果設置為false,則task會拒絕checkpoint然后繼續運行 //https://issues.apache.org/jira/browse/FLINK-11662
    config.setFailOnCheckpointingErrors(false)

然后經常會出現事務失效的問題,報錯有很多種,大概為以下

java.lang.RuntimeException: Error while confirming checkpoint at org.apache.flink.runtime.taskmanager.Task$2.run(Task.java:1218) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) Caused by: org.apache.flink.util.FlinkRuntimeException: Committing one of transactions failed, logging first encountered failure at org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.notifyCheckpointComplete(TwoPhaseCommitSinkFunction.java:296) at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.notifyCheckpointComplete(AbstractUdfStreamOperator.java:130) at org.apache.flink.streaming.runtime.tasks.StreamTask.notifyCheckpointComplete(StreamTask.java:684) at org.apache.flink.runtime.taskmanager.Task$2.run(Task.java:1213) ... 5 more Caused by: org.apache.kafka.common.errors.ProducerFencedException: Producer attempted an operation with an old epoch. Either there is a newer producer with the same transactionalId, or the producer's transaction has been expired by the broker.
org.apache.flink.streaming.connectors.kafka.FlinkKafkaException: Failed to send data to Kafka: Producer attempted an operation with an old epoch. Either there is a newer producer with the same transactionalId, or the producer's transaction has been expired by the broker.
    at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.checkErroneous(FlinkKafkaProducer.java:1002) at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.invoke(FlinkKafkaProducer.java:619) at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.invoke(FlinkKafkaProducer.java:97) at org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.invoke(TwoPhaseCommitSinkFunction.java:228) at org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:56) at org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:202) at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:105) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:300) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711) at java.lang.Thread.run(Thread.java:745) Caused by: org.apache.kafka.common.errors.ProducerFencedException: Producer attempted an operation with an old epoch. Either there is a newer producer with the same transactionalId, or the producer's transaction has been expired by the broker.
org.apache.kafka.common.KafkaException: Cannot perform send because at least one previous transactional or idempotent request has failed with errors. at org.apache.kafka.clients.producer.internals.TransactionManager.failIfNotReadyForSend(TransactionManager.java:278) at org.apache.kafka.clients.producer.internals.TransactionManager.maybeAddPartitionToTransaction(TransactionManager.java:263) at org.apache.kafka.clients.producer.KafkaProducer.doSend(KafkaProducer.java:804) at org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:760) at org.apache.flink.streaming.connectors.kafka.internal.FlinkKafkaInternalProducer.send(FlinkKafkaInternalProducer.java:105) at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.invoke(FlinkKafkaProducer.java:650) at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.invoke(FlinkKafkaProducer.java:97) at org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.invoke(TwoPhaseCommitSinkFunction.java:228) at org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:56) at org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:202) at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:105) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:300) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711) at java.lang.Thread.run(Thread.java:745) Caused by: org.apache.kafka.common.errors.ProducerFencedException: Producer attempted an operation with an old epoch. Either there is a newer producer with the same transactionalId, or the producer's transaction has been expired by the broker.
Checkpoint failed: Failed to send data to Kafka: Producer attempted an operation with an old epoch. Either there is a newer producer with the same transactionalId, or the producer's transaction has been expired by the broker.
 Checkpoint failed: Could not complete snapshot 11 for operator Sink: data_Sink (2/2).

這些錯誤基本涉及到兩階段提交、事務、checkpoint。

查看kafka documentation和研究ProducerConfig這個類后發現 kafka producer 在使用EXACTLY_ONCE的時候需要增加一些配置

the transaction timeout must be larger than the checkpoint interval, but smaller than the broker transaction.max.timeout.ms.

在getKafkaProducer增加以下配置后,出現原來的錯誤減少

    //checkpoint 間隔時間<TRANSACTION_TIMEOUT_CONFIG<kafka transaction.max.timeout.ms (默認900秒)
    properties.setProperty(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, 1000 * 60 * 3 + "") properties.setProperty(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, "1") properties.setProperty(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true")

問題得到緩解。

 

參考:

https://www.cnblogs.com/wangzhuxing/p/10111831.html

http://www.heartthinkdo.com/?p=2040

http://romanmarkunas.com/web/blog/kafka-transactions-in-practice-1-producer/


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM