Flink 1.9 FlinkKafkaProducer 使用 EXACTLY_ONCE 錯誤記錄


使用flink FlinkKafkaProducer 往kafka寫入數據的時候要求使用EXACTLY_ONCE語義
本以為本以為按照官網寫一個就完事,但是卻報錯了

代碼

package com.meda.test

import org.apache.flink.streaming.connectors.kafka.{ FlinkKafkaProducer, KafkaSerializationSchema}

//創建一個DataStream
val dStream: DataStream[MapDt] = ...

//kafka配置
val kafkaPro:Properties = ...
  
//創建FlinkKafkaProducer 指定EXACTLY_ONCE
val kafkaSink: FlinkKafkaProducer[ResultDt] = new FlinkKafkaProducer[ResultDt]("top[ic", new ResultDtSerialization("flink-topic-lillcol"), kafkaPro, FlinkKafkaProducer.Semantic.EXACTLY_ONCE)


case class ResultDt(id: String, date_h: String, star: String, end: String, watermark: String, pv: Long, uv: Long)

class ResultDtSerialization(topic: String) extends KafkaSerializationSchema[ResultDt] {
  override def serialize(t: ResultDt, aLong: lang.Long): ProducerRecord[Array[Byte], Array[Byte]] = {
    new ProducerRecord[Array[Byte], Array[Byte]](topic, t.toString.getBytes())
  }
}

遇到問題

FlinkKafkaProducer.Semantic指定為FlinkKafkaProducer.Semantic.AT_LEAST_ONCE時,執行沒有問題。

FlinkKafkaProducer.Semantic指定為FlinkKafkaProducer.Semantic.EXACTLY_ONCE時,執行報下面的錯誤:

org.apache.kafka.common.KafkaException: Unexpected error in InitProducerIdResponse; The transaction timeout is larger than the maximum value allowed by the broker (as configured by transaction.max.timeout.ms).
	at org.apache.kafka.clients.producer.internals.TransactionManager$InitProducerIdHandler.handleResponse(TransactionManager.java:984)
	at org.apache.kafka.clients.producer.internals.TransactionManager$TxnRequestHandler.onComplete(TransactionManager.java:909)
	at org.apache.kafka.clients.ClientResponse.onComplete(ClientResponse.java:109)
	at org.apache.kafka.clients.NetworkClient.completeResponses(NetworkClient.java:557)
	at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:549)
	at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:288)
	at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:235)
	at java.lang.Thread.run(Thread.java:748)
[INFO ] 2019-12-24 15:25:35,212 method:org.apache.flink.runtime.executiongraph.ExecutionGraph.transitionState(ExecutionGraph.java:1324)

錯誤大意是:
事務超時大於broker允許的最大值(transaction.max.timeout.ms)
一開始想都沒想去修改transaction.max.timeout.ms的值,但是沒有解決問題。


解決辦法

官網關於Kafka Producers and Fault Tolerance有一段說明

Kafka brokers by default have transaction.max.timeout.ms set to 15 minutes. 
This property will not allow to set transaction timeouts for the producers larger than it’s value.
FlinkKafkaProducer011 by default sets the transaction.timeout.ms property in producer config to 1 hour, thus transaction.max.timeout.ms should be increased before using the Semantic.EXACTLY_ONCE mode.

Kafka brokers 默認的最大事務超時(transaction.max.timeout.ms)為15 minutes
生產者設置事務超時不允許大於這個值,這個屬性不允許為大於其值的
但是在默認的情況下,FlinkKafkaProducer011設置事務超時屬性(transaction.timeout.ms)為1 hour, 超過默認transaction.max.timeout.ms15 minutes。
因此在使用EXACTLY_ONCE語義的時候需要增大transaction.max.timeout.ms的值。

按照個和說法我只要transaction.max.timeout.ms增加到大於1 hour(即3 600 000ms)‬以上就可以了,但是經過測試還是不行。
最后通過修改生產者的事務超時屬性transaction.timeout.ms解決問題

transaction.timeout.ms從1hour降到5 minutes 成功解決問題。

//增加配置屬性操作如下:
kafkaPro.setProperty("transaction.timeout.ms",1000*60*5+"")

本文原創文章,轉載請注明出處!!!


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM